diff --git a/core/api.py b/core/api.py index 60d2fa5..1f6dacc 100644 --- a/core/api.py +++ b/core/api.py @@ -1956,7 +1956,7 @@ async def set_folder_rule( logger.info(f"Retry {retry_count}/{max_retries} after {delay}s delay") await asyncio.sleep(delay) - extracted_metadata, _ = await rule.apply(doc_content) + extracted_metadata, _ = await rule.apply(doc_content, {}) logger.info( f"Successfully extracted metadata on attempt {retry_count + 1}: " f"{extracted_metadata}" diff --git a/core/models/rules.py b/core/models/rules.py index 0ce7e91..9518e6a 100644 --- a/core/models/rules.py +++ b/core/models/rules.py @@ -1,6 +1,6 @@ import logging from abc import ABC, abstractmethod -from typing import Any, Dict, Literal +from typing import Any, Dict, Literal, Optional import litellm from pydantic import BaseModel @@ -15,14 +15,16 @@ class BaseRule(BaseModel, ABC): """Base model for all rules""" type: str + stage: Literal["post_parsing", "post_chunking"] @abstractmethod - async def apply(self, content: str) -> tuple[Dict[str, Any], str]: + async def apply(self, content: str, metadata: Optional[Dict[str, Any]] = None) -> tuple[Dict[str, Any], str]: """ Apply the rule to the content. Args: content: The content to apply the rule to + metadata: Optional existing metadata that may be used or modified by the rule Returns: tuple[Dict[str, Any], str]: (metadata, modified_content) @@ -42,7 +44,7 @@ class MetadataExtractionRule(BaseRule): type: Literal["metadata_extraction"] schema: Dict[str, Any] - async def apply(self, content: str) -> tuple[Dict[str, Any], str]: + async def apply(self, content: str, metadata: Optional[Dict[str, Any]] = None) -> tuple[Dict[str, Any], str]: """Extract metadata according to schema""" import instructor from pydantic import create_model @@ -87,8 +89,11 @@ class MetadataExtractionRule(BaseRule): schema_text = "\n".join(schema_descriptions) + # Adjust prompt based on whether it's a chunk or full document + prompt_context = "chunk of text" if self.stage == "post_chunking" else "text" + prompt = f""" - Extract metadata from the following text according to this schema: + Extract metadata from the following {prompt_context} according to this schema: {schema_text} @@ -96,7 +101,8 @@ class MetadataExtractionRule(BaseRule): {content} Follow these guidelines: - 1. Extract all requested information as simple strings, numbers, or booleans (not as objects or nested structures) + 1. Extract all requested information as simple strings, numbers, or booleans + (not as objects or nested structures) 2. If information is not present, indicate this with null instead of making something up 3. Answer directly with the requested information - don't include explanations or reasoning 4. Be concise but accurate in your extractions @@ -109,7 +115,11 @@ class MetadataExtractionRule(BaseRule): system_message = { "role": "system", - "content": "You are a metadata extraction assistant. Extract structured metadata from text precisely following the provided schema. Always return the metadata as direct values (strings, numbers, booleans), not as objects with additional properties.", + "content": ( + "You are a metadata extraction assistant. Extract structured metadata from text " + "precisely following the provided schema. Always return the metadata as direct values " + "(strings, numbers, booleans), not as objects with additional properties." + ), } user_message = {"role": "user", "content": prompt} @@ -133,13 +143,14 @@ class MetadataExtractionRule(BaseRule): ) # Convert pydantic model to dict - metadata = response.model_dump() + extracted_metadata = response.model_dump() except Exception as e: logger.error(f"Error in instructor metadata extraction: {str(e)}") - metadata = {} + extracted_metadata = {} - return metadata, content + # Metadata extraction doesn't modify content + return extracted_metadata, content class TransformationOutput(BaseModel): @@ -154,12 +165,15 @@ class NaturalLanguageRule(BaseRule): type: Literal["natural_language"] prompt: str - async def apply(self, content: str) -> tuple[Dict[str, Any], str]: + async def apply(self, content: str, metadata: Optional[Dict[str, Any]] = None) -> tuple[Dict[str, Any], str]: """Transform content according to prompt""" import instructor + # Adjust prompt based on whether it's a chunk or full document + prompt_context = "chunk of text" if self.stage == "post_chunking" else "text" + prompt = f""" - Your task is to transform the following text according to this instruction: + Your task is to transform the following {prompt_context} according to this instruction: {self.prompt} Text to transform: @@ -175,7 +189,10 @@ class NaturalLanguageRule(BaseRule): system_message = { "role": "system", - "content": "You are a text transformation assistant. Transform text precisely following the provided instructions.", + "content": ( + "You are a text transformation assistant. Transform text precisely following " + "the provided instructions." + ), } user_message = {"role": "user", "content": prompt} @@ -205,4 +222,5 @@ class NaturalLanguageRule(BaseRule): logger.error(f"Error in instructor text transformation: {str(e)}") transformed_text = content # Return original content on error + # Natural language rules modify content, don't add metadata directly return {}, transformed_text diff --git a/core/services/document_service.py b/core/services/document_service.py index bbfe066..f652b65 100644 --- a/core/services/document_service.py +++ b/core/services/document_service.py @@ -132,6 +132,9 @@ class DocumentService: # Maps cache name to active cache object self.active_caches: Dict[str, BaseCache] = {} + # Store for aggregated metadata from chunk rules + self._last_aggregated_metadata: Dict[str, Any] = {} + async def retrieve_chunks( self, query: str, @@ -603,42 +606,83 @@ class DocumentService: num_pages = int(len(content) / (CHARS_PER_TOKEN * TOKENS_PER_PAGE)) # await check_and_increment_limits(auth, "ingest", num_pages, doc.external_id) - # Apply rules if provided + # === Apply post_parsing rules === + document_rule_metadata = {} if rules: - rule_metadata, modified_text = await self.rules_processor.process_rules(content, rules) + logger.info("Applying post-parsing rules...") + document_rule_metadata, content = await self.rules_processor.process_document_rules(content, rules) # Update document metadata with extracted metadata from rules - metadata.update(rule_metadata) + metadata.update(document_rule_metadata) doc.metadata = metadata # Update doc metadata after rules - - if modified_text: - content = modified_text - logger.info("Updated content with modified text from rules") + logger.info(f"Document metadata after post-parsing rules: {metadata}") + logger.info(f"Content length after post-parsing rules: {len(content)}") # Store full content before chunking doc.system_metadata["content"] = content - # Split into chunks after all processing is done - chunks = await self.parser.split_text(content) - if not chunks: - raise ValueError("No content chunks extracted") - logger.debug(f"Split processed text into {len(chunks)} chunks") + # Split text into chunks + parsed_chunks = await self.parser.split_text(content) + if not parsed_chunks: + raise ValueError("No content chunks extracted after rules processing") + logger.debug(f"Split processed text into {len(parsed_chunks)} chunks") - # Generate embeddings for chunks - embeddings = await self.embedding_model.embed_for_ingestion(chunks) + # === Apply post_chunking rules and aggregate metadata === + processed_chunks = [] + aggregated_chunk_metadata: Dict[str, Any] = {} # Initialize dict for aggregated metadata + chunk_contents = [] # Initialize list to collect chunk contents efficiently + + if rules: + logger.info("Applying post-chunking rules...") + + for chunk_obj in parsed_chunks: + # Get metadata *and* the potentially modified chunk + chunk_rule_metadata, processed_chunk = await self.rules_processor.process_chunk_rules(chunk_obj, rules) + processed_chunks.append(processed_chunk) + chunk_contents.append(processed_chunk.content) # Collect content as we process + # Aggregate the metadata extracted from this chunk + aggregated_chunk_metadata.update(chunk_rule_metadata) + logger.info(f"Finished applying post-chunking rules to {len(processed_chunks)} chunks.") + logger.info(f"Aggregated metadata from all chunks: {aggregated_chunk_metadata}") + + # Update the document content with the stitched content from processed chunks + if processed_chunks: + logger.info("Updating document content with processed chunks...") + stitched_content = "\n".join(chunk_contents) + doc.system_metadata["content"] = stitched_content + logger.info(f"Updated document content with stitched chunks (length: {len(stitched_content)})") + else: + processed_chunks = parsed_chunks # No rules, use original chunks + + # Generate embeddings for processed chunks + embeddings = await self.embedding_model.embed_for_ingestion(processed_chunks) logger.debug(f"Generated {len(embeddings)} embeddings") - chunk_objects = self._create_chunk_objects(doc.external_id, chunks, embeddings) + + # Create chunk objects with processed chunk content + chunk_objects = self._create_chunk_objects(doc.external_id, processed_chunks, embeddings) logger.debug(f"Created {len(chunk_objects)} chunk objects") chunk_objects_multivector = [] if use_colpali and self.colpali_embedding_model: - embeddings_multivector = await self.colpali_embedding_model.embed_for_ingestion(chunks) + embeddings_multivector = await self.colpali_embedding_model.embed_for_ingestion(processed_chunks) logger.info(f"Generated {len(embeddings_multivector)} embeddings for multivector embedding") - chunk_objects_multivector = self._create_chunk_objects(doc.external_id, chunks, embeddings_multivector) + chunk_objects_multivector = self._create_chunk_objects( + doc.external_id, processed_chunks, embeddings_multivector + ) logger.info(f"Created {len(chunk_objects_multivector)} chunk objects for multivector embedding") # Create and store chunk objects + # === Merge aggregated chunk metadata into document metadata === + if aggregated_chunk_metadata: + logger.info("Merging aggregated chunk metadata into document metadata...") + # Make sure doc.metadata exists + if not hasattr(doc, "metadata") or doc.metadata is None: + doc.metadata = {} + doc.metadata.update(aggregated_chunk_metadata) + logger.info(f"Final document metadata after merge: {doc.metadata}") + # =========================================================== + # Store everything await self._store_chunks_and_doc(chunk_objects, doc, use_colpali, chunk_objects_multivector) logger.debug(f"Successfully stored text document {doc.external_id}") @@ -654,146 +698,6 @@ class DocumentService: return doc - # TODO: check if it's unused. if so, remove it. - async def ingest_file( - self, - file: UploadFile, - metadata: Dict[str, Any], - auth: AuthContext, - rules: Optional[List[str]] = None, - use_colpali: Optional[bool] = None, - folder_name: Optional[str] = None, - end_user_id: Optional[str] = None, - ) -> Document: - """Ingest a file document.""" - if "write" not in auth.permissions: - raise PermissionError("User does not have write permission") - - # Read file content - file_content = await file.read() - file_size = len(file_content) # Get file size in bytes for limit checking - - # Check limits before doing any expensive processing - from core.config import get_settings - - settings = get_settings() - - if settings.MODE == "cloud" and auth.user_id: - # Check limits before proceeding with parsing - from core.api import check_and_increment_limits - - await check_and_increment_limits(auth, "storage_file", 1) - await check_and_increment_limits(auth, "storage_size", file_size) - - # Now proceed with parsing and processing the file - file_type = filetype.guess(file_content) - - # Set default mime type for cases where filetype.guess returns None - mime_type = "" - if file_type is not None: - mime_type = file_type.mime - elif file.filename: - # Try to determine by file extension as fallback - import mimetypes - - guessed_type = mimetypes.guess_type(file.filename)[0] - if guessed_type: - mime_type = guessed_type - else: - # Default for text files - mime_type = "text/plain" - else: - mime_type = "application/octet-stream" # Generic binary data - - logger.info(f"Determined MIME type: {mime_type} for file {file.filename}") - - # Parse file to text first - additional_metadata, text = await self.parser.parse_file_to_text(file_content, file.filename) - logger.debug(f"Parsed file into text of length {len(text)}") - - # Apply rules if provided - if rules: - rule_metadata, modified_text = await self.rules_processor.process_rules(text, rules) - # Update document metadata with extracted metadata from rules - metadata.update(rule_metadata) - if modified_text: - text = modified_text - logger.info("Updated text with modified content from rules") - - doc = Document( - content_type=mime_type, - filename=file.filename, - metadata=metadata, - owner={"type": auth.entity_type, "id": auth.entity_id}, - access_control={ - "readers": [auth.entity_id], - "writers": [auth.entity_id], - "admins": [auth.entity_id], - "user_id": ( - [auth.user_id] if auth.user_id else [] - ), # Add user_id to access control for filtering (as a list) - }, - additional_metadata=additional_metadata, - ) - - # Add folder_name and end_user_id to system_metadata if provided - if folder_name: - doc.system_metadata["folder_name"] = folder_name - - # Check if the folder exists, if not create it - await self._ensure_folder_exists(folder_name, doc.external_id, auth) - - if end_user_id: - doc.system_metadata["end_user_id"] = end_user_id - - if settings.MODE == "cloud" and auth.user_id: - # Check limits before proceeding with parsing - from core.api import check_and_increment_limits - - num_pages = int(len(text) / (CHARS_PER_TOKEN * TOKENS_PER_PAGE)) # - await check_and_increment_limits(auth, "ingest", num_pages, doc.external_id) - - # Store full content - doc.system_metadata["content"] = text - logger.debug(f"Created file document record with ID {doc.external_id}") - - file_content_base64 = base64.b64encode(file_content).decode() - # Store the original file - storage_info = await self.storage.upload_from_base64(file_content_base64, doc.external_id, file.content_type) - doc.storage_info = {"bucket": storage_info[0], "key": storage_info[1]} - logger.debug(f"Stored file in bucket `{storage_info[0]}` with key `{storage_info[1]}`") - - # Split into chunks after all processing is done - chunks = await self.parser.split_text(text) - if not chunks: - raise ValueError("No content chunks extracted") - logger.debug(f"Split processed text into {len(chunks)} chunks") - - # Generate embeddings for chunks - embeddings = await self.embedding_model.embed_for_ingestion(chunks) - logger.debug(f"Generated {len(embeddings)} embeddings") - - # Create and store chunk objects - chunk_objects = self._create_chunk_objects(doc.external_id, chunks, embeddings) - logger.debug(f"Created {len(chunk_objects)} chunk objects") - - chunk_objects_multivector = [] - logger.debug(f"use_colpali: {use_colpali}") - if use_colpali and self.colpali_embedding_model: - chunks_multivector = self._create_chunks_multivector(file_type, file_content_base64, file_content, chunks) - logger.debug(f"Created {len(chunks_multivector)} chunks for multivector embedding") - colpali_embeddings = await self.colpali_embedding_model.embed_for_ingestion(chunks_multivector) - logger.debug(f"Generated {len(colpali_embeddings)} embeddings for multivector embedding") - chunk_objects_multivector = self._create_chunk_objects( - doc.external_id, chunks_multivector, colpali_embeddings - ) - - # Store everything - doc.chunk_ids = await self._store_chunks_and_doc(chunk_objects, doc, use_colpali, chunk_objects_multivector) - logger.debug(f"Successfully stored file document {doc.external_id}") - - return doc - def img_to_base64_str(self, img: Image): buffered = BytesIO() img.save(buffered, format="PNG") @@ -1373,10 +1277,32 @@ class DocumentService: return await self._update_document_metadata_only(doc, auth) # Process content into chunks and generate embeddings - chunks, chunk_objects = await self._process_chunks_and_embeddings(doc.external_id, updated_content) + chunks, chunk_objects = await self._process_chunks_and_embeddings(doc.external_id, updated_content, rules) if not chunks: return None + # If we have rules processing, the chunks may have modified content + # Update document content with stitched content from processed chunks + if rules and chunks: + chunk_contents = [chunk.content for chunk in chunks] + stitched_content = "\n".join(chunk_contents) + # Check if content actually changed + if stitched_content != updated_content: + logger.info("Updating document content with stitched content from processed chunks...") + doc.system_metadata["content"] = stitched_content + logger.info(f"Updated document content with stitched chunks (length: {len(stitched_content)})") + + # Merge any aggregated metadata from chunk rules + if hasattr(self, "_last_aggregated_metadata") and self._last_aggregated_metadata: + logger.info("Merging aggregated chunk metadata into document metadata...") + # Make sure doc.metadata exists + if not hasattr(doc, "metadata") or doc.metadata is None: + doc.metadata = {} + doc.metadata.update(self._last_aggregated_metadata) + logger.info(f"Final document metadata after merge: {doc.metadata}") + # Clear the temporary metadata + self._last_aggregated_metadata = {} + # Handle colpali (multi-vector) embeddings if needed chunk_objects_multivector = await self._process_colpali_embeddings( use_colpali, doc.external_id, chunks, file, file_type, file_content, file_content_base64 @@ -1423,16 +1349,16 @@ class DocumentService: if filename: doc.filename = filename - # Apply rules if provided for text content + # Apply post_parsing rules if provided if rules: - rule_metadata, modified_text = await self.rules_processor.process_rules(content, rules) + logger.info("Applying post-parsing rules to text update...") + rule_metadata, modified_content = await self.rules_processor.process_document_rules(content, rules) # Update metadata with extracted metadata from rules if metadata is not None: metadata.update(rule_metadata) - if modified_text: - update_content = modified_text - logger.info("Updated content with modified text from rules") + update_content = modified_content + logger.info(f"Content length after post-parsing rules: {len(update_content)}") return update_content @@ -1451,16 +1377,16 @@ class DocumentService: additional_file_metadata, file_text = await self.parser.parse_file_to_text(file_content, file.filename) logger.info(f"Parsed file into text of length {len(file_text)}") - # Apply rules if provided for file content + # Apply post_parsing rules if provided for file content if rules: - rule_metadata, modified_text = await self.rules_processor.process_rules(file_text, rules) + logger.info("Applying post-parsing rules to file update...") + rule_metadata, modified_text = await self.rules_processor.process_document_rules(file_text, rules) # Update metadata with extracted metadata from rules if metadata is not None: metadata.update(rule_metadata) - if modified_text: - file_text = modified_text - logger.info("Updated file content with modified text from rules") + file_text = modified_text + logger.info(f"File content length after post-parsing rules: {len(file_text)}") # Add additional metadata from file if available if additional_file_metadata: @@ -1628,26 +1554,50 @@ class DocumentService: return doc async def _process_chunks_and_embeddings( - self, doc_id: str, content: str + self, doc_id: str, content: str, rules: Optional[List[Dict[str, Any]]] = None ) -> tuple[List[Chunk], List[DocumentChunk]]: """Process content into chunks and generate embeddings.""" # Split content into chunks - chunks = await self.parser.split_text(content) - if not chunks: + parsed_chunks = await self.parser.split_text(content) + if not parsed_chunks: logger.error("No content chunks extracted after update") return None, None - logger.info(f"Split updated text into {len(chunks)} chunks") + logger.info(f"Split updated text into {len(parsed_chunks)} chunks") - # Generate embeddings for new chunks - embeddings = await self.embedding_model.embed_for_ingestion(chunks) + # Apply post_chunking rules and aggregate metadata if provided + processed_chunks = [] + aggregated_chunk_metadata: Dict[str, Any] = {} # Initialize dict for aggregated metadata + chunk_contents = [] # Initialize list to collect chunk contents efficiently + + if rules: + logger.info("Applying post-chunking rules...") + + for chunk_obj in parsed_chunks: + # Get metadata *and* the potentially modified chunk + chunk_rule_metadata, processed_chunk = await self.rules_processor.process_chunk_rules(chunk_obj, rules) + processed_chunks.append(processed_chunk) + chunk_contents.append(processed_chunk.content) # Collect content as we process + # Aggregate the metadata extracted from this chunk + aggregated_chunk_metadata.update(chunk_rule_metadata) + logger.info(f"Finished applying post-chunking rules to {len(processed_chunks)} chunks.") + logger.info(f"Aggregated metadata from all chunks: {aggregated_chunk_metadata}") + + # Return this metadata so the calling method can update the document metadata + self._last_aggregated_metadata = aggregated_chunk_metadata + else: + processed_chunks = parsed_chunks # No rules, use original chunks + self._last_aggregated_metadata = {} + + # Generate embeddings for processed chunks + embeddings = await self.embedding_model.embed_for_ingestion(processed_chunks) logger.info(f"Generated {len(embeddings)} embeddings") # Create new chunk objects - chunk_objects = self._create_chunk_objects(doc_id, chunks, embeddings) + chunk_objects = self._create_chunk_objects(doc_id, processed_chunks, embeddings) logger.info(f"Created {len(chunk_objects)} chunk objects") - return chunks, chunk_objects + return processed_chunks, chunk_objects async def _process_colpali_embeddings( self, diff --git a/core/services/rules_processor.py b/core/services/rules_processor.py index adeba2a..71b736f 100644 --- a/core/services/rules_processor.py +++ b/core/services/rules_processor.py @@ -4,6 +4,7 @@ from typing import Any, Dict, List, Tuple from pydantic import BaseModel from core.config import get_settings +from core.models.chunk import Chunk from core.models.rules import BaseRule, MetadataExtractionRule, NaturalLanguageRule logger = logging.getLogger(__name__) @@ -42,48 +43,21 @@ class RulesProcessor: f"Initializing RulesProcessor with {settings.RULES_PROVIDER} provider using model {settings.RULES_MODEL}" ) - async def process_rules(self, content: str, rules: List[Dict[str, Any]]) -> Tuple[Dict[str, Any], str]: - """ - Process a list of rules on content. - - Args: - content: The document content - rules: List of rule dictionaries - - Returns: - Tuple[Dict[str, Any], str]: (extracted_metadata, modified_content) - """ - logger.debug(f"Processing {len(rules)} rules on content of length {len(content)}") - metadata = {} - modified_content = content - - try: - # Parse all rules first to fail fast if any are invalid - parsed_rules = [self._parse_rule(rule) for rule in rules] - logger.debug(f"Successfully parsed {len(parsed_rules)} rules: {[r.type for r in parsed_rules]}") - - # Apply rules in order - for i, rule in enumerate(parsed_rules, 1): - try: - logger.debug(f"Applying rule {i}/{len(parsed_rules)}: {rule.type}") - rule_metadata, modified_content = await rule.apply(modified_content) - logger.debug(f"Rule {i} extracted metadata: {rule_metadata}") - metadata.update(rule_metadata) - except Exception as e: - logger.error(f"Failed to apply rule {rule.type}: {str(e)}") - continue - - except Exception as e: - logger.error(f"Failed to process rules: {str(e)}") - return metadata, content - - logger.debug(f"Completed processing {len(rules)} rules. Final metadata: {metadata}") - return metadata, modified_content - def _parse_rule(self, rule_dict: Dict[str, Any]) -> BaseRule: """Parse a rule dictionary into a rule object""" rule_type = rule_dict.get("type") - logger.debug(f"Parsing rule of type: {rule_type}") + stage = rule_dict.get("stage") + + if not stage: + # Handle missing stage - default to post_parsing for backward compatibility + logger.warning(f"Rule is missing 'stage' field, defaulting to 'post_parsing': {rule_dict}") + rule_dict["stage"] = "post_parsing" + stage = "post_parsing" + + if stage not in ["post_parsing", "post_chunking"]: + raise ValueError(f"Invalid stage '{stage}' in rule: {rule_dict}") + + logger.debug(f"Parsing rule of type: {rule_type}, stage: {stage}") if rule_type == "metadata_extraction": return MetadataExtractionRule(**rule_dict) @@ -91,3 +65,113 @@ class RulesProcessor: return NaturalLanguageRule(**rule_dict) else: raise ValueError(f"Unknown rule type: {rule_type}") + + async def process_document_rules(self, content: str, rules: List[Dict[str, Any]]) -> Tuple[Dict[str, Any], str]: + """ + Process rules intended for the post-parsing stage (full document text). + + Args: + content: The full document content (text) + rules: The original list of rule dictionaries + + Returns: + Tuple[Dict[str, Any], str]: (extracted_metadata, modified_content) + """ + logger.debug(f"Processing document-level rules (post_parsing) on content length {len(content)}") + document_metadata = {} + modified_content = content + parsed_rules = [] + + # Process rules for post_parsing stage + for rule_dict in rules: + try: + # Let _parse_rule handle stage defaulting + rule = self._parse_rule(rule_dict) + + # Only include rules for post_parsing stage + if rule.stage == "post_parsing": + parsed_rules.append(rule) + except ValueError as e: + logger.warning(f"Skipping invalid document rule: {e}") + continue # Skip this rule + + logger.debug(f"Applying {len(parsed_rules)} post_parsing rules.") + for i, rule in enumerate(parsed_rules, 1): + try: + logger.debug(f"Applying document rule {i}/{len(parsed_rules)}: {rule.type}") + # Pass document metadata accumulated so far + rule_metadata, modified_content = await rule.apply(modified_content, document_metadata) + logger.debug(f"Rule {i} extracted metadata: {rule_metadata}") + document_metadata.update(rule_metadata) # Accumulate metadata + except Exception as e: + logger.error(f"Failed to apply document rule {rule.type}: {str(e)}") + # Decide: stop or continue? Let's continue for robustness. + continue + + logger.debug(f"Finished post_parsing rules. Final metadata: {document_metadata}") + return document_metadata, modified_content + + async def process_chunk_rules(self, chunk: Chunk, rules: List[Dict[str, Any]]) -> Tuple[Dict[str, Any], Chunk]: + """ + Process rules intended for the post-chunking stage on a single chunk. + Modifies the chunk's content if necessary and returns any extracted metadata. + + Args: + chunk: The Chunk object to process + rules: The original list of rule dictionaries + + Returns: + Tuple[Dict[str, Any], Chunk]: (extracted_metadata_for_doc, potentially_modified_chunk) + """ + logger.debug(f"Processing chunk-level rules (post_chunking) on chunk content length {len(chunk.content)}") + parsed_rules = [] + + # Process rules for post_chunking stage + for rule_dict in rules: + try: + # Let _parse_rule handle stage defaulting + rule = self._parse_rule(rule_dict) + + # Only include rules for post_chunking stage + if rule.stage == "post_chunking": + parsed_rules.append(rule) + except ValueError as e: + logger.warning(f"Skipping invalid chunk rule: {e}") + continue + + if not parsed_rules: + return {}, chunk # No applicable rules, return empty metadata and original chunk + + logger.debug(f"Applying {len(parsed_rules)} post_chunking rules to chunk.") + modified_content = chunk.content + # Metadata extracted by rules in *this chunk* to be aggregated at the document level + aggregated_chunk_rule_metadata = {} + + for i, rule in enumerate(parsed_rules, 1): + try: + logger.debug(f"Applying chunk rule {i}/{len(parsed_rules)}: {rule.type}") + # Pass original chunk metadata (if any) for context, but don't modify it here + rule_metadata, rule_modified_content = await rule.apply(modified_content, chunk.metadata) + + # If it's a metadata rule, aggregate its findings + if isinstance(rule, MetadataExtractionRule): + logger.debug(f"Rule {i} (Metadata) extracted metadata: {rule_metadata}") + # Aggregate metadata - simple update, last one wins for a key + aggregated_chunk_rule_metadata.update(rule_metadata) + + # If it's an NL rule, update the content for the next rule in this chunk + elif isinstance(rule, NaturalLanguageRule): + logger.debug(f"Rule {i} (NL) modified content.") + modified_content = rule_modified_content + + except Exception as e: + logger.error(f"Failed to apply chunk rule {rule.type}: {str(e)}") + continue + + # Update the chunk's content if it was modified by NL rules + # Note: We are NOT updating chunk.metadata here with aggregated_chunk_rule_metadata + chunk.content = modified_content + + logger.debug(f"Finished post_chunking rules for chunk. Metadata to aggregate: {aggregated_chunk_rule_metadata}") + # Return the aggregated metadata from this chunk and the potentially modified chunk + return aggregated_chunk_rule_metadata, chunk diff --git a/core/workers/ingestion_worker.py b/core/workers/ingestion_worker.py index 718d716..9388e61 100644 --- a/core/workers/ingestion_worker.py +++ b/core/workers/ingestion_worker.py @@ -143,15 +143,16 @@ async def process_ingestion_job( additional_metadata, text = await document_service.parser.parse_file_to_text(file_content, original_filename) logger.debug(f"Parsed file into text of length {len(text)}") - # 5. Apply rules if provided + # === Apply post_parsing rules === + document_rule_metadata = {} if rules_list: - rule_metadata, modified_text = await document_service.rules_processor.process_rules(text, rules_list) - # Update document metadata with extracted metadata from rules - metadata.update(rule_metadata) - - if modified_text: - text = modified_text - logger.info("Updated text with modified content from rules") + logger.info("Applying post-parsing rules...") + document_rule_metadata, text = await document_service.rules_processor.process_document_rules( + text, rules_list + ) + metadata.update(document_rule_metadata) # Merge metadata into main doc metadata + logger.info(f"Document metadata after post-parsing rules: {metadata}") + logger.info(f"Content length after post-parsing rules: {len(text)}") # 6. Retrieve the existing document logger.debug(f"Retrieving document with ID: {document_id}") @@ -201,17 +202,46 @@ async def process_ingestion_job( logger.debug("Updated document in database with parsed content") # 7. Split text into chunks - chunks = await document_service.parser.split_text(text) - if not chunks: - raise ValueError("No content chunks extracted") - logger.debug(f"Split processed text into {len(chunks)} chunks") + parsed_chunks = await document_service.parser.split_text(text) + if not parsed_chunks: + raise ValueError("No content chunks extracted after rules processing") + logger.debug(f"Split processed text into {len(parsed_chunks)} chunks") - # 8. Generate embeddings for chunks - embeddings = await document_service.embedding_model.embed_for_ingestion(chunks) + # === Apply post_chunking rules and aggregate metadata === + processed_chunks = [] + aggregated_chunk_metadata: Dict[str, Any] = {} # Initialize dict for aggregated metadata + chunk_contents = [] # Initialize list to collect chunk contents as we process them + + if rules_list: + logger.info("Applying post-chunking rules...") + + for chunk_obj in parsed_chunks: + # Get metadata *and* the potentially modified chunk + chunk_rule_metadata, processed_chunk = await document_service.rules_processor.process_chunk_rules( + chunk_obj, rules_list + ) + processed_chunks.append(processed_chunk) + chunk_contents.append(processed_chunk.content) # Collect content as we process + # Aggregate the metadata extracted from this chunk + aggregated_chunk_metadata.update(chunk_rule_metadata) + logger.info(f"Finished applying post-chunking rules to {len(processed_chunks)} chunks.") + logger.info(f"Aggregated metadata from all chunks: {aggregated_chunk_metadata}") + + # Update the document content with the stitched content from processed chunks + if processed_chunks: + logger.info("Updating document content with processed chunks...") + stitched_content = "\n".join(chunk_contents) + doc.system_metadata["content"] = stitched_content + logger.info(f"Updated document content with stitched chunks (length: {len(stitched_content)})") + else: + processed_chunks = parsed_chunks # No rules, use original chunks + + # 8. Generate embeddings for processed chunks + embeddings = await document_service.embedding_model.embed_for_ingestion(processed_chunks) logger.debug(f"Generated {len(embeddings)} embeddings") - # 9. Create chunk objects - chunk_objects = document_service._create_chunk_objects(doc.external_id, chunks, embeddings) + # 9. Create chunk objects with potentially modified chunk content and metadata + chunk_objects = document_service._create_chunk_objects(doc.external_id, processed_chunks, embeddings) logger.debug(f"Created {len(chunk_objects)} chunk objects") # 10. Handle ColPali embeddings if enabled @@ -229,8 +259,9 @@ async def process_ingestion_job( file_content_base64 = base64.b64encode(file_content).decode() + # Use the processed chunks for Colpali chunks_multivector = document_service._create_chunks_multivector( - file_type, file_content_base64, file_content, chunks + file_type, file_content_base64, file_content, processed_chunks ) logger.debug(f"Created {len(chunks_multivector)} chunks for multivector embedding") @@ -241,6 +272,16 @@ async def process_ingestion_job( doc.external_id, chunks_multivector, colpali_embeddings ) + # === Merge aggregated chunk metadata into document metadata === + if aggregated_chunk_metadata: + logger.info("Merging aggregated chunk metadata into document metadata...") + # Make sure doc.metadata exists + if not hasattr(doc, "metadata") or doc.metadata is None: + doc.metadata = {} + doc.metadata.update(aggregated_chunk_metadata) + logger.info(f"Final document metadata after merge: {doc.metadata}") + # =========================================================== + # Update document status to completed before storing doc.system_metadata["status"] = "completed" doc.system_metadata["updated_at"] = datetime.now(UTC) diff --git a/sdks/python/morphik/rules.py b/sdks/python/morphik/rules.py index 3c44094..e73e054 100644 --- a/sdks/python/morphik/rules.py +++ b/sdks/python/morphik/rules.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Any, Dict, Type, Union +from typing import Any, Dict, Literal, Type, Union from pydantic import BaseModel @@ -16,8 +16,19 @@ class Rule(ABC): class MetadataExtractionRule(Rule): """Server-side rule for extracting metadata using a schema""" - def __init__(self, schema: Union[Type[BaseModel], Dict[str, Any]]): + def __init__( + self, + schema: Union[Type[BaseModel], Dict[str, Any]], + stage: Literal["post_parsing", "post_chunking"] = "post_parsing", + ): + """ + Args: + schema: Pydantic model or dict schema defining metadata fields to extract + stage: When to apply the rule - either "post_parsing" (full document text) or + "post_chunking" (individual chunks). Defaults to "post_parsing" for backward compatibility. + """ self.schema = schema + self.stage = stage def to_dict(self) -> Dict[str, Any]: if isinstance(self.schema, type) and issubclass(self.schema, BaseModel): @@ -27,22 +38,25 @@ class MetadataExtractionRule(Rule): # Assume it's already a dict schema schema_dict = self.schema - return {"type": "metadata_extraction", "schema": schema_dict} + return {"type": "metadata_extraction", "schema": schema_dict, "stage": self.stage} class NaturalLanguageRule(Rule): """Server-side rule for transforming content using natural language""" - def __init__(self, prompt: str): + def __init__(self, prompt: str, stage: Literal["post_parsing", "post_chunking"] = "post_parsing"): """ Args: prompt: Instruction for how to transform the content e.g. "Remove any personal information" or "Convert to bullet points" + stage: When to apply the rule - either "post_parsing" (full document text) or + "post_chunking" (individual chunks). Defaults to "post_parsing" for backward compatibility. """ self.prompt = prompt + self.stage = stage def to_dict(self) -> Dict[str, Any]: - return {"type": "natural_language", "prompt": self.prompt} + return {"type": "natural_language", "prompt": self.prompt, "stage": self.stage} __all__ = ["Rule", "MetadataExtractionRule", "NaturalLanguageRule"]