Skip to content

Commit

Permalink
Try to reuse existing chunks. Close #3793
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed Dec 11, 2024
1 parent beeacd3 commit f6d2246
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 6 deletions.
5 changes: 3 additions & 2 deletions api/apps/document_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,9 @@ def run():
e, doc = DocumentService.get_by_id(id)
if not e:
return get_data_error_result(message="Document not found!")
if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id):
settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), doc.kb_id)
if req.get("delete", False):
if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id):
settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), doc.kb_id)

if str(req["run"]) == TaskStatus.RUNNING.value:
TaskService.filter_delete([Task.doc_id == id])
Expand Down
3 changes: 2 additions & 1 deletion conf/infinity_mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@
"available_int": {"type": "integer", "default": 1},
"knowledge_graph_kwd": {"type": "varchar", "default": ""},
"entities_kwd": {"type": "varchar", "default": ""},
"pagerank_fea": {"type": "integer", "default": 0}
"pagerank_fea": {"type": "integer", "default": 0},
"task_digest": {"type": "varchar", "default": ""}
}
2 changes: 1 addition & 1 deletion conf/mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
{
"string": {
"match_pattern": "regex",
"match": "^.*_(with_weight|list)$",
"match": "^.*_(with_weight|list|digest)$",
"mapping": {
"type": "text",
"index": "false",
Expand Down
27 changes: 25 additions & 2 deletions rag/svr/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from rag.utils import rmSpace, num_tokens_from_string
from rag.utils.redis_conn import REDIS_CONN, Payload
from rag.utils.storage_factory import STORAGE_IMPL
from rag.utils.doc_store_conn import OrderByExpr

BATCH_SIZE = 64

Expand Down Expand Up @@ -358,6 +359,22 @@ def run_raptor(row, chat_mdl, embd_mdl, callback=None):
return res, tk_count, vector_size


def task_chunks_exist(task: dict) -> (str, bool):
md5 = hashlib.md5()
for field in ["task_type", "tenant_id", "kb_id", "doc_id", "name", "from_page", "to_page", "parser_config", "embd_id",
"language", "llm_id"]:
md5.update(str(task.get(field, "")).encode("utf-8"))
task_digest = md5.hexdigest()

fields = ["id"]
condition = {"task_digest": task_digest}
tenant_id = task["tenant_id"]
kb_ids = [task["kb_id"]]
res = settings.docStoreConn.search(fields, [], condition, [], OrderByExpr(), 0, 1, search.index_name(tenant_id), kb_ids)
dict_chunks = settings.docStoreConn.getFields(res, fields)
return task_digest, len(dict_chunks) > 0


def do_handle_task(task):
task_id = task["id"]
task_from_page = task["from_page"]
Expand All @@ -373,6 +390,11 @@ def do_handle_task(task):

# prepare the progress callback function
progress_callback = partial(set_progress, task_id, task_from_page, task_to_page)
task_digest, found = task_chunks_exist(task)
if found:
progress_callback(1., msg=f"Chunks of task {task_digest} already exist, skip.")
return

try:
# bind embedding model
embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language)
Expand Down Expand Up @@ -420,6 +442,9 @@ def do_handle_task(task):
progress_message = "Embedding chunks ({:.2f}s)".format(timer() - start_ts)
logging.info(progress_message)
progress_callback(msg=progress_message)

for chunk in chunks:
chunk["task_digest"] = task_digest
# logging.info(f"task_executor init_kb index {search.index_name(task_tenant_id)} embedding_model {embedding_model.llm_name} vector length {vector_size}")
init_kb(task, vector_size)
chunk_count = len(set([chunk["id"] for chunk in chunks]))
Expand All @@ -434,8 +459,6 @@ def do_handle_task(task):
if doc_store_result:
error_message = f"Insert chunk error: {doc_store_result}, please check log file and Elasticsearch/Infinity status!"
progress_callback(-1, msg=error_message)
settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id)
logging.error(error_message)
raise Exception(error_message)

if TaskService.do_cancel(task_id):
Expand Down

0 comments on commit f6d2246

Please sign in to comment.