Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace deleted elements at addition #418

Merged
merged 28 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressing review comments
  • Loading branch information
Dmitry Yashunin committed Dec 13, 2022
commit aabd0df49d2dbea129851d7d67f6b394e1d6a7cd
2 changes: 1 addition & 1 deletion hnswlib/bruteforce.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class BruteforceSearch : public AlgorithmInterface<dist_t> {
}


void addPoint(const void *datapoint, labeltype label) {
void addPoint(const void *datapoint, labeltype label, bool replace_deleted = false) {
int idx;
{
std::unique_lock<std::mutex> lock(index_lock);
Expand Down
42 changes: 12 additions & 30 deletions hnswlib/hnswalg.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
mutable std::atomic<long> metric_distance_computations{0};
mutable std::atomic<long> metric_hops{0};

bool replace_deleted_ = false;
bool replace_deleted_ = false; // flag to replace deleted elements (marked as deleted) during insertions

std::mutex deleted_elements_lock; // lock for deleted_elements
std::unordered_set<tableint> deleted_elements;
std::unordered_set<tableint> deleted_elements; // contains internal ids of deleted elements


HierarchicalNSW(SpaceInterface<dist_t> *s) {
Expand Down Expand Up @@ -785,7 +785,7 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
* Removes the deleted mark of the node, does NOT really change the current graph.
*
* Note: the method is not safe to use when replacement of deleted elements is enabled,
* because elements marked as deleted can be completely removed by addPointToVacantPlace
* because elements marked as deleted can be completely removed by addPoint
*/
void unmarkDelete(labeltype label) {
// lock all operations with element by label
Expand Down Expand Up @@ -843,21 +843,19 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {


/*
* Adds point and replaces previously deleted point if any, updating it with new point
* If deleted point was replaced returns its label, else returns label of added or updated point
*
* Note:
* Methods that can work with deleted elements unmarkDelete and addPoint are not safe to use
* with this method, because addPointToVacantPlace removes deleted elements from the index.
* Adds point. Updates the point if it is already in the index.
* If replacement of deleted elements is enabled: replaces previously deleted point if any, updating it with new point
*/
labeltype addPointToVacantPlace(const void* data_point, labeltype label) {
void addPoint(const void *data_point, labeltype label, bool replace_deleted = false) {
if ((replace_deleted_ == false) && (replace_deleted == true)) {
dyashuni marked this conversation as resolved.
Show resolved Hide resolved
throw std::runtime_error("Replacement of deleted elements is disabled in constructor");
}

// lock all operations with element by label
std::unique_lock <std::mutex> lock_label(getLabelOpMutex(label));

if (!replace_deleted_) {
throw std::runtime_error("Can't use addPointToVacantPlace when replacement of deleted elements is disabled");
if (!replace_deleted) {
addPoint(data_point, label, -1);
}

// check if there is vacant place
tableint internal_id_replaced;
std::unique_lock <std::mutex> lock_deleted_elements(deleted_elements_lock);
Expand All @@ -872,7 +870,6 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
// else add point to vacant place
if (!is_vacant_place) {
addPoint(data_point, label, -1);
return label;
} else {
// we assume that there are no concurrent operations on deleted element
labeltype label_replaced = getExternalLabel(internal_id_replaced);
Expand All @@ -885,25 +882,10 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {

unmarkDeletedInternal(internal_id_replaced);
updatePoint(data_point, internal_id_replaced, 1.0);

return label_replaced;
}
}


/*
* Adds point. Updates the point if it is already in the index
*
* Note: the method is not safe to use to update elements when replacement of deleted elements is enabled,
* because elements marked as deleted can be completely removed by addPointToVacantPlace:
*/
void addPoint(const void *data_point, labeltype label) {
// lock all operations with element by label
std::unique_lock <std::mutex> lock_label(getLabelOpMutex(label));
addPoint(data_point, label, -1);
}


void updatePoint(const void *dataPoint, tableint internalId, float updateNeighborProbability) {
// update the feature vector associated with existing point with new vector
memcpy(getDataByInternalId(internalId), dataPoint, data_size_);
Expand Down
2 changes: 1 addition & 1 deletion hnswlib/hnswlib.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class SpaceInterface {
template<typename dist_t>
class AlgorithmInterface {
public:
virtual void addPoint(const void *datapoint, labeltype label) = 0;
virtual void addPoint(const void *datapoint, labeltype label, bool replace_deleted = false) = 0;

virtual std::priority_queue<std::pair<dist_t, labeltype>>
searchKnn(const void*, size_t, BaseFilterFunctor* isIdAllowed = nullptr) const = 0;
Expand Down
88 changes: 6 additions & 82 deletions python_bindings/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,79 +245,7 @@ class Index {
}


py::object add_items_to_vacant_place_return_numpy(py::object input, py::object ids_ = py::none(), int num_threads = -1) {
size_t rows, features;
hnswlib::labeltype* data_numpy_l = NULL;

py::array_t < dist_t, py::array::c_style | py::array::forcecast > items(input);
auto buffer = items.request();
if (num_threads <= 0)
num_threads = num_threads_default;
get_input_array_shapes(buffer, &rows, &features);
if (features != dim)
throw std::runtime_error("wrong dimensionality of the vectors");

// avoid using threads when the number of insertions is small:
if (rows <= num_threads * 4) {
num_threads = 1;
}

std::vector<size_t> ids = get_input_ids_and_check_shapes(ids_, rows);

{
int start = 0;
data_numpy_l = new hnswlib::labeltype[rows];

if (!ep_added) {
size_t id = ids.size() ? ids.at(0) : (cur_l);
float* vector_data = (float*)items.data(0);
std::vector<float> norm_array(dim);
if (normalize) {
normalize_vector(vector_data, norm_array.data());
vector_data = norm_array.data();
}
hnswlib::labeltype label = appr_alg->addPointToVacantPlace((void*)vector_data, (size_t)id);
data_numpy_l[start] = label;
start = 1;
ep_added = true;
}

py::gil_scoped_release l;
if (normalize == false) {
ParallelFor(start, rows, num_threads, [&](size_t row, size_t threadId) {
size_t id = ids.size() ? ids.at(row) : (cur_l + row);
hnswlib::labeltype label = appr_alg->addPointToVacantPlace((void*)items.data(row), (size_t)id);
data_numpy_l[row] = label;
});
}
else {
std::vector<float> norm_array(num_threads * dim);
ParallelFor(start, rows, num_threads, [&](size_t row, size_t threadId) {
// normalize vector:
size_t start_idx = threadId * dim;
normalize_vector((float*)items.data(row), (norm_array.data() + start_idx));

size_t id = ids.size() ? ids.at(row) : (cur_l + row);
hnswlib::labeltype label = appr_alg->addPointToVacantPlace((void*)(norm_array.data() + start_idx), (size_t)id);
data_numpy_l[row] = label;
});
}
cur_l += rows;
}

py::capsule free_when_done_l(data_numpy_l, [](void* f) {
delete[] f;
});

return py::array_t<hnswlib::labeltype>(
{ rows }, // shape
{ sizeof(hnswlib::labeltype) }, // C-style contiguous strides for each index
data_numpy_l, // the data pointer
free_when_done_l);
}


void addItems(py::object input, py::object ids_ = py::none(), int num_threads = -1) {
void addItems(py::object input, py::object ids_ = py::none(), int num_threads = -1, bool replace_deleted = false) {
py::array_t < dist_t, py::array::c_style | py::array::forcecast > items(input);
auto buffer = items.request();
if (num_threads <= 0)
Expand Down Expand Up @@ -346,7 +274,7 @@ class Index {
normalize_vector(vector_data, norm_array.data());
vector_data = norm_array.data();
}
appr_alg->addPoint((void*)vector_data, (size_t)id);
appr_alg->addPoint((void*)vector_data, (size_t)id, replace_deleted);
start = 1;
ep_added = true;
}
Expand All @@ -355,7 +283,7 @@ class Index {
if (normalize == false) {
ParallelFor(start, rows, num_threads, [&](size_t row, size_t threadId) {
size_t id = ids.size() ? ids.at(row) : (cur_l + row);
appr_alg->addPoint((void*)items.data(row), (size_t)id);
appr_alg->addPoint((void*)items.data(row), (size_t)id, replace_deleted);
});
} else {
std::vector<float> norm_array(num_threads * dim);
Expand All @@ -365,7 +293,7 @@ class Index {
normalize_vector((float*)items.data(row), (norm_array.data() + start_idx));

size_t id = ids.size() ? ids.at(row) : (cur_l + row);
appr_alg->addPoint((void*)(norm_array.data() + start_idx), (size_t)id);
appr_alg->addPoint((void*)(norm_array.data() + start_idx), (size_t)id, replace_deleted);
});
}
cur_l += rows;
Expand Down Expand Up @@ -969,12 +897,8 @@ PYBIND11_PLUGIN(hnswlib) {
&Index<float>::addItems,
py::arg("data"),
py::arg("ids") = py::none(),
py::arg("num_threads") = -1)
.def("add_items_to_vacant_place",
&Index<float>::add_items_to_vacant_place_return_numpy,
py::arg("data"),
py::arg("ids") = py::none(),
py::arg("num_threads") = -1)
py::arg("num_threads") = -1,
py::arg("replace_deleted") = false)
.def("get_items", &Index<float, float>::getDataReturnList, py::arg("ids") = py::none())
.def("get_ids_list", &Index<float>::getIdsList)
.def("set_ef", &Index<float>::set_ef, py::arg("ef"))
Expand Down
12 changes: 4 additions & 8 deletions python_bindings/tests/bindings_test_replace.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,13 @@ def testRandomSelf(self):
print("Inserting batch 3 by replacing deleted elements")
# Maximum number of elements is reached therefore we cannot add new items
# but we can replace the deleted ones
labels_replaced = hnsw_index.add_items_to_vacant_place(data3, labels3)
labels2_deleted_list = [l[0] for l in labels2_deleted]
labels_replaced_list = labels_replaced.tolist()
labels2_deleted_list.sort()
labels_replaced_list.sort()
self.assertSequenceEqual(labels2_deleted_list, labels_replaced_list)
hnsw_index.add_items(data3, labels3, replace_deleted=True)

# After replacing, all labels should be retrievable
print("Checking that remaining labels are in index")
# Get remaining data from batch 1 and batch 2 after deletion of elements
remaining_labels = set(labels1) | set(labels2)
labels2_deleted_list = [l[0] for l in labels2_deleted]
remaining_labels = remaining_labels - set(labels2_deleted_list)
remaining_labels_list = list(remaining_labels)
comb_data = np.concatenate((data1, data2), axis=0)
Expand Down Expand Up @@ -114,7 +110,7 @@ def testRandomSelf(self):

# Insert batch 4
print("Inserting batch 4 by replacing deleted elements")
labels_replaced = hnsw_index.add_items_to_vacant_place(data4, labels4)
hnsw_index.add_items(data4, labels4, replace_deleted=True)

# Check recall
print("Checking recall")
Expand All @@ -133,7 +129,7 @@ def testRandomSelf(self):
del hnsw_index
# Insert batch 3
print("Inserting batch 3 by replacing deleted elements")
labels_replaced = hnsw_index_pckl.add_items_to_vacant_place(data3, labels3)
hnsw_index_pckl.add_items(data3, labels3, replace_deleted=True)

# Check recall
print("Checking recall")
Expand Down
2 changes: 1 addition & 1 deletion python_bindings/tests/bindings_test_stress_mt_replace.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,5 @@ def testRandomSelf(self):
# Replace deleted elements
# Maximum number of elements is reached therefore we cannot add new items
# but we can replace the deleted ones
labels_replaced = hnsw_index.add_items_to_vacant_place(data3, labels3)
labels_replaced = hnsw_index.add_items(data3, labels3, replace_deleted=True)