Skip to content

Commit

Permalink
Merge pull request #412 from navneet1v/main
Browse files Browse the repository at this point in the history
Improved OpenSearchKNN setup for doing benchmarks
  • Loading branch information
erikbern authored Jun 14, 2023
2 parents 3cae098 + df3029a commit 9184eac
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 19 deletions.
4 changes: 2 additions & 2 deletions ann_benchmarks/algorithms/opensearchknn/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ RUN set -eux ; \
curl -OsS https://artifacts.opensearch.org/releases/bundle/opensearch/${OPENSEARCH_VERSION}/opensearch-${OPENSEARCH_VERSION}-linux-$ARCH.tar.gz; \
tar -zxf opensearch-${OPENSEARCH_VERSION}-linux-$ARCH.tar.gz; \
mv opensearch-${OPENSEARCH_VERSION} opensearch; \
rm -r opensearch-${OPENSEARCH_VERSION}-linux-$ARCH.tar.gz opensearch/plugins/opensearch-security;
rm -r opensearch-${OPENSEARCH_VERSION}-linux-$ARCH.tar.gz opensearch/plugins/opensearch-security opensearch/plugins/opensearch-alerting opensearch/plugins/opensearch-anomaly-detection opensearch/plugins/opensearch-asynchronous-search opensearch/plugins/opensearch-cross-cluster-replication opensearch/plugins/opensearch-geospatial opensearch/plugins/opensearch-index-management opensearch/plugins/opensearch-job-scheduler opensearch/plugins/opensearch-ml opensearch/plugins/opensearch-neural-search opensearch/plugins/opensearch-notifications opensearch/plugins/opensearch-notifications-core opensearch/plugins/opensearch-observability opensearch/plugins/opensearch-performance-analyzer opensearch/plugins/opensearch-reports-scheduler opensearch/plugins/opensearch-security-analytics opensearch/plugins/opensearch-sql;

# Install OpenSearch in final image:
# - https://opensearch.org/docs/latest/install-and-configure/install-opensearch/tar/
Expand Down Expand Up @@ -50,7 +50,7 @@ thread_pool.search.queue_size: 1' > opensearch/config/opensearch.yml
RUN echo '\
-Xms3G\n\
-Xmx3G\n\
-XX:InitiatingHeapOccupancyPercent=30\n\
-XX:InitiatingHeapOccupancyPercent=80\n\
-XX:+HeapDumpOnOutOfMemoryError\n\
-XX:HeapDumpPath=data\n\
-XX:ErrorFile=logs/hs_err_pid%p.log\n\
Expand Down
48 changes: 31 additions & 17 deletions ann_benchmarks/algorithms/opensearchknn/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from tqdm import tqdm

from ..base.module import BaseANN
import traceback


class OpenSearchKNN(BaseANN):
Expand All @@ -19,10 +20,17 @@ def __init__(self, metric, dimension, method_param):
self.ef_search = None
self._wait_for_health_status()

def _wait_for_health_status(self, wait_seconds=30, status="yellow"):
def _wait_for_health_status(self, wait_seconds=120, status="yellow"):
for _ in range(wait_seconds):
try:
self.client.cluster.health(wait_for_status=status)
body = {
"persistent": {
"knn.memory.circuit_breaker.enabled": False
}
}
# update cluster settings
self.client.cluster.put_settings(body=body)
return
except ConnectionError as e:
pass
Expand All @@ -32,12 +40,11 @@ def _wait_for_health_status(self, wait_seconds=30, status="yellow"):

def fit(self, X):
body = {
"settings": {"index": {"knn": True}, "number_of_shards": 1, "number_of_replicas": 0, "refresh_interval": -1}
"settings": {"index": {"knn": True}, "number_of_shards": 1, "number_of_replicas": 0, "refresh_interval": "10s"}
}

mapping = {
"properties": {
"id": {"type": "keyword", "store": True},
"vec": {
"type": "knn_vector",
"dimension": self.dimension,
Expand All @@ -55,31 +62,38 @@ def fit(self, X):
}

self.client.indices.create(self.index_name, body=body)
self.client.indices.put_mapping(mapping, self.index_name)
self.client.indices.put_mapping(body=mapping, index=self.index_name)

print("Uploading data to the Index:", self.index_name)

def gen():
for i, vec in enumerate(tqdm(X)):
yield {"_op_type": "index", "_index": self.index_name, "vec": vec.tolist(), "id": str(i + 1)}
yield {"_op_type": "index", "_index": self.index_name, "vec": vec.tolist(), "_id": str(i + 1)}

(_, errors) = bulk(self.client, gen(), chunk_size=500, max_retries=2, request_timeout=10)
(_, errors) = bulk(self.client, gen(), chunk_size=100, max_retries=4, request_timeout=20000)
assert len(errors) == 0, errors

print("Force Merge...")
self.client.indices.forcemerge(self.index_name, max_num_segments=1, request_timeout=1000)

i = 1
while i <= 3:
try:
print(f"Force Merge iteration {i}...")
i = i + 1
self.client.indices.forcemerge(index=self.index_name, max_num_segments=5, request_timeout=20000)
# ensuring the force merge is completed
break
except Exception as e:
print(f"Running force again due to error.....")
traceback.print_exc()
print("Refreshing the Index...")
self.client.indices.refresh(self.index_name, request_timeout=1000)

print("Running Warmup API...")
res = urlopen(Request("http://localhost:9200/_plugins/_knn/warmup/" + self.index_name + "?pretty"))
print(res.read().decode("utf-8"))
self.client.indices.refresh(index=self.index_name, request_timeout=20000)

def set_query_arguments(self, ef):
self.ef_search = ef
body = {"settings": {"index": {"knn.algo_param.ef_search": ef}}}
self.client.indices.put_settings(body=body)
print("Running Warmup API after setting query arguments...")
res = urlopen(Request("http://localhost:9200/_plugins/_knn/warmup/" + self.index_name + "?pretty"), timeout=20000)
print(res.read().decode("utf-8"))

def query(self, q, n):
body = {"query": {"knn": {"vec": {"vector": q.tolist(), "k": n}}}}
Expand All @@ -89,13 +103,13 @@ def query(self, q, n):
body=body,
size=n,
_source=False,
docvalue_fields=["id"],
docvalue_fields=["_id"],
stored_fields="_none_",
filter_path=["hits.hits.fields.id"],
filter_path=["hits.hits.fields._id"],
request_timeout=10,
)

return [int(h["fields"]["id"][0]) - 1 for h in res["hits"]["hits"]]
return [int(h["fields"]["_id"][0]) - 1 for h in res["hits"]["hits"]]

def batch_query(self, X, n):
self.batch_res = [self.query(q, n) for q in X]
Expand Down

0 comments on commit 9184eac

Please sign in to comment.