Skip to content

Commit

Permalink
Manage data: add update feature (#33)
Browse files Browse the repository at this point in the history
* add update collection func

* update readme

* [pre-commit.ci] Add auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix pre-commit errors

* fix update_data.py

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
a-kore and pre-commit-ci[bot] authored Nov 26, 2024
1 parent 5b5a0c1 commit 8715fc1
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 2 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,29 @@ OPENAI_API_KEY=$YOUR_OPENAI_API_KEY python3 health_rec/manage_data.py load --nam
python3 health_rec/manage_data.py list
```

#### Remove data

To remove specific data entries from a collection:

```bash
python3 health_rec/manage_data.py remove --name <collection_name> --data_ids <data_id1> <data_id2> ...
```

These commands allow you to manage your collections efficiently without the need to reload all data, saving time and resources.


Careful while loading embeddings, it uses the OpenAI API, and hence make sure the data you want to use is correct. Test with a small amount of data first.

#### Updating Collections

If you need to update the collections with new or modified data without reloading everything, you can use the following method:

```bash
python3 health_rec/manage_data.py update --name <collection_name> --data_dir /data --load_embeddings
```

This method will sparsely update the collection based on the IDs of the data entries. If the service is not present in the collection, it will be added. If the service is already present, it will be updated with the new data and embeddings will be generated.

#### Navigate to the UI on the browser

```bash
Expand Down
61 changes: 59 additions & 2 deletions health_rec/manage_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import argparse
import glob
import hashlib
import json
import logging
import os
from typing import Any, Dict, List
Expand All @@ -17,6 +19,7 @@

from api.config import Config
from load_data import load_data
from update_data import update_data


logging.basicConfig(
Expand Down Expand Up @@ -151,14 +154,60 @@ def print_collection_details(details: Dict[str, Any]) -> None:
print(f"Number of Documents: {details['count']}")


def calculate_hash(content: Dict[str, Any]) -> str:
"""Calculate a hash for a dictionary content."""
# Sort dictionary to ensure consistent hash
content_str = json.dumps(content, sort_keys=True)
return hashlib.sha256(content_str.encode()).hexdigest()


def update_data_in_collection(
collection_name: str,
data_dir: str,
load_embeddings: bool,
) -> None:
"""
Update a ChromaDB collection by comparing existing entries with new data.
Only generates new embeddings for changed or new entries.
Parameters
----------
collection_name : str
Name of the collection to update
data_dir : str
Directory containing JSON files to process
Notes
-----
Expects data files in the format: data-XX.json where XX is a number.
"""
files = sorted(glob.glob(os.path.join(data_dir, "*.json")))
logger.info(
f"Loading data from {len(files)} files into collection: {collection_name}"
)
for file_path in files:
update_data(
file_path=file_path,
host=Config.CHROMA_HOST,
port=Config.CHROMA_PORT,
collection_name=collection_name,
openai_api_key=Config.OPENAI_API_KEY,
load_embeddings=load_embeddings,
)

logger.info(f"Finished updating data into collection: {collection_name}")


def main() -> None:
"""Manage Chroma collections for Health-Rec RAG application."""
parser = argparse.ArgumentParser(
description="Manage Chroma collections for Health-Rec RAG application."
)
parser.add_argument(
"action",
choices=["list", "create", "delete", "load", "inspect"],
choices=["list", "create", "delete", "load", "inspect", "update"],
help="Action to perform",
)
parser.add_argument(
Expand All @@ -172,7 +221,7 @@ def main() -> None:
parser.add_argument(
"--load_embeddings",
action="store_true",
help="Generate and load embeddings (for load action)",
help="Generate and load embeddings (for load and update action)",
)

args = parser.parse_args()
Expand All @@ -199,6 +248,14 @@ def main() -> None:
parser.error("--name is required for inspect action")
details = get_collection_details(args.name)
print_collection_details(details)
elif args.action == "update":
if not args.name or not args.data_dir:
parser.error("--name and --data_dir are required for update action")
update_data_in_collection(
collection_name=args.name,
data_dir=args.data_dir,
load_embeddings=args.load_embeddings,
)


if __name__ == "__main__":
Expand Down
180 changes: 180 additions & 0 deletions health_rec/update_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""
211 Service Update Data Loader.
This module provides functionality to update a ChromaDB collection with new data.
It compares existing entries with new data and
generates embeddings for changed or new entries.
"""

import hashlib
import json
import logging
from typing import Any, Dict, Optional, Tuple

from api.config import Config
from load_data import OpenAIEmbedding, get_or_create_collection, load_json_data


logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def calculate_hash(content: Dict[str, Any]) -> str:
"""Calculate a hash for a dictionary content."""
# Sort dictionary to ensure consistent hash
content_str = json.dumps(content, sort_keys=True)
return hashlib.sha256(content_str.encode()).hexdigest()


def prepare_document(service: Dict[str, Any]) -> Tuple[str, Dict[str, Any], str]:
"""Prepare a document and metadata for a service entry."""
metadata = {
key: ", ".join(map(str, value))
if isinstance(value, list)
else str(value)
if value is not None
else ""
for key, value in service.items()
}

doc = " | ".join(f"{key}: {value}" for key, value in metadata.items() if value)
service_id = str(service.get("id", " "))

return doc, metadata, service_id


def update_data(
file_path: str,
host: str,
port: int,
collection_name: str,
openai_api_key: Optional[str] = None,
embedding_model: str = Config.OPENAI_EMBEDDING,
load_embeddings: bool = False,
) -> None:
"""
Update a ChromaDB collection by comparing existing entries with new data.
Only generates new embeddings for changed or new entries.
Parameters
----------
file_path : str
Path to the JSON file containing the new data
host : str
The host address of the ChromaDB instance
port : int
The port number of the ChromaDB instance
collection_name : str
Name of the collection to update
data_dir : str
Directory containing JSON files to process
openai_api_key : Optional[str]
OpenAI API key for generating embeddings. If None, embeddings won't be generated
embedding_model : str
The OpenAI embedding model to use
load_embeddings : bool
Whether to load embeddings for the new data
"""
logger.info("Starting update process")
logger.info(f"File path: {file_path}")
logger.info(f"Host: {host}")
logger.info(f"Port: {port}")
logger.info(f"Collection name: {collection_name}")
try:
services = load_json_data(file_path)
logger.info(f"Loaded {len(services)} services from JSON file")

collection = get_or_create_collection(host, port, collection_name)

# Initialize OpenAI embedding function if API key provided
openai_embedding = None
if load_embeddings and openai_api_key:
logger.info("Initializing OpenAI embedding function")
openai_embedding = OpenAIEmbedding(
api_key=openai_api_key, model=embedding_model
)

# Process each JSON file
total_processed = 0
total_updated = 0
total_added = 0

for service in services:
total_processed += 1

doc, metadata, service_id = prepare_document(service)

try:
# Check if the document exists
existing_result = collection.get(
ids=[service_id], include=["documents", "metadatas"]
)

needs_update = False
if existing_result["ids"]:
# Compare existing document and metadata with new ones
existing_doc = existing_result["documents"][0]
existing_metadata = existing_result["metadatas"][0]

# generate new hash for metadata and document
new_metadata_hash, new_doc_hash = (
calculate_hash(metadata),
calculate_hash({"document": doc}),
)
old_metadata_hash, old_doc_hash = (
calculate_hash(existing_metadata),
calculate_hash({"document": existing_doc}),
)

if (
new_metadata_hash != old_metadata_hash
or new_doc_hash != old_doc_hash
):
needs_update = True
logger.info(f"Update needed for service {service_id}")
else:
# Document doesn't exist, mark for addition
needs_update = True
logger.info(f"New service found: {service_id}")

if needs_update:
# Generate embedding if API key provided
embedding = None
if openai_embedding:
embedding = openai_embedding([doc])[0]

# Update or add the document
if existing_result["ids"]:
collection.update(
ids=[service_id],
embeddings=[embedding],
metadatas=[metadata],
documents=[doc],
)
total_updated += 1
else:
collection.add(
ids=[service_id],
embeddings=[embedding],
metadatas=[metadata],
documents=[doc],
)
total_added += 1

except Exception as e:
logger.error(f"Error processing service {service_id}: {e}")
continue

logger.info(
f"Update complete. Processed: {total_processed}, "
f"Updated: {total_updated}, Added: {total_added}"
)

except Exception as e:
logger.error(f"Error updating collection: {e}")
raise

0 comments on commit 8715fc1

Please sign in to comment.