morphik-core/core/models/documents.py
2024-12-29 12:48:41 +05:30

137 lines
4.6 KiB
Python

from typing import Dict, Any, List, Optional, Literal
from enum import Enum
from datetime import UTC, datetime
from pydantic import BaseModel, Field, field_validator
import uuid
import logging
from core.models.video import TimeSeriesData
logger = logging.getLogger(__name__)
class QueryReturnType(str, Enum):
CHUNKS = "chunks"
DOCUMENTS = "documents"
class Document(BaseModel):
"""Represents a document stored in MongoDB documents collection"""
external_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
owner: Dict[str, str]
content_type: str
filename: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
"""user-defined metadata"""
storage_info: Dict[str, str] = Field(default_factory=dict)
system_metadata: Dict[str, Any] = Field(
default_factory=lambda: {
"created_at": datetime.now(UTC),
"updated_at": datetime.now(UTC),
"version": 1,
}
)
"""metadata such as creation date etc."""
additional_metadata: Dict[str, Any] = Field(default_factory=dict)
"""metadata to help with querying eg. frame descriptions and time-stamped transcript for videos"""
access_control: Dict[str, List[str]] = Field(
default_factory=lambda: {"readers": [], "writers": [], "admins": []}
)
chunk_ids: List[str] = Field(default_factory=list)
class DocumentChunk(BaseModel):
"""Represents a chunk stored in VectorStore"""
document_id: str # external_id of parent document
content: str
embedding: List[float]
chunk_number: int
# chunk-specific metadata
metadata: Dict[str, Any] = Field(default_factory=dict)
score: float = 0.0
class Chunk(BaseModel):
"""Represents a chunk containing content and metadata"""
content: str
metadata: Dict[str, Any] = Field(default_factory=dict)
def to_document_chunk(
self, document_id: str, chunk_number: int, embedding: List[float]
) -> DocumentChunk:
return DocumentChunk(
document_id=document_id,
content=self.content,
embedding=embedding,
chunk_number=chunk_number,
metadata=self.metadata,
)
class DocumentContent(BaseModel):
"""Represents either a URL or content string"""
type: Literal["url", "string"]
value: str
filename: Optional[str] = Field(None, description="Filename when type is url")
@field_validator("filename")
def filename_only_for_url(cls, v, values):
logger.debug(f"Value looks like: {values}")
if values.data.get("type") == "string" and v is not None:
raise ValueError("filename can only be set when type is url")
if values.data.get("type") == "url" and v is None:
raise ValueError("filename is required when type is url")
return v
class DocumentResult(BaseModel):
"""Query result at document level"""
score: float # Highest chunk score
document_id: str # external_id
metadata: Dict[str, Any]
content: DocumentContent
additional_metadata: Dict[str, Any]
class ChunkResult(BaseModel):
"""Query result at chunk level"""
content: str
score: float
document_id: str # external_id
chunk_number: int
metadata: Dict[str, Any]
content_type: str
filename: Optional[str] = None
download_url: Optional[str] = None
def augmented_content(self, doc: DocumentResult) -> str:
match self.metadata:
case m if "timestamp" in m:
# if timestamp present, then must be a video. In that case,
# obtain the original document and augment the content with
# frame/transcript information as well.
frame_description = doc.additional_metadata.get("frame_description")
transcript = doc.additional_metadata.get("transcript")
if not isinstance(frame_description, dict) or not isinstance(transcript, dict):
logger.warning("Invalid frame description or transcript - not a dictionary")
return self.content
ts_frame = TimeSeriesData(frame_description)
ts_transcript = TimeSeriesData(transcript)
timestamps = (
ts_frame.content_to_times[self.content]
+ ts_transcript.content_to_times[self.content]
)
augmented_contents = [
f"Frame description: {ts_frame.at_time(t)} \n \n Transcript: {ts_transcript.at_time(t)}"
for t in timestamps
]
return "\n\n".join(augmented_contents)
case _:
return self.content