From 196655fea3e27f0717279fe9e1b40bfda757e607 Mon Sep 17 00:00:00 2001 From: Arnav Agrawal Date: Sat, 28 Dec 2024 19:41:05 +0530 Subject: [PATCH] pipethrough video timestamps on query --- core/embedding/base_embedding_model.py | 6 ++- core/embedding/ollama_embedding_model.py | 15 +++--- core/embedding/openai_embedding_model.py | 8 +++- core/models/documents.py | 21 ++++++++- core/models/time_series.py | 8 ++++ core/parser/base_parser.py | 7 ++- core/parser/combined_parser.py | 59 ++++++++---------------- core/parser/unstructured_parser.py | 14 ++++-- core/services/document_service.py | 36 +++++---------- core/vector_store/base_vector_store.py | 7 +-- 10 files changed, 96 insertions(+), 85 deletions(-) diff --git a/core/embedding/base_embedding_model.py b/core/embedding/base_embedding_model.py index 7670595..9a549d5 100644 --- a/core/embedding/base_embedding_model.py +++ b/core/embedding/base_embedding_model.py @@ -1,10 +1,14 @@ from abc import ABC, abstractmethod from typing import List, Union +from core.models.documents import Chunk + class BaseEmbeddingModel(ABC): @abstractmethod - async def embed_for_ingestion(self, text: Union[str, List[str]]) -> List[float]: + async def embed_for_ingestion( + self, chunks: Union[Chunk, List[Chunk]] + ) -> List[List[float]]: """Generate embeddings for input text""" pass diff --git a/core/embedding/ollama_embedding_model.py b/core/embedding/ollama_embedding_model.py index 95cf936..2c02175 100644 --- a/core/embedding/ollama_embedding_model.py +++ b/core/embedding/ollama_embedding_model.py @@ -1,6 +1,7 @@ from typing import List, Union from ollama import AsyncClient from core.embedding.base_embedding_model import BaseEmbeddingModel +from core.models.documents import Chunk class OllamaEmbeddingModel(BaseEmbeddingModel): @@ -9,14 +10,16 @@ class OllamaEmbeddingModel(BaseEmbeddingModel): self.client = AsyncClient(host=base_url) async def embed_for_ingestion( - self, text: Union[str, List[str]] + self, chunks: Union[Chunk, List[Chunk]] ) -> List[List[float]]: - if isinstance(text, str): - text = [text] + if isinstance(chunks, Chunk): + chunks = [chunks] embeddings: List[List[float]] = [] - for t in text: - response = await self.client.embeddings(model=self.model_name, prompt=t) + for c in chunks: + response = await self.client.embeddings( + model=self.model_name, prompt=c.content + ) embedding = list(response["embedding"]) embeddings.append(embedding) @@ -24,4 +27,4 @@ class OllamaEmbeddingModel(BaseEmbeddingModel): async def embed_for_query(self, text: str) -> List[float]: response = await self.client.embeddings(model=self.model_name, prompt=text) - return response["embedding"] + return list(response["embedding"]) diff --git a/core/embedding/openai_embedding_model.py b/core/embedding/openai_embedding_model.py index d4d4bea..c27e2dc 100644 --- a/core/embedding/openai_embedding_model.py +++ b/core/embedding/openai_embedding_model.py @@ -1,6 +1,8 @@ from typing import List, Union from openai import OpenAI -from .base_embedding_model import BaseEmbeddingModel + +from core.models.documents import Chunk +from core.embedding.base_embedding_model import BaseEmbeddingModel class OpenAIEmbeddingModel(BaseEmbeddingModel): @@ -9,8 +11,10 @@ class OpenAIEmbeddingModel(BaseEmbeddingModel): self.model_name = model_name async def embed_for_ingestion( - self, text: Union[str, List[str]] + self, chunks: Union[Chunk, List[Chunk]] ) -> List[List[float]]: + chunks = [chunks] if isinstance(chunks, Chunk) else chunks + text = [c.content for c in chunks] response = self.client.embeddings.create(model=self.model_name, input=text) return [item.embedding for item in response.data] diff --git a/core/models/documents.py b/core/models/documents.py index 831f2eb..45eeb5f 100644 --- a/core/models/documents.py +++ b/core/models/documents.py @@ -39,15 +39,32 @@ class DocumentChunk(BaseModel): """Represents a chunk stored in VectorStore""" document_id: str # external_id of parent document - # TODO: This might be suboptimal due to storage size. consider moving to separate store. content: str embedding: List[float] chunk_number: int - version: int = 1 + # chunk-specific metadata metadata: Dict[str, Any] = Field(default_factory=dict) score: float = 0.0 +class Chunk(BaseModel): + """Represents a chunk containing content and metadata""" + + content: str + metadata: Dict[str, Any] = Field(default_factory=dict) + + def to_document_chunk( + self, document_id: str, chunk_number: int, embedding: List[float] + ) -> DocumentChunk: + return DocumentChunk( + document_id=document_id, + content=self.content, + embedding=embedding, + chunk_number=chunk_number, + metadata=self.metadata, + ) + + class ChunkResult(BaseModel): """Query result at chunk level""" diff --git a/core/models/time_series.py b/core/models/time_series.py index d649b2b..db8bda5 100644 --- a/core/models/time_series.py +++ b/core/models/time_series.py @@ -3,6 +3,8 @@ from typing import List, Tuple, Optional, Union, Dict from bisect import bisect_left import logging +from core.models.documents import Chunk + logger = logging.getLogger(__name__) @@ -82,3 +84,9 @@ class TimeSeriesData: def times_for_content(self, content: str) -> List[float]: """Get all timestamps where this content appears""" return self.content_to_times[content] + + def to_chunks(self) -> List[Chunk]: + return [ + Chunk(content=content, metadata={"timestamp": timestamp}) + for content, timestamp in zip(self.contents, self.timestamps) + ] diff --git a/core/parser/base_parser.py b/core/parser/base_parser.py index 5fa4038..c46724e 100644 --- a/core/parser/base_parser.py +++ b/core/parser/base_parser.py @@ -1,19 +1,18 @@ from abc import ABC, abstractmethod from typing import List, Union from fastapi import UploadFile +from core.models.documents import Chunk class BaseParser(ABC): """Base class for document parsing""" @abstractmethod - async def split_text(self, text: str) -> List[str]: + async def split_text(self, text: str) -> List[Chunk]: """Split plain text into chunks""" pass @abstractmethod - async def parse_file( - self, file: Union[UploadFile, bytes], content_type: str - ) -> List[str]: + async def parse_file(self, file: bytes, content_type: str) -> List[Chunk]: """Parse file content into text chunks""" pass diff --git a/core/parser/combined_parser.py b/core/parser/combined_parser.py index ee7b2b0..8dfac5b 100644 --- a/core/parser/combined_parser.py +++ b/core/parser/combined_parser.py @@ -1,17 +1,13 @@ -from typing import List, Union +from typing import List, Optional import logging import os -from fastapi import UploadFile import tempfile import magic +from core.models.documents import Chunk -from core.models.time_series import ( - TimeSeriesData, -) # python-magic library for file type detection - -from .base_parser import BaseParser -from .unstructured_parser import UnstructuredAPIParser -from .video.parse_video import VideoParser +from core.parser.base_parser import BaseParser +from core.parser.unstructured_parser import UnstructuredAPIParser +from core.parser.video.parse_video import VideoParser logger = logging.getLogger(__name__) @@ -35,7 +31,10 @@ class CombinedParser(BaseParser): self.magic = magic.Magic(mime=True) def _is_video_file( - self, file_path: str = None, file_bytes: bytes = None, filename: str = None + self, + file_path: Optional[str] = None, + file_bytes: Optional[bytes] = None, + filename: Optional[str] = None, ) -> bool: """ Detect if a file is a video using multiple methods: @@ -73,44 +72,26 @@ class CombinedParser(BaseParser): return False - async def split_text(self, text: str) -> List[str]: + async def split_text(self, text: str) -> List[Chunk]: """Split plain text into chunks using unstructured parser""" return await self.unstructured_parser.split_text(text) - async def parse_file( - self, file: Union[UploadFile, bytes], content_type: str - ) -> List[str]: + async def parse_file(self, file: bytes, content_type: str) -> List[Chunk]: """Parse file content into text chunks""" - # For UploadFile, check both filename and content - if isinstance(file, UploadFile): - content = await file.read() - is_video = self._is_video_file(file_bytes=content, filename=file.filename) - # Reset file position for later use - await file.seek(0) - else: - # For bytes, we can only check content - is_video = self._is_video_file(file_bytes=file) + is_video = self._is_video_file(file_bytes=file) if is_video: return await self._parse_video(file) else: return await self.unstructured_parser.parse_file(file, content_type) - async def _parse_video(self, file: Union[UploadFile, bytes]) -> List[str]: + async def _parse_video(self, file: bytes) -> List[Chunk]: """Parse video file and combine transcript and frame descriptions into chunks""" # Save video to temporary file if needed - if isinstance(file, bytes): - temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") - temp_file.write(file) - temp_file.close() - video_path = temp_file.name - else: - # For UploadFile, save to temp file - temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") - content = await file.read() - temp_file.write(content) - temp_file.close() - video_path = temp_file.name + temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") + temp_file.write(file) + temp_file.close() + video_path = temp_file.name try: # Process video @@ -121,10 +102,10 @@ class CombinedParser(BaseParser): ) results = parser.process_video() # Get all frame descriptions - frame_descriptions: TimeSeriesData = results["frame_descriptions"] + frame_chunks = results["frame_descriptions"].to_chunks() # Get all transcript text - transcript: TimeSeriesData = results["transcript"] - return frame_descriptions.contents + transcript.contents + transcript_chunks = results["transcript"].to_chunks() + return frame_chunks + transcript_chunks finally: # Clean up temporary file diff --git a/core/parser/unstructured_parser.py b/core/parser/unstructured_parser.py index 03fc5b6..77ac3b6 100644 --- a/core/parser/unstructured_parser.py +++ b/core/parser/unstructured_parser.py @@ -2,6 +2,7 @@ from typing import List import io from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_unstructured import UnstructuredLoader +from core.models.documents import Chunk import logging from .base_parser import BaseParser @@ -24,11 +25,14 @@ class UnstructuredAPIParser(BaseParser): separators=["\n\n", "\n", ". ", " ", ""], ) - async def split_text(self, text: str) -> List[str]: + async def split_text(self, text: str) -> List[Chunk]: """Split plain text into chunks""" - return self.text_splitter.split_text(text) + return [ + Chunk(content=chunk, metadata={}) + for chunk in self.text_splitter.split_text(text) + ] - async def parse_file(self, file: bytes, content_type: str) -> List[str]: + async def parse_file(self, file: bytes, content_type: str) -> List[Chunk]: """Parse file content using unstructured""" # Parse with unstructured loader = UnstructuredLoader( @@ -38,4 +42,6 @@ class UnstructuredAPIParser(BaseParser): chunking_strategy="by_title", ) elements = loader.load() - return [element.page_content for element in elements] + return [ + Chunk(content=element.page_content, metadata={}) for element in elements + ] diff --git a/core/services/document_service.py b/core/services/document_service.py index ba41146..e7650e7 100644 --- a/core/services/document_service.py +++ b/core/services/document_service.py @@ -3,7 +3,8 @@ from typing import Dict, Any, List, Optional from fastapi import UploadFile from core.models.request import IngestTextRequest -from ..models.documents import ( +from core.models.documents import ( + Chunk, Document, DocumentChunk, ChunkResult, @@ -62,9 +63,7 @@ class DocumentService: # Search chunks with vector similarity chunks = await self.vector_store.query_similar( - query_embedding, - k=k, - doc_ids=doc_ids, + query_embedding, k=k, doc_ids=doc_ids ) logger.info(f"Found {len(chunks)} similar chunks") @@ -148,9 +147,7 @@ class DocumentService: logger.info(f"Generated {len(embeddings)} embeddings") # 4. Create and store chunk objects - chunk_objects = self._create_chunk_objects( - doc.external_id, chunks, embeddings, doc.metadata - ) + chunk_objects = self._create_chunk_objects(doc.external_id, chunks, embeddings) logger.info(f"Created {len(chunk_objects)} chunk objects") # 5. Store everything @@ -168,7 +165,7 @@ class DocumentService: # 1. Create document record doc = Document( - content_type=file.content_type, + content_type=file.content_type or "", filename=file.filename, metadata=metadata, owner={"type": auth.entity_type, "id": auth.entity_id}, @@ -191,7 +188,7 @@ class DocumentService: ) # 3. Parse content into chunks - chunks = await self.parser.parse_file(file_content, file.content_type) + chunks = await self.parser.parse_file(file_content, file.content_type or "") if not chunks: raise ValueError("No content chunks extracted from file") logger.info(f"Parsed file into {len(chunks)} chunks") @@ -201,9 +198,7 @@ class DocumentService: logger.info(f"Generated {len(embeddings)} embeddings") # 5. Create and store chunk objects - chunk_objects = self._create_chunk_objects( - doc.external_id, chunks, embeddings, doc.metadata - ) + chunk_objects = self._create_chunk_objects(doc.external_id, chunks, embeddings) logger.info(f"Created {len(chunk_objects)} chunk objects") # 6. Store everything @@ -215,20 +210,13 @@ class DocumentService: def _create_chunk_objects( self, doc_id: str, - chunks: List[str], + chunks: List[Chunk], embeddings: List[List[float]], - metadata: Dict[str, Any], ) -> List[DocumentChunk]: """Helper to create chunk objects""" return [ - DocumentChunk( - document_id=doc_id, - content=content, - embedding=embedding, - chunk_number=i, - metadata=metadata, - ) - for i, (content, embedding) in enumerate(zip(chunks, embeddings)) + c.to_document_chunk(chunk_number=i, embedding=embedding, document_id=doc_id) + for i, (embedding, c) in enumerate(zip(embeddings, chunks)) ] async def _store_chunks_and_doc( @@ -287,11 +275,11 @@ class DocumentService: return results async def _create_document_results( - self, auth: AuthContext, chunks: List[DocumentChunk] + self, auth: AuthContext, chunks: List[ChunkResult] ) -> List[DocumentResult]: """Group chunks by document and create DocumentResult objects.""" # Group chunks by document and get highest scoring chunk per doc - doc_chunks: Dict[str, DocumentChunk] = {} + doc_chunks: Dict[str, ChunkResult] = {} for chunk in chunks: if ( chunk.document_id not in doc_chunks diff --git a/core/vector_store/base_vector_store.py b/core/vector_store/base_vector_store.py index 86cb2b8..be591c6 100644 --- a/core/vector_store/base_vector_store.py +++ b/core/vector_store/base_vector_store.py @@ -6,16 +6,17 @@ from core.models.documents import DocumentChunk class BaseVectorStore(ABC): @abstractmethod - def store_embeddings(self, chunks: List[DocumentChunk]) -> Tuple[bool, List[str]]: + async def store_embeddings( + self, chunks: List[DocumentChunk] + ) -> Tuple[bool, List[str]]: """Store document chunks and their embeddings""" pass @abstractmethod - def query_similar( + async def query_similar( self, query_embedding: List[float], k: int, - auth: AuthContext, doc_ids: Optional[List[str]] = None, ) -> List[DocumentChunk]: """Find similar chunks"""