Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Azure MongoDBCosmosDB Integration #649

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions pkg/ragengine/services/tests/vector_store/test_azuremongodb_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import os
from tempfile import TemporaryDirectory
from unittest.mock import patch

import pytest

from services.vector_store.base import BaseVectorStore
from services.vector_store.azuremongodb_store import AzureCosmosDBMongoDBVectorStoreHandler
from services.models import Document
from services.embedding.huggingface_local import LocalHuggingFaceEmbedding
from services.config import MODEL_ID, INFERENCE_URL, INFERENCE_ACCESS_SECRET
from services.config import PERSIST_DIR

@pytest.fixture(scope='session')
def init_embed_manager():
return LocalHuggingFaceEmbedding(MODEL_ID)

@pytest.fixture
def vector_store_manager(init_embed_manager):
with TemporaryDirectory() as temp_dir:
print(f"Saving temporary test storage at: {temp_dir}")
# Mock the persistence directory
os.environ['AZURE_COSMOSDB_MONGODB_URI'] = "<URI_HERE>"
manager = AzureCosmosDBMongoDBVectorStoreHandler(init_embed_manager)
manager._clear_collection_and_indexes()
yield manager

def test_index_documents(vector_store_manager):
first_doc_text, second_doc_text = "First document", "Second document"
documents = [
Document(text=first_doc_text, metadata={"type": "text"}),
Document(text=second_doc_text, metadata={"type": "text"})
]

doc_ids = vector_store_manager.index_documents("test_index", documents)

assert len(doc_ids) == 2
assert set(doc_ids) == {BaseVectorStore.generate_doc_id(first_doc_text),
BaseVectorStore.generate_doc_id(second_doc_text)}

def test_index_documents_isolation(vector_store_manager):
documents1 = [
Document(text="First document in index1", metadata={"type": "text"}),
]
documents2 = [
Document(text="First document in index2", metadata={"type": "text"}),
]

# Index documents in separate indices
index_name_1, index_name_2 = "index1", "index2"
vector_store_manager.index_documents(index_name_1, documents1)
vector_store_manager.index_documents(index_name_2, documents2)

indexed_docs = vector_store_manager.list_all_indexed_documents()
assert len(indexed_docs) == 2
assert list(indexed_docs[index_name_1].values())[0]["text"] == "First document in index1"
assert list(indexed_docs[index_name_1].values())[0]["content_vector"] == "Vector of dimension 384"
assert list(indexed_docs[index_name_2].values())[0]["text"] == "First document in index2"
assert list(indexed_docs[index_name_2].values())[0]["content_vector"] == "Vector of dimension 384"

@patch('requests.post')
def test_query_documents(mock_post, vector_store_manager):
# Define Mock Response for Custom Inference API
mock_response = {
"result": "This is the completion from the API"
}

mock_post.return_value.json.return_value = mock_response

# Add documents to index
documents = [
Document(text="First document", metadata={"type": "text"}),
Document(text="Second document", metadata={"type": "text"})
]
vector_store_manager.index_documents("test_index", documents)

params = {"temperature": 0.7}
# Mock query and results
query_result = vector_store_manager.query("test_index", "First", top_k=1, llm_params=params)

assert query_result is not None
assert query_result["response"] == "{'result': 'This is the completion from the API'}"
assert query_result["source_nodes"][0]["text"] == "First document"
assert query_result["source_nodes"][0]["score"] == pytest.approx(0.7102378952219661, rel=1e-6)

mock_post.assert_called_once_with(
INFERENCE_URL,
# Auto-Generated by LlamaIndex
json={"prompt": "Context information is below.\n---------------------\ntype: text\n\nFirst document\n---------------------\nGiven the context information and not prior knowledge, answer the query.\nQuery: First\nAnswer: ", "formatted": True, 'temperature': 0.7},
headers={"Authorization": f"Bearer {INFERENCE_ACCESS_SECRET}"}
)

def test_add_document(vector_store_manager):
documents = [Document(text="Third document", metadata={"type": "text"})]
vector_store_manager.index_documents("test_index", documents)

# Add a document to the existing index
new_document = [Document(text="Fourth document", metadata={"type": "text"})]
vector_store_manager.index_documents("test_index", new_document)

# Assert that the document exists
assert vector_store_manager.document_exists("test_index", new_document[0],
BaseVectorStore.generate_doc_id("Fourth document"))

