Skip to content

Commit

Permalink
Lots of cleanup of the ECC
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmed-tg committed Jun 27, 2024
1 parent da374a0 commit 57aa08d
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 66 deletions.
181 changes: 117 additions & 64 deletions eventual-consistency-service/app/eventual_consistency_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,87 +191,140 @@ def _upsert_rels(self, src_id, src_type, relationships):
],
)


def fetch_and_process_vertex(self):
v_types_to_scan = self.embedding_indices
vertex_ids_content_map: dict = {}

for v_type in v_types_to_scan:
start_time = time.time()
LogWriter.info(f"Fetching {self.batch_size} vertex ids and content for vertex type: {v_type}")
vertex_ids_content_map = self.conn.runInstalledQuery(
"Scan_For_Updates", {"v_type": v_type, "num_samples": self.batch_size}
)[0]["@@v_and_text"]

vertex_ids_content_map = self._fetch_unprocessed_vertices(v_type)
vertex_ids = [vertex_id for vertex_id in vertex_ids_content_map.keys()]
LogWriter.info(
f"Remove existing entries from Milvus with vertex_ids in {str(vertex_ids)}"
)
self.embedding_stores[self.graphname + "_" + v_type].remove_embeddings(
expr=f"{self.vertex_field} in {str(vertex_ids)}"
)

LogWriter.info(f"Embedding content from vertex type: {v_type}")
for vertex_id, content in vertex_ids_content_map.items():
if content != "":
vec = self.embedding_service.embed_query(content)
self.embedding_stores[self.graphname + "_" + v_type].add_embeddings(
[(content, vec)], [{self.vertex_field: vertex_id}]
)

if v_type == "Document":
LogWriter.info(f"Chunking the content from vertex type: {v_type}")
for vertex_id, content in vertex_ids_content_map.items():
chunks = self._chunk_document(content)
for i, chunk in enumerate(chunks):
self._upsert_chunk(vertex_id, f"{vertex_id}_chunk_{i}", chunk)

if v_type == "Document" or v_type == "DocumentChunk":
LogWriter.info(
f"Extracting and upserting entities from the content from vertex type: {v_type}"
)
for vertex_id, content in vertex_ids_content_map.items():
extracted = self._extract_entities(content)
if len(extracted["nodes"]) > 0:
self._upsert_entities(vertex_id, v_type, extracted["nodes"])
if len(extracted["rels"]) > 0:
self._upsert_rels(vertex_id, v_type, extracted["rels"])

LogWriter.info(
f"Updating the TigerGraph vertex ids to confirm that processing was completed"
)
if vertex_ids:
vertex_ids = [{"id": vertex_id, "type": v_type} for vertex_id in vertex_ids]
self.conn.runInstalledQuery(
"Update_Vertices_Processing_Status",
{"processed_vertices": vertex_ids},
usePost=True
)
end_time = time.time()
elapsed_time = end_time - start_time
LogWriter.info(f"Time elapsed for processing vertex_ids {vertex_ids}: {elapsed_time:.2f} seconds")
self._remove_existing_entries(v_type, vertex_ids)
self._process_content(v_type, vertex_ids_content_map)
self._update_processing_status(v_type, vertex_ids)
self._log_elapsed_time(start_time, v_type)
else:
LogWriter.error(f"No changes detected for vertex type: {v_type}")

return len(vertex_ids_content_map) != 0

def _fetch_unprocessed_vertices(self, v_type):
return self.conn.runInstalledQuery(
"Scan_For_Updates", {"v_type": v_type, "num_samples": self.batch_size}
)[0]["@@v_and_text"]

def _remove_existing_entries(self, v_type, vertex_ids):
LogWriter.info(f"Remove existing entries from Milvus with vertex_ids in {str(vertex_ids)}")
self.embedding_stores[self.graphname + "_" + v_type].remove_embeddings(
expr=f"{self.vertex_field} in {str(vertex_ids)}"
)

def _process_content(self, v_type, vertex_ids_content_map):
LogWriter.info(f"Embedding content from vertex type: {v_type}")
for vertex_id, content in vertex_ids_content_map.items():
if content:
vec = self.embedding_service.embed_query(content)
self.embedding_stores[self.graphname + "_" + v_type].add_embeddings(
[(content, vec)], [{self.vertex_field: vertex_id}]
)

if v_type == "Document":
self._process_document_content(v_type, vertex_id, content)

if v_type in ["Document", "DocumentChunk"]:
self._extract_and_upsert_entities(v_type, vertex_id, content)

def _process_document_content(self, v_type, vertex_id, content):
LogWriter.info(f"Chunking the content from vertex type: {v_type}")
chunks = self._chunk_document(content)
for i, chunk in enumerate(chunks):
self._upsert_chunk(vertex_id, f"{vertex_id}_chunk_{i}", chunk)

