diff --git a/core/api.py b/core/api.py index fd68380..733c66d 100644 --- a/core/api.py +++ b/core/api.py @@ -13,7 +13,16 @@ import logging import arq from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from core.limits_utils import check_and_increment_limits -from core.models.request import GenerateUriRequest, RetrieveRequest, CompletionQueryRequest, IngestTextRequest, CreateGraphRequest, UpdateGraphRequest, BatchIngestResponse, SetFolderRuleRequest +from core.models.request import ( + GenerateUriRequest, + RetrieveRequest, + CompletionQueryRequest, + IngestTextRequest, + CreateGraphRequest, + UpdateGraphRequest, + BatchIngestResponse, + SetFolderRuleRequest, +) from core.models.completion import ChunkSource, CompletionResponse from core.models.documents import Document, DocumentResult, ChunkResult from core.models.graph import Graph @@ -71,7 +80,7 @@ FastAPIInstrumentor.instrument_app( exclude_spans=["send", "receive"], # Exclude HTTP send/receive spans to reduce telemetry volume http_capture_headers_server_request=None, # Don't capture request headers http_capture_headers_server_response=None, # Don't capture response headers - tracer_provider=None # Use the global tracer provider + tracer_provider=None, # Use the global tracer provider ) # Add CORS middleware @@ -106,12 +115,13 @@ async def initialize_database(): # We don't raise an exception here to allow the app to continue starting # even if there are initialization errors + @app.on_event("startup") async def initialize_vector_store(): """Initialize vector store tables and indexes on application startup.""" # First initialize the primary vector store (PGVectorStore if using pgvector) logger.info("Initializing primary vector store...") - if hasattr(vector_store, 'initialize'): + if hasattr(vector_store, "initialize"): success = await vector_store.initialize() if success: logger.info("Primary vector store initialization successful") @@ -119,43 +129,46 @@ async def initialize_vector_store(): logger.error("Primary vector store initialization failed") else: logger.warning("Primary vector store does not have an initialize method") - + # Then initialize the multivector store if enabled if settings.ENABLE_COLPALI and colpali_vector_store: logger.info("Initializing multivector store...") # Handle both synchronous and asynchronous initialize methods - if hasattr(colpali_vector_store.initialize, '__awaitable__'): + if hasattr(colpali_vector_store.initialize, "__awaitable__"): success = await colpali_vector_store.initialize() else: success = colpali_vector_store.initialize() - + if success: logger.info("Multivector store initialization successful") else: logger.error("Multivector store initialization failed") + @app.on_event("startup") async def initialize_user_limits_database(): """Initialize user service on application startup.""" logger.info("Initializing user service...") if settings.MODE == "cloud": from core.database.user_limits_db import UserLimitsDatabase + user_limits_db = UserLimitsDatabase(uri=settings.POSTGRES_URI) await user_limits_db.initialize() + @app.on_event("startup") async def initialize_redis_pool(): """Initialize the Redis connection pool for background tasks.""" global redis_pool logger.info("Initializing Redis connection pool...") - + # Get Redis settings from configuration redis_host = settings.REDIS_HOST redis_port = settings.REDIS_PORT - + # Log the Redis connection details logger.info(f"Connecting to Redis at {redis_host}:{redis_port}") - + redis_settings = arq.connections.RedisSettings( host=redis_host, port=redis_port, @@ -164,6 +177,7 @@ async def initialize_redis_pool(): redis_pool = await arq.create_pool(redis_settings) logger.info("Redis connection pool initialized successfully") + @app.on_event("shutdown") async def close_redis_pool(): """Close the Redis connection pool on application shutdown.""" @@ -174,6 +188,7 @@ async def close_redis_pool(): await redis_pool.wait_closed() logger.info("Redis connection pool closed") + # Initialize vector store if not settings.POSTGRES_URI: raise ValueError("PostgreSQL URI is required for pgvector store") @@ -250,9 +265,7 @@ cache_factory = LlamaCacheFactory(Path(settings.STORAGE_PATH)) # Initialize ColPali embedding model if enabled colpali_embedding_model = ColpaliEmbeddingModel() if settings.ENABLE_COLPALI else None -colpali_vector_store = ( - MultiVectorStore(uri=settings.POSTGRES_URI) if settings.ENABLE_COLPALI else None -) +colpali_vector_store = MultiVectorStore(uri=settings.POSTGRES_URI) if settings.ENABLE_COLPALI else None # Initialize document service with configured components document_service = DocumentService( @@ -302,7 +315,7 @@ async def verify_token(authorization: str = Header(None)) -> AuthContext: entity_type_field = payload.get("type") or payload.get("entity_type") if not entity_type_field: raise HTTPException(status_code=401, detail="Missing entity type in token") - + return AuthContext( entity_type=EntityType(entity_type_field), entity_id=payload["entity_id"], @@ -315,6 +328,7 @@ async def verify_token(authorization: str = Header(None)) -> AuthContext: @app.post("/ingest/text", response_model=Document) +@telemetry.track(operation_type="ingest_text", metadata_resolver=telemetry.ingest_text_metadata) async def ingest_text( request: IngestTextRequest, auth: AuthContext = Depends(verify_token), @@ -338,28 +352,16 @@ async def ingest_text( Document: Metadata of ingested document """ try: - async with telemetry.track_operation( - operation_type="ingest_text", - user_id=auth.entity_id, - tokens_used=len(request.content.split()), # Approximate token count - metadata={ - "metadata": request.metadata, - "rules": request.rules, - "use_colpali": request.use_colpali, - "folder_name": request.folder_name, - "end_user_id": request.end_user_id, - }, - ): - return await document_service.ingest_text( - content=request.content, - filename=request.filename, - metadata=request.metadata, - rules=request.rules, - use_colpali=request.use_colpali, - auth=auth, - folder_name=request.folder_name, - end_user_id=request.end_user_id, - ) + return await document_service.ingest_text( + content=request.content, + filename=request.filename, + metadata=request.metadata, + rules=request.rules, + use_colpali=request.use_colpali, + auth=auth, + folder_name=request.folder_name, + end_user_id=request.end_user_id, + ) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @@ -367,11 +369,14 @@ async def ingest_text( # Redis pool for background tasks redis_pool = None + def get_redis_pool(): """Get the global Redis connection pool for background tasks.""" return redis_pool + @app.post("/ingest/file", response_model=Document) +@telemetry.track(operation_type="queue_ingest_file", metadata_resolver=telemetry.ingest_file_metadata) async def ingest_file( file: UploadFile, metadata: str = Form("{}"), @@ -404,153 +409,133 @@ async def ingest_file( # Parse metadata and rules metadata_dict = json.loads(metadata) rules_list = json.loads(rules) + # Fix bool conversion: ensure string "false" is properly converted to False - def str2bool(v): return v if isinstance(v, bool) else str(v).lower() in {"true", "1", "yes"} + def str2bool(v): + return v if isinstance(v, bool) else str(v).lower() in {"true", "1", "yes"} + use_colpali = str2bool(use_colpali) - + # Ensure user has write permission if "write" not in auth.permissions: raise PermissionError("User does not have write permission") - async with telemetry.track_operation( - operation_type="queue_ingest_file", - user_id=auth.entity_id, - metadata={ - "filename": file.filename, - "content_type": file.content_type, - "metadata": metadata_dict, - "rules": rules_list, - "use_colpali": use_colpali, - "folder_name": folder_name, - "end_user_id": end_user_id, + logger.debug(f"API: Queueing file ingestion with use_colpali: {use_colpali}") + + # Create a document with processing status + doc = Document( + content_type=file.content_type, + filename=file.filename, + metadata=metadata_dict, + owner={"type": auth.entity_type.value, "id": auth.entity_id}, + access_control={ + "readers": [auth.entity_id], + "writers": [auth.entity_id], + "admins": [auth.entity_id], + "user_id": [auth.user_id] if auth.user_id else [], }, - ): - logger.debug(f"API: Queueing file ingestion with use_colpali: {use_colpali}") + system_metadata={"status": "processing"}, + ) + + # Add folder_name and end_user_id to system_metadata if provided + if folder_name: + doc.system_metadata["folder_name"] = folder_name + if end_user_id: + doc.system_metadata["end_user_id"] = end_user_id + + # Set processing status + doc.system_metadata["status"] = "processing" + + # Store the document in the database + success = await database.store_document(doc) + if not success: + raise Exception("Failed to store document metadata") + + # If folder_name is provided, ensure the folder exists and add document to it + if folder_name: + try: + await document_service._ensure_folder_exists(folder_name, doc.external_id, auth) + logger.debug(f"Ensured folder '{folder_name}' exists and contains document {doc.external_id}") + except Exception as e: + # Log error but don't raise - we want document ingestion to continue even if folder operation fails + logger.error(f"Error ensuring folder exists: {e}") + + # Read file content + file_content = await file.read() + + # Generate a unique key for the file + file_key = f"ingest_uploads/{uuid.uuid4()}/{file.filename}" + + # Store the file in the configured storage + file_content_base64 = base64.b64encode(file_content).decode() + bucket, stored_key = await storage.upload_from_base64(file_content_base64, file_key, file.content_type) + logger.debug(f"Stored file in bucket {bucket} with key {stored_key}") + + # Update document with storage info + doc.storage_info = {"bucket": bucket, "key": stored_key} - # Create a document with processing status - doc = Document( - content_type=file.content_type, - filename=file.filename, - metadata=metadata_dict, - owner={"type": auth.entity_type.value, "id": auth.entity_id}, - access_control={ - "readers": [auth.entity_id], - "writers": [auth.entity_id], - "admins": [auth.entity_id], - "user_id": [auth.user_id] if auth.user_id else [], - }, - system_metadata={"status": "processing"} - ) - - # Add folder_name and end_user_id to system_metadata if provided - if folder_name: - doc.system_metadata["folder_name"] = folder_name - if end_user_id: - doc.system_metadata["end_user_id"] = end_user_id - - # Set processing status - doc.system_metadata["status"] = "processing" - - # Store the document in the database - success = await database.store_document(doc) - if not success: - raise Exception("Failed to store document metadata") - - # If folder_name is provided, ensure the folder exists and add document to it - if folder_name: - try: - await document_service._ensure_folder_exists(folder_name, doc.external_id, auth) - logger.debug(f"Ensured folder '{folder_name}' exists and contains document {doc.external_id}") - except Exception as e: - # Log error but don't raise - we want document ingestion to continue even if folder operation fails - logger.error(f"Error ensuring folder exists: {e}") - - # Read file content - file_content = await file.read() - - # Generate a unique key for the file - file_key = f"ingest_uploads/{uuid.uuid4()}/{file.filename}" - - # Store the file in the configured storage - file_content_base64 = base64.b64encode(file_content).decode() - bucket, stored_key = await storage.upload_from_base64( - file_content_base64, - file_key, - file.content_type - ) - logger.debug(f"Stored file in bucket {bucket} with key {stored_key}") - - # Update document with storage info - doc.storage_info = {"bucket": bucket, "key": stored_key} - - # Initialize storage_files array with the first file - from core.models.documents import StorageFileInfo - from datetime import datetime, UTC - - # Create a StorageFileInfo for the initial file - initial_file_info = StorageFileInfo( - bucket=bucket, - key=stored_key, - version=1, - filename=file.filename, - content_type=file.content_type, - timestamp=datetime.now(UTC) - ) - doc.storage_files = [initial_file_info] - - # Log storage files - logger.debug(f"Initial storage_files for {doc.external_id}: {doc.storage_files}") + # Initialize storage_files array with the first file + from core.models.documents import StorageFileInfo + from datetime import datetime, UTC + + # Create a StorageFileInfo for the initial file + initial_file_info = StorageFileInfo( + bucket=bucket, + key=stored_key, + version=1, + filename=file.filename, + content_type=file.content_type, + timestamp=datetime.now(UTC) + ) + doc.storage_files = [initial_file_info] + + # Log storage files + logger.debug(f"Initial storage_files for {doc.external_id}: {doc.storage_files}") # Update both storage_info and storage_files - await database.update_document( - document_id=doc.external_id, - updates={ - "storage_info": doc.storage_info, - "storage_files": doc.storage_files - }, - auth=auth - ) - - # Convert auth context to a dictionary for serialization - auth_dict = { - "entity_type": auth.entity_type.value, - "entity_id": auth.entity_id, - "app_id": auth.app_id, - "permissions": list(auth.permissions), - "user_id": auth.user_id - } - - # Enqueue the background job - job = await redis.enqueue_job( - 'process_ingestion_job', - document_id=doc.external_id, - file_key=stored_key, - bucket=bucket, - original_filename=file.filename, - content_type=file.content_type, - metadata_json=metadata, - auth_dict=auth_dict, - rules_list=rules_list, - use_colpali=use_colpali, - folder_name=folder_name, - end_user_id=end_user_id - ) - - logger.info(f"File ingestion job queued with ID: {job.job_id} for document: {doc.external_id}") - - # Return the document with processing status - return doc - + await database.update_document( + document_id=doc.external_id, updates={"storage_info": doc.storage_info, "storage_files": doc.storage_files}, auth=auth + ) + + # Convert auth context to a dictionary for serialization + auth_dict = { + "entity_type": auth.entity_type.value, + "entity_id": auth.entity_id, + "app_id": auth.app_id, + "permissions": list(auth.permissions), + "user_id": auth.user_id, + } + + # Enqueue the background job + job = await redis.enqueue_job( + "process_ingestion_job", + document_id=doc.external_id, + file_key=stored_key, + bucket=bucket, + original_filename=file.filename, + content_type=file.content_type, + metadata_json=metadata, + auth_dict=auth_dict, + rules_list=rules_list, + use_colpali=use_colpali, + folder_name=folder_name, + end_user_id=end_user_id, + ) + + logger.info(f"File ingestion job queued with ID: {job.job_id} for document: {doc.external_id}") + + return doc except json.JSONDecodeError as e: raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}") except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) except Exception as e: - logger.error(f"Error queueing file ingestion: {str(e)}") - raise HTTPException(status_code=500, detail=f"Error queueing file ingestion: {str(e)}") + logger.error(f"Error during file ingestion: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error during file ingestion: {str(e)}") @app.post("/ingest/files", response_model=BatchIngestResponse) +@telemetry.track(operation_type="queue_batch_ingest", metadata_resolver=telemetry.batch_ingest_metadata) async def batch_ingest_files( files: List[UploadFile] = File(...), metadata: str = Form("{}"), @@ -564,7 +549,7 @@ async def batch_ingest_files( ) -> BatchIngestResponse: """ Batch ingest multiple files using the task queue. - + Args: files: List of files to ingest metadata: JSON string of metadata (either a single dict or list of dicts) @@ -583,18 +568,18 @@ async def batch_ingest_files( - errors: List of errors that occurred during the batch operation """ if not files: - raise HTTPException( - status_code=400, - detail="No files provided for batch ingestion" - ) + raise HTTPException(status_code=400, detail="No files provided for batch ingestion") try: metadata_value = json.loads(metadata) rules_list = json.loads(rules) + # Fix bool conversion: ensure string "false" is properly converted to False - def str2bool(v): return str(v).lower() in {"true", "1", "yes"} + def str2bool(v): + return str(v).lower() in {"true", "1", "yes"} + use_colpali = str2bool(use_colpali) - + # Ensure user has write permission if "write" not in auth.permissions: raise PermissionError("User does not have write permission") @@ -607,7 +592,7 @@ async def batch_ingest_files( if isinstance(metadata_value, list) and len(metadata_value) != len(files): raise HTTPException( status_code=400, - detail=f"Number of metadata items ({len(metadata_value)}) must match number of files ({len(files)})" + detail=f"Number of metadata items ({len(metadata_value)}) must match number of files ({len(files)})", ) # Validate rules if it's a list of lists @@ -615,7 +600,7 @@ async def batch_ingest_files( if len(rules_list) != len(files): raise HTTPException( status_code=400, - detail=f"Number of rule lists ({len(rules_list)}) must match number of files ({len(files)})" + detail=f"Number of rule lists ({len(rules_list)}) must match number of files ({len(files)})", ) # Convert auth context to a dictionary for serialization @@ -624,130 +609,113 @@ async def batch_ingest_files( "entity_id": auth.entity_id, "app_id": auth.app_id, "permissions": list(auth.permissions), - "user_id": auth.user_id + "user_id": auth.user_id, } created_documents = [] - async with telemetry.track_operation( - operation_type="queue_batch_ingest", - user_id=auth.entity_id, - metadata={ - "file_count": len(files), - "metadata_type": "list" if isinstance(metadata_value, list) else "single", - "rules_type": "per_file" if isinstance(rules_list, list) and rules_list and isinstance(rules_list[0], list) else "shared", - "folder_name": folder_name, - "end_user_id": end_user_id, - }, - ): - try: - for i, file in enumerate(files): - # Get the metadata and rules for this file - metadata_item = metadata_value[i] if isinstance(metadata_value, list) else metadata_value - file_rules = rules_list[i] if isinstance(rules_list, list) and rules_list and isinstance(rules_list[0], list) else rules_list - - # Create a document with processing status - doc = Document( - content_type=file.content_type, - filename=file.filename, - metadata=metadata_item, - owner={"type": auth.entity_type.value, "id": auth.entity_id}, - access_control={ - "readers": [auth.entity_id], - "writers": [auth.entity_id], - "admins": [auth.entity_id], - "user_id": [auth.user_id] if auth.user_id else [], - }, - ) - - # Add folder_name and end_user_id to system_metadata if provided - if folder_name: - doc.system_metadata["folder_name"] = folder_name - if end_user_id: - doc.system_metadata["end_user_id"] = end_user_id - - # Set processing status - doc.system_metadata["status"] = "processing" - - # Store the document in the database - success = await database.store_document(doc) - if not success: - raise Exception(f"Failed to store document metadata for {file.filename}") - - # If folder_name is provided, ensure the folder exists and add document to it - if folder_name: - try: - await document_service._ensure_folder_exists(folder_name, doc.external_id, auth) - logger.debug(f"Ensured folder '{folder_name}' exists and contains document {doc.external_id}") - except Exception as e: - # Log error but don't raise - we want document ingestion to continue even if folder operation fails - logger.error(f"Error ensuring folder exists: {e}") - - # Read file content - file_content = await file.read() - - # Generate a unique key for the file - file_key = f"ingest_uploads/{uuid.uuid4()}/{file.filename}" - - # Store the file in the configured storage - file_content_base64 = base64.b64encode(file_content).decode() - bucket, stored_key = await storage.upload_from_base64( - file_content_base64, - file_key, - file.content_type - ) - logger.debug(f"Stored file in bucket {bucket} with key {stored_key}") - - # Update document with storage info - doc.storage_info = {"bucket": bucket, "key": stored_key} - await database.update_document( - document_id=doc.external_id, - updates={"storage_info": doc.storage_info}, - auth=auth - ) - - # Convert metadata to JSON string for job - metadata_json = json.dumps(metadata_item) - - # Enqueue the background job - job = await redis.enqueue_job( - 'process_ingestion_job', - document_id=doc.external_id, - file_key=stored_key, - bucket=bucket, - original_filename=file.filename, - content_type=file.content_type, - metadata_json=metadata_json, - auth_dict=auth_dict, - rules_list=file_rules, - use_colpali=use_colpali, - folder_name=folder_name, - end_user_id=end_user_id - ) - - logger.info(f"File ingestion job queued with ID: {job.job_id} for document: {doc.external_id}") - - # Add document to the list - created_documents.append(doc) - - # Return information about created documents - return BatchIngestResponse( - documents=created_documents, - errors=[] + try: + for i, file in enumerate(files): + # Get the metadata and rules for this file + metadata_item = metadata_value[i] if isinstance(metadata_value, list) else metadata_value + file_rules = ( + rules_list[i] + if isinstance(rules_list, list) and rules_list and isinstance(rules_list[0], list) + else rules_list ) - - except Exception as e: - logger.error(f"Error queueing batch file ingestion: {str(e)}") - raise HTTPException(status_code=500, detail=f"Error queueing batch file ingestion: {str(e)}") + # Create a document with processing status + doc = Document( + content_type=file.content_type, + filename=file.filename, + metadata=metadata_item, + owner={"type": auth.entity_type.value, "id": auth.entity_id}, + access_control={ + "readers": [auth.entity_id], + "writers": [auth.entity_id], + "admins": [auth.entity_id], + "user_id": [auth.user_id] if auth.user_id else [], + }, + ) + # Add folder_name and end_user_id to system_metadata if provided + if folder_name: + doc.system_metadata["folder_name"] = folder_name + if end_user_id: + doc.system_metadata["end_user_id"] = end_user_id + + # Set processing status + doc.system_metadata["status"] = "processing" + + # Store the document in the database + success = await database.store_document(doc) + if not success: + raise Exception(f"Failed to store document metadata for {file.filename}") + + # If folder_name is provided, ensure the folder exists and add document to it + if folder_name: + try: + await document_service._ensure_folder_exists(folder_name, doc.external_id, auth) + logger.debug(f"Ensured folder '{folder_name}' exists and contains document {doc.external_id}") + except Exception as e: + # Log error but don't raise - we want document ingestion to continue even if folder operation fails + logger.error(f"Error ensuring folder exists: {e}") + + # Read file content + file_content = await file.read() + + # Generate a unique key for the file + file_key = f"ingest_uploads/{uuid.uuid4()}/{file.filename}" + + # Store the file in the configured storage + file_content_base64 = base64.b64encode(file_content).decode() + bucket, stored_key = await storage.upload_from_base64(file_content_base64, file_key, file.content_type) + logger.debug(f"Stored file in bucket {bucket} with key {stored_key}") + + # Update document with storage info + doc.storage_info = {"bucket": bucket, "key": stored_key} + await database.update_document( + document_id=doc.external_id, updates={"storage_info": doc.storage_info}, auth=auth + ) + + # Convert metadata to JSON string for job + metadata_json = json.dumps(metadata_item) + + # Enqueue the background job + job = await redis.enqueue_job( + "process_ingestion_job", + document_id=doc.external_id, + file_key=stored_key, + bucket=bucket, + original_filename=file.filename, + content_type=file.content_type, + metadata_json=metadata_json, + auth_dict=auth_dict, + rules_list=file_rules, + use_colpali=use_colpali, + folder_name=folder_name, + end_user_id=end_user_id, + ) + + logger.info(f"File ingestion job queued with ID: {job.job_id} for document: {doc.external_id}") + + # Add document to the list + created_documents.append(doc) + + # Return information about created documents + return BatchIngestResponse(documents=created_documents, errors=[]) + + except Exception as e: + logger.error(f"Error queueing batch file ingestion: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error queueing batch file ingestion: {str(e)}") @app.post("/retrieve/chunks", response_model=List[ChunkResult]) +@telemetry.track(operation_type="retrieve_chunks", metadata_resolver=telemetry.retrieve_chunks_metadata) async def retrieve_chunks(request: RetrieveRequest, auth: AuthContext = Depends(verify_token)): """ Retrieve relevant chunks. - + Args: request: RetrieveRequest containing: - query: Search query text @@ -759,43 +727,32 @@ async def retrieve_chunks(request: RetrieveRequest, auth: AuthContext = Depends( - folder_name: Optional folder to scope the search to - end_user_id: Optional end-user ID to scope the search to auth: Authentication context - + Returns: List[ChunkResult]: List of relevant chunks """ try: - async with telemetry.track_operation( - operation_type="retrieve_chunks", - user_id=auth.entity_id, - metadata={ - "k": request.k, - "min_score": request.min_score, - "use_reranking": request.use_reranking, - "use_colpali": request.use_colpali, - "folder_name": request.folder_name, - "end_user_id": request.end_user_id, - }, - ): - return await document_service.retrieve_chunks( - request.query, - auth, - request.filters, - request.k, - request.min_score, - request.use_reranking, - request.use_colpali, - request.folder_name, - request.end_user_id, - ) + return await document_service.retrieve_chunks( + request.query, + auth, + request.filters, + request.k, + request.min_score, + request.use_reranking, + request.use_colpali, + request.folder_name, + request.end_user_id, + ) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @app.post("/retrieve/docs", response_model=List[DocumentResult]) +@telemetry.track(operation_type="retrieve_docs", metadata_resolver=telemetry.retrieve_docs_metadata) async def retrieve_documents(request: RetrieveRequest, auth: AuthContext = Depends(verify_token)): """ Retrieve relevant documents. - + Args: request: RetrieveRequest containing: - query: Search query text @@ -807,53 +764,39 @@ async def retrieve_documents(request: RetrieveRequest, auth: AuthContext = Depen - folder_name: Optional folder to scope the search to - end_user_id: Optional end-user ID to scope the search to auth: Authentication context - + Returns: List[DocumentResult]: List of relevant documents """ try: - async with telemetry.track_operation( - operation_type="retrieve_docs", - user_id=auth.entity_id, - metadata={ - "k": request.k, - "min_score": request.min_score, - "use_reranking": request.use_reranking, - "use_colpali": request.use_colpali, - "folder_name": request.folder_name, - "end_user_id": request.end_user_id, - }, - ): - return await document_service.retrieve_docs( - request.query, - auth, - request.filters, - request.k, - request.min_score, - request.use_reranking, - request.use_colpali, - request.folder_name, - request.end_user_id, - ) + return await document_service.retrieve_docs( + request.query, + auth, + request.filters, + request.k, + request.min_score, + request.use_reranking, + request.use_colpali, + request.folder_name, + request.end_user_id, + ) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @app.post("/batch/documents", response_model=List[Document]) -async def batch_get_documents( - request: Dict[str, Any], - auth: AuthContext = Depends(verify_token) -): +@telemetry.track(operation_type="batch_get_documents", metadata_resolver=telemetry.batch_documents_metadata) +async def batch_get_documents(request: Dict[str, Any], auth: AuthContext = Depends(verify_token)): """ Retrieve multiple documents by their IDs in a single batch operation. - + Args: request: Dictionary containing: - document_ids: List of document IDs to retrieve - folder_name: Optional folder to scope the operation to - end_user_id: Optional end-user ID to scope the operation to auth: Authentication context - + Returns: List[Document]: List of documents matching the IDs """ @@ -862,39 +805,28 @@ async def batch_get_documents( document_ids = request.get("document_ids", []) folder_name = request.get("folder_name") end_user_id = request.get("end_user_id") - + if not document_ids: return [] - - async with telemetry.track_operation( - operation_type="batch_get_documents", - user_id=auth.entity_id, - metadata={ - "document_count": len(document_ids), - "folder_name": folder_name, - "end_user_id": end_user_id, - }, - ): - return await document_service.batch_retrieve_documents(document_ids, auth, folder_name, end_user_id) + + return await document_service.batch_retrieve_documents(document_ids, auth, folder_name, end_user_id) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @app.post("/batch/chunks", response_model=List[ChunkResult]) -async def batch_get_chunks( - request: Dict[str, Any], - auth: AuthContext = Depends(verify_token) -): +@telemetry.track(operation_type="batch_get_chunks", metadata_resolver=telemetry.batch_chunks_metadata) +async def batch_get_chunks(request: Dict[str, Any], auth: AuthContext = Depends(verify_token)): """ Retrieve specific chunks by their document ID and chunk number in a single batch operation. - + Args: request: Dictionary containing: - sources: List of ChunkSource objects (with document_id and chunk_number) - folder_name: Optional folder to scope the operation to - end_user_id: Optional end-user ID to scope the operation to auth: Authentication context - + Returns: List[ChunkResult]: List of chunk results """ @@ -904,43 +836,32 @@ async def batch_get_chunks( folder_name = request.get("folder_name") end_user_id = request.get("end_user_id") use_colpali = request.get("use_colpali") - + if not sources: return [] - - async with telemetry.track_operation( - operation_type="batch_get_chunks", - user_id=auth.entity_id, - metadata={ - "chunk_count": len(sources), - "folder_name": folder_name, - "end_user_id": end_user_id, - "use_colpali": use_colpali, - }, - ): - # Convert sources to ChunkSource objects if needed - chunk_sources = [] - for source in sources: - if isinstance(source, dict): - chunk_sources.append(ChunkSource(**source)) - else: - chunk_sources.append(source) - - return await document_service.batch_retrieve_chunks(chunk_sources, auth, folder_name, end_user_id, use_colpali) + + # Convert sources to ChunkSource objects if needed + chunk_sources = [] + for source in sources: + if isinstance(source, dict): + chunk_sources.append(ChunkSource(**source)) + else: + chunk_sources.append(source) + + return await document_service.batch_retrieve_chunks(chunk_sources, auth, folder_name, end_user_id, use_colpali) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @app.post("/query", response_model=CompletionResponse) -async def query_completion( - request: CompletionQueryRequest, auth: AuthContext = Depends(verify_token) -): +@telemetry.track(operation_type="query", metadata_resolver=telemetry.query_metadata) +async def query_completion(request: CompletionQueryRequest, auth: AuthContext = Depends(verify_token)): """ Generate completion using relevant chunks as context. - - When graph_name is provided, the query will leverage the knowledge graph + + When graph_name is provided, the query will leverage the knowledge graph to enhance retrieval by finding relevant entities and their connected documents. - + Args: request: CompletionQueryRequest containing: - query: Query text @@ -958,7 +879,7 @@ async def query_completion( - folder_name: Optional folder to scope the operation to - end_user_id: Optional end-user ID to scope the operation to auth: Authentication context - + Returns: CompletionResponse: Generated completion """ @@ -966,46 +887,29 @@ async def query_completion( # Validate prompt overrides before proceeding if request.prompt_overrides: validate_prompt_overrides_with_http_exception(request.prompt_overrides, operation_type="query") - + # Check query limits if in cloud mode if settings.MODE == "cloud" and auth.user_id: # Check limits before proceeding await check_and_increment_limits(auth, "query", 1) - - async with telemetry.track_operation( - operation_type="query", - user_id=auth.entity_id, - metadata={ - "k": request.k, - "min_score": request.min_score, - "max_tokens": request.max_tokens, - "temperature": request.temperature, - "use_reranking": request.use_reranking, - "use_colpali": request.use_colpali, - "graph_name": request.graph_name, - "hop_depth": request.hop_depth, - "include_paths": request.include_paths, - "folder_name": request.folder_name, - "end_user_id": request.end_user_id, - }, - ): - return await document_service.query( - request.query, - auth, - request.filters, - request.k, - request.min_score, - request.max_tokens, - request.temperature, - request.use_reranking, - request.use_colpali, - request.graph_name, - request.hop_depth, - request.include_paths, - request.prompt_overrides, - request.folder_name, - request.end_user_id, - ) + + return await document_service.query( + request.query, + auth, + request.filters, + request.k, + request.min_score, + request.max_tokens, + request.temperature, + request.use_reranking, + request.use_colpali, + request.graph_name, + request.hop_depth, + request.include_paths, + request.prompt_overrides, + request.folder_name, + request.end_user_id, + ) except ValueError as e: validate_prompt_overrides_with_http_exception(operation_type="query", error=e) except PermissionError as e: @@ -1023,7 +927,7 @@ async def list_documents( ): """ List accessible documents. - + Args: auth: Authentication context skip: Number of documents to skip @@ -1031,7 +935,7 @@ async def list_documents( filters: Optional metadata filters folder_name: Optional folder to scope the operation to end_user_id: Optional end-user ID to scope the operation to - + Returns: List[Document]: List of accessible documents """ @@ -1041,7 +945,7 @@ async def list_documents( system_filters["folder_name"] = folder_name if end_user_id: system_filters["end_user_id"] = end_user_id - + return await document_service.db.get_documents(auth, skip, limit, filters, system_filters) @@ -1057,16 +961,17 @@ async def get_document(document_id: str, auth: AuthContext = Depends(verify_toke except HTTPException as e: logger.error(f"Error getting document: {e}") raise e - + + @app.get("/documents/{document_id}/status", response_model=Dict[str, Any]) async def get_document_status(document_id: str, auth: AuthContext = Depends(verify_token)): """ Get the processing status of a document. - + Args: document_id: ID of the document to check auth: Authentication context - + Returns: Dict containing status information for the document """ @@ -1074,10 +979,10 @@ async def get_document_status(document_id: str, auth: AuthContext = Depends(veri doc = await document_service.db.get_document(document_id, auth) if not doc: raise HTTPException(status_code=404, detail="Document not found") - + # Extract status information status = doc.system_metadata.get("status", "unknown") - + response = { "document_id": doc.external_id, "status": status, @@ -1085,11 +990,11 @@ async def get_document_status(document_id: str, auth: AuthContext = Depends(veri "created_at": doc.system_metadata.get("created_at"), "updated_at": doc.system_metadata.get("updated_at"), } - + # Add error information if failed if status == "failed": response["error"] = doc.system_metadata.get("error", "Unknown error") - + return response except HTTPException: raise @@ -1099,6 +1004,7 @@ async def get_document_status(document_id: str, auth: AuthContext = Depends(veri @app.delete("/documents/{document_id}") +@telemetry.track(operation_type="delete_document", metadata_resolver=telemetry.document_delete_metadata) async def delete_document(document_id: str, auth: AuthContext = Depends(verify_token)): """ Delete a document and all associated data. @@ -1116,35 +1022,30 @@ async def delete_document(document_id: str, auth: AuthContext = Depends(verify_t Deletion status """ try: - async with telemetry.track_operation( - operation_type="delete_document", - user_id=auth.entity_id, - metadata={"document_id": document_id}, - ): - success = await document_service.delete_document(document_id, auth) - if not success: - raise HTTPException(status_code=404, detail="Document not found or delete failed") - return {"status": "success", "message": f"Document {document_id} deleted successfully"} + success = await document_service.delete_document(document_id, auth) + if not success: + raise HTTPException(status_code=404, detail="Document not found or delete failed") + return {"status": "success", "message": f"Document {document_id} deleted successfully"} except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @app.get("/documents/filename/{filename}", response_model=Document) async def get_document_by_filename( - filename: str, + filename: str, auth: AuthContext = Depends(verify_token), folder_name: Optional[str] = None, end_user_id: Optional[str] = None, ): """ Get document by filename. - + Args: filename: Filename of the document to retrieve auth: Authentication context folder_name: Optional folder to scope the operation to end_user_id: Optional end-user ID to scope the operation to - + Returns: Document: Document metadata if found and accessible """ @@ -1155,7 +1056,7 @@ async def get_document_by_filename( system_filters["folder_name"] = folder_name if end_user_id: system_filters["end_user_id"] = end_user_id - + doc = await document_service.db.get_document_by_filename(filename, auth, system_filters) logger.debug(f"Found document by filename: {doc}") if not doc: @@ -1167,55 +1068,47 @@ async def get_document_by_filename( @app.post("/documents/{document_id}/update_text", response_model=Document) +@telemetry.track(operation_type="update_document_text", metadata_resolver=telemetry.document_update_text_metadata) async def update_document_text( document_id: str, request: IngestTextRequest, update_strategy: str = "add", - auth: AuthContext = Depends(verify_token) + auth: AuthContext = Depends(verify_token), ): """ Update a document with new text content using the specified strategy. - + Args: document_id: ID of the document to update request: Text content and metadata for the update update_strategy: Strategy for updating the document (default: 'add') - + Returns: Document: Updated document metadata """ try: - async with telemetry.track_operation( - operation_type="update_document_text", - user_id=auth.entity_id, - metadata={ - "document_id": document_id, - "update_strategy": update_strategy, - "use_colpali": request.use_colpali, - "has_filename": request.filename is not None, - }, - ): - doc = await document_service.update_document( - document_id=document_id, - auth=auth, - content=request.content, - file=None, - filename=request.filename, - metadata=request.metadata, - rules=request.rules, - update_strategy=update_strategy, - use_colpali=request.use_colpali, - ) + doc = await document_service.update_document( + document_id=document_id, + auth=auth, + content=request.content, + file=None, + filename=request.filename, + metadata=request.metadata, + rules=request.rules, + update_strategy=update_strategy, + use_colpali=request.use_colpali, + ) - if not doc: - raise HTTPException(status_code=404, detail="Document not found or update failed") + if not doc: + raise HTTPException(status_code=404, detail="Document not found or update failed") - return doc + return doc except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @app.post("/documents/{document_id}/update_file", response_model=Document) +@telemetry.track(operation_type="update_document_file", metadata_resolver=telemetry.document_update_file_metadata) async def update_document_file( document_id: str, file: UploadFile, @@ -1223,7 +1116,7 @@ async def update_document_file( rules: str = Form("[]"), update_strategy: str = Form("add"), use_colpali: Optional[bool] = None, - auth: AuthContext = Depends(verify_token) + auth: AuthContext = Depends(verify_token), ): """ Update a document with content from a file using the specified strategy. @@ -1243,33 +1136,22 @@ async def update_document_file( metadata_dict = json.loads(metadata) rules_list = json.loads(rules) - async with telemetry.track_operation( - operation_type="update_document_file", - user_id=auth.entity_id, - metadata={ - "document_id": document_id, - "filename": file.filename, - "content_type": file.content_type, - "update_strategy": update_strategy, - "use_colpali": use_colpali, - }, - ): - doc = await document_service.update_document( - document_id=document_id, - auth=auth, - content=None, - file=file, - filename=file.filename, - metadata=metadata_dict, - rules=rules_list, - update_strategy=update_strategy, - use_colpali=use_colpali, - ) + doc = await document_service.update_document( + document_id=document_id, + auth=auth, + content=None, + file=file, + filename=file.filename, + metadata=metadata_dict, + rules=rules_list, + update_strategy=update_strategy, + use_colpali=use_colpali, + ) - if not doc: - raise HTTPException(status_code=404, detail="Document not found or update failed") + if not doc: + raise HTTPException(status_code=404, detail="Document not found or update failed") - return doc + return doc except json.JSONDecodeError as e: raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}") except PermissionError as e: @@ -1277,10 +1159,12 @@ async def update_document_file( @app.post("/documents/{document_id}/update_metadata", response_model=Document) +@telemetry.track( + operation_type="update_document_metadata", + metadata_resolver=telemetry.document_update_metadata_resolver, +) async def update_document_metadata( - document_id: str, - metadata: Dict[str, Any], - auth: AuthContext = Depends(verify_token) + document_id: str, metadata: Dict[str, Any], auth: AuthContext = Depends(verify_token) ): """ Update only a document's metadata. @@ -1293,44 +1177,38 @@ async def update_document_metadata( Document: Updated document metadata """ try: - async with telemetry.track_operation( - operation_type="update_document_metadata", - user_id=auth.entity_id, - metadata={ - "document_id": document_id, - }, - ): - doc = await document_service.update_document( - document_id=document_id, - auth=auth, - content=None, - file=None, - filename=None, - metadata=metadata, - rules=[], - update_strategy="add", - use_colpali=None, - ) + doc = await document_service.update_document( + document_id=document_id, + auth=auth, + content=None, + file=None, + filename=None, + metadata=metadata, + rules=[], + update_strategy="add", + use_colpali=None, + ) - if not doc: - raise HTTPException(status_code=404, detail="Document not found or update failed") + if not doc: + raise HTTPException(status_code=404, detail="Document not found or update failed") - return doc + return doc except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) # Usage tracking endpoints @app.get("/usage/stats") +@telemetry.track(operation_type="get_usage_stats", metadata_resolver=telemetry.usage_stats_metadata) async def get_usage_stats(auth: AuthContext = Depends(verify_token)) -> Dict[str, int]: """Get usage statistics for the authenticated user.""" - async with telemetry.track_operation(operation_type="get_usage_stats", user_id=auth.entity_id): - if not auth.permissions or "admin" not in auth.permissions: - return telemetry.get_user_usage(auth.entity_id) + if not auth.permissions or "admin" not in auth.permissions: return telemetry.get_user_usage(auth.entity_id) + return telemetry.get_user_usage(auth.entity_id) @app.get("/usage/recent") +@telemetry.track(operation_type="get_recent_usage", metadata_resolver=telemetry.recent_usage_metadata) async def get_recent_usage( auth: AuthContext = Depends(verify_token), operation_type: Optional[str] = None, @@ -1338,40 +1216,30 @@ async def get_recent_usage( status: Optional[str] = None, ) -> List[Dict]: """Get recent usage records.""" - async with telemetry.track_operation( - operation_type="get_recent_usage", - user_id=auth.entity_id, - metadata={ - "operation_type": operation_type, - "since": since.isoformat() if since else None, - "status": status, - }, - ): - if not auth.permissions or "admin" not in auth.permissions: - records = telemetry.get_recent_usage( - user_id=auth.entity_id, operation_type=operation_type, since=since, status=status - ) - else: - records = telemetry.get_recent_usage( - operation_type=operation_type, since=since, status=status - ) + if not auth.permissions or "admin" not in auth.permissions: + records = telemetry.get_recent_usage( + user_id=auth.entity_id, operation_type=operation_type, since=since, status=status + ) + else: + records = telemetry.get_recent_usage(operation_type=operation_type, since=since, status=status) - return [ - { - "timestamp": record.timestamp, - "operation_type": record.operation_type, - "tokens_used": record.tokens_used, - "user_id": record.user_id, - "duration_ms": record.duration_ms, - "status": record.status, - "metadata": record.metadata, - } - for record in records - ] + return [ + { + "timestamp": record.timestamp, + "operation_type": record.operation_type, + "tokens_used": record.tokens_used, + "user_id": record.user_id, + "duration_ms": record.duration_ms, + "status": record.status, + "metadata": record.metadata, + } + for record in records + ] # Cache endpoints @app.post("/cache/create") +@telemetry.track(operation_type="create_cache", metadata_resolver=telemetry.cache_create_metadata) async def create_cache( name: str, model: str, @@ -1386,97 +1254,66 @@ async def create_cache( if settings.MODE == "cloud" and auth.user_id: # Check limits before proceeding await check_and_increment_limits(auth, "cache", 1) - - async with telemetry.track_operation( - operation_type="create_cache", - user_id=auth.entity_id, - metadata={ - "name": name, - "model": model, - "gguf_file": gguf_file, - "filters": filters, - "docs": docs, - }, - ): - filter_docs = set(await document_service.db.get_documents(auth, filters=filters)) - additional_docs = ( - { - await document_service.db.get_document(document_id=doc_id, auth=auth) - for doc_id in docs - } - if docs - else set() - ) - docs_to_add = list(filter_docs.union(additional_docs)) - if not docs_to_add: - raise HTTPException(status_code=400, detail="No documents to add to cache") - response = await document_service.create_cache( - name, model, gguf_file, docs_to_add, filters - ) - return response + + filter_docs = set(await document_service.db.get_documents(auth, filters=filters)) + additional_docs = ( + {await document_service.db.get_document(document_id=doc_id, auth=auth) for doc_id in docs} + if docs + else set() + ) + docs_to_add = list(filter_docs.union(additional_docs)) + if not docs_to_add: + raise HTTPException(status_code=400, detail="No documents to add to cache") + response = await document_service.create_cache(name, model, gguf_file, docs_to_add, filters) + return response except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @app.get("/cache/{name}") +@telemetry.track(operation_type="get_cache", metadata_resolver=telemetry.cache_get_metadata) async def get_cache(name: str, auth: AuthContext = Depends(verify_token)) -> Dict[str, Any]: """Get cache configuration by name.""" try: - async with telemetry.track_operation( - operation_type="get_cache", - user_id=auth.entity_id, - metadata={"name": name}, - ): - exists = await document_service.load_cache(name) - return {"exists": exists} + exists = await document_service.load_cache(name) + return {"exists": exists} except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @app.post("/cache/{name}/update") +@telemetry.track(operation_type="update_cache", metadata_resolver=telemetry.cache_update_metadata) async def update_cache(name: str, auth: AuthContext = Depends(verify_token)) -> Dict[str, bool]: """Update cache with new documents matching its filter.""" try: - async with telemetry.track_operation( - operation_type="update_cache", - user_id=auth.entity_id, - metadata={"name": name}, - ): - if name not in document_service.active_caches: - exists = await document_service.load_cache(name) - if not exists: - raise HTTPException(status_code=404, detail=f"Cache '{name}' not found") - cache = document_service.active_caches[name] - docs = await document_service.db.get_documents(auth, filters=cache.filters) - docs_to_add = [doc for doc in docs if doc.id not in cache.docs] - return cache.add_docs(docs_to_add) + if name not in document_service.active_caches: + exists = await document_service.load_cache(name) + if not exists: + raise HTTPException(status_code=404, detail=f"Cache '{name}' not found") + cache = document_service.active_caches[name] + docs = await document_service.db.get_documents(auth, filters=cache.filters) + docs_to_add = [doc for doc in docs if doc.id not in cache.docs] + return cache.add_docs(docs_to_add) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @app.post("/cache/{name}/add_docs") -async def add_docs_to_cache( - name: str, docs: List[str], auth: AuthContext = Depends(verify_token) -) -> Dict[str, bool]: +@telemetry.track(operation_type="add_docs_to_cache", metadata_resolver=telemetry.cache_add_docs_metadata) +async def add_docs_to_cache(name: str, docs: List[str], auth: AuthContext = Depends(verify_token)) -> Dict[str, bool]: """Add specific documents to the cache.""" try: - async with telemetry.track_operation( - operation_type="add_docs_to_cache", - user_id=auth.entity_id, - metadata={"name": name, "docs": docs}, - ): - cache = document_service.active_caches[name] - docs_to_add = [ - await document_service.db.get_document(doc_id, auth) - for doc_id in docs - if doc_id not in cache.docs - ] - return cache.add_docs(docs_to_add) + cache = document_service.active_caches[name] + docs_to_add = [ + await document_service.db.get_document(doc_id, auth) for doc_id in docs if doc_id not in cache.docs + ] + return cache.add_docs(docs_to_add) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @app.post("/cache/{name}/query") +@telemetry.track(operation_type="query_cache", metadata_resolver=telemetry.cache_query_metadata) async def query_cache( name: str, query: str, @@ -1490,25 +1327,16 @@ async def query_cache( if settings.MODE == "cloud" and auth.user_id: # Check limits before proceeding await check_and_increment_limits(auth, "cache_query", 1) - - async with telemetry.track_operation( - operation_type="query_cache", - user_id=auth.entity_id, - metadata={ - "name": name, - "query": query, - "max_tokens": max_tokens, - "temperature": temperature, - }, - ): - cache = document_service.active_caches[name] - print(f"Cache state: {cache.state.n_tokens}", file=sys.stderr) - return cache.query(query) # , max_tokens, temperature) + + cache = document_service.active_caches[name] + logger.info(f"Cache state: {cache.state.n_tokens}") + return cache.query(query) # , max_tokens, temperature) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @app.post("/graph/create", response_model=Graph) +@telemetry.track(operation_type="create_graph", metadata_resolver=telemetry.create_graph_metadata) async def create_graph( request: CreateGraphRequest, auth: AuthContext = Depends(verify_token), @@ -1536,38 +1364,27 @@ async def create_graph( # Validate prompt overrides before proceeding if request.prompt_overrides: validate_prompt_overrides_with_http_exception(request.prompt_overrides, operation_type="graph") - + # Check graph creation limits if in cloud mode if settings.MODE == "cloud" and auth.user_id: # Check limits before proceeding await check_and_increment_limits(auth, "graph", 1) - - async with telemetry.track_operation( - operation_type="create_graph", - user_id=auth.entity_id, - metadata={ - "name": request.name, - "filters": request.filters, - "documents": request.documents, - "folder_name": request.folder_name, - "end_user_id": request.end_user_id, - }, - ): - # Create system filters for folder and user scoping - system_filters = {} - if request.folder_name: - system_filters["folder_name"] = request.folder_name - if request.end_user_id: - system_filters["end_user_id"] = request.end_user_id - - return await document_service.create_graph( - name=request.name, - auth=auth, - filters=request.filters, - documents=request.documents, - prompt_overrides=request.prompt_overrides, - system_filters=system_filters, - ) + + # Create system filters for folder and user scoping + system_filters = {} + if request.folder_name: + system_filters["folder_name"] = request.folder_name + if request.end_user_id: + system_filters["end_user_id"] = request.end_user_id + + return await document_service.create_graph( + name=request.name, + auth=auth, + filters=request.filters, + documents=request.documents, + prompt_overrides=request.prompt_overrides, + system_filters=system_filters, + ) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) except ValueError as e: @@ -1581,11 +1398,11 @@ async def create_folder( ) -> Folder: """ Create a new folder. - + Args: folder_create: Folder creation request containing name and optional description auth: Authentication context - + Returns: Folder: Created folder """ @@ -1599,52 +1416,54 @@ async def create_folder( ): # Create a folder object with explicit ID import uuid + folder_id = str(uuid.uuid4()) logger.info(f"Creating folder with ID: {folder_id}, auth.user_id: {auth.user_id}") - + # Set up access control with user_id access_control = { "readers": [auth.entity_id], "writers": [auth.entity_id], "admins": [auth.entity_id], } - + if auth.user_id: access_control["user_id"] = [auth.user_id] logger.info(f"Adding user_id {auth.user_id} to folder access control") - + folder = Folder( id=folder_id, name=folder_create.name, description=folder_create.description, owner={ - "type": auth.entity_type.value, + "type": auth.entity_type.value, "id": auth.entity_id, }, access_control=access_control, ) - + # Store in database success = await document_service.db.create_folder(folder) - + if not success: raise HTTPException(status_code=500, detail="Failed to create folder") - + return folder except Exception as e: logger.error(f"Error creating folder: {e}") raise HTTPException(status_code=500, detail=str(e)) - + + @app.get("/folders", response_model=List[Folder]) async def list_folders( auth: AuthContext = Depends(verify_token), ) -> List[Folder]: """ List all folders the user has access to. - + Args: auth: Authentication context - + Returns: List[Folder]: List of folders """ @@ -1658,7 +1477,8 @@ async def list_folders( except Exception as e: logger.error(f"Error listing folders: {e}") raise HTTPException(status_code=500, detail=str(e)) - + + @app.get("/folders/{folder_id}", response_model=Folder) async def get_folder( folder_id: str, @@ -1666,11 +1486,11 @@ async def get_folder( ) -> Folder: """ Get a folder by ID. - + Args: folder_id: ID of the folder auth: Authentication context - + Returns: Folder: Folder if found and accessible """ @@ -1683,17 +1503,18 @@ async def get_folder( }, ): folder = await document_service.db.get_folder(folder_id, auth) - + if not folder: raise HTTPException(status_code=404, detail=f"Folder {folder_id} not found") - + return folder except HTTPException: raise except Exception as e: logger.error(f"Error getting folder: {e}") raise HTTPException(status_code=500, detail=str(e)) - + + @app.post("/folders/{folder_id}/documents/{document_id}") async def add_document_to_folder( folder_id: str, @@ -1702,12 +1523,12 @@ async def add_document_to_folder( ): """ Add a document to a folder. - + Args: folder_id: ID of the folder document_id: ID of the document auth: Authentication context - + Returns: Success status """ @@ -1721,15 +1542,16 @@ async def add_document_to_folder( }, ): success = await document_service.db.add_document_to_folder(folder_id, document_id, auth) - + if not success: raise HTTPException(status_code=500, detail="Failed to add document to folder") - + return {"status": "success"} except Exception as e: logger.error(f"Error adding document to folder: {e}") raise HTTPException(status_code=500, detail=str(e)) - + + @app.delete("/folders/{folder_id}/documents/{document_id}") async def remove_document_from_folder( folder_id: str, @@ -1738,12 +1560,12 @@ async def remove_document_from_folder( ): """ Remove a document from a folder. - + Args: folder_id: ID of the folder document_id: ID of the document auth: Authentication context - + Returns: Success status """ @@ -1757,10 +1579,10 @@ async def remove_document_from_folder( }, ): success = await document_service.db.remove_document_from_folder(folder_id, document_id, auth) - + if not success: raise HTTPException(status_code=500, detail="Failed to remove document from folder") - + return {"status": "success"} except Exception as e: logger.error(f"Error removing document from folder: {e}") @@ -1768,6 +1590,7 @@ async def remove_document_from_folder( @app.get("/graph/{name}", response_model=Graph) +@telemetry.track(operation_type="get_graph", metadata_resolver=telemetry.get_graph_metadata) async def get_graph( name: str, auth: AuthContext = Depends(verify_token), @@ -1789,26 +1612,17 @@ async def get_graph( Graph: The requested graph object """ try: - async with telemetry.track_operation( - operation_type="get_graph", - user_id=auth.entity_id, - metadata={ - "name": name, - "folder_name": folder_name, - "end_user_id": end_user_id - }, - ): - # Create system filters for folder and user scoping - system_filters = {} - if folder_name: - system_filters["folder_name"] = folder_name - if end_user_id: - system_filters["end_user_id"] = end_user_id - - graph = await document_service.db.get_graph(name, auth, system_filters) - if not graph: - raise HTTPException(status_code=404, detail=f"Graph '{name}' not found") - return graph + # Create system filters for folder and user scoping + system_filters = {} + if folder_name: + system_filters["folder_name"] = folder_name + if end_user_id: + system_filters["end_user_id"] = end_user_id + + graph = await document_service.db.get_graph(name, auth, system_filters) + if not graph: + raise HTTPException(status_code=404, detail=f"Graph '{name}' not found") + return graph except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) except Exception as e: @@ -1816,6 +1630,7 @@ async def get_graph( @app.get("/graphs", response_model=List[Graph]) +@telemetry.track(operation_type="list_graphs", metadata_resolver=telemetry.list_graphs_metadata) async def list_graphs( auth: AuthContext = Depends(verify_token), folder_name: Optional[str] = None, @@ -1835,22 +1650,14 @@ async def list_graphs( List[Graph]: List of graph objects """ try: - async with telemetry.track_operation( - operation_type="list_graphs", - user_id=auth.entity_id, - metadata={ - "folder_name": folder_name, - "end_user_id": end_user_id - }, - ): - # Create system filters for folder and user scoping - system_filters = {} - if folder_name: - system_filters["folder_name"] = folder_name - if end_user_id: - system_filters["end_user_id"] = end_user_id - - return await document_service.db.list_graphs(auth, system_filters) + # Create system filters for folder and user scoping + system_filters = {} + if folder_name: + system_filters["folder_name"] = folder_name + if end_user_id: + system_filters["end_user_id"] = end_user_id + + return await document_service.db.list_graphs(auth, system_filters) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) except Exception as e: @@ -1858,6 +1665,7 @@ async def list_graphs( @app.post("/graph/{name}/update", response_model=Graph) +@telemetry.track(operation_type="update_graph", metadata_resolver=telemetry.update_graph_metadata) async def update_graph( name: str, request: UpdateGraphRequest, @@ -1866,8 +1674,8 @@ async def update_graph( """ Update an existing graph with new documents. - This endpoint processes additional documents based on the original graph filters - and/or new filters/document IDs, extracts entities and relationships, and + This endpoint processes additional documents based on the original graph filters + and/or new filters/document IDs, extracts entities and relationships, and updates the graph with new information. Args: @@ -1887,33 +1695,22 @@ async def update_graph( # Validate prompt overrides before proceeding if request.prompt_overrides: validate_prompt_overrides_with_http_exception(request.prompt_overrides, operation_type="graph") - - async with telemetry.track_operation( - operation_type="update_graph", - user_id=auth.entity_id, - metadata={ - "name": name, - "additional_filters": request.additional_filters, - "additional_documents": request.additional_documents, - "folder_name": request.folder_name, - "end_user_id": request.end_user_id, - }, - ): - # Create system filters for folder and user scoping - system_filters = {} - if request.folder_name: - system_filters["folder_name"] = request.folder_name - if request.end_user_id: - system_filters["end_user_id"] = request.end_user_id - - return await document_service.update_graph( - name=name, - auth=auth, - additional_filters=request.additional_filters, - additional_documents=request.additional_documents, - prompt_overrides=request.prompt_overrides, - system_filters=system_filters, - ) + + # Create system filters for folder and user scoping + system_filters = {} + if request.folder_name: + system_filters["folder_name"] = request.folder_name + if request.end_user_id: + system_filters["end_user_id"] = request.end_user_id + + return await document_service.update_graph( + name=name, + auth=auth, + additional_filters=request.additional_filters, + additional_documents=request.additional_documents, + prompt_overrides=request.prompt_overrides, + system_filters=system_filters, + ) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) except ValueError as e: @@ -1947,9 +1744,7 @@ async def generate_local_uri( # Read config for host/port with open("morphik.toml", "rb") as f: config = tomli.load(f) - base_url = f"{config['api']['host']}:{config['api']['port']}".replace( - "localhost", "127.0.0.1" - ) + base_url = f"{config['api']['host']}:{config['api']['port']}".replace("localhost", "127.0.0.1") # Generate URI uri = f"morphik://{name}:{token}@{base_url}" @@ -1999,13 +1794,14 @@ async def generate_cloud_uri( if not (token_user_id == user_id or "admin" in payload.get("permissions", [])): raise HTTPException( status_code=403, - detail="You can only create apps for your own account unless you have admin permissions" + detail="You can only create apps for your own account unless you have admin permissions", ) except jwt.InvalidTokenError as e: raise HTTPException(status_code=401, detail=str(e)) # Import UserService here to avoid circular imports from core.services.user_service import UserService + user_service = UserService() # Initialize user service if needed @@ -2019,10 +1815,7 @@ async def generate_cloud_uri( if not uri: logger.debug("Application limit reached for this account tier with user_id: %s", user_id) - raise HTTPException( - status_code=403, - detail="Application limit reached for this account tier" - ) + raise HTTPException(status_code=403, detail="Application limit reached for this account tier") return {"uri": uri, "app_id": app_id} except HTTPException: @@ -2034,6 +1827,7 @@ async def generate_cloud_uri( @app.post("/folders/{folder_id}/set_rule") +@telemetry.track(operation_type="set_folder_rule", metadata_resolver=telemetry.set_folder_rule_metadata) async def set_folder_rule( folder_id: str, request: SetFolderRuleRequest, @@ -2042,215 +1836,209 @@ async def set_folder_rule( ): """ Set extraction rules for a folder. - + Args: folder_id: ID of the folder to set rules for request: SetFolderRuleRequest containing metadata extraction rules auth: Authentication context apply_to_existing: Whether to apply rules to existing documents in the folder - + Returns: Success status with processing results """ # Import text here to ensure it's available in this function's scope from sqlalchemy import text - try: - async with telemetry.track_operation( - operation_type="set_folder_rule", - user_id=auth.entity_id, - metadata={ - "folder_id": folder_id, - "rule_count": len(request.rules), - "apply_to_existing": apply_to_existing, - }, - ): - # Log detailed information about the rules - logger.debug(f"Setting rules for folder {folder_id}") - logger.debug(f"Number of rules: {len(request.rules)}") - - for i, rule in enumerate(request.rules): - logger.debug(f"\nRule {i + 1}:") - logger.debug(f"Type: {rule.type}") - logger.debug("Schema:") - for field_name, field_config in rule.schema.items(): - logger.debug(f" Field: {field_name}") - logger.debug(f" Type: {field_config.get('type', 'unknown')}") - logger.debug(f" Description: {field_config.get('description', 'No description')}") - if 'schema' in field_config: - logger.debug(f" Has JSON schema: Yes") - logger.debug(f" Schema: {field_config['schema']}") - - # Get the folder - folder = await document_service.db.get_folder(folder_id, auth) - if not folder: - raise HTTPException(status_code=404, detail=f"Folder {folder_id} not found") - - # Check if user has write access to the folder - if not document_service.db._check_folder_access(folder, auth, "write"): - raise HTTPException(status_code=403, detail="You don't have write access to this folder") - - # Update folder with rules - # Convert rules to dicts for JSON serialization - rules_dicts = [rule.model_dump() for rule in request.rules] - - # Update the folder in the database - async with document_service.db.async_session() as session: - # Execute update query - await session.execute( - text( - """ - UPDATE folders - SET rules = :rules - WHERE id = :folder_id - """ - ), - {"folder_id": folder_id, "rules": json.dumps(rules_dicts)}, - ) - await session.commit() - - logger.info(f"Successfully updated folder {folder_id} with {len(request.rules)} rules") - - # Get updated folder - updated_folder = await document_service.db.get_folder(folder_id, auth) - - # If apply_to_existing is True, apply these rules to all existing documents in the folder - processing_results = {"processed": 0, "errors": []} - - if apply_to_existing and folder.document_ids: - logger.info(f"Applying rules to {len(folder.document_ids)} existing documents in folder") - - # Import rules processor - from core.services.rules_processor import RulesProcessor - rules_processor = RulesProcessor() - - # Get all documents in the folder - documents = await document_service.db.get_documents_by_id(folder.document_ids, auth) - - # Process each document - for doc in documents: - try: - # Get document content - logger.info(f"Processing document {doc.external_id}") - - # For each document, apply the rules from the folder - doc_content = None - - # Get content from system_metadata if available - if doc.system_metadata and "content" in doc.system_metadata: - doc_content = doc.system_metadata["content"] - logger.info(f"Retrieved content from system_metadata for document {doc.external_id}") - - # If we still have no content, log error and continue - if not doc_content: - error_msg = f"No content found in system_metadata for document {doc.external_id}" - logger.error(error_msg) - processing_results["errors"].append({ - "document_id": doc.external_id, - "error": error_msg - }) - continue - - # Process document with rules - try: - # Convert request rules to actual rule models and apply them - from core.models.rules import MetadataExtractionRule - - for rule_request in request.rules: - if rule_request.type == "metadata_extraction": - # Create the actual rule model - rule = MetadataExtractionRule( - type=rule_request.type, - schema=rule_request.schema - ) - - # Apply the rule with retries - max_retries = 3 - base_delay = 1 # seconds - extracted_metadata = None - last_error = None - for retry_count in range(max_retries): - try: - if retry_count > 0: - # Exponential backoff - delay = base_delay * (2 ** (retry_count - 1)) - logger.info(f"Retry {retry_count}/{max_retries} after {delay}s delay") - await asyncio.sleep(delay) - - extracted_metadata, _ = await rule.apply(doc_content) - logger.info(f"Successfully extracted metadata on attempt {retry_count + 1}: {extracted_metadata}") - break # Success, exit retry loop - - except Exception as rule_apply_error: - last_error = rule_apply_error - logger.warning(f"Metadata extraction attempt {retry_count + 1} failed: {rule_apply_error}") - if retry_count == max_retries - 1: # Last attempt - logger.error(f"All {max_retries} metadata extraction attempts failed") - processing_results["errors"].append({ - "document_id": doc.external_id, - "error": f"Failed to extract metadata after {max_retries} attempts: {str(last_error)}" - }) - continue # Skip to next document - - # Update document metadata if extraction succeeded - if extracted_metadata: - # Merge new metadata with existing - doc.metadata.update(extracted_metadata) - - # Create an updates dict that only updates metadata - # We need to create system_metadata with all preserved fields - # Note: In the database, metadata is stored as 'doc_metadata', not 'metadata' - updates = { - "doc_metadata": doc.metadata, # Use doc_metadata for the database - "system_metadata": {} # Will be merged with existing in update_document - } - - # Explicitly preserve the content field in system_metadata - if "content" in doc.system_metadata: - updates["system_metadata"]["content"] = doc.system_metadata["content"] - - # Log the updates we're making - logger.info(f"Updating document {doc.external_id} with metadata: {extracted_metadata}") - logger.info(f"Full metadata being updated: {doc.metadata}") - logger.info(f"Update object being sent to database: {updates}") - logger.info(f"Preserving content in system_metadata: {'content' in doc.system_metadata}") - - # Update document in database - success = await document_service.db.update_document( - doc.external_id, - updates, - auth + try: + # Log detailed information about the rules + logger.debug(f"Setting rules for folder {folder_id}") + logger.debug(f"Number of rules: {len(request.rules)}") + + for i, rule in enumerate(request.rules): + logger.debug(f"\nRule {i + 1}:") + logger.debug(f"Type: {rule.type}") + logger.debug("Schema:") + for field_name, field_config in rule.schema.items(): + logger.debug(f" Field: {field_name}") + logger.debug(f" Type: {field_config.get('type', 'unknown')}") + logger.debug(f" Description: {field_config.get('description', 'No description')}") + if "schema" in field_config: + logger.debug(f" Has JSON schema: Yes") + logger.debug(f" Schema: {field_config['schema']}") + + # Get the folder + folder = await document_service.db.get_folder(folder_id, auth) + if not folder: + raise HTTPException(status_code=404, detail=f"Folder {folder_id} not found") + + # Check if user has write access to the folder + if not document_service.db._check_folder_access(folder, auth, "write"): + raise HTTPException(status_code=403, detail="You don't have write access to this folder") + + # Update folder with rules + # Convert rules to dicts for JSON serialization + rules_dicts = [rule.model_dump() for rule in request.rules] + + # Update the folder in the database + async with document_service.db.async_session() as session: + # Execute update query + await session.execute( + text( + """ + UPDATE folders + SET rules = :rules + WHERE id = :folder_id + """ + ), + {"folder_id": folder_id, "rules": json.dumps(rules_dicts)}, + ) + await session.commit() + + logger.info(f"Successfully updated folder {folder_id} with {len(request.rules)} rules") + + # Get updated folder + updated_folder = await document_service.db.get_folder(folder_id, auth) + + # If apply_to_existing is True, apply these rules to all existing documents in the folder + processing_results = {"processed": 0, "errors": []} + + if apply_to_existing and folder.document_ids: + logger.info(f"Applying rules to {len(folder.document_ids)} existing documents in folder") + + # Import rules processor + from core.services.rules_processor import RulesProcessor + + rules_processor = RulesProcessor() + + # Get all documents in the folder + documents = await document_service.db.get_documents_by_id(folder.document_ids, auth) + + # Process each document + for doc in documents: + try: + # Get document content + logger.info(f"Processing document {doc.external_id}") + + # For each document, apply the rules from the folder + doc_content = None + + # Get content from system_metadata if available + if doc.system_metadata and "content" in doc.system_metadata: + doc_content = doc.system_metadata["content"] + logger.info(f"Retrieved content from system_metadata for document {doc.external_id}") + + # If we still have no content, log error and continue + if not doc_content: + error_msg = f"No content found in system_metadata for document {doc.external_id}" + logger.error(error_msg) + processing_results["errors"].append({"document_id": doc.external_id, "error": error_msg}) + continue + + # Process document with rules + try: + # Convert request rules to actual rule models and apply them + from core.models.rules import MetadataExtractionRule + + for rule_request in request.rules: + if rule_request.type == "metadata_extraction": + # Create the actual rule model + rule = MetadataExtractionRule(type=rule_request.type, schema=rule_request.schema) + + # Apply the rule with retries + max_retries = 3 + base_delay = 1 # seconds + extracted_metadata = None + last_error = None + + for retry_count in range(max_retries): + try: + if retry_count > 0: + # Exponential backoff + delay = base_delay * (2 ** (retry_count - 1)) + logger.info(f"Retry {retry_count}/{max_retries} after {delay}s delay") + await asyncio.sleep(delay) + + extracted_metadata, _ = await rule.apply(doc_content) + logger.info( + f"Successfully extracted metadata on attempt {retry_count + 1}: {extracted_metadata}" ) - - if success: - logger.info(f"Updated metadata for document {doc.external_id}") - processing_results["processed"] += 1 - else: - logger.error(f"Failed to update metadata for document {doc.external_id}") - processing_results["errors"].append({ + break # Success, exit retry loop + + except Exception as rule_apply_error: + last_error = rule_apply_error + logger.warning( + f"Metadata extraction attempt {retry_count + 1} failed: {rule_apply_error}" + ) + if retry_count == max_retries - 1: # Last attempt + logger.error(f"All {max_retries} metadata extraction attempts failed") + processing_results["errors"].append( + { + "document_id": doc.external_id, + "error": f"Failed to extract metadata after {max_retries} attempts: {str(last_error)}", + } + ) + continue # Skip to next document + + # Update document metadata if extraction succeeded + if extracted_metadata: + # Merge new metadata with existing + doc.metadata.update(extracted_metadata) + + # Create an updates dict that only updates metadata + # We need to create system_metadata with all preserved fields + # Note: In the database, metadata is stored as 'doc_metadata', not 'metadata' + updates = { + "doc_metadata": doc.metadata, # Use doc_metadata for the database + "system_metadata": {}, # Will be merged with existing in update_document + } + + # Explicitly preserve the content field in system_metadata + if "content" in doc.system_metadata: + updates["system_metadata"]["content"] = doc.system_metadata["content"] + + # Log the updates we're making + logger.info( + f"Updating document {doc.external_id} with metadata: {extracted_metadata}" + ) + logger.info(f"Full metadata being updated: {doc.metadata}") + logger.info(f"Update object being sent to database: {updates}") + logger.info( + f"Preserving content in system_metadata: {'content' in doc.system_metadata}" + ) + + # Update document in database + success = await document_service.db.update_document(doc.external_id, updates, auth) + + if success: + logger.info(f"Updated metadata for document {doc.external_id}") + processing_results["processed"] += 1 + else: + logger.error(f"Failed to update metadata for document {doc.external_id}") + processing_results["errors"].append( + { "document_id": doc.external_id, - "error": "Failed to update document metadata" - }) - except Exception as rule_error: - logger.error(f"Error processing rules for document {doc.external_id}: {rule_error}") - processing_results["errors"].append({ + "error": "Failed to update document metadata", + } + ) + except Exception as rule_error: + logger.error(f"Error processing rules for document {doc.external_id}: {rule_error}") + processing_results["errors"].append( + { "document_id": doc.external_id, - "error": f"Error processing rules: {str(rule_error)}" - }) - - except Exception as doc_error: - logger.error(f"Error processing document {doc.external_id}: {doc_error}") - processing_results["errors"].append({ - "document_id": doc.external_id, - "error": str(doc_error) - }) - + "error": f"Error processing rules: {str(rule_error)}", + } + ) + + except Exception as doc_error: + logger.error(f"Error processing document {doc.external_id}: {doc_error}") + processing_results["errors"].append({"document_id": doc.external_id, "error": str(doc_error)}) + return { "status": "success", "message": "Rules set successfully", "folder_id": folder_id, "rules": updated_folder.rules, - "processing_results": processing_results + "processing_results": processing_results, } except HTTPException as e: # Re-raise HTTP exceptions diff --git a/core/services/telemetry.py b/core/services/telemetry.py index 270771b..04f13e5 100644 --- a/core/services/telemetry.py +++ b/core/services/telemetry.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Dict, List, Optional, Any +from typing import Dict, List, Optional, Any, Callable, TypeVar, Union, cast from dataclasses import dataclass import threading from collections import defaultdict @@ -11,6 +11,7 @@ from pathlib import Path import uuid import hashlib import logging +import functools from core.config import get_settings @@ -337,6 +338,101 @@ class UsageRecord: metadata: Optional[Dict] = None +# Type variable for function return type +T = TypeVar('T') + +class MetadataField: + """Defines a metadata field to extract and how to extract it.""" + + def __init__(self, key: str, source: str, attr_name: Optional[str] = None, + default: Any = None, transform: Optional[Callable[[Any], Any]] = None): + """ + Initialize a metadata field definition. + + Args: + key: The key to use in the metadata dictionary + source: The source of the data ('request', 'kwargs', etc.) + attr_name: The attribute name to extract (if None, uses key) + default: Default value if not found + transform: Optional function to transform the extracted value + """ + self.key = key + self.source = source + self.attr_name = attr_name or key + self.default = default + self.transform = transform + + def extract(self, args: tuple, kwargs: dict) -> Any: + """Extract the field value from args/kwargs based on configuration.""" + value = self.default + + if self.source == 'kwargs': + value = kwargs.get(self.attr_name, self.default) + elif self.source == 'request': + request = kwargs.get('request') + if request: + if hasattr(request, 'get') and callable(request.get): + value = request.get(self.attr_name, self.default) + else: + value = getattr(request, self.attr_name, self.default) + + if self.transform and value is not None: + value = self.transform(value) + + return value + + +class MetadataExtractor: + """Base class for metadata extractors with common functionality.""" + + def __init__(self, fields: List[MetadataField] = None): + """Initialize with a list of field definitions.""" + self.fields = fields or [] + + def extract(self, args: tuple, kwargs: dict) -> dict: + """Extract metadata using the field definitions.""" + metadata = {} + + for field in self.fields: + value = field.extract(args, kwargs) + if value is not None: # Only include non-None values + metadata[field.key] = value + + return metadata + + def __call__(self, *args, **kwargs) -> dict: + """Make the extractor callable as an instance method.""" + # If called as an instance method, the first arg will be the instance + # which we don't need for extraction, so we slice it off if there are any args + actual_args = args[1:] if len(args) > 0 else () + return self.extract(actual_args, kwargs) + + +# Common transforms and utilities for metadata extraction +def parse_json(value, default=None): + """Parse a JSON string safely, returning default on error.""" + if not isinstance(value, str): + return default + try: + return json.loads(value) + except (json.JSONDecodeError, TypeError): + return default + +def get_json_type(value): + """Determine if a JSON value is a list or single object.""" + return "list" if isinstance(value, list) else "single" + +def get_list_len(value, default=0): + """Get the length of a list safely.""" + if value and isinstance(value, list): + return len(value) + return default + +def is_not_none(value): + """Check if a value is not None.""" + return value is not None + + class TelemetryService: _instance = None _lock = threading.Lock() @@ -449,6 +545,234 @@ class TelemetryService: unit="ms", ) + # Initialize metadata extractors + self._setup_metadata_extractors() + + def _setup_metadata_extractors(self): + """Set up all the metadata extractors with their field definitions.""" + # Common fields that appear in many requests + common_request_fields = [ + MetadataField("use_colpali", "request"), + MetadataField("folder_name", "request"), + MetadataField("end_user_id", "request"), + ] + + retrieval_fields = common_request_fields + [ + MetadataField("k", "request"), + MetadataField("min_score", "request"), + MetadataField("use_reranking", "request"), + ] + + # Set up all the metadata extractors + self.ingest_text_metadata = MetadataExtractor( + common_request_fields + [ + MetadataField("metadata", "request", default={}), + MetadataField("rules", "request", default=[]), + ] + ) + + self.ingest_file_metadata = MetadataExtractor([ + MetadataField("filename", "kwargs", transform=lambda file: file.filename if file else None), + MetadataField("content_type", "kwargs", transform=lambda file: file.content_type if file else None), + MetadataField("metadata", "kwargs", transform=lambda v: parse_json(v, {})), + MetadataField("rules", "kwargs", transform=lambda v: parse_json(v, [])), + MetadataField("use_colpali", "kwargs"), + MetadataField("folder_name", "kwargs"), + MetadataField("end_user_id", "kwargs"), + ]) + + self.batch_ingest_metadata = MetadataExtractor([ + MetadataField("file_count", "kwargs", "files", transform=get_list_len), + MetadataField("metadata_type", "kwargs", "metadata", + transform=lambda v: get_json_type(parse_json(v, {}))), + MetadataField("rules_type", "kwargs", "rules", + transform=lambda v: "per_file" if isinstance(parse_json(v, []), list) + and parse_json(v, []) and isinstance(parse_json(v, [])[0], list) + else "shared"), + MetadataField("folder_name", "kwargs"), + MetadataField("end_user_id", "kwargs"), + ]) + + self.retrieve_chunks_metadata = MetadataExtractor(retrieval_fields) + self.retrieve_docs_metadata = MetadataExtractor(retrieval_fields) + + self.batch_documents_metadata = MetadataExtractor([ + MetadataField("document_count", "request", transform=lambda req: + len(req.get("document_ids", [])) if req else 0), + MetadataField("folder_name", "request"), + MetadataField("end_user_id", "request"), + ]) + + self.batch_chunks_metadata = MetadataExtractor([ + MetadataField("chunk_count", "request", transform=lambda req: + len(req.get("sources", [])) if req else 0), + MetadataField("folder_name", "request"), + MetadataField("end_user_id", "request"), + MetadataField("use_colpali", "request"), + ]) + + self.query_metadata = MetadataExtractor( + retrieval_fields + [ + MetadataField("max_tokens", "request"), + MetadataField("temperature", "request"), + MetadataField("graph_name", "request"), + MetadataField("hop_depth", "request"), + MetadataField("include_paths", "request"), + MetadataField("has_prompt_overrides", "request", "prompt_overrides", + transform=lambda v: v is not None), + ] + ) + + self.document_delete_metadata = MetadataExtractor([ + MetadataField("document_id", "kwargs"), + ]) + + self.document_update_text_metadata = MetadataExtractor([ + MetadataField("document_id", "kwargs"), + MetadataField("update_strategy", "kwargs", default="add"), + MetadataField("use_colpali", "request"), + MetadataField("has_filename", "request", "filename", transform=is_not_none), + ]) + + self.document_update_file_metadata = MetadataExtractor([ + MetadataField("document_id", "kwargs"), + MetadataField("update_strategy", "kwargs", default="add"), + MetadataField("use_colpali", "kwargs"), + MetadataField("filename", "kwargs", transform=lambda file: file.filename if file else None), + MetadataField("content_type", "kwargs", transform=lambda file: file.content_type if file else None), + ]) + + self.document_update_metadata_resolver = MetadataExtractor([ + MetadataField("document_id", "kwargs"), + ]) + + self.usage_stats_metadata = MetadataExtractor([]) + + self.recent_usage_metadata = MetadataExtractor([ + MetadataField("operation_type", "kwargs"), + MetadataField("since", "kwargs", transform=lambda dt: dt.isoformat() if dt else None), + MetadataField("status", "kwargs"), + ]) + + self.cache_create_metadata = MetadataExtractor([ + MetadataField("name", "kwargs"), + MetadataField("model", "kwargs"), + MetadataField("gguf_file", "kwargs"), + MetadataField("filters", "kwargs"), + MetadataField("docs", "kwargs"), + ]) + + self.cache_get_metadata = MetadataExtractor([ + MetadataField("name", "kwargs"), + ]) + + self.cache_update_metadata = self.cache_get_metadata + + self.cache_add_docs_metadata = MetadataExtractor([ + MetadataField("name", "kwargs"), + MetadataField("docs", "kwargs"), + ]) + + self.cache_query_metadata = MetadataExtractor([ + MetadataField("name", "kwargs"), + MetadataField("query", "kwargs"), + MetadataField("max_tokens", "kwargs"), + MetadataField("temperature", "kwargs"), + ]) + + self.create_graph_metadata = MetadataExtractor([ + MetadataField("name", "request"), + MetadataField("has_filters", "request", "filters", transform=is_not_none), + MetadataField("document_count", "request", "documents", + transform=lambda docs: len(docs) if docs else 0), + MetadataField("has_prompt_overrides", "request", "prompt_overrides", transform=is_not_none), + MetadataField("folder_name", "request"), + MetadataField("end_user_id", "request"), + ]) + + self.get_graph_metadata = MetadataExtractor([ + MetadataField("name", "kwargs"), + MetadataField("folder_name", "kwargs"), + MetadataField("end_user_id", "kwargs"), + ]) + + self.list_graphs_metadata = MetadataExtractor([ + MetadataField("folder_name", "kwargs"), + MetadataField("end_user_id", "kwargs"), + ]) + + self.update_graph_metadata = MetadataExtractor([ + MetadataField("name", "kwargs"), + MetadataField("has_additional_filters", "request", "additional_filters", transform=is_not_none), + MetadataField("additional_document_count", "request", "additional_documents", + transform=lambda docs: len(docs) if docs else 0), + MetadataField("has_prompt_overrides", "request", "prompt_overrides", transform=is_not_none), + MetadataField("folder_name", "request"), + MetadataField("end_user_id", "request"), + ]) + + self.set_folder_rule_metadata = MetadataExtractor([ + MetadataField("folder_id", "kwargs"), + MetadataField("apply_to_existing", "kwargs", default=True), + MetadataField("rule_count", "request", "rules", + transform=lambda rules: len(rules) if hasattr(rules, "__len__") else 0), + MetadataField("rule_types", "request", "rules", + transform=lambda rules: [rule.type for rule in rules] if hasattr(rules, "__iter__") else []), + ]) + + def track(self, operation_type: Optional[str] = None, metadata_resolver: Optional[Callable] = None): + """ + Decorator for tracking API operations with telemetry. + + Args: + operation_type: Type of operation or function name if None + metadata_resolver: Function that extracts metadata from the request/args/kwargs + """ + def decorator(func: Callable[..., T]) -> Callable[..., T]: + @functools.wraps(func) + async def wrapper(*args, **kwargs): + # Extract auth from kwargs + auth = kwargs.get('auth') + if not auth: + # Try to find auth in positional arguments (unlikely, but possible) + for arg in args: + if hasattr(arg, 'entity_id') and hasattr(arg, 'permissions'): + auth = arg + break + + # If we don't have auth, we can't track the operation + if not auth: + return await func(*args, **kwargs) + + # Use function name if operation_type not provided + op_type = operation_type or func.__name__ + + # Generate metadata using resolver or create empty dict + meta = {} + if metadata_resolver: + meta = metadata_resolver(*args, **kwargs) + + # Get approximate token count for text ingestion + tokens = 0 + # Try to extract tokens for text ingestion + request = kwargs.get('request') + if request and hasattr(request, 'content') and isinstance(request.content, str): + tokens = len(request.content.split()) # Approximate token count + + # Run the function within the telemetry context + async with self.track_operation( + operation_type=op_type, + user_id=auth.entity_id, + tokens_used=tokens, + metadata=meta, + ) as span: + # Call the original function + result = await func(*args, **kwargs) + return result + + return wrapper + return decorator + @asynccontextmanager async def track_operation( self,