def test_persist_index_1(vector_store_manager):
"""Test that the index store is persisted."""
# Add a document and persist the index
documents = [Document(text="Test document", metadata={"type": "text"})]
vector_store_manager.index_documents("test_index", documents)
vector_store_manager._persist("test_index")
assert os.path.exists(PERSIST_DIR)

def test_persist_index_2(vector_store_manager):
"""Test that an index store is persisted."""
# Add a document and persist the index
documents = [Document(text="Test document", metadata={"type": "text"})]
vector_store_manager.index_documents("test_index", documents)

documents = [Document(text="Another Test document", metadata={"type": "text"})]
vector_store_manager.index_documents("another_test_index", documents)

vector_store_manager._persist_all()
assert os.path.exists(PERSIST_DIR)
93 changes: 93 additions & 0 deletions pkg/ragengine/services/vector_store/azuremongodb_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import logging
from typing import List, Dict
import os
from services.models import Document

import pymongo
import json
from llama_index.vector_stores.azurecosmosmongo import (
AzureCosmosDBMongoDBVectorSearch,
)

from .base import BaseVectorStore

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AzureCosmosDBMongoDBVectorStoreHandler(BaseVectorStore):
def __init__(self, embedding_manager):
super().__init__(embedding_manager)
self.db_name = os.getenv("AZURE_COSMOSDB_MONGODB_DB_NAME", "vector-store")
self.collection_name = os.getenv("AZURE_COSMOSDB_MONGODB_COLLECTION_NAME", "vector-store")
self.connection_string = os.getenv("AZURE_COSMOSDB_MONGODB_URI", "mongodb+srv://myDatabaseUser:D1fficultP%[email protected]/?retryWrites=true&w=majority")
self.dimension = self.embedding_manager.get_embedding_dimension()
try:
self.mongodb_client = pymongo.MongoClient(self.connection_string)
self.mongodb_client.admin.command('ping') # Test the connection
except Exception as e:
raise Exception(f"Failed to connect to MongoDB: {e}")
# Ensure collection exists
try:
self.collection = self.mongodb_client[self.db_name][self.collection_name]
except Exception as e:
raise ValueError(f"Failed to access collection '{self.collection_name}' in database '{self.db_name}': {e}")


def _create_new_index(self, index_name: str, documents: List[Document]) -> List[str]:
vector_store = AzureCosmosDBMongoDBVectorSearch(
mongodb_client=self.mongodb_client,
db_name=self.db_name,
collection_name=self.collection_name,
index_name=index_name,
embedding_key=f"{index_name}_embedding", # Unique field for each index
cosmos_search_kwargs={
# TODO: "kind": "vector-hnsw", # or "vector-ivf", "vector-diskann" (search type)
"dimensions": self.dimension,
}
)
return self._create_index_common(index_name, documents, vector_store)

def list_all_indexed_documents(self) -> Dict[str, Dict[str, Dict[str, str]]]:
indexed_docs = {} # Accumulate documents across all indexes
for index_name in self.index_map.keys():
embedding_key = f"{index_name}_embedding"
documents = self.collection.find({embedding_key: {"$exists": True}})
for doc in documents:
doc_id = doc.get("id")
if doc_id is None:
continue # Skip if no document ID is found
indexed_docs.setdefault(index_name, {})[doc_id] = {
"text": doc.get("text", ""),
"metadata": json.dumps(doc.get("metadata", {})),
"content_vector": f"Vector of dimension {len(doc.get(embedding_key, []))}"
}
return indexed_docs

def document_exists(self, index_name: str, doc: Document, doc_id: str) -> bool:
"""AzureCosmosDBMongoDB for checking document existence."""
if index_name not in self.index_map:
logger.warning(f"No such index: '{index_name}' exists in vector store.")
return False
return doc.text in [doc["text"] for doc in list(self.collection.find({f"{index_name}_embedding": {"$exists": True}}))]

def _clear_collection_and_indexes(self):
"""Clears all documents and drops all indexes in the collection.

This method is primarily intended for testing purposes to ensure
a clean state between tests, preventing index and document conflicts.
"""
try:
# Delete all documents in the collection
self.collection.delete_many({})
print(f"All documents in collection '{self.collection_name}' have been deleted.")

# Drop all indexes in the collection
self.collection.drop_indexes()
print(f"All indexes in collection '{self.collection_name}' have been dropped.")

except Exception as e:
print(f"Failed to clear collection and indexes in '{self.collection_name}': {e}")
Loading