-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathingestsql.py
207 lines (169 loc) · 7.76 KB
/
ingestsql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
from pathlib import Path
#from typing import Iterable
import json
import os
import openai
from docling.document_converter import DocumentConverter
from docling.datamodel.base_models import ConversionStatus
from llama_index.vector_stores.postgres import PGVectorStore
from llama_index.core import SimpleDirectoryReader, StorageContext, VectorStoreIndex
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core import Settings
from llama_index.readers.docling import DoclingReader
from llama_index.node_parser.docling import DoclingNodeParser
from tools.db import DatabaseManager
from dotenv import load_dotenv
#customized inject for evaluation framework
# Load environment variables
load_dotenv()
# Embedding model configuration
Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
# Explicitly set the environment variable TOKENIZERS_PARALLELISM to false
os.environ["TOKENIZERS_PARALLELISM"] = "false"
class VectorSearch:
def __init__(self, db_manager: DatabaseManager):
self.db_manager = db_manager
connection_string = db_manager.get_connection_string()
async_connection_string = connection_string.replace("postgresql://", "postgresql+asyncpg://")
self.table_name = "vector_store"
self.vector_store = PGVectorStore(
connection_string=connection_string,
async_connection_string=async_connection_string,
table_name=self.table_name,
schema_name="public",
embed_dim=384
)
# Check if vectors exist, create if needed
self.has_vectors = self._check_vectors_exist()
if not self.has_vectors:
self._initialize_vectors()
# check if there is error in initializing vectors (verify 30-nov)
if not self.has_vectors:
raise RuntimeError("Error initializing vector db")
def _initialize_vectors(self):
"""Initialize vector store with documents."""
try:
# Convert documents
# Get base directory - use ENV variable for Docker. For docker, base directory is /app
is_docker = os.getenv('ENV', 'true').lower() == 'true'
base_dir = Path("/app") if is_docker else Path(__file__).parent.parent
print("base_dire is: ", base_dir)
# Define document paths relative to base directory
doc_paths = [
"Chinook Data Dictionary.docx",
"Chinook Data Model.docx"
]
# Construct full paths
input_docs = [base_dir / "db" / doc for doc in doc_paths]
# Verify all files exist
if not all(path.exists() for path in input_docs):
raise FileNotFoundError(f"Documents not found in {base_dir / 'db'}")
# Set output directory
output_dir = base_dir / "db" / "converted"
output_dir.mkdir(parents=True, exist_ok=True)
# Process documents
self.convert_documents(input_docs, output_dir)
self.create_index(output_dir)
self.has_vectors = True
# Process documents
self.convert_documents(input_docs, output_dir)
# Create index
self.create_index(output_dir)
self.has_vectors = True
except Exception as e:
print(f"Error initializing vectors: {e}")
self.has_vectors = False
def _check_vectors_exist(self) -> bool:
"""Check if vectors exist in the database."""
query = f"""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'data_{self.table_name}'
);
"""
try:
result = self.db_manager.execute_query(query)
if not (isinstance(result, list) and result[0][0]):
return False
# If table exists, check if it has records
records_query = f"""
SELECT EXISTS (
SELECT 1 FROM data_{self.table_name} LIMIT 1
);
"""
records_result = self.db_manager.execute_query(records_query)
return isinstance(records_result, list) and records_result[0][0]
except Exception as e:
print(f"Error checking vector store: {e}")
return False
def convert_documents(self, input_paths: list[Path], output_dir: Path) -> tuple[int, int, int]:
"""Convert documents and save to JSON format."""
output_dir.mkdir(parents=True, exist_ok=True)
converter = DocumentConverter()
success_count = partial_success_count = failure_count = 0
results = converter.convert_all(input_paths, raises_on_error=False)
for result in results:
if result.status == ConversionStatus.SUCCESS:
success_count += 1
doc_filename = result.input.file.stem
with (output_dir / f"{doc_filename}.json").open("w") as fp:
fp.write(json.dumps(result.document.export_to_dict()))
if failure_count > 0:
raise RuntimeError(f"Failed converting {failure_count} of {len(input_paths)} documents.")
return success_count, partial_success_count, failure_count
def create_index(self, docs_dir: Path, force_rebuild: bool = False) -> VectorStoreIndex:
"""Create or load vector index."""
if self.has_vectors and not force_rebuild:
return self.load_index()
reader = SimpleDirectoryReader(
input_dir=str(docs_dir),
file_extractor={".*": DoclingReader(export_type=DoclingReader.ExportType.JSON)}
)
documents = reader.load_data()
# Embeddings are created and stored
storage_context = StorageContext.from_defaults(vector_store=self.vector_store)
index = VectorStoreIndex.from_documents(
documents,
transformations=[DoclingNodeParser()],
storage_context=storage_context,
show_progress=True
)
return index
def load_index(self) -> VectorStoreIndex:
"""Load existing index from vector store."""
storage_context = StorageContext.from_defaults(vector_store=self.vector_store)
return VectorStoreIndex.from_vector_store(
vector_store=self.vector_store,
storage_context=storage_context
)
def query(self, index: VectorStoreIndex, query_text: str) -> str:
"""Query the vector index."""
return index.as_query_engine().query(query_text)
#def main():
def main(force_rebuild: bool = False):
# 1. Document conversion
input_docs = [
Path("./db/Chinook Data Dictionary.docx"),
Path("./db/Chinook Data Model.docx")
]
output_dir = Path("./db/converted")
# 2. Database setup with imported DatabaseManager
db_manager = DatabaseManager(db_type='vecdb')
if not db_manager.test_connection():
raise ConnectionError("Database connection failed")
# 3. OpenAI setup
openai.api_key = os.getenv('OPENAI_API_KEY')
if not openai.api_key:
raise ValueError("OPENAI_API_KEY not found in environment variables")
# 4. Vector search setup and execution
searcher = VectorSearch(db_manager)
# Only convert and create index if needed
if force_rebuild or not searcher.has_vectors:
searcher.convert_documents(input_docs, output_dir)
index = searcher.create_index(output_dir, force_rebuild=force_rebuild)
else:
index = searcher.load_index()
result = searcher.query(index, "What is the album table?")
if __name__ == "__main__":
main()