diff --git a/db/conversations.db b/db/conversations.db index b74d334..e02ea48 100644 Binary files a/db/conversations.db and b/db/conversations.db differ diff --git a/new.py b/new.py index 14b27ec..8b82006 100644 --- a/new.py +++ b/new.py @@ -49,7 +49,7 @@ def load_prompts(file_path='prompts.yaml'): # model = ChatAnthropic(model="claude-3-5-haiku-20241022", api_key=anthropic_api_key, max_tokens=300, max_retries=2, temperature=0.7) # in_memory_store = InMemoryStore() -store = InMemoryStore() +# store = InMemoryStore() # Define the state class State(TypedDict): @@ -58,54 +58,54 @@ class State(TypedDict): # Define the conversational prompt conversational_prompt = ChatPromptTemplate.from_messages([ ("system", prompts['MYCA']), - ("system", "Context from past conversations:\n{memory_context}"), + # ("system", "Context from past conversations:\n{memory_context}"), ("human", "{input}"), ]) class ConversationalAgent: - def __init__(self, model, store: BaseStore, max_memories: int = 10): + def __init__(self, model): # , store: BaseStore, max_memories: int = 10 # self.prompt_template = prompt_template self.model = model - self.store = store - self.max_memories = max_memories + # self.store = store + # self.max_memories = max_memories # Cache the memory prompt template for reuse self.conversational_prompt = ChatPromptTemplate.from_messages([ ("system", prompts['MYCA']), - ("system", "Context from past conversations:\n{memory_context}"), + # ("system", "Context from past conversations:\n{memory_context}"), ("human", "{input}") ]) # Initialize SQLite database - self._init_database() + # self._init_database() - def _init_database(self): - """Initialize SQLite database with required table""" - conn = sqlite3.connect('db/conversations.db') - cursor = conn.cursor() - cursor.execute(''' - CREATE TABLE IF NOT EXISTS conversations ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - namespace TEXT, - message TEXT, - timestamp DATETIME DEFAULT CURRENT_TIMESTAMP - ) - ''') - conn.commit() - conn.close() + # def _init_database(self): + # """Initialize SQLite database with required table""" + # conn = sqlite3.connect('db/conversations.db') + # cursor = conn.cursor() + # cursor.execute(''' + # CREATE TABLE IF NOT EXISTS conversations ( + # id INTEGER PRIMARY KEY AUTOINCREMENT, + # namespace TEXT, + # message TEXT, + # timestamp DATETIME DEFAULT CURRENT_TIMESTAMP + # ) + # ''') + # conn.commit() + # conn.close() - def fetch_conversations_from_sqlite(self, namespace): - # Implement the logic to fetch past conversations from SQLite - # For example, retrieve the last N messages for the user - try: - conn = sqlite3.connect('db/conversations.db') - cursor = conn.cursor() - cursor.execute("SELECT message FROM conversations WHERE namespace=? ORDER BY timestamp DESC LIMIT ?", (str(namespace),self.max_memories)) - rows = cursor.fetchall() - conn.close() - return [row[0] for row in rows] - except Exception as e: - logging.error(f"Error fetching conversations: {str(e)}") - return [] + # def fetch_conversations_from_sqlite(self, namespace): + # # Implement the logic to fetch past conversations from SQLite + # # For example, retrieve the last N messages for the user + # try: + # conn = sqlite3.connect('db/conversations.db') + # cursor = conn.cursor() + # cursor.execute("SELECT message FROM conversations WHERE namespace=? ORDER BY timestamp DESC LIMIT ?", (str(namespace),self.max_memories)) + # rows = cursor.fetchall() + # conn.close() + # return [row[0] for row in rows] + # except Exception as e: + # logging.error(f"Error fetching conversations: {str(e)}") + # return [] def summarize_conversations(self, conversations): try: @@ -122,68 +122,68 @@ def summarize_conversations(self, conversations): return "" # Fixed method to handle different message types - def store_conversation_in_sqlite(self, namespace, messages): - try: - conn = sqlite3.connect('db/conversations.db') - cursor = conn.cursor() - for message in messages: - # Handle different message types - content = message.content if hasattr(message, 'content') else str(message) - cursor.execute( - "INSERT INTO conversations (namespace, message, timestamp) VALUES (?, ?, ?)", - (str(namespace), content, datetime.now()) - ) - conn.commit() - conn.close() + # def store_conversation_in_sqlite(self, namespace, messages): + # try: + # conn = sqlite3.connect('db/conversations.db') + # cursor = conn.cursor() + # for message in messages: + # # Handle different message types + # content = message.content if hasattr(message, 'content') else str(message) + # cursor.execute( + # "INSERT INTO conversations (namespace, message, timestamp) VALUES (?, ?, ?)", + # (str(namespace), content, datetime.now()) + # ) + # conn.commit() + # conn.close() - # Cleanup old messages - self._cleanup_old_messages(namespace) - except Exception as e: - logging.error(f"Error storing conversation: {str(e)}") + # # Cleanup old messages + # self._cleanup_old_messages(namespace) + # except Exception as e: + # logging.error(f"Error storing conversation: {str(e)}") # Added new method to cleanup old messages - def _cleanup_old_messages(self, namespace): - """Keep only the latest max_memories messages""" - try: - conn = sqlite3.connect('db/conversations.db') - cursor = conn.cursor() - cursor.execute(""" - DELETE FROM conversations - WHERE namespace = ? - AND id NOT IN ( - SELECT id FROM conversations - WHERE namespace = ? - ORDER BY timestamp DESC - LIMIT ? - ) - """, (str(namespace), str(namespace), self.max_memories)) - conn.commit() - conn.close() - except Exception as e: - logging.error(f"Error cleaning up messages: {str(e)}") + # def _cleanup_old_messages(self, namespace): + # """Keep only the latest max_memories messages""" + # try: + # conn = sqlite3.connect('db/conversations.db') + # cursor = conn.cursor() + # cursor.execute(""" + # DELETE FROM conversations + # WHERE namespace = ? + # AND id NOT IN ( + # SELECT id FROM conversations + # WHERE namespace = ? + # ORDER BY timestamp DESC + # LIMIT ? + # ) + # """, (str(namespace), str(namespace), self.max_memories)) + # conn.commit() + # conn.close() + # except Exception as e: + # logging.error(f"Error cleaning up messages: {str(e)}") # Updated main method to use class methods and handle streaming def run_conversational_agent(self, state: State): try: # Get user ID from state config - user_id = state.get("configurable", {}).get("user_id", "default") - namespace = f"memories_user_{user_id}" + # user_id = state.get("configurable", {}).get("user_id", "default") + # namespace = f"memories_user_{user_id}" - # Fetch and summarize past conversations - past_conversations = self.fetch_conversations_from_sqlite(namespace) - summary = self.summarize_conversations(past_conversations) + # # Fetch and summarize past conversations + # past_conversations = self.fetch_conversations_from_sqlite(namespace) + # summary = self.summarize_conversations(past_conversations) # Format messages with context - formatted_messages = self.conversational_prompt.format_messages( - memory_context=summary, - input=state["messages"][-1].content - ) + # formatted_messages = self.conversational_prompt.format_messages( + # memory_context=summary, + # input=state["messages"][-1].content + # ) # Get streaming response from model - response = self.model.invoke(formatted_messages) - + # response = self.model.invoke(formatted_messages) + response = self.model.invoke(state["messages"][-1].content) # Store the conversation - self.store_conversation_in_sqlite(namespace, state["messages"] + [response]) + # self.store_conversation_in_sqlite(namespace, state["messages"] + [response]) return {"messages": state["messages"] + [response]} @@ -192,7 +192,8 @@ def run_conversational_agent(self, state: State): return {"messages": [AIMessage(content="I'm here to help. Could you please rephrase that?")]} # Instantiate the conversational agent with tools -conversational_agent = ConversationalAgent(model, store, max_memories=10) # conversational_prompt, llm_with_tools +# conversational_agent = ConversationalAgent(model, store, max_memories=10) # conversational_prompt, llm_with_tools +conversational_agent = ConversationalAgent(model) # agent = ConversationalAgent(prompt_template=conversational_prompt,model=model,store=store,max_memories=10) # # Define the router function @@ -213,7 +214,8 @@ def run_conversational_agent(self, state: State): # Compile the graph memory = MemorySaver() -graph = workflow.compile(checkpointer=memory, store=store) +graph = workflow.compile(checkpointer=memory) +# graph = workflow.compile(checkpointer=memory, store=store) # Function to run a conversation turn def chat(message: str, config: dict, history: List): @@ -224,9 +226,11 @@ def chat(message: str, config: dict, history: List): for msg in history: # Add user message if it exists and is not empty if msg.get("user"): + logging.info(f"Adding user message to history: {msg['user']}") messages.append(HumanMessage(content=msg["user"])) # Add AI response if it exists and is not empty if msg.get("response"): + logging.info(f"Adding AI response to history: {msg['response']}") messages.append(AIMessage(content=msg["response"])) # Add current message @@ -235,7 +239,7 @@ def chat(message: str, config: dict, history: List): # Add current message messages.append(HumanMessage(content=message)) - + logging.info(f"\n current FULL MESSAGE is: \n{messages}\n") # Invoke the model with messages and config try: result = graph.invoke({"messages": messages}, config=config) diff --git a/sukoon_api.py b/sukoon_api.py index a351375..3af60d8 100644 --- a/sukoon_api.py +++ b/sukoon_api.py @@ -89,6 +89,7 @@ async def process_query(request: MYCARequest, supabase: SupabaseManager = Depend # Process chat. If chat() is blocking, consider running it in a threadpool using run_in_executor. history = supabase.get_chat_history(mobile=mobile) + logger.info("Retrieved chat history for mobile %s: %s", mobile, history) response = chat(user_input, config, history) chat_response = response.content diff --git a/utils/supabase_manager.py b/utils/supabase_manager.py index 51048f6..157808d 100644 --- a/utils/supabase_manager.py +++ b/utils/supabase_manager.py @@ -43,54 +43,113 @@ def log_chat(self, mobile: str, user: str, response: str) -> bool: return False def get_chat_history(self, mobile: str) -> List[Dict[str, Any]]: - """Get chat history for a user from Supabase""" + """ + Get chat history for a user from Supabase + + Args: + mobile: User's mobile number to fetch chat history + + Returns: + List of dictionaries containing user messages and responses + """ try: - # Build the URL with query parameters - query_url = f"{self.url}?select=user,response&mobile=eq.{mobile}&order=created_at.asc" + # Properly encode the mobile number for URL + encoded_mobile = requests.utils.quote(mobile) + + # Build the query with specific columns and conditions + query_url = ( + f"{self.url}" + f"?select=user,response" + f"&mobile=eq.{encoded_mobile}" + f"&order=created_at.asc" + ) - # Log the request details for debugging - print(f"\nFetching chat history:") + self.logger.debug(f"Fetching chat history for mobile: {mobile}") response = requests.get( query_url, - headers=self.headers + headers={ + **self.headers, + 'Prefer': 'return=representation' # Ensures Supabase returns the data + } ) - # Log response details - print(f"\nResponse status: {response.status_code}") - print(f"Response headers: {response.headers}") + response.raise_for_status() # Raise exception for non-200 status codes + + data = response.json() + self.logger.debug(f"Retrieved {len(data)} chat records") + + # Filter out any records where either user or response is None/empty + messages = [ + { + "user": msg["user"], + "response": msg["response"] + } + for msg in data + if msg.get("user") and msg.get("response") # Only include complete conversations + ] - # Check if response is successful - if response.status_code == 200: - data = response.json() - print(f"\nRaw response data: {data}") + return messages + + except requests.exceptions.HTTPError as e: + self.logger.error(f"HTTP Error in get_chat_history: {str(e)}") + if response.status_code == 409: + self.logger.error("Conflict error - possible duplicate or constraint violation") + return [] + + except Exception as e: + self.logger.error(f"Error in get_chat_history: {str(e)}") + return [] + + # def get_chat_history(self, mobile: str) -> List[Dict[str, Any]]: + # """Get chat history for a user from Supabase""" + # try: + # # Build the URL with query parameters + # query_url = f"{self.url}?select=user,response&mobile=eq.{mobile}&order=created_at.asc" + + # # Log the request details for debugging + # print(f"\nFetching chat history:") + + # response = requests.get( + # query_url, + # headers=self.headers + # ) + + # # Log response details + # print(f"\nResponse status: {response.status_code}") + # print(f"Response headers: {response.headers}") + + # # Check if response is successful + # if response.status_code == 200: + # data = response.json() + # print(f"\nRaw response data: {data}") - messages = [ - { - "user": msg["user"], - "response": msg["response"] - } - for msg in data - ] + # messages = [ + # { + # "user": msg["user"], + # "response": msg["response"] + # } + # for msg in data + # ] - print(f"\nFormatted messages: {messages}") - self.logger.info(f"Successfully retrieved {len(messages)} messages for {mobile}") - return messages + # print(f"\nFormatted messages: {messages}") + # self.logger.info(f"Successfully retrieved {len(messages)} messages for {mobile}") + # return messages - else: - print(f"\nError response: {response.text}") - self.logger.error(f"Failed to get chat history. Status code: {response.status_code}") - self.logger.error(f"Response: {response.text}") - return [] + # else: + # print(f"\nError response: {response.text}") + # self.logger.error(f"Failed to get chat history. Status code: {response.status_code}") + # self.logger.error(f"Response: {response.text}") + # return [] - except requests.exceptions.RequestException as e: - print(f"\nRequest error: {str(e)}") - self.logger.error(f"Request failed: {str(e)}") - return [] + # except requests.exceptions.RequestException as e: + # print(f"\nRequest error: {str(e)}") + # self.logger.error(f"Request failed: {str(e)}") + # return [] - except Exception as e: - print(f"\nUnexpected error: {str(e)}") - self.logger.error(f"Unexpected error: {str(e)}") - return [] + # except Exception as e: + # print(f"\nUnexpected error: {str(e)}") + # self.logger.error(f"Unexpected error: {str(e)}") + # return [] \ No newline at end of file