basic ingestion using unstructured and k-nearest retrieval works

This commit is contained in:
Adityavardhan Agrawal 2024-11-14 23:18:37 -05:00
parent db013e4713
commit 30577dc0ff
18 changed files with 763 additions and 171 deletions

23
.gitignore vendored Normal file
View File

@ -0,0 +1,23 @@
# Python-related files
*__pycache__/
*.pyc
*.pyo
*.pyd
.Python
env/
.env
venv/*
ENV/
dist/
build/
*.egg-info/
.eggs/
*.egg
*.pytest_cache/
# Virtual environment
.venv/
.vscode/
*.DS_Store

0
__init__.py Normal file
View File

9
base_embedding_model.py Normal file
View File

@ -0,0 +1,9 @@
from abc import ABC, abstractmethod
from typing import List, Union
class BaseEmbeddingModel(ABC):
@abstractmethod
async def embed(self, text: Union[str, List[str]]) -> List[float]:
"""Generate embeddings for input text"""
pass

9
base_parser.py Normal file
View File

@ -0,0 +1,9 @@
from abc import ABC, abstractmethod
from typing import Dict, Any, List
class BaseParser(ABC):
@abstractmethod
def parse(self, content: str, metadata: Dict[str, Any]) -> List[str]:
"""Parse content into chunks"""
pass

9
base_planner.py Normal file
View File

@ -0,0 +1,9 @@
from abc import ABC, abstractmethod
from typing import Dict, Any
class BasePlanner(ABC):
@abstractmethod
def plan_retrieval(self, query: str, **kwargs) -> Dict[str, Any]:
"""Create execution plan for retrieval"""
pass

15
base_vector_store.py Normal file
View File

@ -0,0 +1,15 @@
from abc import ABC, abstractmethod
from typing import List
from document import DocumentChunk
class BaseVectorStore(ABC):
@abstractmethod
def store_embeddings(self, chunks: List[DocumentChunk]) -> bool:
"""Store document chunks and their embeddings"""
pass
@abstractmethod
def query_similar(self, query_embedding: List[float], k: int, owner_id: str) -> List[DocumentChunk]:
"""Find similar chunks"""
pass

171
core.py
View File

@ -1,171 +0,0 @@
from typing import List, Dict, Any, Optional
from abc import ABC, abstractmethod
from datetime import datetime
import uuid
# Base Classes and Interfaces
class Document:
def __init__(self, content: str, metadata: Dict[str, Any], owner_id: str):
self.id = str(uuid.uuid4())
self.content = content
self.metadata = metadata
self.owner_id = owner_id
self.created_at = datetime.utcnow()
self.chunks: List[DocumentChunk] = []
class DocumentChunk:
def __init__(self, content: str, embedding: List[float], doc_id: str):
self.id = str(uuid.uuid4())
self.content = content
self.embedding = embedding
self.doc_id = doc_id
class BaseParser(ABC):
@abstractmethod
def parse(self, content: str, metadata: Dict[str, Any]) -> List[str]:
"""Parse content into chunks"""
pass
class BasePlanner(ABC):
@abstractmethod
def plan_retrieval(self, query: str, **kwargs) -> Dict[str, Any]:
"""Create execution plan for retrieval"""
pass
class BaseVectorStore(ABC):
@abstractmethod
def store_embeddings(self, chunks: List[DocumentChunk]) -> bool:
"""Store document chunks and their embeddings"""
pass
@abstractmethod
def query_similar(self, query_embedding: List[float], k: int, owner_id: str) -> List[DocumentChunk]:
"""Find similar chunks"""
pass
# Concrete Implementations
class SimpleParser(BaseParser):
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def parse(self, content: str, metadata: Dict[str, Any]) -> List[str]:
# Simple implementation - split by chunk_size
chunks = []
for i in range(0, len(content), self.chunk_size - self.chunk_overlap):
chunk = content[i:i + self.chunk_size]
if chunk:
chunks.append(chunk)
return chunks
class SimpleRAGPlanner(BasePlanner):
def __init__(self, k: int = 3):
self.k = k
def plan_retrieval(self, query: str, **kwargs) -> Dict[str, Any]:
return {
"strategy": "simple_rag",
"k": kwargs.get("k", self.k),
"query": query
}
# Main DataBridge Class
class DataBridge:
def __init__(
self,
parser: BaseParser,
planner: BasePlanner,
vector_store: BaseVectorStore,
embedding_model: Any # This would be your chosen embedding model
):
self.parser = parser
self.planner = planner
self.vector_store = vector_store
self.embedding_model = embedding_model
async def ingest_document(
self,
content: str,
metadata: Dict[str, Any],
owner_id: str
) -> Document:
# Create document
doc = Document(content, metadata, owner_id)
# Parse into chunks
chunk_texts = self.parser.parse(content, metadata)
# Create embeddings and chunks
for chunk_text in chunk_texts:
embedding = await self.embedding_model.embed(chunk_text)
chunk = DocumentChunk(chunk_text, embedding, doc.id)
doc.chunks.append(chunk)
# Store in vector store
success = self.vector_store.store_embeddings(doc.chunks)
if not success:
raise Exception("Failed to store embeddings")
return doc
async def query(
self,
query: str,
owner_id: str,
**kwargs
) -> List[Dict[str, Any]]:
# Create plan
plan = self.planner.plan_retrieval(query, **kwargs)
# Get query embedding
query_embedding = await self.embedding_model.embed(query)
# Execute plan
chunks = self.vector_store.query_similar(
query_embedding,
k=plan["k"],
owner_id=owner_id
)
# Format results
results = []
for chunk in chunks:
results.append({
"content": chunk.content,
"doc_id": chunk.doc_id,
"chunk_id": chunk.id,
"score": chunk.score if hasattr(chunk, "score") else None
})
return results
# Example usage
"""
# Initialize components
parser = SimpleParser()
planner = SimpleRAGPlanner()
vector_store = YourVectorStore() # Implement with chosen backend
embedding_model = YourEmbeddingModel() # Implement with chosen model
# Create DataBridge instance
db = DataBridge(parser, planner, vector_store, embedding_model)
# Ingest a document
doc = await db.ingest_document(
content="Your document content here",
metadata={"source": "pdf", "title": "Example Doc"},
owner_id="user123"
)
# Query the system
results = await db.query(
query="Your query here",
owner_id="user123",
k=5 # optional override
)
"""

106
databridge.py Normal file
View File

@ -0,0 +1,106 @@
from typing import Dict, Any, List
from databridge_uri import DataBridgeURI
from document import Document, DocumentChunk
from mongo_vector_store import MongoDBAtlasVectorStore
from openai_embedding_model import OpenAIEmbeddingModel
from unstructured_parser import UnstructuredAPIParser
from simple_planner import SimpleRAGPlanner
class DataBridge:
"""
DataBridge with owner authentication and authorization.
Configured via URI containing owner credentials.
"""
def __init__(self, uri: str):
# Parse URI and initialize configuration
self.config = DataBridgeURI(uri)
# Initialize components
self._init_components()
def _init_components(self):
"""Initialize all required components using the URI configuration"""
self.embedding_model = OpenAIEmbeddingModel(
api_key=self.config.openai_api_key,
model_name=self.config.embedding_model
)
self.parser = UnstructuredAPIParser(
api_key=self.config.unstructured_api_key,
chunk_size=1000,
chunk_overlap=200
)
self.vector_store = MongoDBAtlasVectorStore(
connection_string=self.config.mongo_uri,
database_name=self.config.db_name,
collection_name=self.config.collection_name
)
self.planner = SimpleRAGPlanner(default_k=4)
async def ingest_document(
self,
content: str,
metadata: Dict[str, Any]
) -> Document:
"""
Ingest a document using the owner ID from the URI configuration.
"""
# Add owner_id to metadata
metadata['owner_id'] = self.config.owner_id
# Create document
doc = Document(content, metadata, self.config.owner_id)
# Parse into chunks
chunk_texts = self.parser.parse(content, metadata)
# Create embeddings and chunks
for chunk_text in chunk_texts:
embedding = await self.embedding_model.embed(chunk_text)
chunk = DocumentChunk(chunk_text, embedding, doc.id)
chunk.metadata = {'owner_id': self.config.owner_id}
doc.chunks.append(chunk)
# Store in vector store
success = self.vector_store.store_embeddings(doc.chunks)
if not success:
raise Exception("Failed to store embeddings")
return doc
async def query(
self,
query: str,
**kwargs
) -> List[Dict[str, Any]]:
"""
Query the document store using the owner ID from the URI configuration.
"""
# Create plan
plan = self.planner.plan_retrieval(query, **kwargs)
# Get query embedding
query_embedding = await self.embedding_model.embed(query)
# Execute plan
chunks = self.vector_store.query_similar(
query_embedding,
k=plan["k"],
owner_id=self.config.owner_id
)
# Format results
results = []
for chunk in chunks:
results.append({
"content": chunk.content,
"doc_id": chunk.doc_id,
"chunk_id": chunk.id,
"score": chunk.score if hasattr(chunk, "score") else None
})
return results

61
databridge_uri.py Normal file
View File

@ -0,0 +1,61 @@
from urllib.parse import urlparse, parse_qs
from typing import Optional, Dict, Any
import os
import jwt
from datetime import datetime, timedelta
class DataBridgeURI:
"""
Handles parsing and validation of DataBridge URIs with owner authentication
Format: databridge://<owner_id>:<auth_token>@host/path?params
"""
def __init__(self, uri: str):
self.uri = uri
self._parse_uri()
def _parse_uri(self):
parsed = urlparse(self.uri)
query_params = parse_qs(parsed.query)
# Parse authentication info from netloc
auth_parts = parsed.netloc.split('@')[0].split(':')
if len(auth_parts) != 2:
raise ValueError("URI must include owner_id and auth_token")
self.owner_id = auth_parts[0]
self.auth_token = auth_parts[1]
# Validate and decode auth token
try:
self._validate_auth_token()
except Exception as e:
raise ValueError(f"Invalid auth token: {str(e)}")
# Get the original MongoDB URI from environment - use it as is
self.mongo_uri = os.getenv("MONGODB_URI")
if not self.mongo_uri:
raise ValueError("MONGODB_URI environment variable not set")
# Get configuration from query parameters
self.openai_api_key = query_params.get('openai_key', [os.getenv('OPENAI_API_KEY', '')])[0]
self.unstructured_api_key = query_params.get('unstructured_key', [os.getenv('UNSTRUCTURED_API_KEY', '')])[0]
self.db_name = query_params.get('db', ['brandsyncaidb'])[0]
self.collection_name = query_params.get('collection', ['kb_chunked_embeddings'])[0]
self.embedding_model = query_params.get('embedding_model', ['text-embedding-3-small'])[0]
# Validate required fields
if not all([self.mongo_uri, self.openai_api_key, self.unstructured_api_key]):
raise ValueError("Missing required configuration in DataBridge URI")
def _validate_auth_token(self):
"""Validate the auth token and extract any additional claims"""
try:
decoded = jwt.decode(self.auth_token, 'your-secret-key', algorithms=['HS256'])
if decoded.get('owner_id') != self.owner_id:
raise ValueError("Token owner_id mismatch")
self.auth_claims = decoded
except jwt.ExpiredSignatureError:
raise ValueError("Auth token has expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid auth token")

21
document.py Normal file
View File

@ -0,0 +1,21 @@
from typing import Dict, Any, List
import uuid
from datetime import datetime
class Document:
def __init__(self, content: str, metadata: Dict[str, Any], owner_id: str):
self.id = str(uuid.uuid4())
self.content = content
self.metadata = metadata
self.owner_id = owner_id
self.created_at = datetime.utcnow()
self.chunks: List[DocumentChunk] = []
class DocumentChunk:
def __init__(self, content: str, embedding: List[float], doc_id: str):
self.id = str(uuid.uuid4())
self.content = content
self.embedding = embedding
self.doc_id = doc_id

111
mongo_vector_store.py Normal file
View File

@ -0,0 +1,111 @@
from typing import List, Dict, Any
from pymongo import MongoClient
from base_vector_store import BaseVectorStore
from document import DocumentChunk
class MongoDBAtlasVectorStore(BaseVectorStore):
def __init__(
self,
connection_string: str,
database_name: str,
collection_name: str = "kb_chunked_embeddings",
index_name: str = "vector_index"
):
self.client = MongoClient(connection_string)
self.db = self.client[database_name]
self.collection = self.db[collection_name]
self.index_name = index_name
# Ensure vector search index exists
# self._ensure_index()
def _ensure_index(self):
"""Ensure the vector search index exists"""
try:
# Check if index exists
indexes = self.collection.list_indexes()
index_exists = any(index.get('name') == self.index_name for index in indexes)
if not index_exists:
# Create the vector search index if it doesn't exist
self.collection.create_index(
[("embedding", "vectorSearch")],
name=self.index_name,
vectorSearchOptions={
"dimensions": 1536, # For OpenAI embeddings
"similarity": "cosine"
}
)
except Exception as e:
print(f"Warning: Could not create vector index: {str(e)}")
def store_embeddings(self, chunks: List[DocumentChunk]) -> bool:
try:
documents = []
for chunk in chunks:
doc = {
"_id": chunk.id, # Use chunk.id as MongoDB _id
"text": chunk.content,
"embedding": chunk.embedding,
"doc_id": chunk.doc_id,
"owner_id": chunk.metadata.get("owner_id"),
"metadata": chunk.metadata
}
documents.append(doc)
if documents:
# Use ordered=False to continue even if some inserts fail
result = self.collection.insert_many(documents, ordered=False)
return len(result.inserted_ids) > 0
return True
except Exception as e:
print(f"Error storing embeddings: {str(e)}")
return False
def query_similar(
self,
query_embedding: List[float],
k: int,
owner_id: str,
filters: Dict[str, Any] = None
) -> List[DocumentChunk]:
"""Find similar chunks using MongoDB Atlas Vector Search."""
base_filter = {"owner_id": owner_id}
if filters:
base_filter.update(filters)
try:
pipeline = [
{
"$vectorSearch": {
"index": self.index_name,
"path": "embedding",
"queryVector": query_embedding,
"numCandidates": k * 10,
"limit": k,
"filter": base_filter
}
}
]
results = list(self.collection.aggregate(pipeline))
chunks = []
for result in results:
chunk = DocumentChunk(
content=result["text"],
embedding=result["embedding"],
doc_id=result["doc_id"]
)
chunk.score = result.get("score", 0)
# Add metadata back to chunk
chunk.metadata = result.get("metadata", {})
chunks.append(chunk)
return chunks
except Exception as e:
print(f"Error querying similar documents: {str(e)}")
return []

23
openai_embedding_model.py Normal file
View File

@ -0,0 +1,23 @@
from typing import List, Union
import openai
from base_embedding_model import BaseEmbeddingModel
class OpenAIEmbeddingModel(BaseEmbeddingModel):
def __init__(self, api_key: str, model_name: str = "text-embedding-3-small"):
self.client = openai.Client(api_key=api_key)
self.model_name = model_name
async def embed(self, text: Union[str, List[str]]) -> List[float]:
if isinstance(text, str):
text = [text]
response = self.client.embeddings.create(
model=self.model_name,
input=text
)
if len(text) == 1:
return response.data[0].embedding
return [item.embedding for item in response.data]

141
requirements.txt Normal file
View File

@ -0,0 +1,141 @@
aiohappyeyeballs==2.4.3
aiohttp==3.11.2
aiosignal==1.3.1
annotated-types==0.7.0
antlr4-python3-runtime==4.9.3
anyio==4.6.2.post1
attrs==24.2.0
backoff==2.2.1
beautifulsoup4==4.12.3
cachetools==5.5.0
certifi==2024.8.30
cffi==1.17.1
chardet==5.2.0
charset-normalizer==3.4.0
click==8.1.7
coloredlogs==15.0.1
contourpy==1.3.1
cryptography==43.0.3
cycler==0.12.1
dataclasses-json==0.6.7
Deprecated==1.2.14
distro==1.9.0
dnspython==2.7.0
effdet==0.4.1
emoji==2.14.0
eval_type_backport==0.2.0
filelock==3.16.1
filetype==1.2.0
flatbuffers==24.3.25
fonttools==4.55.0
frozenlist==1.5.0
fsspec==2024.10.0
google-api-core==2.23.0
google-auth==2.36.0
google-cloud-vision==3.8.1
googleapis-common-protos==1.66.0
grpcio==1.67.1
grpcio-status==1.67.1
h11==0.14.0
html5lib==1.1
httpcore==1.0.6
httpx==0.27.2
huggingface-hub==0.26.2
humanfriendly==10.0
idna==3.10
iopath==0.1.10
Jinja2==3.1.4
jiter==0.7.1
joblib==1.4.2
jsonpatch==1.33
jsonpath-python==1.0.6
jsonpointer==3.0.0
jwt==1.3.1
kiwisolver==1.4.7
langchain==0.3.7
langchain-core==0.3.18
langchain-text-splitters==0.3.2
langdetect==1.0.9
langsmith==0.1.143
layoutparser==0.3.4
lxml==5.3.0
MarkupSafe==3.0.2
marshmallow==3.23.1
matplotlib==3.9.2
mpmath==1.3.0
multidict==6.1.0
mypy-extensions==1.0.0
nest-asyncio==1.6.0
networkx==3.4.2
nltk==3.9.1
numpy==1.26.4
olefile==0.47
omegaconf==2.3.0
onnx==1.17.0
onnxruntime==1.20.0
openai==1.54.4
opencv-python==4.10.0.84
orjson==3.10.11
packaging==24.2
pandas==2.2.3
pdf2image==1.17.0
pdfminer.six==20231228
pdfplumber==0.11.4
pi_heif==0.20.0
pikepdf==9.4.1
pillow==11.0.0
portalocker==2.10.1
propcache==0.2.0
proto-plus==1.25.0
protobuf==5.28.3
psutil==6.1.0
pyasn1==0.6.1
pyasn1_modules==0.4.1
pycocotools==2.0.8
pycparser==2.22
pydantic==2.9.2
pydantic_core==2.23.4
PyJWT==2.9.0
pymongo==4.10.1
pyparsing==3.2.0
pypdf==5.1.0
pypdfium2==4.30.0
python-dateutil==2.8.2
python-dotenv==1.0.1
python-iso639==2024.10.22
python-magic==0.4.27
python-multipart==0.0.17
python-oxmsg==0.0.1
pytz==2024.2
PyYAML==6.0.2
RapidFuzz==3.10.1
regex==2024.11.6
requests==2.32.3
requests-toolbelt==1.0.0
rsa==4.9
safetensors==0.4.5
scipy==1.14.1
setuptools==75.5.0
six==1.16.0
sniffio==1.3.1
soupsieve==2.6
SQLAlchemy==2.0.36
sympy==1.13.1
tenacity==9.0.0
timm==1.0.11
tokenizers==0.20.3
torch==2.5.1
torchvision==0.20.1
tqdm==4.67.0
transformers==4.46.2
typing-inspect==0.9.0
typing_extensions==4.12.2
tzdata==2024.2
unstructured==0.16.5
unstructured-client==0.27.0
unstructured-inference==0.8.1
unstructured.pytesseract==0.3.13
urllib3==2.2.3
webencodings==0.5.1
wrapt==1.16.0
yarl==1.17.1

BIN
sample.pdf Normal file

Binary file not shown.

72
sanity_checks/mongo.py Normal file
View File

@ -0,0 +1,72 @@
from pymongo import MongoClient
from dotenv import load_dotenv
import os
import datetime
def test_mongo_operations():
# Load environment variables
load_dotenv()
# Get MongoDB URI from environment variable
mongo_uri = os.getenv("MONGODB_URI")
if not mongo_uri:
raise ValueError("MONGODB_URI environment variable not set")
try:
# Connect to MongoDB
client = MongoClient(mongo_uri)
# Test connection
client.admin.command('ping')
print("✅ Connected successfully to MongoDB")
# Get database and collection
db = client.brandsyncaidb # Using a test database
collection = db.kb_chunked_embeddings
# Insert a single document
test_doc = {
"name": "Test Document",
"timestamp": datetime.datetime.now(),
"value": 42
}
result = collection.insert_one(test_doc)
print(f"✅ Inserted document with ID: {result.inserted_id}")
# Insert multiple documents
test_docs = [
{"name": "Doc 1", "value": 1},
{"name": "Doc 2", "value": 2},
{"name": "Doc 3", "value": 3}
]
result = collection.insert_many(test_docs)
print(f"✅ Inserted {len(result.inserted_ids)} documents")
# Retrieve documents
print("\nRetrieving documents:")
for doc in collection.find():
print(f"Found document: {doc}")
# Find specific documents
print("\nFinding documents with value >= 2:")
query = {"value": {"$gte": 2}}
for doc in collection.find(query):
print(f"Found document: {doc}")
# Clean up - delete all test documents
# DON'T DELETE IF It'S BRANDSYNCAI
# result = collection.delete_many({})
print(f"\n✅ Cleaned up {result.deleted_count} test documents")
except Exception as e:
print(f"❌ Error: {str(e)}")
finally:
client.close()
print("\n✅ Connection closed")
if __name__ == "__main__":
test_mongo_operations()

74
simple_example.py Normal file
View File

@ -0,0 +1,74 @@
from datetime import datetime, timedelta, UTC # Note: using UTC for timezone awareness
import base64
from databridge import DataBridge
import jwt
import os
from dotenv import load_dotenv
def create_databridge_uri() -> str:
"""Create DataBridge URI from environment variables"""
load_dotenv()
# Get credentials from environment
mongo_uri = os.getenv("MONGODB_URI")
openai_key = os.getenv("OPENAI_API_KEY")
unstructured_key = os.getenv("UNSTRUCTURED_API_KEY")
owner_id = os.getenv("DATABRIDGE_OWNER", "admin")
# Validate required credentials
if not all([mongo_uri, openai_key, unstructured_key]):
raise ValueError("Missing required environment variables")
# Generate auth token
auth_token = jwt.encode(
{
'owner_id': owner_id,
'exp': datetime.now(UTC) + timedelta(days=30)
},
'your-secret-key', # In production, use proper secret
algorithm='HS256'
)
# For DataBridge URI, use any host identifier (it won't affect MongoDB connection)
uri = (
f"databridge://{owner_id}:{auth_token}@databridge.local"
f"?openai_key={openai_key}"
f"&unstructured_key={unstructured_key}"
f"&db=brandsyncaidb"
f"&collection=kb_chunked_embeddings"
)
return uri
async def main():
# Initialize DataBridge
bridge = DataBridge(create_databridge_uri())
# Example: Ingest a PDF document
with open("sample.pdf", "rb") as f:
pdf_content = base64.b64encode(f.read()).decode()
await bridge.ingest_document(
content=pdf_content,
metadata={
"content_type": "application/pdf",
"is_base64": True,
"title": "Sample PDF"
}
)
# Example: Query documents
results = await bridge.query(
query="What is machine learning?",
k=4
)
for result in results:
print(f"Content: {result['content'][:200]}...")
print(f"Score: {result['score']}\n")
if __name__ == "__main__":
import asyncio
asyncio.run(main())

17
simple_planner.py Normal file
View File

@ -0,0 +1,17 @@
from typing import Dict, Any
from base_planner import BasePlanner
class SimpleRAGPlanner(BasePlanner):
def __init__(self, default_k: int = 3):
self.default_k = default_k
def plan_retrieval(self, query: str, **kwargs) -> Dict[str, Any]:
"""Create a simple retrieval plan."""
return {
"strategy": "simple_retrieval",
"k": kwargs.get("k", self.default_k),
"query": query,
"filters": kwargs.get("filters", {}),
"min_score": kwargs.get("min_score", 0.0)
}

72
unstructured_parser.py Normal file
View File

@ -0,0 +1,72 @@
from typing import Dict, Any, List
from base_parser import BaseParser
from unstructured.partition.auto import partition
from langchain.text_splitter import RecursiveCharacterTextSplitter
import os
import tempfile
import base64
class UnstructuredAPIParser(BaseParser):
def __init__(
self,
api_key: str,
chunk_size: int = 1000,
chunk_overlap: int = 200,
api_url: str = "https://api.unstructuredapp.io"
):
self.api_key = api_key
self.api_url = api_url
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""]
)
def parse(self, content: str, metadata: Dict[str, Any]) -> List[str]:
"""Parse content using Unstructured API and split into chunks."""
try:
# Create temporary file for content
with tempfile.NamedTemporaryFile(delete=False, suffix=self._get_file_extension(metadata)) as temp_file:
if metadata.get("is_base64", False):
temp_file.write(base64.b64decode(content))
else:
temp_file.write(content.encode('utf-8'))
temp_file_path = temp_file.name
try:
# Use Unstructured API for parsing
elements = partition(
filename=temp_file_path,
api_key=self.api_key,
api_url=self.api_url,
partition_via_api=True
)
# Combine elements and split into chunks
full_text = "\n\n".join(str(element) for element in elements)
chunks = self.text_splitter.split_text(full_text)
return chunks
finally:
# Clean up temporary file
os.unlink(temp_file_path)
except Exception as e:
raise Exception(f"Error parsing document: {str(e)}")
def _get_file_extension(self, metadata: Dict[str, Any]) -> str:
"""Get appropriate file extension based on content type."""
content_type_mapping = {
'application/pdf': '.pdf',
'application/msword': '.doc',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
'image/jpeg': '.jpg',
'image/png': '.png',
'text/plain': '.txt',
'text/html': '.html'
}
return content_type_mapping.get(metadata.get('content_type'), '.txt')