Skip to content

Commit

Permalink
Merge pull request #227 from tigergraph/GML-1797-Investigate-ECC-Issue
Browse files Browse the repository at this point in the history
GML-1797 - Fix ECC - processing nodes
  • Loading branch information
RobRossmiller-TG authored Jun 27, 2024
2 parents 4d6cc0a + 57aa08d commit ed0cad5
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 77 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,17 @@ You can also disable the consistency_checker, which reconciles Milvus and TigerG
```

##### Milvus configuration
Copy the below into `configs/milvus_config.json` and edit the `host` and `port` fields to match your Milvus configuration (keeping in mind docker configuration). `username` and `password` can also be configured below if required by your Milvus setup. `enabled` should always be set to "true" for now as Milvus is only the embedding store supported. `sync_interval_seconds` is the number of seconds which the eventual-consistency-checker (ECC) service will be scheduled to check for new vertices in TigerGraph in order to create embeddings in Milvus. In the same way `cleanup_interval_seconds` is the number of seconds the ECC service will be scheduled to check for stale Milvus embeddings (e.g. if TigerGraph is restored from backup, or a vertex is deleted).
Copy the below into `configs/milvus_config.json` and edit the `host` and `port` fields to match your Milvus configuration (keeping in mind docker configuration). `username` and `password` can also be configured below if required by your Milvus setup. `enabled` should always be set to "true" for now as Milvus is only the embedding store supported. `process_interval_seconds` is the number of seconds which the eventual-consistency-checker (ECC) service will be scheduled to check for new vertices in TigerGraph in order to create embeddings in Milvus. In the same way `cleanup_interval_seconds` is the number of seconds the ECC service will be scheduled to check for stale Milvus embeddings (e.g. if TigerGraph is restored from backup, or a vertex is deleted). Batch size is the number of vertices that ECC will process in one workload; this is optional and defaults to 10.
```json
{
"host": "milvus-standalone",
"port": 19530,
"username": "",
"password": "",
"enabled": "true",
"sync_interval_seconds": 1800,
"cleanup_interval_seconds": 2592000
"process_interval_seconds": 1800,
"cleanup_interval_seconds": 2592000,
"batch_size": 10
}
```

Expand Down
4 changes: 2 additions & 2 deletions common/gsql/supportai/Scan_For_Updates.gsql
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ CREATE DISTRIBUTED QUERY Scan_For_Updates(STRING v_type = "Document",
seeds = {v_type};

to_process = SELECT s FROM seeds:s
WHERE (s.epoch_processing == 0 AND s.epoch_added > 0)
OR s.epoch_processing > s.epoch_added + expire_window
WHERE s.epoch_processed == 0 AND ((s.epoch_processing == 0 AND s.epoch_added > 0)
OR datetime_to_epoch(now()) > s.epoch_processing + expire_window)
POST-ACCUM @@ToProcess += ProcessInfo(s, s.epoch_added);

FOREACH tup IN @@ToProcess DO
Expand Down
197 changes: 131 additions & 66 deletions eventual-consistency-service/app/eventual_consistency_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
class EventualConsistencyChecker:
def __init__(
self,
interval_seconds,
process_interval_seconds,
cleanup_interval_seconds,
graphname,
vertex_field,
Expand All @@ -29,7 +29,7 @@ def __init__(
batch_size = 10,
run_forever = True
):
self.interval_seconds = interval_seconds
self.process_interval_seconds = process_interval_seconds
self.cleanup_interval_seconds = cleanup_interval_seconds
self.graphname = graphname
self.conn = conn
Expand Down Expand Up @@ -194,91 +194,153 @@ def _upsert_rels(self, src_id, src_type, relationships):
def fetch_and_process_vertex(self):
v_types_to_scan = self.embedding_indices
vertex_ids_content_map: dict = {}

for v_type in v_types_to_scan:
LogWriter.info(f"Fetching vertex ids and content for vertex type: {v_type}")
vertex_ids_content_map = self.conn.runInstalledQuery(
"Scan_For_Updates", {"v_type": v_type, "num_samples": self.batch_size}
)[0]["@@v_and_text"]
start_time = time.time()
LogWriter.info(f"Fetching {self.batch_size} vertex ids and content for vertex type: {v_type}")

vertex_ids_content_map = self._fetch_unprocessed_vertices(v_type)
vertex_ids = [vertex_id for vertex_id in vertex_ids_content_map.keys()]
LogWriter.info(
f"Remove existing entries from Milvus with vertex_ids in {str(vertex_ids)}"
)
self.embedding_stores[self.graphname + "_" + v_type].remove_embeddings(
expr=f"{self.vertex_field} in {str(vertex_ids)}"
)

LogWriter.info(f"Embedding content from vertex type: {v_type}")
for vertex_id, content in vertex_ids_content_map.items():
if content != "":
vec = self.embedding_service.embed_query(content)
self.embedding_stores[self.graphname + "_" + v_type].add_embeddings(
[(content, vec)], [{self.vertex_field: vertex_id}]
)

if v_type == "Document":
LogWriter.info(f"Chunking the content from vertex type: {v_type}")
for vertex_id, content in vertex_ids_content_map.items():
chunks = self._chunk_document(content)
for i, chunk in enumerate(chunks):
self._upsert_chunk(vertex_id, f"{vertex_id}_chunk_{i}", chunk)

if v_type == "Document" or v_type == "DocumentChunk":
LogWriter.info(
f"Extracting and upserting entities from the content from vertex type: {v_type}"
)
for vertex_id, content in vertex_ids_content_map.items():
extracted = self._extract_entities(content)
if len(extracted["nodes"]) > 0:
self._upsert_entities(vertex_id, v_type, extracted["nodes"])
if len(extracted["rels"]) > 0:
self._upsert_rels(vertex_id, v_type, extracted["rels"])

LogWriter.info(
f"Updating the TigerGraph vertex ids to confirm that processing was completed"
)
if vertex_ids:
vertex_ids = [{"id": vertex_id, "type": v_type} for vertex_id in vertex_ids]
self.conn.runInstalledQuery(
"Update_Vertices_Processing_Status",
{"processed_vertices": vertex_ids},
usePost=True
)
self._remove_existing_entries(v_type, vertex_ids)
self._process_content(v_type, vertex_ids_content_map)
self._update_processing_status(v_type, vertex_ids)
self._log_elapsed_time(start_time, v_type)
else:
LogWriter.error(f"No changes detected for vertex type: {v_type}")

return len(vertex_ids_content_map) != 0

def verify_and_cleanup_embeddings(self):
def _fetch_unprocessed_vertices(self, v_type):
return self.conn.runInstalledQuery(
"Scan_For_Updates", {"v_type": v_type, "num_samples": self.batch_size}
)[0]["@@v_and_text"]

def _remove_existing_entries(self, v_type, vertex_ids):
LogWriter.info(f"Remove existing entries from Milvus with vertex_ids in {str(vertex_ids)}")
self.embedding_stores[self.graphname + "_" + v_type].remove_embeddings(
expr=f"{self.vertex_field} in {str(vertex_ids)}"
)

def _process_content(self, v_type, vertex_ids_content_map):
LogWriter.info(f"Embedding content from vertex type: {v_type}")
for vertex_id, content in vertex_ids_content_map.items():
if content:
vec = self.embedding_service.embed_query(content)
self.embedding_stores[self.graphname + "_" + v_type].add_embeddings(
[(content, vec)], [{self.vertex_field: vertex_id}]
)

if v_type == "Document":
self._process_document_content(v_type, vertex_id, content)

if v_type in ["Document", "DocumentChunk"]:
self._extract_and_upsert_entities(v_type, vertex_id, content)

def _process_document_content(self, v_type, vertex_id, content):
LogWriter.info(f"Chunking the content from vertex type: {v_type}")
chunks = self._chunk_document(content)
for i, chunk in enumerate(chunks):
self._upsert_chunk(vertex_id, f"{vertex_id}_chunk_{i}", chunk)

def _extract_and_upsert_entities(self, v_type, vertex_id, content):
LogWriter.info(f"Extracting and upserting entities from the content from vertex type: {v_type}")
extracted = self._extract_entities(content)
if extracted["nodes"]:
self._upsert_entities(vertex_id, v_type, extracted["nodes"])
if extracted["rels"]:
self._upsert_rels(vertex_id, v_type, extracted["rels"])

def _update_processing_status(self, v_type, vertex_ids):
LogWriter.info(f"Updating the TigerGraph vertex ids for type {v_type} to confirm that processing was completed")
processed_vertices = [{"id": vertex_id, "type": v_type} for vertex_id in vertex_ids]
self.conn.runInstalledQuery(
"Update_Vertices_Processing_Status",
{"processed_vertices": processed_vertices},
usePost=True
)

def _log_elapsed_time(self, start_time, v_type):
end_time = time.time()
elapsed_time = end_time - start_time
LogWriter.info(f"Time elapsed for processing vertex_ids for type {v_type}: {elapsed_time:.2f} seconds")


def verify_and_cleanup_embeddings(self, batch_size=10):
for v_type in self.embedding_indices:
query_result = self.embedding_stores[self.graphname + "_" + v_type].query("pk > 0", [self.vertex_field])
if query_result is not None:
vertex_ids = [item.get(self.vertex_field) for item in query_result]

if vertex_ids:
non_existent_vertices = self.conn.runInstalledQuery(
"Check_Nonexistent_Vertices",
{"v_type": v_type, "vertex_ids": vertex_ids}
)[0]["@@missing_vertices"]

for vertex_id in non_existent_vertices:
LogWriter.info(f"Vertex {vertex_id} no longer exists in TigerGraph. Removing from Milvus.")
self.embedding_stores[self.graphname + "_" + v_type].remove_embeddings(
expr=f"{self.vertex_field} == '{vertex_id}'"
)
LogWriter.info(f"Running cleanup for vertex type {v_type}")

query_result = self.embedding_stores[self.graphname + "_" + v_type].query("pk > 0", [self.vertex_field, 'pk'])
if not query_result:
LogWriter.info(f"No vertices to process for vertex type {v_type}")
continue

vertex_id_map, duplicates_to_remove = self._identify_duplicates(query_result)
self._remove_duplicates(v_type, duplicates_to_remove)

unique_vertex_ids = list(vertex_id_map.keys())
self._process_vertex_batches(v_type, unique_vertex_ids, batch_size)

LogWriter.info(f"Finished cleanup for vertex type {v_type}")

def _identify_duplicates(self, query_result):
vertex_id_map = {}
duplicates_to_remove = []

for item in query_result:
vertex_id = item.get(self.vertex_field)
pk = item.get('pk')
if vertex_id not in vertex_id_map:
vertex_id_map[vertex_id] = pk
else:
duplicates_to_remove.append(pk)
LogWriter.info(f"Duplicate vertex id found with pk {pk} and will be removed")

return vertex_id_map, duplicates_to_remove

def _remove_duplicates(self, v_type, duplicates_to_remove):
for pk in duplicates_to_remove:
self.embedding_stores[self.graphname + "_" + v_type].remove_embeddings(
expr=f"pk == {pk}"
)
LogWriter.info(f"Removed duplicate with pk {pk} from Milvus")

def _process_vertex_batches(self, v_type, unique_vertex_ids, batch_size):
for i in range(0, len(unique_vertex_ids), batch_size):
batch_vertex_ids = unique_vertex_ids[i:i + batch_size]

non_existent_vertices = self.conn.runInstalledQuery(
"Check_Nonexistent_Vertices",
{"v_type": v_type, "vertex_ids": batch_vertex_ids}
)[0]["@@missing_vertices"]

if non_existent_vertices:
self._cleanup_nonexistent_vertices(v_type, non_existent_vertices)
else:
LogWriter.info(f"No cleanup needed for current batch of vertex type {v_type}")

def _cleanup_nonexistent_vertices(self, v_type, non_existent_vertices):
for vertex_id in non_existent_vertices:
self.embedding_stores[self.graphname + "_" + v_type].remove_embeddings(
expr=f"{self.vertex_field} == '{vertex_id}'"
)

def initialize(self):
LogWriter.info(
f"Eventual Consistency Check running for graphname {self.graphname} "
)
self.is_initialized = True
while True:
self.fetch_and_process_vertex()
worked = self.fetch_and_process_vertex()

if not self.run_forever:
break
else:
time.sleep(self.interval_seconds)
elif not worked:
LogWriter.info(
f"Eventual Consistency Check waiting to process for graphname {self.graphname} for {self.process_interval_seconds} seconds"
)
time.sleep(self.process_interval_seconds)


def initialize_cleanup(self):
Expand All @@ -292,6 +354,9 @@ def initialize_cleanup(self):
if not self.run_forever:
break
else:
LogWriter.info(
f"Eventual Consistency Check waiting to cleanup for graphname {self.graphname} for {self.cleanup_interval_seconds} seconds"
)
time.sleep(self.cleanup_interval_seconds)

def get_status(self):
Expand Down
15 changes: 9 additions & 6 deletions eventual-consistency-service/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ def initialize_eventual_consistency_checker(graphname: str, conn: TigerGraphConn
return consistency_checkers[graphname]

try:
check_interval_seconds = milvus_config.get("sync_interval_seconds", 1800) # default 30 minutes
cleanup_interval_seconds = milvus_config.get("cleanup_interval_seconds", 86400) # default 30 days
process_interval_seconds = milvus_config.get("process_interval_seconds", 1800) # default 30 minutes
cleanup_interval_seconds = milvus_config.get("cleanup_interval_seconds", 86400) # default 30 days,
batch_size = milvus_config.get("batch_size", 10)
vector_indices = {}
vertex_field = None

Expand All @@ -69,7 +70,7 @@ def initialize_eventual_consistency_checker(graphname: str, conn: TigerGraphConn
password=milvus_config.get("password", ""),
vector_field=milvus_config.get("vector_field", "document_vector"),
text_field=milvus_config.get("text_field", "document_content"),
vertex_field=vertex_field,
vertex_field=vertex_field
)

if doc_processing_config.get("chunker") == "semantic":
Expand Down Expand Up @@ -113,7 +114,7 @@ def initialize_eventual_consistency_checker(graphname: str, conn: TigerGraphConn
raise ValueError("vertex_field is not defined. Ensure Milvus is enabled in the configuration.")

checker = EventualConsistencyChecker(
check_interval_seconds,
process_interval_seconds,
cleanup_interval_seconds,
graphname,
vertex_field,
Expand All @@ -123,12 +124,14 @@ def initialize_eventual_consistency_checker(graphname: str, conn: TigerGraphConn
conn,
chunker,
extractor,
batch_size
)
consistency_checkers[graphname] = checker

# start the longer cleanup process that will run in further spaced-out intervals
cleanup_thread = Thread(target=checker.initialize_cleanup, daemon=True)
cleanup_thread.start()
if milvus_config.get("cleanup_enabled", True):
cleanup_thread = Thread(target=checker.initialize_cleanup, daemon=True)
cleanup_thread.start()

# start the main ECC process that searches for new vertices that need to be processed
checker.initialize()
Expand Down

0 comments on commit ed0cad5

Please sign in to comment.