Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace deleted elements at addition #418

Merged
merged 28 commits into from
Jan 12, 2023
Merged
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix possible multithreading issues
  • Loading branch information
Dmitry Yashunin committed Nov 14, 2022
commit 34fe7f19a02b51e8bd5c21932770ded1ef844898
116 changes: 75 additions & 41 deletions hnswlib/hnswalg.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
// Locks to prevent race condition during update/insert of an element at same time.
// Note: Locks for additions can also be used to prevent this race condition
// if the querying of KNN is not exposed along with update/inserts i.e multithread insert/update/query in parallel.
std::vector<std::mutex> link_list_update_locks_;
mutable std::vector<std::mutex> link_list_update_locks_;

std::mutex global;
std::mutex cur_element_count_guard_;
std::vector<std::mutex> link_list_locks_;

tableint enterpoint_node_{0};
Expand All @@ -57,7 +56,8 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {

DISTFUNC<dist_t> fstdistfunc_;
void *dist_func_param_{nullptr};
std::mutex label_lookup_lock;

mutable std::mutex label_lookup_lock; // lock for label_lookup_
dyashuni marked this conversation as resolved.
Show resolved Hide resolved
std::unordered_map<labeltype, tableint> label_lookup_;

std::default_random_engine level_generator_;
Expand All @@ -68,7 +68,7 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {

bool replace_deleted_ = false;
dyashuni marked this conversation as resolved.
Show resolved Hide resolved

std::mutex deleted_elements_lock;
std::mutex deleted_elements_lock; // lock for deleted_elements
std::unordered_set<tableint> deleted_elements;


Expand Down Expand Up @@ -714,14 +714,16 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {

template<typename data_t>
std::vector<data_t> getDataByLabel(labeltype label) const {
tableint label_internal;
std::unique_lock <std::mutex> lock_table(label_lookup_lock);
auto search = label_lookup_.find(label);
if (search == label_lookup_.end() || isMarkedDeleted(search->second)) {
throw std::runtime_error("Label not found");
}
label_internal = search->second;

char* data_ptrv = getDataByInternalId(label_internal);
tableint internalId = search->second;
lock_table.unlock();
// wait for element addition or update
std::unique_lock <std::mutex> lock_el_update(link_list_update_locks_[(internalId & (max_update_element_locks - 1))]);
char* data_ptrv = getDataByInternalId(internalId);
size_t dim = *((size_t *) dist_func_param_);
std::vector<data_t> data;
data_t* data_ptr = (data_t*) data_ptrv;
Expand All @@ -737,11 +739,15 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
* Marks an element with the given label deleted, does NOT really change the current graph.
*/
void markDelete(labeltype label) {
std::unique_lock <std::mutex> lock_table(label_lookup_lock);
auto search = label_lookup_.find(label);
if (search == label_lookup_.end()) {
throw std::runtime_error("Label not found");
}
tableint internalId = search->second;
lock_table.unlock();
// wait for element addition or update
std::unique_lock <std::mutex> lock_el_update(link_list_update_locks_[(internalId & (max_update_element_locks - 1))]);
markDeletedInternal(internalId);
}

Expand All @@ -756,7 +762,10 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
unsigned char *ll_cur = ((unsigned char *)get_linklist0(internalId))+2;
*ll_cur |= DELETE_MARK;
num_deleted_ += 1;
if (replace_deleted_) deleted_elements.insert(internalId);
if (replace_deleted_) {
std::unique_lock <std::mutex> lock_deleted_elements(deleted_elements_lock);
deleted_elements.insert(internalId);
}
} else {
throw std::runtime_error("The requested to delete element is already deleted");
}
Expand All @@ -767,25 +776,36 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
* Remove the deleted mark of the node, does NOT really change the current graph.
*/
void unmarkDelete(labeltype label) {
std::unique_lock <std::mutex> lock_table(label_lookup_lock);
auto search = label_lookup_.find(label);
if (search == label_lookup_.end()) {
throw std::runtime_error("Label not found");
}
tableint internalId = search->second;
lock_table.unlock();
// wait for element addition or update
std::unique_lock <std::mutex> lock_el_update(link_list_update_locks_[(internalId & (max_update_element_locks - 1))]);
unmarkDeletedInternal(internalId);
}



/**
* Remove the deleted mark of the node.
*/
* Remove the deleted mark of the node.
*
* Note: the method is not safe to use when replacement of deleted elements is enabled
* bacause elements marked as deleted can be completely removed from the index
*/
void unmarkDeletedInternal(tableint internalId) {
assert(internalId < cur_element_count);
if (isMarkedDeleted(internalId)) {
unsigned char *ll_cur = ((unsigned char *)get_linklist0(internalId)) + 2;
*ll_cur &= ~DELETE_MARK;
num_deleted_ -= 1;
if (replace_deleted_) deleted_elements.erase(internalId);
if (replace_deleted_) {
std::unique_lock <std::mutex> lock_deleted_elements(deleted_elements_lock);
deleted_elements.erase(internalId);
}
} else {
throw std::runtime_error("The requested to undelete element is not deleted");
}
Expand Down Expand Up @@ -813,42 +833,49 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {

/**
* Adds point and replaces previously deleted point if any, updating it with new point
*
* If deleted point was replaced returns its label, else returns label of added point
* If deleted point was replaced returns its label, else returns label of added or updated point
*
* Note:
* Methods that can work with deleted elements unmarkDelete and addPoint are not safe to use
* with this method. Because addPointToVacantPlace removes deleted elements from the index.
*/
labeltype addPointToVacantPlace(const void* data_point, labeltype label) {
dyashuni marked this conversation as resolved.
Show resolved Hide resolved
if (!replace_deleted_) {
throw std::runtime_error("Can't use addPointToVacantPlace when replacement of deleted elements is disabled");
}

std::unique_lock <std::mutex> tmp_del_el_lock(deleted_elements_lock);
bool is_empty = deleted_elements.empty();
tmp_del_el_lock.unlock();

if (is_empty) {
addPoint(data_point, label);
return label;
// check if there is vacant place
tableint internal_id_replaced;
std::unique_lock <std::mutex> lock_deleted_elements(deleted_elements_lock);
bool is_vacant_place = !deleted_elements.empty();
if (is_vacant_place) {
internal_id_replaced = *deleted_elements.begin();
deleted_elements.erase(internal_id_replaced);
}
else {
tmp_del_el_lock.lock();
tableint id_replace = *deleted_elements.begin();
deleted_elements.erase(id_replace);
tmp_del_el_lock.unlock();

// use link list locks to not block calls for other elements
std::unique_lock <std::mutex> lock_label_update(link_list_update_locks_[(id_replace & (max_update_element_locks - 1))]);
labeltype label_replace = getExternalLabel(id_replace);
setExternalLabel(id_replace, label);
lock_label_update.unlock();

std::unique_lock <std::mutex> tmp_label_lookup_lock(label_lookup_lock);
label_lookup_.erase(label_replace);
label_lookup_[label] = id_replace;
tmp_label_lookup_lock.unlock();
lock_deleted_elements.unlock();

// if there is no vacant place then add or update point
// else add point to vacant place
if (!is_vacant_place) {
addPoint(data_point, label);

return label_replace;
return label;
} else {
// wait for element addition or update
std::unique_lock <std::mutex> lock_el_update(link_list_update_locks_[(internal_id_replaced & (max_update_element_locks - 1))]);
labeltype label_replaced = getExternalLabel(internal_id_replaced);
setExternalLabel(internal_id_replaced, label);
lock_el_update.unlock();

std::unique_lock <std::mutex> lock_table(label_lookup_lock);
label_lookup_.erase(label_replaced);
label_lookup_[label] = internal_id_replaced;
lock_table.unlock();

lock_el_update.lock();
unmarkDeletedInternal(internal_id_replaced);
updatePoint(data_point, internal_id_replaced, 1.0);

return label_replaced;
}
}

Expand Down Expand Up @@ -1024,11 +1051,18 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
{
// Checking if the element with the same label already exists
// if so, updating it *instead* of creating a new element.
std::unique_lock <std::mutex> templock_curr(cur_element_count_guard_);
std::unique_lock <std::mutex> lock_table(label_lookup_lock);
auto search = label_lookup_.find(label);
if (search != label_lookup_.end()) {
tableint existingInternalId = search->second;
templock_curr.unlock();
if (replace_deleted_) {
// wait for element addition or update
std::unique_lock <std::mutex> lock_el_update(link_list_update_locks_[(existingInternalId & (max_update_element_locks - 1))]);
if (isMarkedDeleted(existingInternalId)) {
throw std::runtime_error("Can't use addPoint to update deleted elements if replacement of deleted elements is enabled.");
}
}
lock_table.unlock();

std::unique_lock <std::mutex> lock_el_update(link_list_update_locks_[(existingInternalId & (max_update_element_locks - 1))]);

Expand Down