import sqlite3 import json from datetime import datetime from typing import Dict, Any, Optional import logging logging.basicConfig( level=logging.DEBUG, # Set to DEBUG to capture the debug logs format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("llm_proxy.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) DATABASE_NAME = "llm_proxy.db" def init_db(): """Initializes the database and creates the 'requests' table if it doesn't exist.""" with sqlite3.connect(DATABASE_NAME) as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS requests ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, client_request TEXT, llm_request TEXT, llm_response TEXT, client_response TEXT ) """) conn.commit() def log_request(client_request: Dict[str, Any]) -> int: """Logs the initial client request and returns the log ID.""" with sqlite3.connect(DATABASE_NAME) as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO requests (client_request) VALUES (?)", (json.dumps(client_request),) ) conn.commit() return cursor.lastrowid def update_request_log( log_id: int, llm_request: Optional[Dict[str, Any]] = None, llm_response: Optional[Dict[str, Any]] = None, client_response: Optional[Dict[str, Any]] = None, ): """Updates a request log with the LLM request, LLM response, or client response.""" fields_to_update = [] values = [] if llm_request is not None: fields_to_update.append("llm_request = ?") values.append(json.dumps(llm_request)) if llm_response is not None: fields_to_update.append("llm_response = ?") values.append(json.dumps(llm_response)) if client_response is not None: fields_to_update.append("client_response = ?") values.append(json.dumps(client_response)) if not fields_to_update: logger.debug(f"No fields to update for log ID {log_id}. Skipping database update.") return sql = f"UPDATE requests SET {', '.join(fields_to_update)} WHERE id = ?" values.append(log_id) try: with sqlite3.connect(DATABASE_NAME) as conn: cursor = conn.cursor() cursor.execute(sql, tuple(values)) logger.debug(f"Attempting to commit update for log ID {log_id} with fields: {fields_to_update}") conn.commit() logger.debug(f"Successfully committed update for log ID {log_id}.") except sqlite3.Error as e: logger.error(f"Database error updating log ID {log_id}: {e}") except Exception as e: logger.error(f"An unexpected error occurred while updating log ID {log_id}: {e}") def get_latest_log_entry() -> Optional[dict]: """Helper to get the full latest log entry.""" try: with sqlite3.connect(DATABASE_NAME) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM requests ORDER BY id DESC LIMIT 1") row = cursor.fetchone() if row: return dict(row) except sqlite3.Error as e: print(f"Database error: {e}") return None