morphik-core/core/models/documents.py
Adityavardhan Agrawal 1792275cb8
Format fix, UI package update (#100)
Co-authored-by: Arnav Agrawal <aa779@cornell.edu>
2025-04-20 16:34:29 -07:00

145 lines
5.4 KiB
Python

import logging
import uuid
from datetime import UTC, datetime
from enum import Enum
from typing import Any, Dict, List, Literal, Optional
from PIL import Image
from pydantic import BaseModel, Field, field_validator
from core.models.video import TimeSeriesData
logger = logging.getLogger(__name__)
class QueryReturnType(str, Enum):
CHUNKS = "chunks"
DOCUMENTS = "documents"
class StorageFileInfo(BaseModel):
"""Information about a file stored in storage"""
bucket: str
key: str
version: int = 1
filename: Optional[str] = None
content_type: Optional[str] = None
timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC))
class Document(BaseModel):
"""Represents a document stored in the database documents collection"""
external_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
owner: Dict[str, str]
content_type: str
filename: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
"""user-defined metadata"""
storage_info: Dict[str, str] = Field(default_factory=dict)
"""Legacy field for backwards compatibility - for single file storage"""
storage_files: List[StorageFileInfo] = Field(default_factory=list)
"""List of files associated with this document"""
system_metadata: Dict[str, Any] = Field(
default_factory=lambda: {
"created_at": datetime.now(UTC),
"updated_at": datetime.now(UTC),
"version": 1,
"folder_name": None,
"end_user_id": None,
"status": "processing", # Status can be: processing, completed, failed
}
)
"""metadata such as creation date etc."""
additional_metadata: Dict[str, Any] = Field(default_factory=dict)
"""metadata to help with querying eg. frame descriptions and time-stamped transcript for videos"""
access_control: Dict[str, List[str]] = Field(default_factory=lambda: {"readers": [], "writers": [], "admins": []})
chunk_ids: List[str] = Field(default_factory=list)
def __hash__(self):
return hash(self.external_id)
def __eq__(self, other):
if not isinstance(other, Document):
return False
return self.external_id == other.external_id
class DocumentContent(BaseModel):
"""Represents either a URL or content string"""
type: Literal["url", "string"]
value: str
filename: Optional[str] = Field(None, description="Filename when type is url")
@field_validator("filename")
def filename_only_for_url(cls, v, values):
logger.debug(f"Value looks like: {values}")
if values.data.get("type") == "string" and v is not None:
raise ValueError("filename can only be set when type is url")
if values.data.get("type") == "url" and v is None:
raise ValueError("filename is required when type is url")
return v
class DocumentResult(BaseModel):
"""Query result at document level"""
score: float # Highest chunk score
document_id: str # external_id
metadata: Dict[str, Any]
content: DocumentContent
additional_metadata: Dict[str, Any]
class ChunkResult(BaseModel):
"""Query result at chunk level"""
content: str
score: float
document_id: str # external_id
chunk_number: int
metadata: Dict[str, Any]
content_type: str
filename: Optional[str] = None
download_url: Optional[str] = None
def augmented_content(self, doc: DocumentResult) -> str | Image.Image:
match self.metadata:
case m if "timestamp" in m:
# if timestamp present, then must be a video. In that case,
# obtain the original document and augment the content with
# frame/transcript information as well.
frame_description = doc.additional_metadata.get("frame_description")
transcript = doc.additional_metadata.get("transcript")
if not isinstance(frame_description, dict) or not isinstance(transcript, dict):
logger.warning("Invalid frame description or transcript - not a dictionary")
return self.content
ts_frame = TimeSeriesData(time_to_content=frame_description)
ts_transcript = TimeSeriesData(time_to_content=transcript)
timestamps = ts_frame.content_to_times[self.content] + ts_transcript.content_to_times[self.content]
augmented_contents = [
f"Frame description: {ts_frame.at_time(t)} \n \n Transcript: {ts_transcript.at_time(t)}"
for t in timestamps
]
return "\n\n".join(augmented_contents)
# case m if m.get("is_image", False):
# try:
# # Handle data URI format "data:image/png;base64,..."
# content = self.content
# if content.startswith('data:'):
# # Extract the base64 part after the comma
# content = content.split(',', 1)[1]
# # Now decode the base64 string
# image_bytes = base64.b64decode(content)
# content = Image.open(io.BytesIO(image_bytes))
# return content
# except Exception as e:
# print(f"Error processing image: {str(e)}")
# # Fall back to using the content as text
# return self.content
case _:
return self.content