diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2c51b24 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# Python-related files +*__pycache__/ +*.pyc +*.pyo +*.pyd +.Python +env/ +.env +venv/* +ENV/ +dist/ +build/ +*.egg-info/ +.eggs/ +*.egg +*.pytest_cache/ + + +# Virtual environment +.venv/ +.vscode/ + +*.DS_Store diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/base_embedding_model.py b/base_embedding_model.py new file mode 100644 index 0000000..7ea1070 --- /dev/null +++ b/base_embedding_model.py @@ -0,0 +1,9 @@ +from abc import ABC, abstractmethod +from typing import List, Union + + +class BaseEmbeddingModel(ABC): + @abstractmethod + async def embed(self, text: Union[str, List[str]]) -> List[float]: + """Generate embeddings for input text""" + pass diff --git a/base_parser.py b/base_parser.py new file mode 100644 index 0000000..e69af50 --- /dev/null +++ b/base_parser.py @@ -0,0 +1,9 @@ +from abc import ABC, abstractmethod +from typing import Dict, Any, List + + +class BaseParser(ABC): + @abstractmethod + def parse(self, content: str, metadata: Dict[str, Any]) -> List[str]: + """Parse content into chunks""" + pass diff --git a/base_planner.py b/base_planner.py new file mode 100644 index 0000000..ed2d7ad --- /dev/null +++ b/base_planner.py @@ -0,0 +1,9 @@ +from abc import ABC, abstractmethod +from typing import Dict, Any + + +class BasePlanner(ABC): + @abstractmethod + def plan_retrieval(self, query: str, **kwargs) -> Dict[str, Any]: + """Create execution plan for retrieval""" + pass diff --git a/base_vector_store.py b/base_vector_store.py new file mode 100644 index 0000000..6d2bcbd --- /dev/null +++ b/base_vector_store.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod +from typing import List +from document import DocumentChunk + + +class BaseVectorStore(ABC): + @abstractmethod + def store_embeddings(self, chunks: List[DocumentChunk]) -> bool: + """Store document chunks and their embeddings""" + pass + + @abstractmethod + def query_similar(self, query_embedding: List[float], k: int, owner_id: str) -> List[DocumentChunk]: + """Find similar chunks""" + pass diff --git a/core.py b/core.py deleted file mode 100644 index 1053d25..0000000 --- a/core.py +++ /dev/null @@ -1,171 +0,0 @@ -from typing import List, Dict, Any, Optional -from abc import ABC, abstractmethod -from datetime import datetime -import uuid - -# Base Classes and Interfaces - -class Document: - def __init__(self, content: str, metadata: Dict[str, Any], owner_id: str): - self.id = str(uuid.uuid4()) - self.content = content - self.metadata = metadata - self.owner_id = owner_id - self.created_at = datetime.utcnow() - self.chunks: List[DocumentChunk] = [] - -class DocumentChunk: - def __init__(self, content: str, embedding: List[float], doc_id: str): - self.id = str(uuid.uuid4()) - self.content = content - self.embedding = embedding - self.doc_id = doc_id - -class BaseParser(ABC): - @abstractmethod - def parse(self, content: str, metadata: Dict[str, Any]) -> List[str]: - """Parse content into chunks""" - pass - - -class BasePlanner(ABC): - @abstractmethod - def plan_retrieval(self, query: str, **kwargs) -> Dict[str, Any]: - """Create execution plan for retrieval""" - pass - - -class BaseVectorStore(ABC): - @abstractmethod - def store_embeddings(self, chunks: List[DocumentChunk]) -> bool: - """Store document chunks and their embeddings""" - pass - - @abstractmethod - def query_similar(self, query_embedding: List[float], k: int, owner_id: str) -> List[DocumentChunk]: - """Find similar chunks""" - pass - -# Concrete Implementations - -class SimpleParser(BaseParser): - def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200): - self.chunk_size = chunk_size - self.chunk_overlap = chunk_overlap - - def parse(self, content: str, metadata: Dict[str, Any]) -> List[str]: - # Simple implementation - split by chunk_size - chunks = [] - for i in range(0, len(content), self.chunk_size - self.chunk_overlap): - chunk = content[i:i + self.chunk_size] - if chunk: - chunks.append(chunk) - return chunks - -class SimpleRAGPlanner(BasePlanner): - def __init__(self, k: int = 3): - self.k = k - - def plan_retrieval(self, query: str, **kwargs) -> Dict[str, Any]: - return { - "strategy": "simple_rag", - "k": kwargs.get("k", self.k), - "query": query - } - -# Main DataBridge Class - -class DataBridge: - def __init__( - self, - parser: BaseParser, - planner: BasePlanner, - vector_store: BaseVectorStore, - embedding_model: Any # This would be your chosen embedding model - ): - self.parser = parser - self.planner = planner - self.vector_store = vector_store - self.embedding_model = embedding_model - - async def ingest_document( - self, - content: str, - metadata: Dict[str, Any], - owner_id: str - ) -> Document: - # Create document - doc = Document(content, metadata, owner_id) - - # Parse into chunks - chunk_texts = self.parser.parse(content, metadata) - - # Create embeddings and chunks - for chunk_text in chunk_texts: - embedding = await self.embedding_model.embed(chunk_text) - chunk = DocumentChunk(chunk_text, embedding, doc.id) - doc.chunks.append(chunk) - - # Store in vector store - success = self.vector_store.store_embeddings(doc.chunks) - if not success: - raise Exception("Failed to store embeddings") - - return doc - - async def query( - self, - query: str, - owner_id: str, - **kwargs - ) -> List[Dict[str, Any]]: - # Create plan - plan = self.planner.plan_retrieval(query, **kwargs) - - # Get query embedding - query_embedding = await self.embedding_model.embed(query) - - # Execute plan - chunks = self.vector_store.query_similar( - query_embedding, - k=plan["k"], - owner_id=owner_id - ) - - # Format results - results = [] - for chunk in chunks: - results.append({ - "content": chunk.content, - "doc_id": chunk.doc_id, - "chunk_id": chunk.id, - "score": chunk.score if hasattr(chunk, "score") else None - }) - - return results - -# Example usage -""" -# Initialize components -parser = SimpleParser() -planner = SimpleRAGPlanner() -vector_store = YourVectorStore() # Implement with chosen backend -embedding_model = YourEmbeddingModel() # Implement with chosen model - -# Create DataBridge instance -db = DataBridge(parser, planner, vector_store, embedding_model) - -# Ingest a document -doc = await db.ingest_document( - content="Your document content here", - metadata={"source": "pdf", "title": "Example Doc"}, - owner_id="user123" -) - -# Query the system -results = await db.query( - query="Your query here", - owner_id="user123", - k=5 # optional override -) -""" \ No newline at end of file diff --git a/databridge.py b/databridge.py new file mode 100644 index 0000000..2035249 --- /dev/null +++ b/databridge.py @@ -0,0 +1,106 @@ +from typing import Dict, Any, List +from databridge_uri import DataBridgeURI +from document import Document, DocumentChunk +from mongo_vector_store import MongoDBAtlasVectorStore +from openai_embedding_model import OpenAIEmbeddingModel +from unstructured_parser import UnstructuredAPIParser +from simple_planner import SimpleRAGPlanner + + +class DataBridge: + """ + DataBridge with owner authentication and authorization. + Configured via URI containing owner credentials. + """ + + def __init__(self, uri: str): + # Parse URI and initialize configuration + self.config = DataBridgeURI(uri) + + # Initialize components + self._init_components() + + def _init_components(self): + """Initialize all required components using the URI configuration""" + self.embedding_model = OpenAIEmbeddingModel( + api_key=self.config.openai_api_key, + model_name=self.config.embedding_model + ) + + self.parser = UnstructuredAPIParser( + api_key=self.config.unstructured_api_key, + chunk_size=1000, + chunk_overlap=200 + ) + + self.vector_store = MongoDBAtlasVectorStore( + connection_string=self.config.mongo_uri, + database_name=self.config.db_name, + collection_name=self.config.collection_name + ) + + self.planner = SimpleRAGPlanner(default_k=4) + + async def ingest_document( + self, + content: str, + metadata: Dict[str, Any] + ) -> Document: + """ + Ingest a document using the owner ID from the URI configuration. + """ + # Add owner_id to metadata + metadata['owner_id'] = self.config.owner_id + + # Create document + doc = Document(content, metadata, self.config.owner_id) + + # Parse into chunks + chunk_texts = self.parser.parse(content, metadata) + + # Create embeddings and chunks + for chunk_text in chunk_texts: + embedding = await self.embedding_model.embed(chunk_text) + chunk = DocumentChunk(chunk_text, embedding, doc.id) + chunk.metadata = {'owner_id': self.config.owner_id} + doc.chunks.append(chunk) + + # Store in vector store + success = self.vector_store.store_embeddings(doc.chunks) + if not success: + raise Exception("Failed to store embeddings") + + return doc + + async def query( + self, + query: str, + **kwargs + ) -> List[Dict[str, Any]]: + """ + Query the document store using the owner ID from the URI configuration. + """ + # Create plan + plan = self.planner.plan_retrieval(query, **kwargs) + + # Get query embedding + query_embedding = await self.embedding_model.embed(query) + + # Execute plan + chunks = self.vector_store.query_similar( + query_embedding, + k=plan["k"], + owner_id=self.config.owner_id + ) + + # Format results + results = [] + for chunk in chunks: + results.append({ + "content": chunk.content, + "doc_id": chunk.doc_id, + "chunk_id": chunk.id, + "score": chunk.score if hasattr(chunk, "score") else None + }) + + return results diff --git a/databridge_uri.py b/databridge_uri.py new file mode 100644 index 0000000..2af80d0 --- /dev/null +++ b/databridge_uri.py @@ -0,0 +1,61 @@ +from urllib.parse import urlparse, parse_qs +from typing import Optional, Dict, Any +import os +import jwt +from datetime import datetime, timedelta + + +class DataBridgeURI: + """ + Handles parsing and validation of DataBridge URIs with owner authentication + Format: databridge://:@host/path?params + """ + def __init__(self, uri: str): + self.uri = uri + self._parse_uri() + + def _parse_uri(self): + parsed = urlparse(self.uri) + query_params = parse_qs(parsed.query) + + # Parse authentication info from netloc + auth_parts = parsed.netloc.split('@')[0].split(':') + if len(auth_parts) != 2: + raise ValueError("URI must include owner_id and auth_token") + + self.owner_id = auth_parts[0] + self.auth_token = auth_parts[1] + + # Validate and decode auth token + try: + self._validate_auth_token() + except Exception as e: + raise ValueError(f"Invalid auth token: {str(e)}") + + # Get the original MongoDB URI from environment - use it as is + self.mongo_uri = os.getenv("MONGODB_URI") + if not self.mongo_uri: + raise ValueError("MONGODB_URI environment variable not set") + + # Get configuration from query parameters + self.openai_api_key = query_params.get('openai_key', [os.getenv('OPENAI_API_KEY', '')])[0] + self.unstructured_api_key = query_params.get('unstructured_key', [os.getenv('UNSTRUCTURED_API_KEY', '')])[0] + self.db_name = query_params.get('db', ['brandsyncaidb'])[0] + self.collection_name = query_params.get('collection', ['kb_chunked_embeddings'])[0] + self.embedding_model = query_params.get('embedding_model', ['text-embedding-3-small'])[0] + + # Validate required fields + if not all([self.mongo_uri, self.openai_api_key, self.unstructured_api_key]): + raise ValueError("Missing required configuration in DataBridge URI") + + def _validate_auth_token(self): + """Validate the auth token and extract any additional claims""" + try: + decoded = jwt.decode(self.auth_token, 'your-secret-key', algorithms=['HS256']) + if decoded.get('owner_id') != self.owner_id: + raise ValueError("Token owner_id mismatch") + self.auth_claims = decoded + except jwt.ExpiredSignatureError: + raise ValueError("Auth token has expired") + except jwt.InvalidTokenError: + raise ValueError("Invalid auth token") diff --git a/document.py b/document.py new file mode 100644 index 0000000..4b24257 --- /dev/null +++ b/document.py @@ -0,0 +1,21 @@ +from typing import Dict, Any, List +import uuid +from datetime import datetime + + +class Document: + def __init__(self, content: str, metadata: Dict[str, Any], owner_id: str): + self.id = str(uuid.uuid4()) + self.content = content + self.metadata = metadata + self.owner_id = owner_id + self.created_at = datetime.utcnow() + self.chunks: List[DocumentChunk] = [] + + +class DocumentChunk: + def __init__(self, content: str, embedding: List[float], doc_id: str): + self.id = str(uuid.uuid4()) + self.content = content + self.embedding = embedding + self.doc_id = doc_id diff --git a/mongo_vector_store.py b/mongo_vector_store.py new file mode 100644 index 0000000..319007a --- /dev/null +++ b/mongo_vector_store.py @@ -0,0 +1,111 @@ +from typing import List, Dict, Any +from pymongo import MongoClient +from base_vector_store import BaseVectorStore +from document import DocumentChunk + + +class MongoDBAtlasVectorStore(BaseVectorStore): + def __init__( + self, + connection_string: str, + database_name: str, + collection_name: str = "kb_chunked_embeddings", + index_name: str = "vector_index" + ): + self.client = MongoClient(connection_string) + self.db = self.client[database_name] + self.collection = self.db[collection_name] + self.index_name = index_name + + # Ensure vector search index exists + # self._ensure_index() + + def _ensure_index(self): + """Ensure the vector search index exists""" + try: + # Check if index exists + indexes = self.collection.list_indexes() + index_exists = any(index.get('name') == self.index_name for index in indexes) + + if not index_exists: + # Create the vector search index if it doesn't exist + self.collection.create_index( + [("embedding", "vectorSearch")], + name=self.index_name, + vectorSearchOptions={ + "dimensions": 1536, # For OpenAI embeddings + "similarity": "cosine" + } + ) + except Exception as e: + print(f"Warning: Could not create vector index: {str(e)}") + + def store_embeddings(self, chunks: List[DocumentChunk]) -> bool: + try: + documents = [] + for chunk in chunks: + doc = { + "_id": chunk.id, # Use chunk.id as MongoDB _id + "text": chunk.content, + "embedding": chunk.embedding, + "doc_id": chunk.doc_id, + "owner_id": chunk.metadata.get("owner_id"), + "metadata": chunk.metadata + } + documents.append(doc) + + if documents: + # Use ordered=False to continue even if some inserts fail + result = self.collection.insert_many(documents, ordered=False) + return len(result.inserted_ids) > 0 + return True + + except Exception as e: + print(f"Error storing embeddings: {str(e)}") + return False + + def query_similar( + self, + query_embedding: List[float], + k: int, + owner_id: str, + filters: Dict[str, Any] = None + ) -> List[DocumentChunk]: + """Find similar chunks using MongoDB Atlas Vector Search.""" + base_filter = {"owner_id": owner_id} + if filters: + base_filter.update(filters) + + try: + pipeline = [ + { + "$vectorSearch": { + "index": self.index_name, + "path": "embedding", + "queryVector": query_embedding, + "numCandidates": k * 10, + "limit": k, + "filter": base_filter + } + } + ] + + results = list(self.collection.aggregate(pipeline)) + chunks = [] + + for result in results: + chunk = DocumentChunk( + content=result["text"], + embedding=result["embedding"], + doc_id=result["doc_id"] + ) + chunk.score = result.get("score", 0) + # Add metadata back to chunk + chunk.metadata = result.get("metadata", {}) + chunks.append(chunk) + + return chunks + + except Exception as e: + print(f"Error querying similar documents: {str(e)}") + return [] diff --git a/openai_embedding_model.py b/openai_embedding_model.py new file mode 100644 index 0000000..e3176f2 --- /dev/null +++ b/openai_embedding_model.py @@ -0,0 +1,23 @@ +from typing import List, Union +import openai +from base_embedding_model import BaseEmbeddingModel + + +class OpenAIEmbeddingModel(BaseEmbeddingModel): + def __init__(self, api_key: str, model_name: str = "text-embedding-3-small"): + self.client = openai.Client(api_key=api_key) + self.model_name = model_name + + async def embed(self, text: Union[str, List[str]]) -> List[float]: + if isinstance(text, str): + text = [text] + + response = self.client.embeddings.create( + model=self.model_name, + input=text + ) + + if len(text) == 1: + return response.data[0].embedding + + return [item.embedding for item in response.data] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3e33368 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,141 @@ +aiohappyeyeballs==2.4.3 +aiohttp==3.11.2 +aiosignal==1.3.1 +annotated-types==0.7.0 +antlr4-python3-runtime==4.9.3 +anyio==4.6.2.post1 +attrs==24.2.0 +backoff==2.2.1 +beautifulsoup4==4.12.3 +cachetools==5.5.0 +certifi==2024.8.30 +cffi==1.17.1 +chardet==5.2.0 +charset-normalizer==3.4.0 +click==8.1.7 +coloredlogs==15.0.1 +contourpy==1.3.1 +cryptography==43.0.3 +cycler==0.12.1 +dataclasses-json==0.6.7 +Deprecated==1.2.14 +distro==1.9.0 +dnspython==2.7.0 +effdet==0.4.1 +emoji==2.14.0 +eval_type_backport==0.2.0 +filelock==3.16.1 +filetype==1.2.0 +flatbuffers==24.3.25 +fonttools==4.55.0 +frozenlist==1.5.0 +fsspec==2024.10.0 +google-api-core==2.23.0 +google-auth==2.36.0 +google-cloud-vision==3.8.1 +googleapis-common-protos==1.66.0 +grpcio==1.67.1 +grpcio-status==1.67.1 +h11==0.14.0 +html5lib==1.1 +httpcore==1.0.6 +httpx==0.27.2 +huggingface-hub==0.26.2 +humanfriendly==10.0 +idna==3.10 +iopath==0.1.10 +Jinja2==3.1.4 +jiter==0.7.1 +joblib==1.4.2 +jsonpatch==1.33 +jsonpath-python==1.0.6 +jsonpointer==3.0.0 +jwt==1.3.1 +kiwisolver==1.4.7 +langchain==0.3.7 +langchain-core==0.3.18 +langchain-text-splitters==0.3.2 +langdetect==1.0.9 +langsmith==0.1.143 +layoutparser==0.3.4 +lxml==5.3.0 +MarkupSafe==3.0.2 +marshmallow==3.23.1 +matplotlib==3.9.2 +mpmath==1.3.0 +multidict==6.1.0 +mypy-extensions==1.0.0 +nest-asyncio==1.6.0 +networkx==3.4.2 +nltk==3.9.1 +numpy==1.26.4 +olefile==0.47 +omegaconf==2.3.0 +onnx==1.17.0 +onnxruntime==1.20.0 +openai==1.54.4 +opencv-python==4.10.0.84 +orjson==3.10.11 +packaging==24.2 +pandas==2.2.3 +pdf2image==1.17.0 +pdfminer.six==20231228 +pdfplumber==0.11.4 +pi_heif==0.20.0 +pikepdf==9.4.1 +pillow==11.0.0 +portalocker==2.10.1 +propcache==0.2.0 +proto-plus==1.25.0 +protobuf==5.28.3 +psutil==6.1.0 +pyasn1==0.6.1 +pyasn1_modules==0.4.1 +pycocotools==2.0.8 +pycparser==2.22 +pydantic==2.9.2 +pydantic_core==2.23.4 +PyJWT==2.9.0 +pymongo==4.10.1 +pyparsing==3.2.0 +pypdf==5.1.0 +pypdfium2==4.30.0 +python-dateutil==2.8.2 +python-dotenv==1.0.1 +python-iso639==2024.10.22 +python-magic==0.4.27 +python-multipart==0.0.17 +python-oxmsg==0.0.1 +pytz==2024.2 +PyYAML==6.0.2 +RapidFuzz==3.10.1 +regex==2024.11.6 +requests==2.32.3 +requests-toolbelt==1.0.0 +rsa==4.9 +safetensors==0.4.5 +scipy==1.14.1 +setuptools==75.5.0 +six==1.16.0 +sniffio==1.3.1 +soupsieve==2.6 +SQLAlchemy==2.0.36 +sympy==1.13.1 +tenacity==9.0.0 +timm==1.0.11 +tokenizers==0.20.3 +torch==2.5.1 +torchvision==0.20.1 +tqdm==4.67.0 +transformers==4.46.2 +typing-inspect==0.9.0 +typing_extensions==4.12.2 +tzdata==2024.2 +unstructured==0.16.5 +unstructured-client==0.27.0 +unstructured-inference==0.8.1 +unstructured.pytesseract==0.3.13 +urllib3==2.2.3 +webencodings==0.5.1 +wrapt==1.16.0 +yarl==1.17.1 diff --git a/sample.pdf b/sample.pdf new file mode 100644 index 0000000..b8dc38f Binary files /dev/null and b/sample.pdf differ diff --git a/sanity_checks/mongo.py b/sanity_checks/mongo.py new file mode 100644 index 0000000..f536036 --- /dev/null +++ b/sanity_checks/mongo.py @@ -0,0 +1,72 @@ +from pymongo import MongoClient +from dotenv import load_dotenv +import os +import datetime + + +def test_mongo_operations(): + # Load environment variables + load_dotenv() + + # Get MongoDB URI from environment variable + mongo_uri = os.getenv("MONGODB_URI") + if not mongo_uri: + raise ValueError("MONGODB_URI environment variable not set") + + try: + # Connect to MongoDB + client = MongoClient(mongo_uri) + + # Test connection + client.admin.command('ping') + print("✅ Connected successfully to MongoDB") + + # Get database and collection + db = client.brandsyncaidb # Using a test database + collection = db.kb_chunked_embeddings + + # Insert a single document + test_doc = { + "name": "Test Document", + "timestamp": datetime.datetime.now(), + "value": 42 + } + + result = collection.insert_one(test_doc) + print(f"✅ Inserted document with ID: {result.inserted_id}") + + # Insert multiple documents + test_docs = [ + {"name": "Doc 1", "value": 1}, + {"name": "Doc 2", "value": 2}, + {"name": "Doc 3", "value": 3} + ] + + result = collection.insert_many(test_docs) + print(f"✅ Inserted {len(result.inserted_ids)} documents") + + # Retrieve documents + print("\nRetrieving documents:") + for doc in collection.find(): + print(f"Found document: {doc}") + + # Find specific documents + print("\nFinding documents with value >= 2:") + query = {"value": {"$gte": 2}} + for doc in collection.find(query): + print(f"Found document: {doc}") + + # Clean up - delete all test documents + # DON'T DELETE IF It'S BRANDSYNCAI + # result = collection.delete_many({}) + print(f"\n✅ Cleaned up {result.deleted_count} test documents") + + except Exception as e: + print(f"❌ Error: {str(e)}") + finally: + client.close() + print("\n✅ Connection closed") + + +if __name__ == "__main__": + test_mongo_operations() diff --git a/simple_example.py b/simple_example.py new file mode 100644 index 0000000..2ce760b --- /dev/null +++ b/simple_example.py @@ -0,0 +1,74 @@ +from datetime import datetime, timedelta, UTC # Note: using UTC for timezone awareness +import base64 +from databridge import DataBridge +import jwt +import os +from dotenv import load_dotenv + + +def create_databridge_uri() -> str: + """Create DataBridge URI from environment variables""" + load_dotenv() + + # Get credentials from environment + mongo_uri = os.getenv("MONGODB_URI") + openai_key = os.getenv("OPENAI_API_KEY") + unstructured_key = os.getenv("UNSTRUCTURED_API_KEY") + owner_id = os.getenv("DATABRIDGE_OWNER", "admin") + + # Validate required credentials + if not all([mongo_uri, openai_key, unstructured_key]): + raise ValueError("Missing required environment variables") + + # Generate auth token + auth_token = jwt.encode( + { + 'owner_id': owner_id, + 'exp': datetime.now(UTC) + timedelta(days=30) + }, + 'your-secret-key', # In production, use proper secret + algorithm='HS256' + ) + + # For DataBridge URI, use any host identifier (it won't affect MongoDB connection) + uri = ( + f"databridge://{owner_id}:{auth_token}@databridge.local" + f"?openai_key={openai_key}" + f"&unstructured_key={unstructured_key}" + f"&db=brandsyncaidb" + f"&collection=kb_chunked_embeddings" + ) + + return uri + + +async def main(): + # Initialize DataBridge + bridge = DataBridge(create_databridge_uri()) + + # Example: Ingest a PDF document + with open("sample.pdf", "rb") as f: + pdf_content = base64.b64encode(f.read()).decode() + + await bridge.ingest_document( + content=pdf_content, + metadata={ + "content_type": "application/pdf", + "is_base64": True, + "title": "Sample PDF" + } + ) + + # Example: Query documents + results = await bridge.query( + query="What is machine learning?", + k=4 + ) + + for result in results: + print(f"Content: {result['content'][:200]}...") + print(f"Score: {result['score']}\n") + +if __name__ == "__main__": + import asyncio + asyncio.run(main()) diff --git a/simple_planner.py b/simple_planner.py new file mode 100644 index 0000000..df59915 --- /dev/null +++ b/simple_planner.py @@ -0,0 +1,17 @@ +from typing import Dict, Any +from base_planner import BasePlanner + + +class SimpleRAGPlanner(BasePlanner): + def __init__(self, default_k: int = 3): + self.default_k = default_k + + def plan_retrieval(self, query: str, **kwargs) -> Dict[str, Any]: + """Create a simple retrieval plan.""" + return { + "strategy": "simple_retrieval", + "k": kwargs.get("k", self.default_k), + "query": query, + "filters": kwargs.get("filters", {}), + "min_score": kwargs.get("min_score", 0.0) + } diff --git a/unstructured_parser.py b/unstructured_parser.py new file mode 100644 index 0000000..25b43eb --- /dev/null +++ b/unstructured_parser.py @@ -0,0 +1,72 @@ +from typing import Dict, Any, List +from base_parser import BaseParser +from unstructured.partition.auto import partition +from langchain.text_splitter import RecursiveCharacterTextSplitter +import os +import tempfile +import base64 + + +class UnstructuredAPIParser(BaseParser): + def __init__( + self, + api_key: str, + chunk_size: int = 1000, + chunk_overlap: int = 200, + api_url: str = "https://api.unstructuredapp.io" + ): + self.api_key = api_key + self.api_url = api_url + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + self.text_splitter = RecursiveCharacterTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + length_function=len, + separators=["\n\n", "\n", ". ", " ", ""] + ) + + def parse(self, content: str, metadata: Dict[str, Any]) -> List[str]: + """Parse content using Unstructured API and split into chunks.""" + try: + # Create temporary file for content + with tempfile.NamedTemporaryFile(delete=False, suffix=self._get_file_extension(metadata)) as temp_file: + if metadata.get("is_base64", False): + temp_file.write(base64.b64decode(content)) + else: + temp_file.write(content.encode('utf-8')) + temp_file_path = temp_file.name + + try: + # Use Unstructured API for parsing + elements = partition( + filename=temp_file_path, + api_key=self.api_key, + api_url=self.api_url, + partition_via_api=True + ) + + # Combine elements and split into chunks + full_text = "\n\n".join(str(element) for element in elements) + chunks = self.text_splitter.split_text(full_text) + + return chunks + finally: + # Clean up temporary file + os.unlink(temp_file_path) + + except Exception as e: + raise Exception(f"Error parsing document: {str(e)}") + + def _get_file_extension(self, metadata: Dict[str, Any]) -> str: + """Get appropriate file extension based on content type.""" + content_type_mapping = { + 'application/pdf': '.pdf', + 'application/msword': '.doc', + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx', + 'image/jpeg': '.jpg', + 'image/png': '.png', + 'text/plain': '.txt', + 'text/html': '.html' + } + return content_type_mapping.get(metadata.get('content_type'), '.txt')