diff --git a/core/api.py b/core/api.py index 5fea590..270145f 100644 --- a/core/api.py +++ b/core/api.py @@ -880,6 +880,7 @@ async def batch_get_chunks( sources = request.get("sources", []) folder_name = request.get("folder_name") end_user_id = request.get("end_user_id") + use_colpali = request.get("use_colpali") if not sources: return [] @@ -891,6 +892,7 @@ async def batch_get_chunks( "chunk_count": len(sources), "folder_name": folder_name, "end_user_id": end_user_id, + "use_colpali": use_colpali, }, ): # Convert sources to ChunkSource objects if needed @@ -901,7 +903,7 @@ async def batch_get_chunks( else: chunk_sources.append(source) - return await document_service.batch_retrieve_chunks(chunk_sources, auth, folder_name, end_user_id) + return await document_service.batch_retrieve_chunks(chunk_sources, auth, folder_name, end_user_id, use_colpali) except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) diff --git a/core/services/document_service.py b/core/services/document_service.py index 48b7161..b561946 100644 --- a/core/services/document_service.py +++ b/core/services/document_service.py @@ -297,7 +297,8 @@ class DocumentService: chunk_ids: List[ChunkSource], auth: AuthContext, folder_name: Optional[str] = None, - end_user_id: Optional[str] = None + end_user_id: Optional[str] = None, + use_colpali: Optional[bool] = None ) -> List[ChunkResult]: """ Retrieve specific chunks by their document ID and chunk number in a single batch operation. @@ -305,6 +306,9 @@ class DocumentService: Args: chunk_ids: List of ChunkSource objects with document_id and chunk_number auth: Authentication context + folder_name: Optional folder to scope the operation to + end_user_id: Optional end-user ID to scope the operation to + use_colpali: Whether to use colpali multimodal features for image chunks Returns: List of ChunkResult objects @@ -337,6 +341,32 @@ class DocumentService: # Retrieve the chunks from vector store in a single query chunks = await self.vector_store.get_chunks_by_id(chunk_identifiers) + # Check if we should use colpali for image chunks + if use_colpali and self.colpali_vector_store: + logger.info("Trying to retrieve chunks from colpali vector store") + try: + # Also try to retrieve from the colpali vector store + colpali_chunks = await self.colpali_vector_store.get_chunks_by_id(chunk_identifiers) + + if colpali_chunks: + # Create a dictionary of (doc_id, chunk_number) -> chunk for fast lookup + chunk_dict = {(c.document_id, c.chunk_number): c for c in chunks} + + logger.debug(f"Found {len(colpali_chunks)} chunks in colpali store") + for colpali_chunk in colpali_chunks: + logger.debug(f"Found colpali chunk: doc={colpali_chunk.document_id}, chunk={colpali_chunk.chunk_number}") + key = (colpali_chunk.document_id, colpali_chunk.chunk_number) + # Replace chunks with colpali chunks when available + chunk_dict[key] = colpali_chunk + + # Update chunks list with the combined/replaced chunks + chunks = list(chunk_dict.values()) + logger.info(f"Enhanced {len(colpali_chunks)} chunks with colpali/multimodal data") + else: + logger.warning("No chunks found in colpali vector store") + except Exception as e: + logger.error(f"Error retrieving chunks from colpali vector store: {e}", exc_info=True) + # Convert to chunk results results = await self._create_chunk_results(auth, chunks) logger.info(f"Batch retrieved {len(results)} chunks out of {len(chunk_ids)} requested") diff --git a/core/workers/ingestion_worker.py b/core/workers/ingestion_worker.py index b4a658e..295e062 100644 --- a/core/workers/ingestion_worker.py +++ b/core/workers/ingestion_worker.py @@ -217,8 +217,9 @@ async def process_ingestion_job( logger.debug(f"Created {len(chunk_objects)} chunk objects") # 10. Handle ColPali embeddings if enabled + using_colpali = use_colpali and document_service.colpali_embedding_model and document_service.colpali_vector_store chunk_objects_multivector = [] - if use_colpali and document_service.colpali_embedding_model and document_service.colpali_vector_store: + if using_colpali: import filetype file_type = filetype.guess(file_content) diff --git a/scripts/scrub_metadata.py b/scripts/scrub_metadata.py new file mode 100755 index 0000000..fcd4648 --- /dev/null +++ b/scripts/scrub_metadata.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +import asyncio +import os +import logging +import json +from typing import Optional, Dict, Any +import argparse + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession +from sqlalchemy.orm import sessionmaker + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +async def get_postgres_session(uri: str) -> AsyncSession: + """Create and return a PostgreSQL session.""" + engine = create_async_engine(uri, echo=False) + async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + return async_session() + +async def scrub_document_metadata( + postgres_uri: str, + preserve_external_id_only: bool = True, + batch_size: int = 100 +) -> None: + """ + Scrub metadata from all documents in the database, + keeping only external_id if preserve_external_id_only is True. + + Args: + postgres_uri: PostgreSQL connection URI + preserve_external_id_only: If True, preserve only external_id in metadata + batch_size: Number of documents to process in each batch + """ + try: + async with await get_postgres_session(postgres_uri) as session: + # Get total count of documents + count_result = await session.execute(text("SELECT COUNT(*) FROM documents")) + total_docs = count_result.scalar() + logger.info(f"Found {total_docs} documents to process") + + # Process in batches + offset = 0 + total_processed = 0 + total_updated = 0 + + while offset < total_docs: + # Get batch of document IDs + id_result = await session.execute( + text(f"SELECT external_id FROM documents LIMIT {batch_size} OFFSET {offset}") + ) + doc_ids = [row[0] for row in id_result.all()] + + # Process each document in the batch + for doc_id in doc_ids: + # Create new metadata object with only external_id + if preserve_external_id_only: + new_metadata = {"external_id": doc_id} + else: + # This would be where you could implement more complex preservation rules + new_metadata = {"external_id": doc_id} + + # Use fully manual query with directly inserted values + json_string = json.dumps(new_metadata).replace("'", "''") + # Use a direct query using string formatting (safe in this context since we control the values) + query = text(f""" + UPDATE documents + SET doc_metadata = '{json_string}'::jsonb + WHERE external_id = '{doc_id}' + """) + + await session.execute(query) + total_updated += 1 + + # Commit changes for this batch + await session.commit() + total_processed += len(doc_ids) + offset += batch_size + logger.info(f"Processed {total_processed}/{total_docs} documents, updated {total_updated}") + + logger.info(f"Metadata scrubbing complete. Processed {total_processed} documents, updated {total_updated}.") + + except Exception as e: + logger.error(f"Error scrubbing document metadata: {e}") + raise + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Scrub metadata from documents, preserving only external_id') + parser.add_argument('--env', choices=['docker', 'local'], required=True, + help='Environment to run in (docker or local)') + parser.add_argument('--batch-size', type=int, default=100, + help='Number of documents to process in each batch') + + args = parser.parse_args() + + # Get database URI based on environment + if args.env == 'docker': + # Using Docker container + postgres_uri = "postgresql+asyncpg://morphik:morphik@localhost:5432/morphik" + else: + # Using local .env file + try: + # Try to load from .env file + from dotenv import load_dotenv + load_dotenv() + postgres_uri = os.environ.get("POSTGRES_URI") + if not postgres_uri: + raise ValueError("POSTGRES_URI not found in environment variables") + except ImportError: + # If python-dotenv is not installed + postgres_uri = os.environ.get("POSTGRES_URI") + if not postgres_uri: + raise ValueError("POSTGRES_URI not found in environment variables. Install python-dotenv or set POSTGRES_URI manually.") + + logger.info(f"Starting metadata scrubbing in {args.env} environment") + asyncio.run(scrub_document_metadata(postgres_uri, batch_size=args.batch_size)) \ No newline at end of file diff --git a/ui-component/components/chat/ChatSection.tsx b/ui-component/components/chat/ChatSection.tsx index 4cd066f..bee6cdf 100644 --- a/ui-component/components/chat/ChatSection.tsx +++ b/ui-component/components/chat/ChatSection.tsx @@ -236,28 +236,39 @@ const ChatSection: React.FC = ({ apiBaseUrl, authToken }) => { }, body: JSON.stringify({ sources: data.sources, - folder_name: queryOptions.folder_name + folder_name: queryOptions.folder_name, + use_colpali: true, }) }); if (sourcesResponse.ok) { const sourcesData = await sourcesResponse.json(); - - // Process source data + + // Check if we have any image sources + const imageSources = sourcesData.filter((source: Source) => + source.content_type?.startsWith('image/') || + (source.content && ( + source.content.startsWith('data:image/png;base64,') || + source.content.startsWith('data:image/jpeg;base64,') + )) + ); + console.log('Image sources found:', imageSources.length); // Update the message with detailed source information const updatedMessage = { ...assistantMessage, - sources: sourcesData.map((source: Source) => ({ - document_id: source.document_id, - chunk_number: source.chunk_number, - score: source.score, - content: source.content, - content_type: source.content_type || 'text/plain', - filename: source.filename, - metadata: source.metadata, - download_url: source.download_url - })) + sources: sourcesData.map((source: Source) => { + return { + document_id: source.document_id, + chunk_number: source.chunk_number, + score: source.score, + content: source.content, + content_type: source.content_type || 'text/plain', + filename: source.filename, + metadata: source.metadata, + download_url: source.download_url + }; + }) }; // Update the message with detailed sources @@ -284,7 +295,7 @@ const ChatSection: React.FC = ({ apiBaseUrl, authToken }) => { }; return ( - + Chat with Your Documents