Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace deleted elements at addition #418

Merged
merged 28 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add locks by label
  • Loading branch information
Dmitry Yashunin committed Nov 26, 2022
commit c2fb5740a97e20bc4cdef39bc40ed767cba73941
3 changes: 2 additions & 1 deletion examples/multiThreadLoad_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ int main() {
hnswlib::L2Space space(d);
hnswlib::HierarchicalNSW<float>* alg_hnsw = new hnswlib::HierarchicalNSW<float>(&space, max_elements);

// with these parameters about 7 threads will do operations with the same label simultaneously
int num_threads = 40;
int num_ids = 1;
int num_ids = 10;

int num_iterations = 10;
int start_id = 0;
Expand Down
83 changes: 58 additions & 25 deletions hnswlib/hnswalg.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ typedef unsigned int linklistsizeint;
template<typename dist_t>
class HierarchicalNSW : public AlgorithmInterface<dist_t> {
public:
static const tableint max_update_element_locks = 65536;
static const tableint MAX_ELEMENT_UPDATE_LOCKS = 65536;
static const tableint MAX_LABEL_OPERATION_LOCKS = 65536;
static const unsigned char DELETE_MARK = 0x01;

size_t max_elements_{0};
Expand All @@ -38,7 +39,9 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
// Locks to prevent race condition during update/insert of an element at same time.
// Note: Locks for additions can also be used to prevent this race condition
// if the querying of KNN is not exposed along with update/inserts i.e multithread insert/update/query in parallel.
mutable std::vector<std::mutex> link_list_update_locks_;
mutable std::vector<std::mutex> element_update_locks_;
// Locks operations with element by label value
mutable std::vector<std::mutex> label_op_locks_;

std::mutex global;
std::vector<std::mutex> link_list_locks_;
Expand Down Expand Up @@ -95,7 +98,8 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
size_t random_seed = 100,
bool replace_deleted = false)
: link_list_locks_(max_elements),
link_list_update_locks_(max_update_element_locks),
element_update_locks_(MAX_ELEMENT_UPDATE_LOCKS),
label_op_locks_(MAX_LABEL_OPERATION_LOCKS),
element_levels_(max_elements),
replace_deleted_(replace_deleted) {
max_elements_ = max_elements;
Expand Down Expand Up @@ -163,6 +167,20 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
}


inline std::mutex& getUpdateElMutex(tableint internal_id) const {
// calculate hash
size_t lock_id = internal_id & (MAX_ELEMENT_UPDATE_LOCKS - 1);
return element_update_locks_[lock_id];
}


inline std::mutex& getLabelOpMutex(labeltype label) const {
// calculate hash
size_t lock_id = label & (MAX_LABEL_OPERATION_LOCKS - 1);
return label_op_locks_[lock_id];
}


inline labeltype getExternalLabel(tableint internal_id) const {
labeltype return_label;
memcpy(&return_label, (data_level0_memory_ + internal_id * size_data_per_element_ + label_offset_), sizeof(labeltype));
Expand Down Expand Up @@ -673,7 +691,8 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {

size_links_level0_ = maxM0_ * sizeof(tableint) + sizeof(linklistsizeint);
std::vector<std::mutex>(max_elements).swap(link_list_locks_);
std::vector<std::mutex>(max_update_element_locks).swap(link_list_update_locks_);
std::vector<std::mutex>(MAX_ELEMENT_UPDATE_LOCKS).swap(element_update_locks_);
std::vector<std::mutex>(MAX_LABEL_OPERATION_LOCKS).swap(label_op_locks_);

visited_list_pool_ = new VisitedListPool(1, max_elements);

Expand Down Expand Up @@ -714,13 +733,17 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {

template<typename data_t>
std::vector<data_t> getDataByLabel(labeltype label) const {
// lock all operations with element by label
std::unique_lock <std::mutex> lock_label(getLabelOpMutex(label));

std::unique_lock <std::mutex> lock_table(label_lookup_lock);
auto search = label_lookup_.find(label);
if (search == label_lookup_.end() || isMarkedDeleted(search->second)) {
throw std::runtime_error("Label not found");
}
tableint internalId = search->second;
lock_table.unlock();

char* data_ptrv = getDataByInternalId(internalId);
size_t dim = *((size_t *) dist_func_param_);
std::vector<data_t> data;
Expand All @@ -733,25 +756,29 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
}


/**
* Marks an element with the given label deleted, does NOT really change the current graph.
*/
/*
* Marks an element with the given label deleted, does NOT really change the current graph.
*/
void markDelete(labeltype label) {
// lock all operations with element by label
std::unique_lock <std::mutex> lock_label(getLabelOpMutex(label));

std::unique_lock <std::mutex> lock_table(label_lookup_lock);
auto search = label_lookup_.find(label);
if (search == label_lookup_.end()) {
throw std::runtime_error("Label not found");
}
tableint internalId = search->second;
lock_table.unlock();

markDeletedInternal(internalId);
}


/**
* Uses the last 16 bits of the memory for the linked list size to store the mark,
* whereas maxM0_ has to be limited to the lower 16 bits, however, still large enough in almost all cases.
*/
/*
* Uses the last 16 bits of the memory for the linked list size to store the mark,
* whereas maxM0_ has to be limited to the lower 16 bits, however, still large enough in almost all cases.
*/
void markDeletedInternal(tableint internalId) {
assert(internalId < cur_element_count);
if (!isMarkedDeleted(internalId)) {
Expand All @@ -768,28 +795,32 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
}


/**
* Remove the deleted mark of the node, does NOT really change the current graph.
/*
* Removes the deleted mark of the node, does NOT really change the current graph.
*
* Note: the method is not safe to use when replacement of deleted elements is enabled
* bacause elements marked as deleted can be completely removed from the index
*/
void unmarkDelete(labeltype label) {
// lock all operations with element by label
std::unique_lock <std::mutex> lock_label(getLabelOpMutex(label));

std::unique_lock <std::mutex> lock_table(label_lookup_lock);
auto search = label_lookup_.find(label);
if (search == label_lookup_.end()) {
throw std::runtime_error("Label not found");
}
tableint internalId = search->second;
lock_table.unlock();

unmarkDeletedInternal(internalId);
}



/**
* Remove the deleted mark of the node.
*/
/*
* Remove the deleted mark of the node.
*/
void unmarkDeletedInternal(tableint internalId) {
assert(internalId < cur_element_count);
if (isMarkedDeleted(internalId)) {
Expand All @@ -806,9 +837,9 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
}


/**
* Checks the first 16 bits of the memory to see if the element is marked deleted.
*/
/*
* Checks the first 16 bits of the memory to see if the element is marked deleted.
*/
bool isMarkedDeleted(tableint internalId) const {
unsigned char *ll_cur = ((unsigned char*)get_linklist0(internalId)) + 2;
return *ll_cur & DELETE_MARK;
Expand All @@ -825,7 +856,7 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
}


/**
/*
* Adds point and replaces previously deleted point if any, updating it with new point
* If deleted point was replaced returns its label, else returns label of added or updated point
*
Expand Down Expand Up @@ -872,10 +903,12 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
}


/**
* Adds point. Updates the point if it is already in the index
/*
* Adds point. Updates the point if it is already in the index
*/
void addPoint(const void *data_point, labeltype label) {
// lock all operations with element by label
std::unique_lock <std::mutex> lock_label(getLabelOpMutex(label));
addPoint(data_point, label, -1);
}

Expand Down Expand Up @@ -1049,14 +1082,14 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
tableint existingInternalId = search->second;
if (replace_deleted_) {
// wait for element addition or update
std::unique_lock <std::mutex> lock_el_update(link_list_update_locks_[(existingInternalId & (max_update_element_locks - 1))]);
std::unique_lock <std::mutex> lock_el_update(getUpdateElMutex(existingInternalId));
if (isMarkedDeleted(existingInternalId)) {
throw std::runtime_error("Can't use addPoint to update deleted elements if replacement of deleted elements is enabled.");
}
}
lock_table.unlock();

std::unique_lock <std::mutex> lock_el_update(link_list_update_locks_[(existingInternalId & (max_update_element_locks - 1))]);
std::unique_lock <std::mutex> lock_el_update(getUpdateElMutex(existingInternalId));

if (isMarkedDeleted(existingInternalId)) {
unmarkDeletedInternal(existingInternalId);
Expand All @@ -1076,7 +1109,7 @@ class HierarchicalNSW : public AlgorithmInterface<dist_t> {
}

// Take update lock to prevent race conditions on an element with insertion/update at the same time.
std::unique_lock <std::mutex> lock_el_update(link_list_update_locks_[(cur_c & (max_update_element_locks - 1))]);
std::unique_lock <std::mutex> lock_el_update(getUpdateElMutex(cur_c));
std::unique_lock <std::mutex> lock_el(link_list_locks_[cur_c]);
int curlevel = getRandomLevel(mult_);
if (level > 0)
Expand Down