diff --git a/core/api.py b/core/api.py index 80db7c1..89b9120 100644 --- a/core/api.py +++ b/core/api.py @@ -434,6 +434,7 @@ async def ingest_file( "writers": [auth.entity_id], "admins": [auth.entity_id], "user_id": [auth.user_id] if auth.user_id else [], + "app_access": ([auth.app_id] if auth.app_id else []), }, system_metadata={"status": "processing"}, ) @@ -443,6 +444,8 @@ async def ingest_file( doc.system_metadata["folder_name"] = folder_name if end_user_id: doc.system_metadata["end_user_id"] = end_user_id + if auth.app_id: + doc.system_metadata["app_id"] = auth.app_id # Set processing status doc.system_metadata["status"] = "processing" @@ -639,6 +642,7 @@ async def batch_ingest_files( "writers": [auth.entity_id], "admins": [auth.entity_id], "user_id": [auth.user_id] if auth.user_id else [], + "app_access": ([auth.app_id] if auth.app_id else []), }, ) @@ -647,6 +651,8 @@ async def batch_ingest_files( doc.system_metadata["folder_name"] = folder_name if end_user_id: doc.system_metadata["end_user_id"] = end_user_id + if auth.app_id: + doc.system_metadata["app_id"] = auth.app_id # Set processing status doc.system_metadata["status"] = "processing" @@ -813,6 +819,15 @@ async def batch_get_documents(request: Dict[str, Any], auth: AuthContext = Depen if not document_ids: return [] + # Create system filters for folder and user scoping + system_filters = {} + if folder_name: + system_filters["folder_name"] = folder_name + if end_user_id: + system_filters["end_user_id"] = end_user_id + if auth.app_id: + system_filters["app_id"] = auth.app_id + return await document_service.batch_retrieve_documents(document_ids, auth, folder_name, end_user_id) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @@ -852,6 +867,15 @@ async def batch_get_chunks(request: Dict[str, Any], auth: AuthContext = Depends( else: chunk_sources.append(source) + # Create system filters for folder and user scoping + system_filters = {} + if folder_name: + system_filters["folder_name"] = folder_name + if end_user_id: + system_filters["end_user_id"] = end_user_id + if auth.app_id: + system_filters["app_id"] = auth.app_id + return await document_service.batch_retrieve_chunks(chunk_sources, auth, folder_name, end_user_id, use_colpali) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) @@ -966,6 +990,8 @@ async def list_documents( system_filters["folder_name"] = folder_name if end_user_id: system_filters["end_user_id"] = end_user_id + if auth.app_id: + system_filters["app_id"] = auth.app_id return await document_service.db.get_documents(auth, skip, limit, filters, system_filters) diff --git a/core/database/postgres_database.py b/core/database/postgres_database.py index 5d82288..5c66e86 100644 --- a/core/database/postgres_database.py +++ b/core/database/postgres_database.py @@ -8,7 +8,7 @@ from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import declarative_base, sessionmaker -from ..models.auth import AuthContext +from ..models.auth import AuthContext, EntityType from ..models.documents import Document, StorageFileInfo from ..models.folders import Folder from ..models.graph import Graph @@ -200,7 +200,7 @@ class PostgresDatabase(BaseDatabase): ) logger.info("Added storage_files column to documents table") - # Create indexes for folder_name and end_user_id in system_metadata for documents + # Create indexes for folder_name, end_user_id and app_id in system_metadata for documents await conn.execute( text( """ @@ -210,6 +210,24 @@ class PostgresDatabase(BaseDatabase): ) ) + await conn.execute( + text( + """ + CREATE INDEX IF NOT EXISTS idx_system_metadata_app_id + ON documents ((system_metadata->>'app_id')); + """ + ) + ) + + await conn.execute( + text( + """ + CREATE INDEX IF NOT EXISTS idx_system_metadata_end_user_id + ON documents ((system_metadata->>'end_user_id')); + """ + ) + ) + # Create folders table if it doesn't exist await conn.execute( text( @@ -256,15 +274,6 @@ class PostgresDatabase(BaseDatabase): text("CREATE INDEX IF NOT EXISTS idx_folder_access_control ON folders USING gin (access_control);") ) - await conn.execute( - text( - """ - CREATE INDEX IF NOT EXISTS idx_system_metadata_end_user_id - ON documents ((system_metadata->>'end_user_id')); - """ - ) - ) - # Check if system_metadata column exists in graphs table result = await conn.execute( text( @@ -754,26 +763,38 @@ class PostgresDatabase(BaseDatabase): return False def _build_access_filter(self, auth: AuthContext) -> str: - """Build PostgreSQL filter for access control.""" - filters = [ + """Build PostgreSQL filter for access control. + + For developer-scoped tokens (i.e. those that include an ``app_id``) we *must* ensure + that the caller only ever sees documents that belong to that application. Simply + checking the developer entity ID is **insufficient**, because multiple apps created + by the same developer share the same entity ID. Therefore, when an ``app_id`` is + present, we additionally scope the filter by the ``app_id`` that is stored either + in ``system_metadata.app_id`` or in the ``access_control->app_access`` list. + """ + + # Base clauses that will always be AND-ed with any additional application scoping. + base_clauses = [ f"owner->>'id' = '{auth.entity_id}'", f"access_control->'readers' ? '{auth.entity_id}'", f"access_control->'writers' ? '{auth.entity_id}'", f"access_control->'admins' ? '{auth.entity_id}'", ] - if auth.entity_type == "DEVELOPER" and auth.app_id: - # Add app-specific access for developers - filters.append(f"access_control->'app_access' ? '{auth.app_id}'") + # Developer token with app_id → restrict strictly by that app_id. + if auth.entity_type == EntityType.DEVELOPER and auth.app_id: + filters = [f"system_metadata->>'app_id' = '{auth.app_id}'"] + else: + filters = base_clauses.copy() - # Add user_id filter in cloud mode + # In cloud mode further restrict by user_id when available (used for multi-tenant + # end-user isolation). if auth.user_id: from core.config import get_settings settings = get_settings() if settings.MODE == "cloud": - # Filter by user_id in access_control filters.append(f"access_control->>'user_id' = '{auth.user_id}'") return " OR ".join(filters) diff --git a/core/services/document_service.py b/core/services/document_service.py index 97bac92..60df32d 100644 --- a/core/services/document_service.py +++ b/core/services/document_service.py @@ -159,17 +159,19 @@ class DocumentService: should_rerank = use_reranking if use_reranking is not None else settings.USE_RERANKING using_colpali = use_colpali if use_colpali is not None else False - # Launch embedding queries concurrently - embedding_tasks = [self.embedding_model.embed_for_query(query)] - if using_colpali and self.colpali_embedding_model: - embedding_tasks.append(self.colpali_embedding_model.embed_for_query(query)) - # Build system filters for folder_name and end_user_id system_filters = {} if folder_name: system_filters["folder_name"] = folder_name if end_user_id: system_filters["end_user_id"] = end_user_id + if auth.app_id: + system_filters["app_id"] = auth.app_id + + # Launch embedding queries concurrently + embedding_tasks = [self.embedding_model.embed_for_query(query)] + if using_colpali and self.colpali_embedding_model: + embedding_tasks.append(self.colpali_embedding_model.embed_for_query(query)) # Run embeddings and document authorization in parallel results = await asyncio.gather( @@ -357,6 +359,8 @@ class DocumentService: system_filters["folder_name"] = folder_name if end_user_id: system_filters["end_user_id"] = end_user_id + if auth.app_id: + system_filters["app_id"] = auth.app_id # Use the database's batch retrieval method documents = await self.db.get_documents_by_id(document_ids, auth, system_filters) @@ -582,9 +586,7 @@ class DocumentService: "readers": [auth.entity_id], "writers": [auth.entity_id], "admins": [auth.entity_id], - "user_id": ( - [auth.user_id] if auth.user_id else [] - ), # Add user_id to access control for filtering (as a list) + "user_id": [auth.user_id] if auth.user_id else [], # user scoping }, ) @@ -597,6 +599,11 @@ class DocumentService: if end_user_id: doc.system_metadata["end_user_id"] = end_user_id + + # Tag document with app_id for segmentation + if auth.app_id: + doc.system_metadata["app_id"] = auth.app_id + logger.debug(f"Created text document record with ID {doc.external_id}") if settings.MODE == "cloud" and auth.user_id: @@ -711,10 +718,47 @@ class DocumentService: mime_type = file_type.mime if file_type is not None else "text/plain" logger.info(f"Creating chunks for multivector embedding for file type {mime_type}") - # If file_type is None, treat it as a text file + # If file_type is None, attempt a light-weight heuristic to detect images + # Some JPGs with uncommon EXIF markers fail `filetype.guess`, leading to + # false "text" classification and, eventually, empty chunk lists. Try to + # open the bytes with Pillow; if that succeeds, treat it as an image. if file_type is None: - logger.info("File type is None, treating as text") - return [Chunk(content=chunk.content, metadata=(chunk.metadata | {"is_image": False})) for chunk in chunks] + try: + from PIL import Image as PILImage + + PILImage.open(BytesIO(file_content)).verify() + logger.info("Heuristic image detection succeeded (Pillow). Treating as image.") + return [Chunk(content=file_content_base64, metadata={"is_image": True})] + except Exception: + logger.info("File type is None and not an image – treating as text") + return [ + Chunk(content=chunk.content, metadata=(chunk.metadata | {"is_image": False})) for chunk in chunks + ] + + # Treat any direct image MIME (e.g. "image/jpeg") as an image regardless of + # the more specialised pattern matching below. This is more robust for files + # where `filetype.guess` fails but we still know from the upload metadata that + # it is an image. + if mime_type.startswith("image/"): + try: + from PIL import Image as PILImage + + img = PILImage.open(BytesIO(file_content)) + # Resize the image to a max width of 512 while preserving aspect ratio to + # keep the base64 payload smaller (helps avoid context window errors). + max_width = 512 + if img.width > max_width: + ratio = max_width / float(img.width) + new_height = int(float(img.height) * ratio) + img = img.resize((max_width, new_height)) + + buffered = BytesIO() + img.save(buffered, format="PNG", optimize=True) + img_b64 = "data:image/png;base64," + base64.b64encode(buffered.getvalue()).decode() + return [Chunk(content=img_b64, metadata={"is_image": True})] + except Exception as e: + logger.error(f"Error resizing image for base64 encoding: {e}. Falling back to original size.") + return [Chunk(content=file_content_base64, metadata={"is_image": True})] match mime_type: case file_type if file_type in IMAGE: @@ -1475,64 +1519,6 @@ class DocumentService: logger.warning(f"Unknown update strategy '{update_strategy}', defaulting to 'add'") return current_content + "\n\n" + update_content - def _update_metadata_and_version( - self, - doc: Document, - metadata: Optional[Dict[str, Any]], - update_strategy: str, - file: Optional[UploadFile], - ): - """Update document metadata and version tracking.""" - # Update metadata if provided - additive but replacing existing keys - if metadata: - doc.metadata.update(metadata) - - # Ensure external_id is preserved in metadata - doc.metadata["external_id"] = doc.external_id - - # Increment version - current_version = doc.system_metadata.get("version", 1) - doc.system_metadata["version"] = current_version + 1 - doc.system_metadata["updated_at"] = datetime.now(UTC) - - # Track update history - if "update_history" not in doc.system_metadata: - doc.system_metadata["update_history"] = [] - - update_entry = { - "timestamp": datetime.now(UTC).isoformat(), - "version": current_version + 1, - "strategy": update_strategy, - } - - if file: - update_entry["filename"] = file.filename - - if metadata: - update_entry["metadata_updated"] = True - - doc.system_metadata["update_history"].append(update_entry) - - # Ensure storage_files models are properly typed as StorageFileInfo objects - if hasattr(doc, "storage_files") and doc.storage_files: - # Convert to StorageFileInfo objects if they're dicts or ensure they're properly serializable - doc.storage_files = [ - ( - StorageFileInfo(**file) - if isinstance(file, dict) - else ( - file - if isinstance(file, StorageFileInfo) - else ( - StorageFileInfo(**file.model_dump()) - if hasattr(file, "model_dump") - else StorageFileInfo(**file.dict()) if hasattr(file, "dict") else file - ) - ) - ) - for file in doc.storage_files - ] - async def _update_document_metadata_only(self, doc: Document, auth: AuthContext) -> Optional[Document]: """Update document metadata without reprocessing chunks.""" updates = { @@ -1809,3 +1795,38 @@ class DocumentService: """Close all resources.""" # Close any active caches self.active_caches.clear() + + def _update_metadata_and_version( + self, + doc: Document, + metadata: Optional[Dict[str, Any]], + update_strategy: str, + file: Optional[UploadFile], + ): + """Update document metadata and version tracking.""" + + # Merge/replace metadata + if metadata: + doc.metadata.update(metadata) + + # Ensure external_id is preserved + doc.metadata["external_id"] = doc.external_id + + # Increment version counter + current_version = doc.system_metadata.get("version", 1) + doc.system_metadata["version"] = current_version + 1 + doc.system_metadata["updated_at"] = datetime.now(UTC) + + # Maintain simple history list + history = doc.system_metadata.setdefault("update_history", []) + entry = { + "timestamp": datetime.now(UTC).isoformat(), + "version": current_version + 1, + "strategy": update_strategy, + } + if file: + entry["filename"] = file.filename + if metadata: + entry["metadata_updated"] = True + + history.append(entry) diff --git a/core/workers/ingestion_worker.py b/core/workers/ingestion_worker.py index 8d833ea..166ede7 100644 --- a/core/workers/ingestion_worker.py +++ b/core/workers/ingestion_worker.py @@ -245,20 +245,37 @@ async def process_ingestion_job( chunking_start = time.time() parsed_chunks = await document_service.parser.split_text(text) if not parsed_chunks: - raise ValueError("No content chunks extracted after rules processing") - logger.debug(f"Split processed text into {len(parsed_chunks)} chunks") + # No text was extracted from the file. In many cases (e.g. pure images) + # we can still proceed if ColPali multivector chunks are produced later. + # Therefore we defer the fatal check until after ColPali chunk creation. + logger.warning( + "No text chunks extracted after parsing. Will attempt to continue " + "and rely on image-based chunks if available." + ) chunking_time = time.time() - chunking_start phase_times["split_into_chunks"] = chunking_time logger.info(f"Text chunking took {chunking_time:.2f}s to create {len(parsed_chunks)} chunks") - # 8. Handle ColPali embeddings if enabled - IMPORTANT: Do this BEFORE applying chunk rules - # so that image chunks can be processed by rules when use_images=True - colpali_processing_start = time.time() + # Decide whether we need image chunks either for ColPali embedding or because + # there are image-based rules (use_images=True) that must process them. + has_image_rules = any( + r.get("stage", "post_parsing") == "post_chunking" + and r.get("type") == "metadata_extraction" + and r.get("use_images", False) + for r in rules_list or [] + ) + using_colpali = ( use_colpali and document_service.colpali_embedding_model and document_service.colpali_vector_store ) + + should_create_image_chunks = has_image_rules or using_colpali + + # Start timer for optional image chunk creation / multivector processing + colpali_processing_start = time.time() + chunks_multivector = [] - if using_colpali: + if should_create_image_chunks: import base64 import filetype @@ -266,16 +283,23 @@ async def process_ingestion_job( file_type = filetype.guess(file_content) file_content_base64 = base64.b64encode(file_content).decode() - # Use the parsed chunks for Colpali - this will create image chunks if appropriate + # Use the parsed chunks for ColPali/image rules – this will create image chunks if appropriate chunks_multivector = document_service._create_chunks_multivector( file_type, file_content_base64, file_content, parsed_chunks ) - logger.debug(f"Created {len(chunks_multivector)} chunks for multivector embedding") + logger.debug( + f"Created {len(chunks_multivector)} multivector/image chunks " + f"(has_image_rules={has_image_rules}, using_colpali={using_colpali})" + ) colpali_create_chunks_time = time.time() - colpali_processing_start phase_times["colpali_create_chunks"] = colpali_create_chunks_time if using_colpali: logger.info(f"Colpali chunk creation took {colpali_create_chunks_time:.2f}s") + # If we still have no chunks at all (neither text nor image) abort early + if not parsed_chunks and not chunks_multivector: + raise ValueError("No content chunks (text or image) could be extracted from the document") + # 9. Apply post_chunking rules and aggregate metadata processed_chunks = [] processed_chunks_multivector = []