mirror of
https://github.com/james-m-jordan/morphik-core.git
synced 2025-05-09 19:32:38 +00:00
145 lines
5.4 KiB
Python
145 lines
5.4 KiB
Python
import logging
|
|
import uuid
|
|
from datetime import UTC, datetime
|
|
from enum import Enum
|
|
from typing import Any, Dict, List, Literal, Optional
|
|
|
|
from PIL import Image
|
|
from pydantic import BaseModel, Field, field_validator
|
|
|
|
from core.models.video import TimeSeriesData
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class QueryReturnType(str, Enum):
|
|
CHUNKS = "chunks"
|
|
DOCUMENTS = "documents"
|
|
|
|
|
|
class StorageFileInfo(BaseModel):
|
|
"""Information about a file stored in storage"""
|
|
|
|
bucket: str
|
|
key: str
|
|
version: int = 1
|
|
filename: Optional[str] = None
|
|
content_type: Optional[str] = None
|
|
timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
|
|
|
|
|
class Document(BaseModel):
|
|
"""Represents a document stored in the database documents collection"""
|
|
|
|
external_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
|
|
owner: Dict[str, str]
|
|
content_type: str
|
|
filename: Optional[str] = None
|
|
metadata: Dict[str, Any] = Field(default_factory=dict)
|
|
"""user-defined metadata"""
|
|
storage_info: Dict[str, str] = Field(default_factory=dict)
|
|
"""Legacy field for backwards compatibility - for single file storage"""
|
|
storage_files: List[StorageFileInfo] = Field(default_factory=list)
|
|
"""List of files associated with this document"""
|
|
system_metadata: Dict[str, Any] = Field(
|
|
default_factory=lambda: {
|
|
"created_at": datetime.now(UTC),
|
|
"updated_at": datetime.now(UTC),
|
|
"version": 1,
|
|
"folder_name": None,
|
|
"end_user_id": None,
|
|
"status": "processing", # Status can be: processing, completed, failed
|
|
}
|
|
)
|
|
"""metadata such as creation date etc."""
|
|
additional_metadata: Dict[str, Any] = Field(default_factory=dict)
|
|
"""metadata to help with querying eg. frame descriptions and time-stamped transcript for videos"""
|
|
access_control: Dict[str, List[str]] = Field(default_factory=lambda: {"readers": [], "writers": [], "admins": []})
|
|
chunk_ids: List[str] = Field(default_factory=list)
|
|
|
|
def __hash__(self):
|
|
return hash(self.external_id)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Document):
|
|
return False
|
|
return self.external_id == other.external_id
|
|
|
|
|
|
class DocumentContent(BaseModel):
|
|
"""Represents either a URL or content string"""
|
|
|
|
type: Literal["url", "string"]
|
|
value: str
|
|
filename: Optional[str] = Field(None, description="Filename when type is url")
|
|
|
|
@field_validator("filename")
|
|
def filename_only_for_url(cls, v, values):
|
|
logger.debug(f"Value looks like: {values}")
|
|
if values.data.get("type") == "string" and v is not None:
|
|
raise ValueError("filename can only be set when type is url")
|
|
if values.data.get("type") == "url" and v is None:
|
|
raise ValueError("filename is required when type is url")
|
|
return v
|
|
|
|
|
|
class DocumentResult(BaseModel):
|
|
"""Query result at document level"""
|
|
|
|
score: float # Highest chunk score
|
|
document_id: str # external_id
|
|
metadata: Dict[str, Any]
|
|
content: DocumentContent
|
|
additional_metadata: Dict[str, Any]
|
|
|
|
|
|
class ChunkResult(BaseModel):
|
|
"""Query result at chunk level"""
|
|
|
|
content: str
|
|
score: float
|
|
document_id: str # external_id
|
|
chunk_number: int
|
|
metadata: Dict[str, Any]
|
|
content_type: str
|
|
filename: Optional[str] = None
|
|
download_url: Optional[str] = None
|
|
|
|
def augmented_content(self, doc: DocumentResult) -> str | Image.Image:
|
|
match self.metadata:
|
|
case m if "timestamp" in m:
|
|
# if timestamp present, then must be a video. In that case,
|
|
# obtain the original document and augment the content with
|
|
# frame/transcript information as well.
|
|
frame_description = doc.additional_metadata.get("frame_description")
|
|
transcript = doc.additional_metadata.get("transcript")
|
|
if not isinstance(frame_description, dict) or not isinstance(transcript, dict):
|
|
logger.warning("Invalid frame description or transcript - not a dictionary")
|
|
return self.content
|
|
ts_frame = TimeSeriesData(time_to_content=frame_description)
|
|
ts_transcript = TimeSeriesData(time_to_content=transcript)
|
|
timestamps = ts_frame.content_to_times[self.content] + ts_transcript.content_to_times[self.content]
|
|
augmented_contents = [
|
|
f"Frame description: {ts_frame.at_time(t)} \n \n Transcript: {ts_transcript.at_time(t)}"
|
|
for t in timestamps
|
|
]
|
|
return "\n\n".join(augmented_contents)
|
|
# case m if m.get("is_image", False):
|
|
# try:
|
|
# # Handle data URI format "data:image/png;base64,..."
|
|
# content = self.content
|
|
# if content.startswith('data:'):
|
|
# # Extract the base64 part after the comma
|
|
# content = content.split(',', 1)[1]
|
|
|
|
# # Now decode the base64 string
|
|
# image_bytes = base64.b64decode(content)
|
|
# content = Image.open(io.BytesIO(image_bytes))
|
|
# return content
|
|
# except Exception as e:
|
|
# print(f"Error processing image: {str(e)}")
|
|
# # Fall back to using the content as text
|
|
# return self.content
|
|
case _:
|
|
return self.content
|