def _extract_and_upsert_entities(self, v_type, vertex_id, content):
LogWriter.info(f"Extracting and upserting entities from the content from vertex type: {v_type}")
extracted = self._extract_entities(content)
if extracted["nodes"]:
self._upsert_entities(vertex_id, v_type, extracted["nodes"])
if extracted["rels"]:
self._upsert_rels(vertex_id, v_type, extracted["rels"])

def _update_processing_status(self, v_type, vertex_ids):
LogWriter.info(f"Updating the TigerGraph vertex ids for type {v_type} to confirm that processing was completed")
processed_vertices = [{"id": vertex_id, "type": v_type} for vertex_id in vertex_ids]
self.conn.runInstalledQuery(
"Update_Vertices_Processing_Status",
{"processed_vertices": processed_vertices},
usePost=True
)

def _log_elapsed_time(self, start_time, v_type):
end_time = time.time()
elapsed_time = end_time - start_time
LogWriter.info(f"Time elapsed for processing vertex_ids for type {v_type}: {elapsed_time:.2f} seconds")


def verify_and_cleanup_embeddings(self):
def verify_and_cleanup_embeddings(self, batch_size=10):
for v_type in self.embedding_indices:
query_result = self.embedding_stores[self.graphname + "_" + v_type].query("pk > 0", [self.vertex_field])
if query_result is not None:
vertex_ids = [item.get(self.vertex_field) for item in query_result]

if vertex_ids:
non_existent_vertices = self.conn.runInstalledQuery(
"Check_Nonexistent_Vertices",
{"v_type": v_type, "vertex_ids": vertex_ids}
)[0]["@@missing_vertices"]

for vertex_id in non_existent_vertices:
LogWriter.info(f"Vertex {vertex_id} no longer exists in TigerGraph. Removing from Milvus.")
self.embedding_stores[self.graphname + "_" + v_type].remove_embeddings(
expr=f"{self.vertex_field} == '{vertex_id}'"
)
LogWriter.info(f"Running cleanup for vertex type {v_type}")

query_result = self.embedding_stores[self.graphname + "_" + v_type].query("pk > 0", [self.vertex_field, 'pk'])
if not query_result:
LogWriter.info(f"No vertices to process for vertex type {v_type}")
continue

vertex_id_map, duplicates_to_remove = self._identify_duplicates(query_result)
self._remove_duplicates(v_type, duplicates_to_remove)

unique_vertex_ids = list(vertex_id_map.keys())
self._process_vertex_batches(v_type, unique_vertex_ids, batch_size)

LogWriter.info(f"Finished cleanup for vertex type {v_type}")

def _identify_duplicates(self, query_result):
vertex_id_map = {}
duplicates_to_remove = []

for item in query_result:
vertex_id = item.get(self.vertex_field)
pk = item.get('pk')
if vertex_id not in vertex_id_map:
vertex_id_map[vertex_id] = pk
else:
duplicates_to_remove.append(pk)
LogWriter.info(f"Duplicate vertex id found with pk {pk} and will be removed")

return vertex_id_map, duplicates_to_remove

def _remove_duplicates(self, v_type, duplicates_to_remove):
for pk in duplicates_to_remove:
self.embedding_stores[self.graphname + "_" + v_type].remove_embeddings(
expr=f"pk == {pk}"
)
LogWriter.info(f"Removed duplicate with pk {pk} from Milvus")

def _process_vertex_batches(self, v_type, unique_vertex_ids, batch_size):
for i in range(0, len(unique_vertex_ids), batch_size):
batch_vertex_ids = unique_vertex_ids[i:i + batch_size]

non_existent_vertices = self.conn.runInstalledQuery(
"Check_Nonexistent_Vertices",
{"v_type": v_type, "vertex_ids": batch_vertex_ids}
)[0]["@@missing_vertices"]

if non_existent_vertices:
self._cleanup_nonexistent_vertices(v_type, non_existent_vertices)
else:
LogWriter.info(f"No cleanup needed for current batch of vertex type {v_type}")

def _cleanup_nonexistent_vertices(self, v_type, non_existent_vertices):
for vertex_id in non_existent_vertices:
self.embedding_stores[self.graphname + "_" + v_type].remove_embeddings(
expr=f"{self.vertex_field} == '{vertex_id}'"
)

def initialize(self):
LogWriter.info(
Expand Down
5 changes: 3 additions & 2 deletions eventual-consistency-service/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ def initialize_eventual_consistency_checker(graphname: str, conn: TigerGraphConn
consistency_checkers[graphname] = checker

# start the longer cleanup process that will run in further spaced-out intervals
cleanup_thread = Thread(target=checker.initialize_cleanup, daemon=True)
cleanup_thread.start()
if milvus_config.get("cleanup_enabled", True):
cleanup_thread = Thread(target=checker.initialize_cleanup, daemon=True)
cleanup_thread.start()

# start the main ECC process that searches for new vertices that need to be processed
checker.initialize()
Expand Down

0 comments on commit 57aa08d

Please sign in to comment.