morphik-core/core/parser/combined_parser.py

123 lines
4.0 KiB
Python
Raw Normal View History

2024-12-29 11:10:51 +05:30
from typing import Any, Dict, List, Optional, Tuple
import logging
import os
import tempfile
import magic
2024-12-30 11:58:53 -05:00
from core.models.chunk import Chunk
2024-12-28 19:41:05 +05:30
from core.parser.base_parser import BaseParser
from core.parser.unstructured_parser import UnstructuredParser
2024-12-28 19:41:05 +05:30
from core.parser.video.parse_video import VideoParser
logger = logging.getLogger(__name__)
class CombinedParser(BaseParser):
def __init__(
self,
use_unstructured_api: bool,
unstructured_api_key: str,
assemblyai_api_key: str,
chunk_size: int,
chunk_overlap: int,
frame_sample_rate: int,
):
self.unstructured_parser = UnstructuredParser(
use_api=use_unstructured_api,
api_key=unstructured_api_key,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
self.assemblyai_api_key = assemblyai_api_key
self.frame_sample_rate = frame_sample_rate
self.magic = magic.Magic(mime=True)
def _is_video_file(
2024-12-28 19:41:05 +05:30
self,
file_path: Optional[str] = None,
file_bytes: Optional[bytes] = None,
filename: Optional[str] = None,
) -> bool:
"""
Detect if a file is a video using multiple methods:
1. Magic numbers/file signatures
2. File extension (as fallback)
"""
video_extensions = {
".mp4",
".avi",
".mov",
".mkv",
".wmv",
".flv",
".webm",
".m4v",
".mpeg",
".mpg",
}
# Check magic numbers if we have file content
if file_bytes:
mime_type = self.magic.from_buffer(file_bytes)
if mime_type.startswith("video/"):
return True
elif file_path:
mime_type = self.magic.from_file(file_path)
if mime_type.startswith("video/"):
return True
# Fallback to extension check if we have a filename
if filename:
ext = os.path.splitext(filename.lower())[1]
if ext in video_extensions:
return True
return False
2024-12-28 19:41:05 +05:30
async def split_text(self, text: str) -> List[Chunk]:
"""Split plain text into chunks using unstructured parser"""
return await self.unstructured_parser.split_text(text)
2024-12-29 11:10:51 +05:30
async def parse_file(
self, file: bytes, content_type: str, filename: str
2024-12-29 11:10:51 +05:30
) -> Tuple[Dict[str, Any], List[Chunk]]:
"""Parse file content into text chunks. Returns document metadata and a list of chunks"""
2024-12-28 19:41:05 +05:30
is_video = self._is_video_file(file_bytes=file)
if is_video:
return await self._parse_video(file)
else:
return await self.unstructured_parser.parse_file(file, content_type, filename)
2024-12-29 11:10:51 +05:30
async def _parse_video(self, file: bytes) -> Tuple[Dict[str, Any], List[Chunk]]:
"""Parse video file and combine transcript and frame descriptions into chunks"""
# Save video to temporary file if needed
2024-12-28 19:41:05 +05:30
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
temp_file.write(file)
temp_file.close()
video_path = temp_file.name
try:
# Process video
parser = VideoParser(
video_path,
assemblyai_api_key=self.assemblyai_api_key,
frame_sample_rate=self.frame_sample_rate,
)
results = parser.process_video()
# Get all frame descriptions
2024-12-29 11:10:51 +05:30
frame_descriptions = results.frame_descriptions
# Get all transcript text
2024-12-29 11:10:51 +05:30
transcript_text = results.transcript
additional_metadata = {
"frame_descriptions": frame_descriptions.time_to_content,
"transcript": transcript_text.time_to_content,
"video_metadata": results.metadata,
}
chunks = frame_descriptions.to_chunks() + transcript_text.to_chunks()
return additional_metadata, chunks
finally:
# Clean up temporary file
os.unlink(video_path)