Staged Rule Execution (#111)

This commit is contained in:
Adityavardhan Agrawal 2025-04-23 23:15:03 -07:00 committed by GitHub
parent 792f082e05
commit de1a7d2fd7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 357 additions and 250 deletions

View File

@ -1956,7 +1956,7 @@ async def set_folder_rule(
logger.info(f"Retry {retry_count}/{max_retries} after {delay}s delay")
await asyncio.sleep(delay)
extracted_metadata, _ = await rule.apply(doc_content)
extracted_metadata, _ = await rule.apply(doc_content, {})
logger.info(
f"Successfully extracted metadata on attempt {retry_count + 1}: "
f"{extracted_metadata}"

View File

@ -1,6 +1,6 @@
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Literal
from typing import Any, Dict, Literal, Optional
import litellm
from pydantic import BaseModel
@ -15,14 +15,16 @@ class BaseRule(BaseModel, ABC):
"""Base model for all rules"""
type: str
stage: Literal["post_parsing", "post_chunking"]
@abstractmethod
async def apply(self, content: str) -> tuple[Dict[str, Any], str]:
async def apply(self, content: str, metadata: Optional[Dict[str, Any]] = None) -> tuple[Dict[str, Any], str]:
"""
Apply the rule to the content.
Args:
content: The content to apply the rule to
metadata: Optional existing metadata that may be used or modified by the rule
Returns:
tuple[Dict[str, Any], str]: (metadata, modified_content)
@ -42,7 +44,7 @@ class MetadataExtractionRule(BaseRule):
type: Literal["metadata_extraction"]
schema: Dict[str, Any]
async def apply(self, content: str) -> tuple[Dict[str, Any], str]:
async def apply(self, content: str, metadata: Optional[Dict[str, Any]] = None) -> tuple[Dict[str, Any], str]:
"""Extract metadata according to schema"""
import instructor
from pydantic import create_model
@ -87,8 +89,11 @@ class MetadataExtractionRule(BaseRule):
schema_text = "\n".join(schema_descriptions)
# Adjust prompt based on whether it's a chunk or full document
prompt_context = "chunk of text" if self.stage == "post_chunking" else "text"
prompt = f"""
Extract metadata from the following text according to this schema:
Extract metadata from the following {prompt_context} according to this schema:
{schema_text}
@ -96,7 +101,8 @@ class MetadataExtractionRule(BaseRule):
{content}
Follow these guidelines:
1. Extract all requested information as simple strings, numbers, or booleans (not as objects or nested structures)
1. Extract all requested information as simple strings, numbers, or booleans
(not as objects or nested structures)
2. If information is not present, indicate this with null instead of making something up
3. Answer directly with the requested information - don't include explanations or reasoning
4. Be concise but accurate in your extractions
@ -109,7 +115,11 @@ class MetadataExtractionRule(BaseRule):
system_message = {
"role": "system",
"content": "You are a metadata extraction assistant. Extract structured metadata from text precisely following the provided schema. Always return the metadata as direct values (strings, numbers, booleans), not as objects with additional properties.",
"content": (
"You are a metadata extraction assistant. Extract structured metadata from text "
"precisely following the provided schema. Always return the metadata as direct values "
"(strings, numbers, booleans), not as objects with additional properties."
),
}
user_message = {"role": "user", "content": prompt}
@ -133,13 +143,14 @@ class MetadataExtractionRule(BaseRule):
)
# Convert pydantic model to dict
metadata = response.model_dump()
extracted_metadata = response.model_dump()
except Exception as e:
logger.error(f"Error in instructor metadata extraction: {str(e)}")
metadata = {}
extracted_metadata = {}
return metadata, content
# Metadata extraction doesn't modify content
return extracted_metadata, content
class TransformationOutput(BaseModel):
@ -154,12 +165,15 @@ class NaturalLanguageRule(BaseRule):
type: Literal["natural_language"]
prompt: str
async def apply(self, content: str) -> tuple[Dict[str, Any], str]:
async def apply(self, content: str, metadata: Optional[Dict[str, Any]] = None) -> tuple[Dict[str, Any], str]:
"""Transform content according to prompt"""
import instructor
# Adjust prompt based on whether it's a chunk or full document
prompt_context = "chunk of text" if self.stage == "post_chunking" else "text"
prompt = f"""
Your task is to transform the following text according to this instruction:
Your task is to transform the following {prompt_context} according to this instruction:
{self.prompt}
Text to transform:
@ -175,7 +189,10 @@ class NaturalLanguageRule(BaseRule):
system_message = {
"role": "system",
"content": "You are a text transformation assistant. Transform text precisely following the provided instructions.",
"content": (
"You are a text transformation assistant. Transform text precisely following "
"the provided instructions."
),
}
user_message = {"role": "user", "content": prompt}
@ -205,4 +222,5 @@ class NaturalLanguageRule(BaseRule):
logger.error(f"Error in instructor text transformation: {str(e)}")
transformed_text = content # Return original content on error
# Natural language rules modify content, don't add metadata directly
return {}, transformed_text

View File

@ -132,6 +132,9 @@ class DocumentService:
# Maps cache name to active cache object
self.active_caches: Dict[str, BaseCache] = {}
# Store for aggregated metadata from chunk rules
self._last_aggregated_metadata: Dict[str, Any] = {}
async def retrieve_chunks(
self,
query: str,
@ -603,42 +606,83 @@ class DocumentService:
num_pages = int(len(content) / (CHARS_PER_TOKEN * TOKENS_PER_PAGE)) #
await check_and_increment_limits(auth, "ingest", num_pages, doc.external_id)
# Apply rules if provided
# === Apply post_parsing rules ===
document_rule_metadata = {}
if rules:
rule_metadata, modified_text = await self.rules_processor.process_rules(content, rules)
logger.info("Applying post-parsing rules...")
document_rule_metadata, content = await self.rules_processor.process_document_rules(content, rules)
# Update document metadata with extracted metadata from rules
metadata.update(rule_metadata)
metadata.update(document_rule_metadata)
doc.metadata = metadata # Update doc metadata after rules
if modified_text:
content = modified_text
logger.info("Updated content with modified text from rules")
logger.info(f"Document metadata after post-parsing rules: {metadata}")
logger.info(f"Content length after post-parsing rules: {len(content)}")
# Store full content before chunking
doc.system_metadata["content"] = content
# Split into chunks after all processing is done
chunks = await self.parser.split_text(content)
if not chunks:
raise ValueError("No content chunks extracted")
logger.debug(f"Split processed text into {len(chunks)} chunks")
# Split text into chunks
parsed_chunks = await self.parser.split_text(content)
if not parsed_chunks:
raise ValueError("No content chunks extracted after rules processing")
logger.debug(f"Split processed text into {len(parsed_chunks)} chunks")
# Generate embeddings for chunks
embeddings = await self.embedding_model.embed_for_ingestion(chunks)
# === Apply post_chunking rules and aggregate metadata ===
processed_chunks = []
aggregated_chunk_metadata: Dict[str, Any] = {} # Initialize dict for aggregated metadata
chunk_contents = [] # Initialize list to collect chunk contents efficiently
if rules:
logger.info("Applying post-chunking rules...")
for chunk_obj in parsed_chunks:
# Get metadata *and* the potentially modified chunk
chunk_rule_metadata, processed_chunk = await self.rules_processor.process_chunk_rules(chunk_obj, rules)
processed_chunks.append(processed_chunk)
chunk_contents.append(processed_chunk.content) # Collect content as we process
# Aggregate the metadata extracted from this chunk
aggregated_chunk_metadata.update(chunk_rule_metadata)
logger.info(f"Finished applying post-chunking rules to {len(processed_chunks)} chunks.")
logger.info(f"Aggregated metadata from all chunks: {aggregated_chunk_metadata}")
# Update the document content with the stitched content from processed chunks
if processed_chunks:
logger.info("Updating document content with processed chunks...")
stitched_content = "\n".join(chunk_contents)
doc.system_metadata["content"] = stitched_content
logger.info(f"Updated document content with stitched chunks (length: {len(stitched_content)})")
else:
processed_chunks = parsed_chunks # No rules, use original chunks
# Generate embeddings for processed chunks
embeddings = await self.embedding_model.embed_for_ingestion(processed_chunks)
logger.debug(f"Generated {len(embeddings)} embeddings")
chunk_objects = self._create_chunk_objects(doc.external_id, chunks, embeddings)
# Create chunk objects with processed chunk content
chunk_objects = self._create_chunk_objects(doc.external_id, processed_chunks, embeddings)
logger.debug(f"Created {len(chunk_objects)} chunk objects")
chunk_objects_multivector = []
if use_colpali and self.colpali_embedding_model:
embeddings_multivector = await self.colpali_embedding_model.embed_for_ingestion(chunks)
embeddings_multivector = await self.colpali_embedding_model.embed_for_ingestion(processed_chunks)
logger.info(f"Generated {len(embeddings_multivector)} embeddings for multivector embedding")
chunk_objects_multivector = self._create_chunk_objects(doc.external_id, chunks, embeddings_multivector)
chunk_objects_multivector = self._create_chunk_objects(
doc.external_id, processed_chunks, embeddings_multivector
)
logger.info(f"Created {len(chunk_objects_multivector)} chunk objects for multivector embedding")
# Create and store chunk objects
# === Merge aggregated chunk metadata into document metadata ===
if aggregated_chunk_metadata:
logger.info("Merging aggregated chunk metadata into document metadata...")
# Make sure doc.metadata exists
if not hasattr(doc, "metadata") or doc.metadata is None:
doc.metadata = {}
doc.metadata.update(aggregated_chunk_metadata)
logger.info(f"Final document metadata after merge: {doc.metadata}")
# ===========================================================
# Store everything
await self._store_chunks_and_doc(chunk_objects, doc, use_colpali, chunk_objects_multivector)
logger.debug(f"Successfully stored text document {doc.external_id}")
@ -654,146 +698,6 @@ class DocumentService:
return doc
# TODO: check if it's unused. if so, remove it.
async def ingest_file(
self,
file: UploadFile,
metadata: Dict[str, Any],
auth: AuthContext,
rules: Optional[List[str]] = None,
use_colpali: Optional[bool] = None,
folder_name: Optional[str] = None,
end_user_id: Optional[str] = None,
) -> Document:
"""Ingest a file document."""
if "write" not in auth.permissions:
raise PermissionError("User does not have write permission")
# Read file content
file_content = await file.read()
file_size = len(file_content) # Get file size in bytes for limit checking
# Check limits before doing any expensive processing
from core.config import get_settings
settings = get_settings()
if settings.MODE == "cloud" and auth.user_id:
# Check limits before proceeding with parsing
from core.api import check_and_increment_limits
await check_and_increment_limits(auth, "storage_file", 1)
await check_and_increment_limits(auth, "storage_size", file_size)
# Now proceed with parsing and processing the file
file_type = filetype.guess(file_content)
# Set default mime type for cases where filetype.guess returns None
mime_type = ""
if file_type is not None:
mime_type = file_type.mime
elif file.filename:
# Try to determine by file extension as fallback
import mimetypes
guessed_type = mimetypes.guess_type(file.filename)[0]
if guessed_type:
mime_type = guessed_type
else:
# Default for text files
mime_type = "text/plain"
else:
mime_type = "application/octet-stream" # Generic binary data
logger.info(f"Determined MIME type: {mime_type} for file {file.filename}")
# Parse file to text first
additional_metadata, text = await self.parser.parse_file_to_text(file_content, file.filename)
logger.debug(f"Parsed file into text of length {len(text)}")
# Apply rules if provided
if rules:
rule_metadata, modified_text = await self.rules_processor.process_rules(text, rules)
# Update document metadata with extracted metadata from rules
metadata.update(rule_metadata)
if modified_text:
text = modified_text
logger.info("Updated text with modified content from rules")
doc = Document(
content_type=mime_type,
filename=file.filename,
metadata=metadata,
owner={"type": auth.entity_type, "id": auth.entity_id},
access_control={
"readers": [auth.entity_id],
"writers": [auth.entity_id],
"admins": [auth.entity_id],
"user_id": (
[auth.user_id] if auth.user_id else []
), # Add user_id to access control for filtering (as a list)
},
additional_metadata=additional_metadata,
)
# Add folder_name and end_user_id to system_metadata if provided
if folder_name:
doc.system_metadata["folder_name"] = folder_name
# Check if the folder exists, if not create it
await self._ensure_folder_exists(folder_name, doc.external_id, auth)
if end_user_id:
doc.system_metadata["end_user_id"] = end_user_id
if settings.MODE == "cloud" and auth.user_id:
# Check limits before proceeding with parsing
from core.api import check_and_increment_limits
num_pages = int(len(text) / (CHARS_PER_TOKEN * TOKENS_PER_PAGE)) #
await check_and_increment_limits(auth, "ingest", num_pages, doc.external_id)
# Store full content
doc.system_metadata["content"] = text
logger.debug(f"Created file document record with ID {doc.external_id}")
file_content_base64 = base64.b64encode(file_content).decode()
# Store the original file
storage_info = await self.storage.upload_from_base64(file_content_base64, doc.external_id, file.content_type)
doc.storage_info = {"bucket": storage_info[0], "key": storage_info[1]}
logger.debug(f"Stored file in bucket `{storage_info[0]}` with key `{storage_info[1]}`")
# Split into chunks after all processing is done
chunks = await self.parser.split_text(text)
if not chunks:
raise ValueError("No content chunks extracted")
logger.debug(f"Split processed text into {len(chunks)} chunks")
# Generate embeddings for chunks
embeddings = await self.embedding_model.embed_for_ingestion(chunks)
logger.debug(f"Generated {len(embeddings)} embeddings")
# Create and store chunk objects
chunk_objects = self._create_chunk_objects(doc.external_id, chunks, embeddings)
logger.debug(f"Created {len(chunk_objects)} chunk objects")
chunk_objects_multivector = []
logger.debug(f"use_colpali: {use_colpali}")
if use_colpali and self.colpali_embedding_model:
chunks_multivector = self._create_chunks_multivector(file_type, file_content_base64, file_content, chunks)
logger.debug(f"Created {len(chunks_multivector)} chunks for multivector embedding")
colpali_embeddings = await self.colpali_embedding_model.embed_for_ingestion(chunks_multivector)
logger.debug(f"Generated {len(colpali_embeddings)} embeddings for multivector embedding")
chunk_objects_multivector = self._create_chunk_objects(
doc.external_id, chunks_multivector, colpali_embeddings
)
# Store everything
doc.chunk_ids = await self._store_chunks_and_doc(chunk_objects, doc, use_colpali, chunk_objects_multivector)
logger.debug(f"Successfully stored file document {doc.external_id}")
return doc
def img_to_base64_str(self, img: Image):
buffered = BytesIO()
img.save(buffered, format="PNG")
@ -1373,10 +1277,32 @@ class DocumentService:
return await self._update_document_metadata_only(doc, auth)
# Process content into chunks and generate embeddings
chunks, chunk_objects = await self._process_chunks_and_embeddings(doc.external_id, updated_content)
chunks, chunk_objects = await self._process_chunks_and_embeddings(doc.external_id, updated_content, rules)
if not chunks:
return None
# If we have rules processing, the chunks may have modified content
# Update document content with stitched content from processed chunks
if rules and chunks:
chunk_contents = [chunk.content for chunk in chunks]
stitched_content = "\n".join(chunk_contents)
# Check if content actually changed
if stitched_content != updated_content:
logger.info("Updating document content with stitched content from processed chunks...")
doc.system_metadata["content"] = stitched_content
logger.info(f"Updated document content with stitched chunks (length: {len(stitched_content)})")
# Merge any aggregated metadata from chunk rules
if hasattr(self, "_last_aggregated_metadata") and self._last_aggregated_metadata:
logger.info("Merging aggregated chunk metadata into document metadata...")
# Make sure doc.metadata exists
if not hasattr(doc, "metadata") or doc.metadata is None:
doc.metadata = {}
doc.metadata.update(self._last_aggregated_metadata)
logger.info(f"Final document metadata after merge: {doc.metadata}")
# Clear the temporary metadata
self._last_aggregated_metadata = {}
# Handle colpali (multi-vector) embeddings if needed
chunk_objects_multivector = await self._process_colpali_embeddings(
use_colpali, doc.external_id, chunks, file, file_type, file_content, file_content_base64
@ -1423,16 +1349,16 @@ class DocumentService:
if filename:
doc.filename = filename
# Apply rules if provided for text content
# Apply post_parsing rules if provided
if rules:
rule_metadata, modified_text = await self.rules_processor.process_rules(content, rules)
logger.info("Applying post-parsing rules to text update...")
rule_metadata, modified_content = await self.rules_processor.process_document_rules(content, rules)
# Update metadata with extracted metadata from rules
if metadata is not None:
metadata.update(rule_metadata)
if modified_text:
update_content = modified_text
logger.info("Updated content with modified text from rules")
update_content = modified_content
logger.info(f"Content length after post-parsing rules: {len(update_content)}")
return update_content
@ -1451,16 +1377,16 @@ class DocumentService:
additional_file_metadata, file_text = await self.parser.parse_file_to_text(file_content, file.filename)
logger.info(f"Parsed file into text of length {len(file_text)}")
# Apply rules if provided for file content
# Apply post_parsing rules if provided for file content
if rules:
rule_metadata, modified_text = await self.rules_processor.process_rules(file_text, rules)
logger.info("Applying post-parsing rules to file update...")
rule_metadata, modified_text = await self.rules_processor.process_document_rules(file_text, rules)
# Update metadata with extracted metadata from rules
if metadata is not None:
metadata.update(rule_metadata)
if modified_text:
file_text = modified_text
logger.info("Updated file content with modified text from rules")
file_text = modified_text
logger.info(f"File content length after post-parsing rules: {len(file_text)}")
# Add additional metadata from file if available
if additional_file_metadata:
@ -1628,26 +1554,50 @@ class DocumentService:
return doc
async def _process_chunks_and_embeddings(
self, doc_id: str, content: str
self, doc_id: str, content: str, rules: Optional[List[Dict[str, Any]]] = None
) -> tuple[List[Chunk], List[DocumentChunk]]:
"""Process content into chunks and generate embeddings."""
# Split content into chunks
chunks = await self.parser.split_text(content)
if not chunks:
parsed_chunks = await self.parser.split_text(content)
if not parsed_chunks:
logger.error("No content chunks extracted after update")
return None, None
logger.info(f"Split updated text into {len(chunks)} chunks")
logger.info(f"Split updated text into {len(parsed_chunks)} chunks")
# Generate embeddings for new chunks
embeddings = await self.embedding_model.embed_for_ingestion(chunks)
# Apply post_chunking rules and aggregate metadata if provided
processed_chunks = []
aggregated_chunk_metadata: Dict[str, Any] = {} # Initialize dict for aggregated metadata
chunk_contents = [] # Initialize list to collect chunk contents efficiently
if rules:
logger.info("Applying post-chunking rules...")
for chunk_obj in parsed_chunks:
# Get metadata *and* the potentially modified chunk
chunk_rule_metadata, processed_chunk = await self.rules_processor.process_chunk_rules(chunk_obj, rules)
processed_chunks.append(processed_chunk)
chunk_contents.append(processed_chunk.content) # Collect content as we process
# Aggregate the metadata extracted from this chunk
aggregated_chunk_metadata.update(chunk_rule_metadata)
logger.info(f"Finished applying post-chunking rules to {len(processed_chunks)} chunks.")
logger.info(f"Aggregated metadata from all chunks: {aggregated_chunk_metadata}")
# Return this metadata so the calling method can update the document metadata
self._last_aggregated_metadata = aggregated_chunk_metadata
else:
processed_chunks = parsed_chunks # No rules, use original chunks
self._last_aggregated_metadata = {}
# Generate embeddings for processed chunks
embeddings = await self.embedding_model.embed_for_ingestion(processed_chunks)
logger.info(f"Generated {len(embeddings)} embeddings")
# Create new chunk objects
chunk_objects = self._create_chunk_objects(doc_id, chunks, embeddings)
chunk_objects = self._create_chunk_objects(doc_id, processed_chunks, embeddings)
logger.info(f"Created {len(chunk_objects)} chunk objects")
return chunks, chunk_objects
return processed_chunks, chunk_objects
async def _process_colpali_embeddings(
self,

View File

@ -4,6 +4,7 @@ from typing import Any, Dict, List, Tuple
from pydantic import BaseModel
from core.config import get_settings
from core.models.chunk import Chunk
from core.models.rules import BaseRule, MetadataExtractionRule, NaturalLanguageRule
logger = logging.getLogger(__name__)
@ -42,48 +43,21 @@ class RulesProcessor:
f"Initializing RulesProcessor with {settings.RULES_PROVIDER} provider using model {settings.RULES_MODEL}"
)
async def process_rules(self, content: str, rules: List[Dict[str, Any]]) -> Tuple[Dict[str, Any], str]:
"""
Process a list of rules on content.
Args:
content: The document content
rules: List of rule dictionaries
Returns:
Tuple[Dict[str, Any], str]: (extracted_metadata, modified_content)
"""
logger.debug(f"Processing {len(rules)} rules on content of length {len(content)}")
metadata = {}
modified_content = content
try:
# Parse all rules first to fail fast if any are invalid
parsed_rules = [self._parse_rule(rule) for rule in rules]
logger.debug(f"Successfully parsed {len(parsed_rules)} rules: {[r.type for r in parsed_rules]}")
# Apply rules in order
for i, rule in enumerate(parsed_rules, 1):
try:
logger.debug(f"Applying rule {i}/{len(parsed_rules)}: {rule.type}")
rule_metadata, modified_content = await rule.apply(modified_content)
logger.debug(f"Rule {i} extracted metadata: {rule_metadata}")
metadata.update(rule_metadata)
except Exception as e:
logger.error(f"Failed to apply rule {rule.type}: {str(e)}")
continue
except Exception as e:
logger.error(f"Failed to process rules: {str(e)}")
return metadata, content
logger.debug(f"Completed processing {len(rules)} rules. Final metadata: {metadata}")
return metadata, modified_content
def _parse_rule(self, rule_dict: Dict[str, Any]) -> BaseRule:
"""Parse a rule dictionary into a rule object"""
rule_type = rule_dict.get("type")
logger.debug(f"Parsing rule of type: {rule_type}")
stage = rule_dict.get("stage")
if not stage:
# Handle missing stage - default to post_parsing for backward compatibility
logger.warning(f"Rule is missing 'stage' field, defaulting to 'post_parsing': {rule_dict}")
rule_dict["stage"] = "post_parsing"
stage = "post_parsing"
if stage not in ["post_parsing", "post_chunking"]:
raise ValueError(f"Invalid stage '{stage}' in rule: {rule_dict}")
logger.debug(f"Parsing rule of type: {rule_type}, stage: {stage}")
if rule_type == "metadata_extraction":
return MetadataExtractionRule(**rule_dict)
@ -91,3 +65,113 @@ class RulesProcessor:
return NaturalLanguageRule(**rule_dict)
else:
raise ValueError(f"Unknown rule type: {rule_type}")
async def process_document_rules(self, content: str, rules: List[Dict[str, Any]]) -> Tuple[Dict[str, Any], str]:
"""
Process rules intended for the post-parsing stage (full document text).
Args:
content: The full document content (text)
rules: The original list of rule dictionaries
Returns:
Tuple[Dict[str, Any], str]: (extracted_metadata, modified_content)
"""
logger.debug(f"Processing document-level rules (post_parsing) on content length {len(content)}")
document_metadata = {}
modified_content = content
parsed_rules = []
# Process rules for post_parsing stage
for rule_dict in rules:
try:
# Let _parse_rule handle stage defaulting
rule = self._parse_rule(rule_dict)
# Only include rules for post_parsing stage
if rule.stage == "post_parsing":
parsed_rules.append(rule)
except ValueError as e:
logger.warning(f"Skipping invalid document rule: {e}")
continue # Skip this rule
logger.debug(f"Applying {len(parsed_rules)} post_parsing rules.")
for i, rule in enumerate(parsed_rules, 1):
try:
logger.debug(f"Applying document rule {i}/{len(parsed_rules)}: {rule.type}")
# Pass document metadata accumulated so far
rule_metadata, modified_content = await rule.apply(modified_content, document_metadata)
logger.debug(f"Rule {i} extracted metadata: {rule_metadata}")
document_metadata.update(rule_metadata) # Accumulate metadata
except Exception as e:
logger.error(f"Failed to apply document rule {rule.type}: {str(e)}")
# Decide: stop or continue? Let's continue for robustness.
continue
logger.debug(f"Finished post_parsing rules. Final metadata: {document_metadata}")
return document_metadata, modified_content
async def process_chunk_rules(self, chunk: Chunk, rules: List[Dict[str, Any]]) -> Tuple[Dict[str, Any], Chunk]:
"""
Process rules intended for the post-chunking stage on a single chunk.
Modifies the chunk's content if necessary and returns any extracted metadata.
Args:
chunk: The Chunk object to process
rules: The original list of rule dictionaries
Returns:
Tuple[Dict[str, Any], Chunk]: (extracted_metadata_for_doc, potentially_modified_chunk)
"""
logger.debug(f"Processing chunk-level rules (post_chunking) on chunk content length {len(chunk.content)}")
parsed_rules = []
# Process rules for post_chunking stage
for rule_dict in rules:
try:
# Let _parse_rule handle stage defaulting
rule = self._parse_rule(rule_dict)
# Only include rules for post_chunking stage
if rule.stage == "post_chunking":
parsed_rules.append(rule)
except ValueError as e:
logger.warning(f"Skipping invalid chunk rule: {e}")
continue
if not parsed_rules:
return {}, chunk # No applicable rules, return empty metadata and original chunk
logger.debug(f"Applying {len(parsed_rules)} post_chunking rules to chunk.")
modified_content = chunk.content
# Metadata extracted by rules in *this chunk* to be aggregated at the document level
aggregated_chunk_rule_metadata = {}
for i, rule in enumerate(parsed_rules, 1):
try:
logger.debug(f"Applying chunk rule {i}/{len(parsed_rules)}: {rule.type}")
# Pass original chunk metadata (if any) for context, but don't modify it here
rule_metadata, rule_modified_content = await rule.apply(modified_content, chunk.metadata)
# If it's a metadata rule, aggregate its findings
if isinstance(rule, MetadataExtractionRule):
logger.debug(f"Rule {i} (Metadata) extracted metadata: {rule_metadata}")
# Aggregate metadata - simple update, last one wins for a key
aggregated_chunk_rule_metadata.update(rule_metadata)
# If it's an NL rule, update the content for the next rule in this chunk
elif isinstance(rule, NaturalLanguageRule):
logger.debug(f"Rule {i} (NL) modified content.")
modified_content = rule_modified_content
except Exception as e:
logger.error(f"Failed to apply chunk rule {rule.type}: {str(e)}")
continue
# Update the chunk's content if it was modified by NL rules
# Note: We are NOT updating chunk.metadata here with aggregated_chunk_rule_metadata
chunk.content = modified_content
logger.debug(f"Finished post_chunking rules for chunk. Metadata to aggregate: {aggregated_chunk_rule_metadata}")
# Return the aggregated metadata from this chunk and the potentially modified chunk
return aggregated_chunk_rule_metadata, chunk

View File

@ -143,15 +143,16 @@ async def process_ingestion_job(
additional_metadata, text = await document_service.parser.parse_file_to_text(file_content, original_filename)
logger.debug(f"Parsed file into text of length {len(text)}")
# 5. Apply rules if provided
# === Apply post_parsing rules ===
document_rule_metadata = {}
if rules_list:
rule_metadata, modified_text = await document_service.rules_processor.process_rules(text, rules_list)
# Update document metadata with extracted metadata from rules
metadata.update(rule_metadata)
if modified_text:
text = modified_text
logger.info("Updated text with modified content from rules")
logger.info("Applying post-parsing rules...")
document_rule_metadata, text = await document_service.rules_processor.process_document_rules(
text, rules_list
)
metadata.update(document_rule_metadata) # Merge metadata into main doc metadata
logger.info(f"Document metadata after post-parsing rules: {metadata}")
logger.info(f"Content length after post-parsing rules: {len(text)}")
# 6. Retrieve the existing document
logger.debug(f"Retrieving document with ID: {document_id}")
@ -201,17 +202,46 @@ async def process_ingestion_job(
logger.debug("Updated document in database with parsed content")
# 7. Split text into chunks
chunks = await document_service.parser.split_text(text)
if not chunks:
raise ValueError("No content chunks extracted")
logger.debug(f"Split processed text into {len(chunks)} chunks")
parsed_chunks = await document_service.parser.split_text(text)
if not parsed_chunks:
raise ValueError("No content chunks extracted after rules processing")
logger.debug(f"Split processed text into {len(parsed_chunks)} chunks")
# 8. Generate embeddings for chunks
embeddings = await document_service.embedding_model.embed_for_ingestion(chunks)
# === Apply post_chunking rules and aggregate metadata ===
processed_chunks = []
aggregated_chunk_metadata: Dict[str, Any] = {} # Initialize dict for aggregated metadata
chunk_contents = [] # Initialize list to collect chunk contents as we process them
if rules_list:
logger.info("Applying post-chunking rules...")
for chunk_obj in parsed_chunks:
# Get metadata *and* the potentially modified chunk
chunk_rule_metadata, processed_chunk = await document_service.rules_processor.process_chunk_rules(
chunk_obj, rules_list
)
processed_chunks.append(processed_chunk)
chunk_contents.append(processed_chunk.content) # Collect content as we process
# Aggregate the metadata extracted from this chunk
aggregated_chunk_metadata.update(chunk_rule_metadata)
logger.info(f"Finished applying post-chunking rules to {len(processed_chunks)} chunks.")
logger.info(f"Aggregated metadata from all chunks: {aggregated_chunk_metadata}")
# Update the document content with the stitched content from processed chunks
if processed_chunks:
logger.info("Updating document content with processed chunks...")
stitched_content = "\n".join(chunk_contents)
doc.system_metadata["content"] = stitched_content
logger.info(f"Updated document content with stitched chunks (length: {len(stitched_content)})")
else:
processed_chunks = parsed_chunks # No rules, use original chunks
# 8. Generate embeddings for processed chunks
embeddings = await document_service.embedding_model.embed_for_ingestion(processed_chunks)
logger.debug(f"Generated {len(embeddings)} embeddings")
# 9. Create chunk objects
chunk_objects = document_service._create_chunk_objects(doc.external_id, chunks, embeddings)
# 9. Create chunk objects with potentially modified chunk content and metadata
chunk_objects = document_service._create_chunk_objects(doc.external_id, processed_chunks, embeddings)
logger.debug(f"Created {len(chunk_objects)} chunk objects")
# 10. Handle ColPali embeddings if enabled
@ -229,8 +259,9 @@ async def process_ingestion_job(
file_content_base64 = base64.b64encode(file_content).decode()
# Use the processed chunks for Colpali
chunks_multivector = document_service._create_chunks_multivector(
file_type, file_content_base64, file_content, chunks
file_type, file_content_base64, file_content, processed_chunks
)
logger.debug(f"Created {len(chunks_multivector)} chunks for multivector embedding")
@ -241,6 +272,16 @@ async def process_ingestion_job(
doc.external_id, chunks_multivector, colpali_embeddings
)
# === Merge aggregated chunk metadata into document metadata ===
if aggregated_chunk_metadata:
logger.info("Merging aggregated chunk metadata into document metadata...")
# Make sure doc.metadata exists
if not hasattr(doc, "metadata") or doc.metadata is None:
doc.metadata = {}
doc.metadata.update(aggregated_chunk_metadata)
logger.info(f"Final document metadata after merge: {doc.metadata}")
# ===========================================================
# Update document status to completed before storing
doc.system_metadata["status"] = "completed"
doc.system_metadata["updated_at"] = datetime.now(UTC)

View File

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Type, Union
from typing import Any, Dict, Literal, Type, Union
from pydantic import BaseModel
@ -16,8 +16,19 @@ class Rule(ABC):
class MetadataExtractionRule(Rule):
"""Server-side rule for extracting metadata using a schema"""
def __init__(self, schema: Union[Type[BaseModel], Dict[str, Any]]):
def __init__(
self,
schema: Union[Type[BaseModel], Dict[str, Any]],
stage: Literal["post_parsing", "post_chunking"] = "post_parsing",
):
"""
Args:
schema: Pydantic model or dict schema defining metadata fields to extract
stage: When to apply the rule - either "post_parsing" (full document text) or
"post_chunking" (individual chunks). Defaults to "post_parsing" for backward compatibility.
"""
self.schema = schema
self.stage = stage
def to_dict(self) -> Dict[str, Any]:
if isinstance(self.schema, type) and issubclass(self.schema, BaseModel):
@ -27,22 +38,25 @@ class MetadataExtractionRule(Rule):
# Assume it's already a dict schema
schema_dict = self.schema
return {"type": "metadata_extraction", "schema": schema_dict}
return {"type": "metadata_extraction", "schema": schema_dict, "stage": self.stage}
class NaturalLanguageRule(Rule):
"""Server-side rule for transforming content using natural language"""
def __init__(self, prompt: str):
def __init__(self, prompt: str, stage: Literal["post_parsing", "post_chunking"] = "post_parsing"):
"""
Args:
prompt: Instruction for how to transform the content
e.g. "Remove any personal information" or "Convert to bullet points"
stage: When to apply the rule - either "post_parsing" (full document text) or
"post_chunking" (individual chunks). Defaults to "post_parsing" for backward compatibility.
"""
self.prompt = prompt
self.stage = stage
def to_dict(self) -> Dict[str, Any]:
return {"type": "natural_language", "prompt": self.prompt}
return {"type": "natural_language", "prompt": self.prompt, "stage": self.stage}
__all__ = ["Rule", "MetadataExtractionRule", "NaturalLanguageRule"]