diff --git a/core/models/rules.py b/core/models/rules.py index 9518e6a..726346d 100644 --- a/core/models/rules.py +++ b/core/models/rules.py @@ -43,6 +43,7 @@ class MetadataExtractionRule(BaseRule): type: Literal["metadata_extraction"] schema: Dict[str, Any] + use_images: bool = False async def apply(self, content: str, metadata: Optional[Dict[str, Any]] = None) -> tuple[Dict[str, Any], str]: """Extract metadata according to schema""" @@ -89,15 +90,18 @@ class MetadataExtractionRule(BaseRule): schema_text = "\n".join(schema_descriptions) - # Adjust prompt based on whether it's a chunk or full document - prompt_context = "chunk of text" if self.stage == "post_chunking" else "text" + # Adjust prompt based on whether it's a chunk or full document and whether it's an image + if self.use_images: + prompt_context = "image" if self.stage == "post_chunking" else "document with images" + else: + prompt_context = "chunk of text" if self.stage == "post_chunking" else "text" prompt = f""" Extract metadata from the following {prompt_context} according to this schema: {schema_text} - Text to extract from: + {"Image to analyze:" if self.use_images else "Text to extract from:"} {content} Follow these guidelines: @@ -113,16 +117,47 @@ class MetadataExtractionRule(BaseRule): if not model_config: raise ValueError(f"Model '{settings.RULES_MODEL}' not found in registered_models configuration") + # Prepare base64 data for vision model if this is an image rule + vision_messages = [] + if self.use_images: + try: + # For image content, check if it's a base64 string + # Handle data URI format "data:image/png;base64,..." + if content.startswith("data:"): + content_type, content = content.split(";base64,", 1) + + # User message with image content + vision_messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{content}"}}, + ], + } + ] + except Exception as e: + logger.error(f"Error preparing image content for vision model: {str(e)}") + # Fall back to text-only if image processing fails + vision_messages = [] + system_message = { "role": "system", "content": ( - "You are a metadata extraction assistant. Extract structured metadata from text " + "You are a metadata extraction assistant. Extract structured metadata " + f"from {'images' if self.use_images else 'text'} " "precisely following the provided schema. Always return the metadata as direct values " "(strings, numbers, booleans), not as objects with additional properties." ), } - user_message = {"role": "user", "content": prompt} + # If we have vision messages, use those, otherwise use standard text message + messages = [] + if vision_messages and self.use_images: + messages = [system_message] + vision_messages + else: + user_message = {"role": "user", "content": prompt} + messages = [system_message, user_message] # Use instructor with litellm to get structured responses client = instructor.from_litellm(litellm.acompletion, mode=instructor.Mode.JSON) @@ -137,7 +172,7 @@ class MetadataExtractionRule(BaseRule): # Use instructor's client to create a structured response response = await client.chat.completions.create( model=model, - messages=[system_message, user_message], + messages=messages, response_model=DynamicMetadataModel, **model_kwargs, ) diff --git a/core/services/rules_processor.py b/core/services/rules_processor.py index 71b736f..13a92f4 100644 --- a/core/services/rules_processor.py +++ b/core/services/rules_processor.py @@ -60,6 +60,9 @@ class RulesProcessor: logger.debug(f"Parsing rule of type: {rule_type}, stage: {stage}") if rule_type == "metadata_extraction": + # Default use_images to False if not present + if "use_images" not in rule_dict: + rule_dict["use_images"] = False return MetadataExtractionRule(**rule_dict) elif rule_type == "natural_language": return NaturalLanguageRule(**rule_dict) @@ -119,11 +122,19 @@ class RulesProcessor: Args: chunk: The Chunk object to process rules: The original list of rule dictionaries + (these should be already filtered by the caller based on stage and chunk type) Returns: Tuple[Dict[str, Any], Chunk]: (extracted_metadata_for_doc, potentially_modified_chunk) """ - logger.debug(f"Processing chunk-level rules (post_chunking) on chunk content length {len(chunk.content)}") + is_image_chunk = chunk.metadata.get("is_image", False) + chunk_type = "image" if is_image_chunk else "text" + + logger.debug( + f"Processing chunk-level rules (post_chunking) on {chunk_type} chunk " + f"with content length {len(chunk.content)}" + ) + parsed_rules = [] # Process rules for post_chunking stage @@ -134,6 +145,16 @@ class RulesProcessor: # Only include rules for post_chunking stage if rule.stage == "post_chunking": + # Skip image rules for text chunks and vice versa + if isinstance(rule, MetadataExtractionRule): + # If it's an image rule but chunk is not an image, or vice versa, skip + if (rule.use_images and not is_image_chunk) or (not rule.use_images and is_image_chunk): + logger.debug( + f"Skipping rule with use_images={rule.use_images} " + f"for chunk with is_image={is_image_chunk}" + ) + continue + parsed_rules.append(rule) except ValueError as e: logger.warning(f"Skipping invalid chunk rule: {e}") diff --git a/core/workers/ingestion_worker.py b/core/workers/ingestion_worker.py index 9388e61..d1b4bc7 100644 --- a/core/workers/ingestion_worker.py +++ b/core/workers/ingestion_worker.py @@ -14,6 +14,7 @@ from core.database.postgres_database import PostgresDatabase from core.embedding.colpali_embedding_model import ColpaliEmbeddingModel from core.embedding.litellm_embedding import LiteLLMEmbeddingModel from core.models.auth import AuthContext, EntityType +from core.models.rules import MetadataExtractionRule from core.parser.morphik_parser import MorphikParser from core.services.document_service import DocumentService from core.services.rules_processor import RulesProcessor @@ -207,24 +208,87 @@ async def process_ingestion_job( raise ValueError("No content chunks extracted after rules processing") logger.debug(f"Split processed text into {len(parsed_chunks)} chunks") - # === Apply post_chunking rules and aggregate metadata === + # 8. Handle ColPali embeddings if enabled - IMPORTANT: Do this BEFORE applying chunk rules + # so that image chunks can be processed by rules when use_images=True + using_colpali = ( + use_colpali and document_service.colpali_embedding_model and document_service.colpali_vector_store + ) + chunks_multivector = [] + if using_colpali: + import base64 + + import filetype + + file_type = filetype.guess(file_content) + file_content_base64 = base64.b64encode(file_content).decode() + + # Use the parsed chunks for Colpali - this will create image chunks if appropriate + chunks_multivector = document_service._create_chunks_multivector( + file_type, file_content_base64, file_content, parsed_chunks + ) + logger.debug(f"Created {len(chunks_multivector)} chunks for multivector embedding") + + # 9. Apply post_chunking rules and aggregate metadata processed_chunks = [] + processed_chunks_multivector = [] aggregated_chunk_metadata: Dict[str, Any] = {} # Initialize dict for aggregated metadata chunk_contents = [] # Initialize list to collect chunk contents as we process them if rules_list: logger.info("Applying post-chunking rules...") - for chunk_obj in parsed_chunks: - # Get metadata *and* the potentially modified chunk - chunk_rule_metadata, processed_chunk = await document_service.rules_processor.process_chunk_rules( - chunk_obj, rules_list - ) - processed_chunks.append(processed_chunk) - chunk_contents.append(processed_chunk.content) # Collect content as we process - # Aggregate the metadata extracted from this chunk - aggregated_chunk_metadata.update(chunk_rule_metadata) - logger.info(f"Finished applying post-chunking rules to {len(processed_chunks)} chunks.") + # Partition rules by type + text_rules = [] + image_rules = [] + + for rule_dict in rules_list: + rule = document_service.rules_processor._parse_rule(rule_dict) + if rule.stage == "post_chunking": + if isinstance(rule, MetadataExtractionRule) and rule.use_images: + image_rules.append(rule_dict) + else: + text_rules.append(rule_dict) + + logger.info(f"Partitioned rules: {len(text_rules)} text rules, {len(image_rules)} image rules") + + # Process regular text chunks with text rules only + if text_rules: + logger.info(f"Applying {len(text_rules)} text rules to text chunks...") + for chunk_obj in parsed_chunks: + # Get metadata *and* the potentially modified chunk + chunk_rule_metadata, processed_chunk = await document_service.rules_processor.process_chunk_rules( + chunk_obj, text_rules + ) + processed_chunks.append(processed_chunk) + chunk_contents.append(processed_chunk.content) # Collect content as we process + # Aggregate the metadata extracted from this chunk + aggregated_chunk_metadata.update(chunk_rule_metadata) + else: + processed_chunks = parsed_chunks # No text rules, use original chunks + + # Process colpali image chunks with image rules if they exist + if chunks_multivector and image_rules: + logger.info(f"Applying {len(image_rules)} image rules to image chunks...") + for chunk_obj in chunks_multivector: + # Only process if it's an image chunk - pass the image content to the rule + if chunk_obj.metadata.get("is_image", False): + # Get metadata *and* the potentially modified chunk + chunk_rule_metadata, processed_chunk = ( + await document_service.rules_processor.process_chunk_rules(chunk_obj, image_rules) + ) + processed_chunks_multivector.append(processed_chunk) + # Aggregate the metadata extracted from this chunk + aggregated_chunk_metadata.update(chunk_rule_metadata) + else: + # Non-image chunks from multivector don't need further processing + processed_chunks_multivector.append(chunk_obj) + + logger.info(f"Finished applying image rules to {len(processed_chunks_multivector)} image chunks.") + elif chunks_multivector: + # No image rules, use original multivector chunks + processed_chunks_multivector = chunks_multivector + + logger.info(f"Finished applying post-chunking rules to {len(processed_chunks)} regular chunks.") logger.info(f"Aggregated metadata from all chunks: {aggregated_chunk_metadata}") # Update the document content with the stitched content from processed chunks @@ -235,41 +299,26 @@ async def process_ingestion_job( logger.info(f"Updated document content with stitched chunks (length: {len(stitched_content)})") else: processed_chunks = parsed_chunks # No rules, use original chunks + processed_chunks_multivector = chunks_multivector # No rules, use original multivector chunks - # 8. Generate embeddings for processed chunks + # 10. Generate embeddings for processed chunks embeddings = await document_service.embedding_model.embed_for_ingestion(processed_chunks) logger.debug(f"Generated {len(embeddings)} embeddings") - # 9. Create chunk objects with potentially modified chunk content and metadata + # 11. Create chunk objects with potentially modified chunk content and metadata chunk_objects = document_service._create_chunk_objects(doc.external_id, processed_chunks, embeddings) logger.debug(f"Created {len(chunk_objects)} chunk objects") - # 10. Handle ColPali embeddings if enabled - using_colpali = ( - use_colpali and document_service.colpali_embedding_model and document_service.colpali_vector_store - ) + # 12. Handle ColPali embeddings chunk_objects_multivector = [] if using_colpali: - import filetype - - file_type = filetype.guess(file_content) - - # For ColPali we need the base64 encoding of the file - import base64 - - file_content_base64 = base64.b64encode(file_content).decode() - - # Use the processed chunks for Colpali - chunks_multivector = document_service._create_chunks_multivector( - file_type, file_content_base64, file_content, processed_chunks + colpali_embeddings = await document_service.colpali_embedding_model.embed_for_ingestion( + processed_chunks_multivector ) - logger.debug(f"Created {len(chunks_multivector)} chunks for multivector embedding") - - colpali_embeddings = await document_service.colpali_embedding_model.embed_for_ingestion(chunks_multivector) logger.debug(f"Generated {len(colpali_embeddings)} embeddings for multivector embedding") chunk_objects_multivector = document_service._create_chunk_objects( - doc.external_id, chunks_multivector, colpali_embeddings + doc.external_id, processed_chunks_multivector, colpali_embeddings ) # === Merge aggregated chunk metadata into document metadata === diff --git a/examples/image_rules_example.py b/examples/image_rules_example.py new file mode 100644 index 0000000..4e40be7 --- /dev/null +++ b/examples/image_rules_example.py @@ -0,0 +1,88 @@ +import time + +from sdks.python.morphik.sync import Morphik + +# sys.path.append(str(Path(__file__).parent.parent)) + + +# Connect to Morphik +db = Morphik(timeout=10000, is_local=True) + + +# Helper function to wait for document ingestion to complete +def wait_for_ingestion_completion(document_id, max_wait_time=600, check_interval=30): + """ + Poll the system until document ingestion is completed or max wait time is reached. + + Args: + document_id: The ID of the document to check + max_wait_time: Maximum time to wait in seconds + check_interval: Time between checks in seconds + + Returns: + True if ingestion completed, False if timed out + """ + start_time = time.time() + while (time.time() - start_time) < max_wait_time: + # Get the document status info directly using the status API + status_info = db.get_document_status(document_id) + + # Check if ingestion is completed + if status_info.get("status") == "completed": + print(f"Document ingestion completed for {document_id}") + return True + + print(f"Document status: {status_info.get('status')}. Waiting {check_interval} seconds...") + time.sleep(check_interval) + + print(f"Warning: Maximum wait time reached for document {document_id}") + return False + + +# Define a single image-focused post_chunking rule +image_rules = [ + { + "type": "metadata_extraction", + "stage": "post_chunking", + "use_images": True, + "schema": { + "graph_details": { + "type": "string", + "description": "Detailed description of any graphs, charts, or diagrams visible " + "in the image, including axis labels, trends, and key data points", + }, + "technical_elements": { + "type": "array", + "description": "List of technical elements visible in the image such as formulas, " + "equations, or technical diagrams", + }, + "visual_content_summary": { + "type": "string", + "description": "Brief summary of the visual content in the technical document", + }, + }, + } +] + +# Ingest document with image-focused post_chunking rule +print("Ingesting document with image-focused post_chunking rule...") +doc = db.ingest_file( + "examples/assets/colpali_example.pdf", + rules=image_rules, + metadata={"source": "example", "rules_stage": "image_analysis"}, + use_colpali=True, # Enable colpali for image processing, critical for handling images +) + +# Wait for ingestion to complete +wait_for_ingestion_completion(doc.external_id) + +# Get updated document information with processed image metadata +updated_doc = db.get_document(doc.external_id) + +print("\n" + "=" * 50) +print("DOCUMENT WITH IMAGE PROCESSING RULES") +print("=" * 50) +print(f"Document ID: {updated_doc.external_id}") +# print(f"Document: {updated_doc}") +print(f"Document metadata: {updated_doc.metadata}") +print(f"Document system metadata: {updated_doc.system_metadata}") diff --git a/sdks/python/morphik/rules.py b/sdks/python/morphik/rules.py index e73e054..41cf2d0 100644 --- a/sdks/python/morphik/rules.py +++ b/sdks/python/morphik/rules.py @@ -20,15 +20,18 @@ class MetadataExtractionRule(Rule): self, schema: Union[Type[BaseModel], Dict[str, Any]], stage: Literal["post_parsing", "post_chunking"] = "post_parsing", + use_images: bool = False, ): """ Args: schema: Pydantic model or dict schema defining metadata fields to extract stage: When to apply the rule - either "post_parsing" (full document text) or "post_chunking" (individual chunks). Defaults to "post_parsing" for backward compatibility. + use_images: Whether to process image chunks instead of text chunks. Defaults to False. """ self.schema = schema self.stage = stage + self.use_images = use_images def to_dict(self) -> Dict[str, Any]: if isinstance(self.schema, type) and issubclass(self.schema, BaseModel): @@ -38,7 +41,12 @@ class MetadataExtractionRule(Rule): # Assume it's already a dict schema schema_dict = self.schema - return {"type": "metadata_extraction", "schema": schema_dict, "stage": self.stage} + return { + "type": "metadata_extraction", + "schema": schema_dict, + "stage": self.stage, + "use_images": self.use_images, + } class NaturalLanguageRule(Rule):