Add app_id scoping and fix raw image extraction (#129)

This commit is contained in:
Adityavardhan Agrawal 2025-05-03 12:29:44 -07:00 committed by GitHub
parent ae5297e5dc
commit 1897485c87
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 187 additions and 95 deletions

View File

@ -434,6 +434,7 @@ async def ingest_file(
"writers": [auth.entity_id], "writers": [auth.entity_id],
"admins": [auth.entity_id], "admins": [auth.entity_id],
"user_id": [auth.user_id] if auth.user_id else [], "user_id": [auth.user_id] if auth.user_id else [],
"app_access": ([auth.app_id] if auth.app_id else []),
}, },
system_metadata={"status": "processing"}, system_metadata={"status": "processing"},
) )
@ -443,6 +444,8 @@ async def ingest_file(
doc.system_metadata["folder_name"] = folder_name doc.system_metadata["folder_name"] = folder_name
if end_user_id: if end_user_id:
doc.system_metadata["end_user_id"] = end_user_id doc.system_metadata["end_user_id"] = end_user_id
if auth.app_id:
doc.system_metadata["app_id"] = auth.app_id
# Set processing status # Set processing status
doc.system_metadata["status"] = "processing" doc.system_metadata["status"] = "processing"
@ -639,6 +642,7 @@ async def batch_ingest_files(
"writers": [auth.entity_id], "writers": [auth.entity_id],
"admins": [auth.entity_id], "admins": [auth.entity_id],
"user_id": [auth.user_id] if auth.user_id else [], "user_id": [auth.user_id] if auth.user_id else [],
"app_access": ([auth.app_id] if auth.app_id else []),
}, },
) )
@ -647,6 +651,8 @@ async def batch_ingest_files(
doc.system_metadata["folder_name"] = folder_name doc.system_metadata["folder_name"] = folder_name
if end_user_id: if end_user_id:
doc.system_metadata["end_user_id"] = end_user_id doc.system_metadata["end_user_id"] = end_user_id
if auth.app_id:
doc.system_metadata["app_id"] = auth.app_id
# Set processing status # Set processing status
doc.system_metadata["status"] = "processing" doc.system_metadata["status"] = "processing"
@ -813,6 +819,15 @@ async def batch_get_documents(request: Dict[str, Any], auth: AuthContext = Depen
if not document_ids: if not document_ids:
return [] return []
# Create system filters for folder and user scoping
system_filters = {}
if folder_name:
system_filters["folder_name"] = folder_name
if end_user_id:
system_filters["end_user_id"] = end_user_id
if auth.app_id:
system_filters["app_id"] = auth.app_id
return await document_service.batch_retrieve_documents(document_ids, auth, folder_name, end_user_id) return await document_service.batch_retrieve_documents(document_ids, auth, folder_name, end_user_id)
except PermissionError as e: except PermissionError as e:
raise HTTPException(status_code=403, detail=str(e)) raise HTTPException(status_code=403, detail=str(e))
@ -852,6 +867,15 @@ async def batch_get_chunks(request: Dict[str, Any], auth: AuthContext = Depends(
else: else:
chunk_sources.append(source) chunk_sources.append(source)
# Create system filters for folder and user scoping
system_filters = {}
if folder_name:
system_filters["folder_name"] = folder_name
if end_user_id:
system_filters["end_user_id"] = end_user_id
if auth.app_id:
system_filters["app_id"] = auth.app_id
return await document_service.batch_retrieve_chunks(chunk_sources, auth, folder_name, end_user_id, use_colpali) return await document_service.batch_retrieve_chunks(chunk_sources, auth, folder_name, end_user_id, use_colpali)
except PermissionError as e: except PermissionError as e:
raise HTTPException(status_code=403, detail=str(e)) raise HTTPException(status_code=403, detail=str(e))
@ -966,6 +990,8 @@ async def list_documents(
system_filters["folder_name"] = folder_name system_filters["folder_name"] = folder_name
if end_user_id: if end_user_id:
system_filters["end_user_id"] = end_user_id system_filters["end_user_id"] = end_user_id
if auth.app_id:
system_filters["app_id"] = auth.app_id
return await document_service.db.get_documents(auth, skip, limit, filters, system_filters) return await document_service.db.get_documents(auth, skip, limit, filters, system_filters)

View File

@ -8,7 +8,7 @@ from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import declarative_base, sessionmaker from sqlalchemy.orm import declarative_base, sessionmaker
from ..models.auth import AuthContext from ..models.auth import AuthContext, EntityType
from ..models.documents import Document, StorageFileInfo from ..models.documents import Document, StorageFileInfo
from ..models.folders import Folder from ..models.folders import Folder
from ..models.graph import Graph from ..models.graph import Graph
@ -200,7 +200,7 @@ class PostgresDatabase(BaseDatabase):
) )
logger.info("Added storage_files column to documents table") logger.info("Added storage_files column to documents table")
# Create indexes for folder_name and end_user_id in system_metadata for documents # Create indexes for folder_name, end_user_id and app_id in system_metadata for documents
await conn.execute( await conn.execute(
text( text(
""" """
@ -210,6 +210,24 @@ class PostgresDatabase(BaseDatabase):
) )
) )
await conn.execute(
text(
"""
CREATE INDEX IF NOT EXISTS idx_system_metadata_app_id
ON documents ((system_metadata->>'app_id'));
"""
)
)
await conn.execute(
text(
"""
CREATE INDEX IF NOT EXISTS idx_system_metadata_end_user_id
ON documents ((system_metadata->>'end_user_id'));
"""
)
)
# Create folders table if it doesn't exist # Create folders table if it doesn't exist
await conn.execute( await conn.execute(
text( text(
@ -256,15 +274,6 @@ class PostgresDatabase(BaseDatabase):
text("CREATE INDEX IF NOT EXISTS idx_folder_access_control ON folders USING gin (access_control);") text("CREATE INDEX IF NOT EXISTS idx_folder_access_control ON folders USING gin (access_control);")
) )
await conn.execute(
text(
"""
CREATE INDEX IF NOT EXISTS idx_system_metadata_end_user_id
ON documents ((system_metadata->>'end_user_id'));
"""
)
)
# Check if system_metadata column exists in graphs table # Check if system_metadata column exists in graphs table
result = await conn.execute( result = await conn.execute(
text( text(
@ -754,26 +763,38 @@ class PostgresDatabase(BaseDatabase):
return False return False
def _build_access_filter(self, auth: AuthContext) -> str: def _build_access_filter(self, auth: AuthContext) -> str:
"""Build PostgreSQL filter for access control.""" """Build PostgreSQL filter for access control.
filters = [
For developer-scoped tokens (i.e. those that include an ``app_id``) we *must* ensure
that the caller only ever sees documents that belong to that application. Simply
checking the developer entity ID is **insufficient**, because multiple apps created
by the same developer share the same entity ID. Therefore, when an ``app_id`` is
present, we additionally scope the filter by the ``app_id`` that is stored either
in ``system_metadata.app_id`` or in the ``access_control->app_access`` list.
"""
# Base clauses that will always be AND-ed with any additional application scoping.
base_clauses = [
f"owner->>'id' = '{auth.entity_id}'", f"owner->>'id' = '{auth.entity_id}'",
f"access_control->'readers' ? '{auth.entity_id}'", f"access_control->'readers' ? '{auth.entity_id}'",
f"access_control->'writers' ? '{auth.entity_id}'", f"access_control->'writers' ? '{auth.entity_id}'",
f"access_control->'admins' ? '{auth.entity_id}'", f"access_control->'admins' ? '{auth.entity_id}'",
] ]
if auth.entity_type == "DEVELOPER" and auth.app_id: # Developer token with app_id → restrict strictly by that app_id.
# Add app-specific access for developers if auth.entity_type == EntityType.DEVELOPER and auth.app_id:
filters.append(f"access_control->'app_access' ? '{auth.app_id}'") filters = [f"system_metadata->>'app_id' = '{auth.app_id}'"]
else:
filters = base_clauses.copy()
# Add user_id filter in cloud mode # In cloud mode further restrict by user_id when available (used for multi-tenant
# end-user isolation).
if auth.user_id: if auth.user_id:
from core.config import get_settings from core.config import get_settings
settings = get_settings() settings = get_settings()
if settings.MODE == "cloud": if settings.MODE == "cloud":
# Filter by user_id in access_control
filters.append(f"access_control->>'user_id' = '{auth.user_id}'") filters.append(f"access_control->>'user_id' = '{auth.user_id}'")
return " OR ".join(filters) return " OR ".join(filters)

View File

@ -159,17 +159,19 @@ class DocumentService:
should_rerank = use_reranking if use_reranking is not None else settings.USE_RERANKING should_rerank = use_reranking if use_reranking is not None else settings.USE_RERANKING
using_colpali = use_colpali if use_colpali is not None else False using_colpali = use_colpali if use_colpali is not None else False
# Launch embedding queries concurrently
embedding_tasks = [self.embedding_model.embed_for_query(query)]
if using_colpali and self.colpali_embedding_model:
embedding_tasks.append(self.colpali_embedding_model.embed_for_query(query))
# Build system filters for folder_name and end_user_id # Build system filters for folder_name and end_user_id
system_filters = {} system_filters = {}
if folder_name: if folder_name:
system_filters["folder_name"] = folder_name system_filters["folder_name"] = folder_name
if end_user_id: if end_user_id:
system_filters["end_user_id"] = end_user_id system_filters["end_user_id"] = end_user_id
if auth.app_id:
system_filters["app_id"] = auth.app_id
# Launch embedding queries concurrently
embedding_tasks = [self.embedding_model.embed_for_query(query)]
if using_colpali and self.colpali_embedding_model:
embedding_tasks.append(self.colpali_embedding_model.embed_for_query(query))
# Run embeddings and document authorization in parallel # Run embeddings and document authorization in parallel
results = await asyncio.gather( results = await asyncio.gather(
@ -357,6 +359,8 @@ class DocumentService:
system_filters["folder_name"] = folder_name system_filters["folder_name"] = folder_name
if end_user_id: if end_user_id:
system_filters["end_user_id"] = end_user_id system_filters["end_user_id"] = end_user_id
if auth.app_id:
system_filters["app_id"] = auth.app_id
# Use the database's batch retrieval method # Use the database's batch retrieval method
documents = await self.db.get_documents_by_id(document_ids, auth, system_filters) documents = await self.db.get_documents_by_id(document_ids, auth, system_filters)
@ -582,9 +586,7 @@ class DocumentService:
"readers": [auth.entity_id], "readers": [auth.entity_id],
"writers": [auth.entity_id], "writers": [auth.entity_id],
"admins": [auth.entity_id], "admins": [auth.entity_id],
"user_id": ( "user_id": [auth.user_id] if auth.user_id else [], # user scoping
[auth.user_id] if auth.user_id else []
), # Add user_id to access control for filtering (as a list)
}, },
) )
@ -597,6 +599,11 @@ class DocumentService:
if end_user_id: if end_user_id:
doc.system_metadata["end_user_id"] = end_user_id doc.system_metadata["end_user_id"] = end_user_id
# Tag document with app_id for segmentation
if auth.app_id:
doc.system_metadata["app_id"] = auth.app_id
logger.debug(f"Created text document record with ID {doc.external_id}") logger.debug(f"Created text document record with ID {doc.external_id}")
if settings.MODE == "cloud" and auth.user_id: if settings.MODE == "cloud" and auth.user_id:
@ -711,10 +718,47 @@ class DocumentService:
mime_type = file_type.mime if file_type is not None else "text/plain" mime_type = file_type.mime if file_type is not None else "text/plain"
logger.info(f"Creating chunks for multivector embedding for file type {mime_type}") logger.info(f"Creating chunks for multivector embedding for file type {mime_type}")
# If file_type is None, treat it as a text file # If file_type is None, attempt a light-weight heuristic to detect images
# Some JPGs with uncommon EXIF markers fail `filetype.guess`, leading to
# false "text" classification and, eventually, empty chunk lists. Try to
# open the bytes with Pillow; if that succeeds, treat it as an image.
if file_type is None: if file_type is None:
logger.info("File type is None, treating as text") try:
return [Chunk(content=chunk.content, metadata=(chunk.metadata | {"is_image": False})) for chunk in chunks] from PIL import Image as PILImage
PILImage.open(BytesIO(file_content)).verify()
logger.info("Heuristic image detection succeeded (Pillow). Treating as image.")
return [Chunk(content=file_content_base64, metadata={"is_image": True})]
except Exception:
logger.info("File type is None and not an image treating as text")
return [
Chunk(content=chunk.content, metadata=(chunk.metadata | {"is_image": False})) for chunk in chunks
]
# Treat any direct image MIME (e.g. "image/jpeg") as an image regardless of
# the more specialised pattern matching below. This is more robust for files
# where `filetype.guess` fails but we still know from the upload metadata that
# it is an image.
if mime_type.startswith("image/"):
try:
from PIL import Image as PILImage
img = PILImage.open(BytesIO(file_content))
# Resize the image to a max width of 512 while preserving aspect ratio to
# keep the base64 payload smaller (helps avoid context window errors).
max_width = 512
if img.width > max_width:
ratio = max_width / float(img.width)
new_height = int(float(img.height) * ratio)
img = img.resize((max_width, new_height))
buffered = BytesIO()
img.save(buffered, format="PNG", optimize=True)
img_b64 = "data:image/png;base64," + base64.b64encode(buffered.getvalue()).decode()
return [Chunk(content=img_b64, metadata={"is_image": True})]
except Exception as e:
logger.error(f"Error resizing image for base64 encoding: {e}. Falling back to original size.")
return [Chunk(content=file_content_base64, metadata={"is_image": True})]
match mime_type: match mime_type:
case file_type if file_type in IMAGE: case file_type if file_type in IMAGE:
@ -1475,64 +1519,6 @@ class DocumentService:
logger.warning(f"Unknown update strategy '{update_strategy}', defaulting to 'add'") logger.warning(f"Unknown update strategy '{update_strategy}', defaulting to 'add'")
return current_content + "\n\n" + update_content return current_content + "\n\n" + update_content
def _update_metadata_and_version(
self,
doc: Document,
metadata: Optional[Dict[str, Any]],
update_strategy: str,
file: Optional[UploadFile],
):
"""Update document metadata and version tracking."""
# Update metadata if provided - additive but replacing existing keys
if metadata:
doc.metadata.update(metadata)
# Ensure external_id is preserved in metadata
doc.metadata["external_id"] = doc.external_id
# Increment version
current_version = doc.system_metadata.get("version", 1)
doc.system_metadata["version"] = current_version + 1
doc.system_metadata["updated_at"] = datetime.now(UTC)
# Track update history
if "update_history" not in doc.system_metadata:
doc.system_metadata["update_history"] = []
update_entry = {
"timestamp": datetime.now(UTC).isoformat(),
"version": current_version + 1,
"strategy": update_strategy,
}
if file:
update_entry["filename"] = file.filename
if metadata:
update_entry["metadata_updated"] = True
doc.system_metadata["update_history"].append(update_entry)
# Ensure storage_files models are properly typed as StorageFileInfo objects
if hasattr(doc, "storage_files") and doc.storage_files:
# Convert to StorageFileInfo objects if they're dicts or ensure they're properly serializable
doc.storage_files = [
(
StorageFileInfo(**file)
if isinstance(file, dict)
else (
file
if isinstance(file, StorageFileInfo)
else (
StorageFileInfo(**file.model_dump())
if hasattr(file, "model_dump")
else StorageFileInfo(**file.dict()) if hasattr(file, "dict") else file
)
)
)
for file in doc.storage_files
]
async def _update_document_metadata_only(self, doc: Document, auth: AuthContext) -> Optional[Document]: async def _update_document_metadata_only(self, doc: Document, auth: AuthContext) -> Optional[Document]:
"""Update document metadata without reprocessing chunks.""" """Update document metadata without reprocessing chunks."""
updates = { updates = {
@ -1809,3 +1795,38 @@ class DocumentService:
"""Close all resources.""" """Close all resources."""
# Close any active caches # Close any active caches
self.active_caches.clear() self.active_caches.clear()
def _update_metadata_and_version(
self,
doc: Document,
metadata: Optional[Dict[str, Any]],
update_strategy: str,
file: Optional[UploadFile],
):
"""Update document metadata and version tracking."""
# Merge/replace metadata
if metadata:
doc.metadata.update(metadata)
# Ensure external_id is preserved
doc.metadata["external_id"] = doc.external_id
# Increment version counter
current_version = doc.system_metadata.get("version", 1)
doc.system_metadata["version"] = current_version + 1
doc.system_metadata["updated_at"] = datetime.now(UTC)
# Maintain simple history list
history = doc.system_metadata.setdefault("update_history", [])
entry = {
"timestamp": datetime.now(UTC).isoformat(),
"version": current_version + 1,
"strategy": update_strategy,
}
if file:
entry["filename"] = file.filename
if metadata:
entry["metadata_updated"] = True
history.append(entry)

View File

@ -245,20 +245,37 @@ async def process_ingestion_job(
chunking_start = time.time() chunking_start = time.time()
parsed_chunks = await document_service.parser.split_text(text) parsed_chunks = await document_service.parser.split_text(text)
if not parsed_chunks: if not parsed_chunks:
raise ValueError("No content chunks extracted after rules processing") # No text was extracted from the file. In many cases (e.g. pure images)
logger.debug(f"Split processed text into {len(parsed_chunks)} chunks") # we can still proceed if ColPali multivector chunks are produced later.
# Therefore we defer the fatal check until after ColPali chunk creation.
logger.warning(
"No text chunks extracted after parsing. Will attempt to continue "
"and rely on image-based chunks if available."
)
chunking_time = time.time() - chunking_start chunking_time = time.time() - chunking_start
phase_times["split_into_chunks"] = chunking_time phase_times["split_into_chunks"] = chunking_time
logger.info(f"Text chunking took {chunking_time:.2f}s to create {len(parsed_chunks)} chunks") logger.info(f"Text chunking took {chunking_time:.2f}s to create {len(parsed_chunks)} chunks")
# 8. Handle ColPali embeddings if enabled - IMPORTANT: Do this BEFORE applying chunk rules # Decide whether we need image chunks either for ColPali embedding or because
# so that image chunks can be processed by rules when use_images=True # there are image-based rules (use_images=True) that must process them.
colpali_processing_start = time.time() has_image_rules = any(
r.get("stage", "post_parsing") == "post_chunking"
and r.get("type") == "metadata_extraction"
and r.get("use_images", False)
for r in rules_list or []
)
using_colpali = ( using_colpali = (
use_colpali and document_service.colpali_embedding_model and document_service.colpali_vector_store use_colpali and document_service.colpali_embedding_model and document_service.colpali_vector_store
) )
should_create_image_chunks = has_image_rules or using_colpali
# Start timer for optional image chunk creation / multivector processing
colpali_processing_start = time.time()
chunks_multivector = [] chunks_multivector = []
if using_colpali: if should_create_image_chunks:
import base64 import base64
import filetype import filetype
@ -266,16 +283,23 @@ async def process_ingestion_job(
file_type = filetype.guess(file_content) file_type = filetype.guess(file_content)
file_content_base64 = base64.b64encode(file_content).decode() file_content_base64 = base64.b64encode(file_content).decode()
# Use the parsed chunks for Colpali - this will create image chunks if appropriate # Use the parsed chunks for ColPali/image rules this will create image chunks if appropriate
chunks_multivector = document_service._create_chunks_multivector( chunks_multivector = document_service._create_chunks_multivector(
file_type, file_content_base64, file_content, parsed_chunks file_type, file_content_base64, file_content, parsed_chunks
) )
logger.debug(f"Created {len(chunks_multivector)} chunks for multivector embedding") logger.debug(
f"Created {len(chunks_multivector)} multivector/image chunks "
f"(has_image_rules={has_image_rules}, using_colpali={using_colpali})"
)
colpali_create_chunks_time = time.time() - colpali_processing_start colpali_create_chunks_time = time.time() - colpali_processing_start
phase_times["colpali_create_chunks"] = colpali_create_chunks_time phase_times["colpali_create_chunks"] = colpali_create_chunks_time
if using_colpali: if using_colpali:
logger.info(f"Colpali chunk creation took {colpali_create_chunks_time:.2f}s") logger.info(f"Colpali chunk creation took {colpali_create_chunks_time:.2f}s")
# If we still have no chunks at all (neither text nor image) abort early
if not parsed_chunks and not chunks_multivector:
raise ValueError("No content chunks (text or image) could be extracted from the document")
# 9. Apply post_chunking rules and aggregate metadata # 9. Apply post_chunking rules and aggregate metadata
processed_chunks = [] processed_chunks = []
processed_chunks_multivector = [] processed_chunks_multivector = []