From 7830b42c6b4cb5c3e7aec6eb31b2381d5d1c2577 Mon Sep 17 00:00:00 2001 From: Arnav Agrawal Date: Sun, 29 Dec 2024 11:10:51 +0530 Subject: [PATCH] fix typing errors --- core/models/documents.py | 4 +++ core/models/{time_series.py => video.py} | 10 +++++++ core/parser/base_parser.py | 7 +++-- core/parser/combined_parser.py | 21 +++++++++---- core/parser/unstructured_parser.py | 8 +++-- core/parser/video/parse_video.py | 38 ++++++++++++++---------- core/services/document_service.py | 11 +++---- core/vector_store/base_vector_store.py | 1 - 8 files changed, 67 insertions(+), 33 deletions(-) rename core/models/{time_series.py => video.py} (93%) diff --git a/core/models/documents.py b/core/models/documents.py index 45eeb5f..e9df927 100644 --- a/core/models/documents.py +++ b/core/models/documents.py @@ -21,6 +21,7 @@ class Document(BaseModel): content_type: str filename: Optional[str] = None metadata: Dict[str, Any] = Field(default_factory=dict) + """user-defined metadata""" storage_info: Dict[str, str] = Field(default_factory=dict) system_metadata: Dict[str, Any] = Field( default_factory=lambda: { @@ -29,6 +30,9 @@ class Document(BaseModel): "version": 1, } ) + """metadata such as creation date etc.""" + additional_metadata: Dict[str, Any] = Field(default_factory=dict) + """metadata to help with querying eg. frame descriptions and time-stamped transcript for videos""" access_control: Dict[str, List[str]] = Field( default_factory=lambda: {"readers": [], "writers": [], "admins": []} ) diff --git a/core/models/time_series.py b/core/models/video.py similarity index 93% rename from core/models/time_series.py rename to core/models/video.py index db8bda5..5b5a4cd 100644 --- a/core/models/time_series.py +++ b/core/models/video.py @@ -1,8 +1,11 @@ from collections import defaultdict +from numbers import Number from typing import List, Tuple, Optional, Union, Dict from bisect import bisect_left import logging +from pydantic import BaseModel + from core.models.documents import Chunk logger = logging.getLogger(__name__) @@ -18,6 +21,7 @@ class TimeSeriesData: """ # Sort timestamps and content for binary search sorted_items = sorted(time_to_content.items(), key=lambda x: x[0]) + self.time_to_content = time_to_content self.timestamps = [t for t, _ in sorted_items] self.contents = [c for _, c in sorted_items] @@ -90,3 +94,9 @@ class TimeSeriesData: Chunk(content=content, metadata={"timestamp": timestamp}) for content, timestamp in zip(self.contents, self.timestamps) ] + + +class ParseVideoResult(BaseModel): + metadata: Dict[str, Number] + frame_descriptions: TimeSeriesData + transcript: TimeSeriesData diff --git a/core/parser/base_parser.py b/core/parser/base_parser.py index c46724e..d5fcfe3 100644 --- a/core/parser/base_parser.py +++ b/core/parser/base_parser.py @@ -1,6 +1,5 @@ from abc import ABC, abstractmethod -from typing import List, Union -from fastapi import UploadFile +from typing import Any, Dict, List, Tuple from core.models.documents import Chunk @@ -13,6 +12,8 @@ class BaseParser(ABC): pass @abstractmethod - async def parse_file(self, file: bytes, content_type: str) -> List[Chunk]: + async def parse_file( + self, file: bytes, content_type: str + ) -> Tuple[Dict[str, Any], List[Chunk]]: """Parse file content into text chunks""" pass diff --git a/core/parser/combined_parser.py b/core/parser/combined_parser.py index 8dfac5b..8fbc14a 100644 --- a/core/parser/combined_parser.py +++ b/core/parser/combined_parser.py @@ -1,10 +1,11 @@ -from typing import List, Optional +from typing import Any, Dict, List, Optional, Tuple import logging import os import tempfile import magic from core.models.documents import Chunk +from core.models.video import TimeSeriesData from core.parser.base_parser import BaseParser from core.parser.unstructured_parser import UnstructuredAPIParser from core.parser.video.parse_video import VideoParser @@ -76,7 +77,9 @@ class CombinedParser(BaseParser): """Split plain text into chunks using unstructured parser""" return await self.unstructured_parser.split_text(text) - async def parse_file(self, file: bytes, content_type: str) -> List[Chunk]: + async def parse_file( + self, file: bytes, content_type: str + ) -> Tuple[Dict[str, Any], List[Chunk]]: """Parse file content into text chunks""" is_video = self._is_video_file(file_bytes=file) @@ -85,7 +88,7 @@ class CombinedParser(BaseParser): else: return await self.unstructured_parser.parse_file(file, content_type) - async def _parse_video(self, file: bytes) -> List[Chunk]: + async def _parse_video(self, file: bytes) -> Tuple[Dict[str, Any], List[Chunk]]: """Parse video file and combine transcript and frame descriptions into chunks""" # Save video to temporary file if needed temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") @@ -102,10 +105,16 @@ class CombinedParser(BaseParser): ) results = parser.process_video() # Get all frame descriptions - frame_chunks = results["frame_descriptions"].to_chunks() + frame_descriptions = results.frame_descriptions # Get all transcript text - transcript_chunks = results["transcript"].to_chunks() - return frame_chunks + transcript_chunks + transcript_text = results.transcript + additional_metadata = { + "frame_descriptions": frame_descriptions.time_to_content, + "transcript": transcript_text.time_to_content, + "video_metadata": results.metadata, + } + chunks = frame_descriptions.to_chunks() + transcript_text.to_chunks() + return additional_metadata, chunks finally: # Clean up temporary file diff --git a/core/parser/unstructured_parser.py b/core/parser/unstructured_parser.py index 77ac3b6..46c25fd 100644 --- a/core/parser/unstructured_parser.py +++ b/core/parser/unstructured_parser.py @@ -1,4 +1,4 @@ -from typing import List +from typing import Any, Dict, List, Tuple import io from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_unstructured import UnstructuredLoader @@ -32,7 +32,9 @@ class UnstructuredAPIParser(BaseParser): for chunk in self.text_splitter.split_text(text) ] - async def parse_file(self, file: bytes, content_type: str) -> List[Chunk]: + async def parse_file( + self, file: bytes, content_type: str + ) -> Tuple[Dict[str, Any], List[Chunk]]: """Parse file content using unstructured""" # Parse with unstructured loader = UnstructuredLoader( @@ -42,6 +44,6 @@ class UnstructuredAPIParser(BaseParser): chunking_strategy="by_title", ) elements = loader.load() - return [ + return {}, [ Chunk(content=element.page_content, metadata={}) for element in elements ] diff --git a/core/parser/video/parse_video.py b/core/parser/video/parse_video.py index 76e6661..090b92a 100644 --- a/core/parser/video/parse_video.py +++ b/core/parser/video/parse_video.py @@ -1,10 +1,11 @@ +from numbers import Number import cv2 -from typing import Dict +from typing import Dict, Union import base64 from openai import OpenAI import assemblyai as aai import logging -from core.models.time_series import TimeSeriesData +from core.models.video import TimeSeriesData, ParseVideoResult logger = logging.getLogger(__name__) @@ -42,7 +43,9 @@ class VideoParser: speaker_labels=True ) # speech_model=aai.SpeechModel.nano self.transcriber = aai.Transcriber(config=aai_config) - self.transcript = None + self.transcript = TimeSeriesData( + time_to_content={} + ) # empty transcript initially - TODO: have this be a lateinit somehow self.gpt = OpenAI() logger.info(f"Video loaded: {self.duration:.2f}s duration, {self.fps:.2f} FPS") @@ -80,7 +83,11 @@ class VideoParser: logger.info("Starting video transcription") transcript = self.get_transcript_object() # divide by 1000 because assemblyai timestamps are in milliseconds - time_to_text = {u.start / 1000: u.text for u in transcript.utterances} + time_to_text = ( + {u.start / 1000: u.text for u in transcript.utterances} + if transcript.utterances + else {} + ) debug_object("Time to text", time_to_text) self.transcript = TimeSeriesData(time_to_text) return self.transcript @@ -125,7 +132,7 @@ class VideoParser: {last_description if last_description else 'No previous frame description available, this is the first frame'} --- - In your response, only provide the description of the current frame, using the above information as context. + In your response, only provide the description of the current frame, using the above information as context. """, }, { @@ -147,7 +154,7 @@ class VideoParser: logger.info(f"Generated descriptions for {len(time_to_description)} frames") return TimeSeriesData(time_to_description) - def process_video(self) -> Dict: + def process_video(self) -> ParseVideoResult: """ Process the video to get both transcript and frame descriptions @@ -155,16 +162,17 @@ class VideoParser: Dictionary containing transcript and frame descriptions as TimeSeriesData objects """ logger.info("Starting full video processing") - result = { - "metadata": { - "duration": self.duration, - "fps": self.fps, - "total_frames": self.total_frames, - "frame_sample_rate": self.frame_sample_rate, - }, - "transcript": self.get_transcript(), - "frame_descriptions": self.get_frame_descriptions(), + metadata = { + "duration": self.duration, + "fps": self.fps, + "total_frames": self.total_frames, + "frame_sample_rate": self.frame_sample_rate, } + result = ParseVideoResult( + metadata=metadata, + transcript=self.get_transcript(), + frame_descriptions=self.get_frame_descriptions(), + ) logger.info("Video processing completed successfully") return result diff --git a/core/services/document_service.py b/core/services/document_service.py index e7650e7..36a3873 100644 --- a/core/services/document_service.py +++ b/core/services/document_service.py @@ -163,7 +163,11 @@ class DocumentService: if "write" not in auth.permissions: raise PermissionError("User does not have write permission") - # 1. Create document record + file_content = await file.read() + additional_metadata, chunks = await self.parser.parse_file( + file_content, file.content_type or "" + ) + doc = Document( content_type=file.content_type or "", filename=file.filename, @@ -174,11 +178,10 @@ class DocumentService: "writers": [auth.entity_id], "admins": [auth.entity_id], }, + additional_metadata=additional_metadata, ) logger.info(f"Created file document record with ID {doc.external_id}") - # 2. Read and store file - file_content = await file.read() storage_info = await self.storage.upload_from_base64( base64.b64encode(file_content).decode(), doc.external_id, file.content_type ) @@ -187,8 +190,6 @@ class DocumentService: f"Stored file in bucket `{storage_info[0]}` with key `{storage_info[1]}`" ) - # 3. Parse content into chunks - chunks = await self.parser.parse_file(file_content, file.content_type or "") if not chunks: raise ValueError("No content chunks extracted from file") logger.info(f"Parsed file into {len(chunks)} chunks") diff --git a/core/vector_store/base_vector_store.py b/core/vector_store/base_vector_store.py index be591c6..48da6d4 100644 --- a/core/vector_store/base_vector_store.py +++ b/core/vector_store/base_vector_store.py @@ -1,6 +1,5 @@ from abc import ABC, abstractmethod from typing import List, Optional, Tuple -from core.models.auth import AuthContext from core.models.documents import DocumentChunk