Skip to content

Commit

Permalink
Replace deleted elements at addition (#418)
Browse files Browse the repository at this point in the history
* Replace deleted elements at insertion
* Add multithread stress tests
* Add timeout to jobs in actions
* Add locks by label
* Remove python 3.6 tests as it is not available in Ubuntu 22.04
* Fix multithread update of elements
* Update readme and refactoring
  • Loading branch information
dyashuni authored Jan 12, 2023
1 parent 983cea9 commit 3e006ea
Show file tree
Hide file tree
Showing 15 changed files with 884 additions and 81 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, windows-latest]
python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"]
python-version: ["3.7", "3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
Expand All @@ -19,6 +19,7 @@ jobs:
run: python -m pip install .

- name: Test
timeout-minutes: 15
run: python -m unittest discover -v --start-directory python_bindings/tests --pattern "*_test*.py"

test_cpp:
Expand Down Expand Up @@ -52,13 +53,16 @@ jobs:
shell: bash

- name: Test
timeout-minutes: 15
run: |
cd build
if [ "$RUNNER_OS" == "Windows" ]; then
cp ./Release/* ./
fi
./searchKnnCloserFirst_test
./searchKnnWithFilter_test
./multiThreadLoad_test
./multiThread_replace_test
./test_updates
./test_updates update
shell: bash
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ var/
.idea/
.vscode/
.vs/
**.DS_Store
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME)
add_executable(searchKnnWithFilter_test examples/searchKnnWithFilter_test.cpp)
target_link_libraries(searchKnnWithFilter_test hnswlib)

add_executable(multiThreadLoad_test examples/multiThreadLoad_test.cpp)
target_link_libraries(multiThreadLoad_test hnswlib)

add_executable(multiThread_replace_test examples/multiThread_replace_test.cpp)
target_link_libraries(multiThread_replace_test hnswlib)

add_executable(main main.cpp sift_1b.cpp)
target_link_libraries(main hnswlib)
endif()
117 changes: 109 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,33 +54,38 @@ For other spaces use the nmslib library https://github.com/nmslib/nmslib.
* `hnswlib.Index(space, dim)` creates a non-initialized index an HNSW in space `space` with integer dimension `dim`.

`hnswlib.Index` methods:
* `init_index(max_elements, M = 16, ef_construction = 200, random_seed = 100)` initializes the index from with no elements.
* `init_index(max_elements, M = 16, ef_construction = 200, random_seed = 100, allow_replace_deleted = False)` initializes the index from with no elements.
* `max_elements` defines the maximum number of elements that can be stored in the structure(can be increased/shrunk).
* `ef_construction` defines a construction time/accuracy trade-off (see [ALGO_PARAMS.md](ALGO_PARAMS.md)).
* `M` defines tha maximum number of outgoing connections in the graph ([ALGO_PARAMS.md](ALGO_PARAMS.md)).
* `allow_replace_deleted` enables replacing of deleted elements with new added ones.

* `add_items(data, ids, num_threads = -1)` - inserts the `data`(numpy array of vectors, shape:`N*dim`) into the structure.
* `add_items(data, ids, num_threads = -1, replace_deleted = False)` - inserts the `data`(numpy array of vectors, shape:`N*dim`) into the structure.
* `num_threads` sets the number of cpu threads to use (-1 means use default).
* `ids` are optional N-size numpy array of integer labels for all elements in `data`.
- If index already has the elements with the same labels, their features will be updated. Note that update procedure is slower than insertion of a new element, but more memory- and query-efficient.
* `replace_deleted` replaces deleted elements. Note it allows to save memory.
- to use it `init_index` should be called with `allow_replace_deleted=True`
* Thread-safe with other `add_items` calls, but not with `knn_query`.

* `mark_deleted(label)` - marks the element as deleted, so it will be omitted from search results. Throws an exception if it is already deleted.
*

* `unmark_deleted(label)` - unmarks the element as deleted, so it will be not be omitted from search results.

* `resize_index(new_size)` - changes the maximum capacity of the index. Not thread safe with `add_items` and `knn_query`.

* `set_ef(ef)` - sets the query time accuracy/speed trade-off, defined by the `ef` parameter (
[ALGO_PARAMS.md](ALGO_PARAMS.md)). Note that the parameter is currently not saved along with the index, so you need to set it manually after loading.

* `knn_query(data, k = 1, num_threads = -1)` make a batch query for `k` closest elements for each element of the
* `knn_query(data, k = 1, num_threads = -1, filter = None)` make a batch query for `k` closest elements for each element of the
* `data` (shape:`N*dim`). Returns a numpy array of (shape:`N*k`).
* `num_threads` sets the number of cpu threads to use (-1 means use default).
* `filter` filters elements by its labels, returns elements with allowed ids
* Thread-safe with other `knn_query` calls, but not with `add_items`.

* `load_index(path_to_index, max_elements = 0)` loads the index from persistence to the uninitialized index.
* `load_index(path_to_index, max_elements = 0, allow_replace_deleted = False)` loads the index from persistence to the uninitialized index.
* `max_elements`(optional) resets the maximum number of elements in the structure.
* `allow_replace_deleted` specifies whether the index being loaded has enabled replacing of deleted elements.

* `save_index(path_to_index)` saves the index from persistence.

Expand Down Expand Up @@ -142,7 +147,7 @@ p.add_items(data, ids)
# Controlling the recall by setting ef:
p.set_ef(50) # ef should always be > k

# Query dataset, k - number of closest elements (returns 2 numpy arrays)
# Query dataset, k - number of the closest elements (returns 2 numpy arrays)
labels, distances = p.knn_query(data, k = 1)

# Index objects support pickling
Expand All @@ -155,7 +160,6 @@ print(f"Parameters passed to constructor: space={p_copy.space}, dim={p_copy.dim
print(f"Index construction: M={p_copy.M}, ef_construction={p_copy.ef_construction}")
print(f"Index size is {p_copy.element_count} and index capacity is {p_copy.max_elements}")
print(f"Search speed/quality trade-off parameter: ef={p_copy.ef}")

```

An example with updates after serialization/deserialization:
Expand Down Expand Up @@ -196,7 +200,6 @@ p.set_ef(10)
# By default using all available cores
p.set_num_threads(4)


print("Adding first batch of %d elements" % (len(data1)))
p.add_items(data1)

Expand Down Expand Up @@ -226,6 +229,104 @@ labels, distances = p.knn_query(data, k=1)
print("Recall for two batches:", np.mean(labels.reshape(-1) == np.arange(len(data))), "\n")
```

An example with a filter:
```python
import hnswlib
import numpy as np

dim = 16
num_elements = 10000

# Generating sample data
data = np.float32(np.random.random((num_elements, dim)))

# Declaring index
hnsw_index = hnswlib.Index(space='l2', dim=dim) # possible options are l2, cosine or ip

# Initiating index
# max_elements - the maximum number of elements, should be known beforehand
# (probably will be made optional in the future)
#
# ef_construction - controls index search speed/build speed tradeoff
# M - is tightly connected with internal dimensionality of the data
# strongly affects the memory consumption

hnsw_index.init_index(max_elements=num_elements, ef_construction=100, M=16)

# Controlling the recall by setting ef:
# higher ef leads to better accuracy, but slower search
hnsw_index.set_ef(10)

# Set number of threads used during batch search/construction
# By default using all available cores
hnsw_index.set_num_threads(4)

print("Adding %d elements" % (len(data)))
# Added elements will have consecutive ids
hnsw_index.add_items(data, ids=np.arange(num_elements))

print("Querying only even elements")
# Define filter function that allows only even ids
filter_function = lambda idx: idx%2 == 0
# Query the elements for themselves and search only for even elements:
labels, distances = hnsw_index.knn_query(data, k=1, filter=filter_function)
# labels contain only elements with even id
```

An example with replacing of deleted elements:
```python
import hnswlib
import numpy as np

dim = 16
num_elements = 1_000
max_num_elements = 2 * num_elements

# Generating sample data
labels1 = np.arange(0, num_elements)
data1 = np.float32(np.random.random((num_elements, dim))) # batch 1
labels2 = np.arange(num_elements, 2 * num_elements)
data2 = np.float32(np.random.random((num_elements, dim))) # batch 2
labels3 = np.arange(2 * num_elements, 3 * num_elements)
data3 = np.float32(np.random.random((num_elements, dim))) # batch 3

# Declaring index
hnsw_index = hnswlib.Index(space='l2', dim=dim)

# Initiating index
# max_elements - the maximum number of elements, should be known beforehand
# (probably will be made optional in the future)
#
# ef_construction - controls index search speed/build speed tradeoff
# M - is tightly connected with internal dimensionality of the data
# strongly affects the memory consumption

# Enable replacing of deleted elements
hnsw_index.init_index(max_elements=max_num_elements, ef_construction=200, M=16, allow_replace_deleted=True)

# Controlling the recall by setting ef:
# higher ef leads to better accuracy, but slower search
hnsw_index.set_ef(10)

# Set number of threads used during batch search/construction
# By default using all available cores
hnsw_index.set_num_threads(4)

# Add batch 1 and 2 data
hnsw_index.add_items(data1, labels1)
hnsw_index.add_items(data2, labels2) # Note: maximum number of elements is reached

# Delete data of batch 2
for label in labels2:
hnsw_index.mark_deleted(label)

# Replace deleted elements
# Maximum number of elements is reached therefore we cannot add new items,
# but we can replace the deleted ones by using replace_deleted=True
hnsw_index.add_items(data3, labels3, replace_deleted=True)
# hnsw_index contains the data of batch 1 and batch 3 only
```

### Bindings installation

You can install from sources:
Expand Down
140 changes: 140 additions & 0 deletions examples/multiThreadLoad_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#include "../hnswlib/hnswlib.h"
#include <thread>
#include <chrono>


int main() {
std::cout << "Running multithread load test" << std::endl;
int d = 16;
int max_elements = 1000;

std::mt19937 rng;
rng.seed(47);
std::uniform_real_distribution<> distrib_real;

hnswlib::L2Space space(d);
hnswlib::HierarchicalNSW<float>* alg_hnsw = new hnswlib::HierarchicalNSW<float>(&space, 2 * max_elements);

std::cout << "Building index" << std::endl;
int num_threads = 40;
int num_labels = 10;

int num_iterations = 10;
int start_label = 0;

// run threads that will add elements to the index
// about 7 threads (the number depends on num_threads and num_labels)
// will add/update element with the same label simultaneously
while (true) {
// add elements by batches
std::uniform_int_distribution<> distrib_int(start_label, start_label + num_labels - 1);
std::vector<std::thread> threads;
for (size_t thread_id = 0; thread_id < num_threads; thread_id++) {
threads.push_back(
std::thread(
[&] {
for (int iter = 0; iter < num_iterations; iter++) {
std::vector<float> data(d);
hnswlib::labeltype label = distrib_int(rng);
for (int i = 0; i < d; i++) {
data[i] = distrib_real(rng);
}
alg_hnsw->addPoint(data.data(), label);
}
}
)
);
}
for (auto &thread : threads) {
thread.join();
}
if (alg_hnsw->cur_element_count > max_elements - num_labels) {
break;
}
start_label += num_labels;
}

// insert remaining elements if needed
for (hnswlib::labeltype label = 0; label < max_elements; label++) {
auto search = alg_hnsw->label_lookup_.find(label);
if (search == alg_hnsw->label_lookup_.end()) {
std::cout << "Adding " << label << std::endl;
std::vector<float> data(d);
for (int i = 0; i < d; i++) {
data[i] = distrib_real(rng);
}
alg_hnsw->addPoint(data.data(), label);
}
}

std::cout << "Index is created" << std::endl;

bool stop_threads = false;
std::vector<std::thread> threads;

// create threads that will do markDeleted and unmarkDeleted of random elements
// each thread works with specific range of labels
std::cout << "Starting markDeleted and unmarkDeleted threads" << std::endl;
num_threads = 20;
int chunk_size = max_elements / num_threads;
for (size_t thread_id = 0; thread_id < num_threads; thread_id++) {
threads.push_back(
std::thread(
[&, thread_id] {
std::uniform_int_distribution<> distrib_int(0, chunk_size - 1);
int start_id = thread_id * chunk_size;
std::vector<bool> marked_deleted(chunk_size);
while (!stop_threads) {
int id = distrib_int(rng);
hnswlib::labeltype label = start_id + id;
if (marked_deleted[id]) {
alg_hnsw->unmarkDelete(label);
marked_deleted[id] = false;
} else {
alg_hnsw->markDelete(label);
marked_deleted[id] = true;
}
}
}
)
);
}

// create threads that will add and update random elements
std::cout << "Starting add and update elements threads" << std::endl;
num_threads = 20;
std::uniform_int_distribution<> distrib_int_add(max_elements, 2 * max_elements - 1);
for (size_t thread_id = 0; thread_id < num_threads; thread_id++) {
threads.push_back(
std::thread(
[&] {
std::vector<float> data(d);
while (!stop_threads) {
hnswlib::labeltype label = distrib_int_add(rng);
for (int i = 0; i < d; i++) {
data[i] = distrib_real(rng);
}
alg_hnsw->addPoint(data.data(), label);
std::vector<float> data = alg_hnsw->getDataByLabel<float>(label);
float max_val = *max_element(data.begin(), data.end());
// never happens but prevents compiler from deleting unused code
if (max_val > 10) {
throw std::runtime_error("Unexpected value in data");
}
}
}
)
);
}

std::cout << "Sleep and continue operations with index" << std::endl;
int sleep_ms = 60 * 1000;
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
stop_threads = true;
for (auto &thread : threads) {
thread.join();
}

std::cout << "Finish" << std::endl;
return 0;
}
Loading

0 comments on commit 3e006ea

Please sign in to comment.