diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/api.py b/core/api.py new file mode 100644 index 0000000..f2cf8f3 --- /dev/null +++ b/core/api.py @@ -0,0 +1,274 @@ +from fastapi import FastAPI, HTTPException, Depends, Header, Request, status +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +from typing import Dict, Any, List, Optional, Annotated +from pydantic import BaseModel, Field +import jwt +import os +from datetime import datetime, UTC +import logging +from .vector_store.mongo_vector_store import MongoDBAtlasVectorStore +from .embedding_model.openai_embedding_model import OpenAIEmbeddingModel +from .parser.unstructured_parser import UnstructuredAPIParser +from .planner.simple_planner import SimpleRAGPlanner +from .document import Document, DocumentChunk + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Initialize FastAPI app +app = FastAPI( + title="DataBridge API", + description="REST API for DataBridge document ingestion and querying", + version="1.0.0" +) + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +class DataBridgeException(HTTPException): + def __init__(self, detail: str, status_code: int = 400): + super().__init__(status_code=status_code, detail=detail) + + +class AuthenticationError(DataBridgeException): + def __init__(self, detail: str = "Authentication failed"): + super().__init__(detail=detail, status_code=status.HTTP_401_UNAUTHORIZED) + + +class ServiceConfig: + """Service-wide configuration and component management""" + def __init__(self): + self.jwt_secret = os.getenv("JWT_SECRET_KEY") + if not self.jwt_secret: + raise ValueError("JWT_SECRET_KEY environment variable not set") + + # Required environment variables + required_vars = { + "MONGODB_URI": "MongoDB connection string", + "OPENAI_API_KEY": "OpenAI API key", + "UNSTRUCTURED_API_KEY": "Unstructured API key" + } + + missing = [f"{var} ({desc})" for var, desc in required_vars.items() if not os.getenv(var)] + if missing: + raise ValueError(f"Missing required environment variables: {', '.join(missing)}") + + # Initialize core components + self._init_components() + + def _init_components(self): + """Initialize service components""" + try: + self.vector_store = MongoDBAtlasVectorStore( + connection_string=os.getenv("MONGODB_URI"), + database_name=os.getenv("DB_NAME", "databridge"), + collection_name=os.getenv("COLLECTION_NAME", "embeddings") + ) + + self.embedding_model = OpenAIEmbeddingModel( + api_key=os.getenv("OPENAI_API_KEY"), + model_name=os.getenv("EMBEDDING_MODEL", "text-embedding-3-small") + ) + + self.parser = UnstructuredAPIParser( + api_key=os.getenv("UNSTRUCTURED_API_KEY"), + chunk_size=int(os.getenv("CHUNK_SIZE", "1000")), + chunk_overlap=int(os.getenv("CHUNK_OVERLAP", "200")) + ) + + self.planner = SimpleRAGPlanner( + default_k=int(os.getenv("DEFAULT_K", "4")) + ) + except Exception as e: + raise ValueError(f"Failed to initialize components: {str(e)}") + + async def verify_token(self, token: str, owner_id: str) -> bool: + """Verify JWT token and owner_id""" + try: + payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"]) + if payload.get("owner_id") != owner_id: + raise AuthenticationError("Owner ID mismatch") + if datetime.fromtimestamp(payload["exp"], UTC) < datetime.now(UTC): + raise AuthenticationError("Token has expired") + return True + except jwt.InvalidTokenError: + raise AuthenticationError("Invalid token") + except Exception as e: + raise AuthenticationError(f"Authentication failed: {str(e)}") + + +# Initialize service +service = ServiceConfig() + + +# Request/Response Models +class IngestRequest(BaseModel): + content: str = Field(..., description="Document content (text or base64)") + metadata: Dict[str, Any] = Field(default_factory=dict, description="Document metadata") + + +class QueryRequest(BaseModel): + query: str = Field(..., description="Query string") + k: Optional[int] = Field(default=4, description="Number of results to return") + filters: Optional[Dict[str, Any]] = Field(default=None, + description="Optional metadata filters") + + +class IngestResponse(BaseModel): + document_id: str = Field(..., description="Ingested document ID") + message: str = Field(default="Document ingested successfully") + + +class QueryResponse(BaseModel): + results: List[Dict[str, Any]] = Field(..., description="Query results") + total_results: int = Field(..., description="Total number of results") + + +# Authentication dependency +async def verify_auth( + owner_id: Annotated[str, Header(alias="X-Owner-ID")], + auth_token: Annotated[str, Header(alias="X-Auth-Token")] +) -> str: + """Verify authentication headers""" + await service.verify_token(auth_token, owner_id) + return owner_id + + +# Error handler middleware +@app.middleware("http") +async def error_handler(request: Request, call_next): + try: + return await call_next(request) + except DataBridgeException as e: + return JSONResponse( + status_code=e.status_code, + content={"error": e.detail} + ) + except Exception as e: + logger.exception("Unexpected error") + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"error": "Internal server error"} + ) + + +# API Routes +@app.post("/ingest", response_model=IngestResponse) +async def ingest_document( + request: IngestRequest, + owner_id: str = Depends(verify_auth) +) -> IngestResponse: + """ + Ingest a document into DataBridge. + All configuration and credentials are handled server-side. + """ + logger.info(f"Ingesting document for owner {owner_id}") + + # Add owner_id to metadata + request.metadata['owner_id'] = owner_id + + # Create document + doc = Document(request.content, request.metadata, owner_id) + + # Parse into chunks + chunk_texts = service.parser.parse(request.content, request.metadata) + # Create embeddings and chunks + chunks = [] + for chunk_text in chunk_texts: + embedding = await service.embedding_model.embed(chunk_text) + chunk = DocumentChunk(chunk_text, embedding, doc.id) + chunk.metadata = { + 'owner_id': owner_id, + **request.metadata + } + chunks.append(chunk) + + # Store in vector store + if not service.vector_store.store_embeddings(chunks): + raise DataBridgeException( + "Failed to store embeddings", + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR + ) + + return IngestResponse(document_id=doc.id) + + +@app.post("/query", response_model=QueryResponse) +async def query_documents( + request: QueryRequest, + owner_id: str = Depends(verify_auth) +) -> QueryResponse: + """ + Query documents in DataBridge. + All configuration and credentials are handled server-side. + """ + logger.info(f"Processing query for owner {owner_id}") + print("ADILOG ") + # Create plan + plan = service.planner.plan_retrieval(request.query, k=request.k) + + # Get query embedding + query_embedding = await service.embedding_model.embed(request.query) + + # Query vector store + chunks = service.vector_store.query_similar( + query_embedding, + k=plan["k"], + owner_id=owner_id, + filters=request.filters + ) + + # Format results + results = [ + { + "content": chunk.content, + "doc_id": chunk.doc_id, + "chunk_id": chunk.id, + "score": getattr(chunk, "score", None), + "metadata": {k:v for k,v in chunk.metadata.items() if k != 'owner_id'} + } + for chunk in chunks + ] + + return QueryResponse( + results=results, + total_results=len(results) + ) + + +# Health check endpoint +@app.get("/health") +async def health_check(): + """Check service health""" + try: + # Verify MongoDB connection + service.vector_store.collection.find_one({}) + return {"status": "healthy"} + except Exception as e: + raise DataBridgeException( + f"Service unhealthy: {str(e)}", + status_code=status.HTTP_503_SERVICE_UNAVAILABLE + ) + + +# Startup and shutdown events +@app.on_event("startup") +async def startup_event(): + """Verify all connections on startup""" + logger.info("Starting DataBridge service") + await health_check() + + +@app.on_event("shutdown") +async def shutdown_event(): + """Cleanup on shutdown""" + logger.info("Shutting down DataBridge service") diff --git a/core/auth.py b/core/auth.py new file mode 100644 index 0000000..42413cf --- /dev/null +++ b/core/auth.py @@ -0,0 +1,35 @@ +from fastapi import Request, HTTPException +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +import jwt + +security = HTTPBearer() + + +class DataBridgeAuth: + def __init__(self, secret_key: str): + self.secret_key = secret_key + + async def __call__(self, request: Request, credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: + try: + token = credentials.credentials + payload = jwt.decode(token, self.secret_key, algorithms=["HS256"]) + + # Validate owner_id from token matches header + owner_id = request.headers.get("X-Owner-ID") + if owner_id != payload.get("owner_id"): + raise HTTPException( + status_code=401, + detail="Owner ID mismatch" + ) + + return owner_id + except jwt.ExpiredSignatureError: + raise HTTPException( + status_code=401, + detail="Token has expired" + ) + except jwt.InvalidTokenError: + raise HTTPException( + status_code=401, + detail="Invalid token" + ) diff --git a/databridge.py b/core/databridge.py similarity index 89% rename from databridge.py rename to core/databridge.py index 2035249..0a37eaa 100644 --- a/databridge.py +++ b/core/databridge.py @@ -1,10 +1,10 @@ from typing import Dict, Any, List -from databridge_uri import DataBridgeURI -from document import Document, DocumentChunk -from mongo_vector_store import MongoDBAtlasVectorStore -from openai_embedding_model import OpenAIEmbeddingModel -from unstructured_parser import UnstructuredAPIParser -from simple_planner import SimpleRAGPlanner +from .databridge_uri import DataBridgeURI +from .document import Document, DocumentChunk +from .vector_store.mongo_vector_store import MongoDBAtlasVectorStore +from .embedding_model.openai_embedding_model import OpenAIEmbeddingModel +from .parser.unstructured_parser import UnstructuredAPIParser +from .planner.simple_planner import SimpleRAGPlanner class DataBridge: diff --git a/databridge_uri.py b/core/databridge_uri.py similarity index 100% rename from databridge_uri.py rename to core/databridge_uri.py diff --git a/document.py b/core/document.py similarity index 100% rename from document.py rename to core/document.py diff --git a/core/embedding_model/__init__.py b/core/embedding_model/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/base_embedding_model.py b/core/embedding_model/base_embedding_model.py similarity index 100% rename from base_embedding_model.py rename to core/embedding_model/base_embedding_model.py diff --git a/openai_embedding_model.py b/core/embedding_model/openai_embedding_model.py similarity index 92% rename from openai_embedding_model.py rename to core/embedding_model/openai_embedding_model.py index e3176f2..043265f 100644 --- a/openai_embedding_model.py +++ b/core/embedding_model/openai_embedding_model.py @@ -1,6 +1,6 @@ from typing import List, Union import openai -from base_embedding_model import BaseEmbeddingModel +from .base_embedding_model import BaseEmbeddingModel class OpenAIEmbeddingModel(BaseEmbeddingModel): diff --git a/core/main.py b/core/main.py new file mode 100644 index 0000000..645175a --- /dev/null +++ b/core/main.py @@ -0,0 +1,27 @@ +from fastapi import FastAPI, Depends +from .api import app as api_app +from .auth import DataBridgeAuth +import os + +app = FastAPI() +auth = DataBridgeAuth(secret_key=os.getenv("JWT_SECRET_KEY", "your-secret-key")) + +# Mount the API with authentication +app.mount("/api/v1", api_app) + +# Add authentication middleware to all routes +@app.middleware("http") +async def authenticate_requests(request: Request, call_next): + if request.url.path.startswith("/api/v1"): + try: + await auth(request) + except HTTPException as e: + return JSONResponse( + status_code=e.status_code, + content={"detail": e.detail} + ) + return await call_next(request) + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/core/parser/__init__.py b/core/parser/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/base_parser.py b/core/parser/base_parser.py similarity index 100% rename from base_parser.py rename to core/parser/base_parser.py diff --git a/unstructured_parser.py b/core/parser/unstructured_parser.py similarity index 98% rename from unstructured_parser.py rename to core/parser/unstructured_parser.py index 25b43eb..0a2071c 100644 --- a/unstructured_parser.py +++ b/core/parser/unstructured_parser.py @@ -1,5 +1,5 @@ from typing import Dict, Any, List -from base_parser import BaseParser +from .base_parser import BaseParser from unstructured.partition.auto import partition from langchain.text_splitter import RecursiveCharacterTextSplitter import os diff --git a/core/planner/__init__.py b/core/planner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/base_planner.py b/core/planner/base_planner.py similarity index 100% rename from base_planner.py rename to core/planner/base_planner.py diff --git a/simple_planner.py b/core/planner/simple_planner.py similarity index 93% rename from simple_planner.py rename to core/planner/simple_planner.py index df59915..733e063 100644 --- a/simple_planner.py +++ b/core/planner/simple_planner.py @@ -1,5 +1,5 @@ from typing import Dict, Any -from base_planner import BasePlanner +from .base_planner import BasePlanner class SimpleRAGPlanner(BasePlanner): diff --git a/core/vector_store/__init__.py b/core/vector_store/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/base_vector_store.py b/core/vector_store/base_vector_store.py similarity index 91% rename from base_vector_store.py rename to core/vector_store/base_vector_store.py index 6d2bcbd..630c0ad 100644 --- a/base_vector_store.py +++ b/core/vector_store/base_vector_store.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from typing import List -from document import DocumentChunk +from core.document import DocumentChunk class BaseVectorStore(ABC): diff --git a/mongo_vector_store.py b/core/vector_store/mongo_vector_store.py similarity index 92% rename from mongo_vector_store.py rename to core/vector_store/mongo_vector_store.py index 319007a..587e34c 100644 --- a/mongo_vector_store.py +++ b/core/vector_store/mongo_vector_store.py @@ -1,7 +1,7 @@ from typing import List, Dict, Any from pymongo import MongoClient -from base_vector_store import BaseVectorStore -from document import DocumentChunk +from .base_vector_store import BaseVectorStore +from core.document import DocumentChunk class MongoDBAtlasVectorStore(BaseVectorStore): @@ -45,18 +45,21 @@ class MongoDBAtlasVectorStore(BaseVectorStore): documents = [] for chunk in chunks: doc = { - "_id": chunk.id, # Use chunk.id as MongoDB _id + "_id": chunk.id, "text": chunk.content, "embedding": chunk.embedding, "doc_id": chunk.doc_id, "owner_id": chunk.metadata.get("owner_id"), "metadata": chunk.metadata } + print("BHAU") + print(doc) documents.append(doc) if documents: # Use ordered=False to continue even if some inserts fail result = self.collection.insert_many(documents, ordered=False) + print(result) return len(result.inserted_ids) > 0 return True @@ -89,8 +92,11 @@ class MongoDBAtlasVectorStore(BaseVectorStore): } } ] + # print("ADILOG: " + str(pipeline)) results = list(self.collection.aggregate(pipeline)) + print("ADILOG") + print(results) chunks = [] for result in results: diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sample.pdf b/examples/sample.pdf similarity index 100% rename from sample.pdf rename to examples/sample.pdf diff --git a/simple_example.py b/examples/simple_example.py similarity index 91% rename from simple_example.py rename to examples/simple_example.py index 2ce760b..9637c8c 100644 --- a/simple_example.py +++ b/examples/simple_example.py @@ -1,6 +1,8 @@ -from datetime import datetime, timedelta, UTC # Note: using UTC for timezone awareness +import sys; sys.path.append('.') + +from datetime import datetime, timedelta, UTC import base64 -from databridge import DataBridge +from core.databridge import DataBridge import jwt import os from dotenv import load_dotenv @@ -47,7 +49,7 @@ async def main(): bridge = DataBridge(create_databridge_uri()) # Example: Ingest a PDF document - with open("sample.pdf", "rb") as f: + with open("examples/sample.pdf", "rb") as f: pdf_content = base64.b64encode(f.read()).decode() await bridge.ingest_document( diff --git a/printer.py b/printer.py new file mode 100644 index 0000000..0c1823c --- /dev/null +++ b/printer.py @@ -0,0 +1,89 @@ +import os +from pathlib import Path + +def should_ignore_directory(dirname): + """ + Check if directory should be ignored. + + Args: + dirname (str): Name of the directory + + Returns: + bool: True if directory should be ignored, False otherwise + """ + # List of directory names to ignore + ignore_dirs = { + 'venv', + 'env', + '.venv', + 'virtualenv', + '__pycache__', + '.pytest_cache', + '.mypy_cache', + '.tox' + } + return dirname in ignore_dirs + +def aggregate_python_files(root_dir, output_file, script_name): + """ + Recursively search through directories starting from root_dir, + find all Python files, and write their contents to a single output file. + Ignores virtual environment directories, __init__.py files and the script itself. + + Args: + root_dir (str): The root directory to start the search from + output_file (str): The name of the output file to create + script_name (str): Name of this script to ignore + """ + # Convert root_dir to absolute path + root_dir = os.path.abspath(root_dir) + + # Use with statement to properly handle file opening/closing + with open(output_file, 'w', encoding='utf-8') as outfile: + # Walk through all directories + for dirpath, dirnames, filenames in os.walk(root_dir, topdown=True): + # Modify dirnames in place to skip ignored directories + dirnames[:] = [d for d in dirnames if not should_ignore_directory(d)] + + # Filter for Python files, excluding __init__.py and this script + python_files = [ + f for f in filenames + if f.endswith('.py') + and f != '__init__.py' + and f != script_name + and f != output_file + ] + + for py_file in python_files: + # Get the full file path + file_path = os.path.join(dirpath, py_file) + # Get relative path from root_dir + rel_path = os.path.relpath(file_path, root_dir) + + try: + # Read the content of the Python file + with open(file_path, 'r', encoding='utf-8') as infile: + content = infile.read() + + # Write the file path and contents to the output file + outfile.write(f"{rel_path}\n") + outfile.write(content) + outfile.write("\n\n" + "="*80 + "\n\n") # Separator between files + + except Exception as e: + outfile.write(f"Error reading {rel_path}: {str(e)}\n\n") + +if __name__ == "__main__": + # Get the name of this script + script_name = os.path.basename(__file__) + + # Get current directory as default root + current_dir = os.getcwd() + + # Output file name + output_file = "python_files_contents.txt" + + print(f"Starting to process Python files from: {current_dir}") + print(f"Ignoring {script_name}, all __init__.py files, and virtual environment directories") + aggregate_python_files(current_dir, output_file, script_name) + print(f"Finished! Results written to: {output_file}") diff --git a/sdks/python/README.md b/sdks/python/README.md new file mode 100644 index 0000000..e69de29 diff --git a/sdks/python/databridge/__init__.py b/sdks/python/databridge/__init__.py new file mode 100644 index 0000000..5de8ab5 --- /dev/null +++ b/sdks/python/databridge/__init__.py @@ -0,0 +1,5 @@ +from .client import DataBridge +from .exceptions import DataBridgeError +from .types import ContentType + +__all__ = ['DataBridge', 'DataBridgeError', 'ContentType'] \ No newline at end of file diff --git a/sdks/python/databridge/client.py b/sdks/python/databridge/client.py new file mode 100644 index 0000000..d465453 --- /dev/null +++ b/sdks/python/databridge/client.py @@ -0,0 +1,198 @@ +from typing import Dict, Any, List, Optional, Union +import httpx +from urllib.parse import urlparse +import jwt +from datetime import datetime, UTC +import asyncio +from dataclasses import dataclass +from .exceptions import AuthenticationError +from .types import ContentType +import logging + +logger = logging.getLogger(__name__) + + +@dataclass +class QueryResult: + """Structured query result""" + content: str + doc_id: str + chunk_id: str + score: Optional[float] + metadata: Dict[str, Any] + + +class DataBridge: + """ + DataBridge client for document ingestion and querying. + + Usage: + db = DataBridge("databridge://owner123:token@databridge.local") + doc_id = await db.ingest_document("content", {"title": "My Doc"}) + results = await db.query("What is...") + """ + def __init__( + self, + uri: str, + base_url: str = "https://api.databridge.ai/v1", + timeout: int = 30, + max_retries: int = 3 + ): + self._base_url = base_url.rstrip('/') + self._timeout = timeout + self._max_retries = max_retries + self._client = httpx.AsyncClient(timeout=timeout) + self._setup_auth(uri) + + def _setup_auth(self, uri: str) -> None: + """Setup authentication from URI""" + try: + parsed = urlparse(uri) + if not parsed.netloc: + raise ValueError("Invalid URI format") + + auth_parts = parsed.netloc.split('@')[0].split(':') + if len(auth_parts) != 2: + raise ValueError("URI must include owner_id and auth_token") + + self._owner_id = auth_parts[0] + self._auth_token = auth_parts[1] + + # Validate token structure (not signature) + try: + decoded = jwt.decode(self._auth_token, options={"verify_signature": False}) + self._token_expiry = datetime.fromtimestamp(decoded['exp'], UTC) + except jwt.InvalidTokenError as e: + raise ValueError(f"Invalid auth token format: {str(e)}") + + except Exception as e: + raise AuthenticationError(f"Failed to setup authentication: {str(e)}") + + async def _make_request( + self, + method: str, + endpoint: str, + data: Dict[str, Any] = None, + retry_count: int = 0 + ) -> Dict[str, Any]: + """Make authenticated HTTP request with retries""" + # if datetime.now(UTC) > self._token_expiry: + # raise AuthenticationError("Authentication token has expired") + headers = { + "X-Owner-ID": self._owner_id, + "X-Auth-Token": self._auth_token, + "Content-Type": "application/json" + } + + try: + response = await self._client.request( + method, + f"http://localhost:8000/{endpoint.lstrip('/')}", + json=data, + headers=headers + ) + + response.raise_for_status() + return response.json() + + except httpx.HTTPStatusError as e: + if e.response.status_code == 401: + raise AuthenticationError("Authentication failed: " + str(e)) + elif e.response.status_code >= 500 and retry_count < self._max_retries: + await asyncio.sleep(2 ** retry_count) # Exponential backoff + return await self._make_request(method, endpoint, data, retry_count + 1) + else: + raise ConnectionError(f"Request failed: {e.response.text}") + except Exception as e: + raise ConnectionError(f"Request failed: {str(e)}") + + async def ingest_document( + self, + content: Union[str, bytes], + metadata: Optional[Dict[str, Any]] = None, + content_type: ContentType = ContentType.TEXT + ) -> str: + """ + Ingest a document into DataBridge. + + Args: + content: Document content (string or bytes) + metadata: Optional document metadata + content_type: Type of the content being ingested + + Returns: + Document ID of the ingested document + """ + if isinstance(content, bytes): + import base64 + content = base64.b64encode(content).decode() + metadata = metadata or {} + metadata["is_base64"] = True + + metadata = metadata or {} + metadata["content_type"] = content_type + + response = await self._make_request( + "POST", + "ingest", + { + "content": content, + "metadata": metadata + } + ) + + return response["document_id"] + + async def query( + self, + query: str, + k: int = 4, + filters: Optional[Dict[str, Any]] = None + ) -> List[QueryResult]: + """ + Query documents in DataBridge. + + Args: + query: Query string + k: Number of results to return + filters: Optional metadata filters + + Returns: + List of QueryResult objects + """ + response = await self._make_request( + "POST", + "query", + { + "query": query, + "k": k, + "filters": filters + } + ) + + return [ + QueryResult( + content=result["content"], + doc_id=result["doc_id"], + chunk_id=result["chunk_id"], + score=result.get("score"), + metadata=result.get("metadata", {}) + ) + for result in response["results"] + ] + + async def close(self): + """Close the HTTP client""" + await self._client.aclose() + + async def __aenter__(self): + """Async context manager entry""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit""" + await self.close() + + def __repr__(self) -> str: + """Safe string representation""" + return f"DataBridge(owner_id='{self._owner_id}')" diff --git a/sdks/python/databridge/exceptions.py b/sdks/python/databridge/exceptions.py new file mode 100644 index 0000000..fe1e7f6 --- /dev/null +++ b/sdks/python/databridge/exceptions.py @@ -0,0 +1,13 @@ +class DataBridgeError(Exception): + """Base exception for DataBridge SDK""" + pass + + +class AuthenticationError(DataBridgeError): + """Authentication related errors""" + pass + + +class ConnectionError(DataBridgeError): + """Connection related errors""" + pass \ No newline at end of file diff --git a/sdks/python/databridge/types.py b/sdks/python/databridge/types.py new file mode 100644 index 0000000..6fccf7c --- /dev/null +++ b/sdks/python/databridge/types.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class ContentType(str, Enum): + """Supported content types""" + TEXT = "text/plain" + PDF = "application/pdf" + DOCX = "application/vnd.openxmlformats-officedocument.wordprocessingml.document" + HTML = "text/html" \ No newline at end of file diff --git a/sdks/python/examples/basic_usage.py b/sdks/python/examples/basic_usage.py new file mode 100644 index 0000000..0eda359 --- /dev/null +++ b/sdks/python/examples/basic_usage.py @@ -0,0 +1,197 @@ +import asyncio +import os +import sys +from pathlib import Path +from datetime import UTC, datetime, timedelta + +from dotenv import load_dotenv +import jwt + +# we can't pip install, this basically acts like pip install. +sdk_root = Path(__file__).parent.parent +sys.path.append(str(sdk_root)) + +from databridge import DataBridge, ContentType, DataBridgeError + + +def create_test_uri(): + """Create a test URI with a valid JWT token""" + token = jwt.encode( + { + 'owner_id': 'test_user_123', + 'exp': datetime.now(UTC) + timedelta(days=30) + }, + "your-secret-key-for-signing-tokens", + algorithm='HS256' + ) + return f"databridge://test_user_123:{token}@localhost:8000" + + +async def example_text(): + """Example of ingesting and querying text documents""" + print("\n=== Text Document Example ===") + load_dotenv() + uri = os.getenv("DATABRIDGE_URI") + if not uri: + raise ValueError("Please set DATABRIDGE_URI environment variable") + + db = DataBridge(create_test_uri()) + + try: + # Ingest a simple text document + content = """ + Machine learning (ML) is a type of artificial intelligence (AI) that allows + software applications to become more accurate at predicting outcomes without + being explicitly programmed to do so. Machine learning algorithms use historical + data as input to predict new output values. + """ + + doc_id = await db.ingest_document( + content=content, + metadata={ + "title": "ML Introduction", + "category": "tech", + "tags": ["ML", "AI", "technology"] + } + ) + print(f"✓ Document ingested successfully (ID: {doc_id})") + + # Query the document + results = await db.query( + query="What is machine learning?", + k=1 # Get top result + ) + + print("\nQuery Results:") + for result in results: + print(f"Content: {result.content.strip()}") + print(f"Score: {result.score:.2f}") + print(f"Metadata: {result.metadata}") + + except DataBridgeError as e: + print(f"× Error: {str(e)}") + + finally: + await db.close() + + +async def example_pdf(): + """Example of ingesting and querying PDF documents""" + print("\n=== PDF Document Example ===") + + uri = os.getenv("DATABRIDGE_URI") + if not uri: + raise ValueError("Please set DATABRIDGE_URI environment variable") + + # Path to a sample PDF in the examples directory + pdf_path = Path(__file__).parent / "sample.pdf" + if not pdf_path.exists(): + print("× sample.pdf not found in examples directory") + return + + db = DataBridge(uri) + + try: + # Read and ingest PDF + with open(pdf_path, "rb") as f: + pdf_content = f.read() + + doc_id = await db.ingest_document( + content=pdf_content, + metadata={ + "title": "Sample Document", + "source": "examples", + "file_type": "pdf" + }, + content_type=ContentType.PDF + ) + print(f"✓ PDF ingested successfully (ID: {doc_id})") + + # Query the PDF content + results = await db.query( + query="What is the main topic of this document?", + k=2, # Get top 2 results + filters={"file_type": "pdf"} # Only search PDF documents + ) + + print("\nQuery Results:") + for i, result in enumerate(results, 1): + print(f"\nResult {i}:") + print(f"Content: {result.content[:200]}...") + print(f"Score: {result.score:.2f}") + print(f"Document ID: {result.doc_id}") + + except DataBridgeError as e: + print(f"× Error: {str(e)}") + + finally: + await db.close() + + +async def example_batch(): + """Example of batch operations""" + print("\n=== Batch Operations Example ===") + + uri = os.getenv("DATABRIDGE_URI") + if not uri: + raise ValueError("Please set DATABRIDGE_URI environment variable") + + db = DataBridge(uri) + + try: + # Prepare multiple documents + documents = [ + { + "content": "Python is a programming language.", + "metadata": {"category": "programming", "level": "basic"} + }, + { + "content": "JavaScript runs in the browser.", + "metadata": {"category": "programming", "level": "basic"} + }, + { + "content": "Docker containers package applications.", + "metadata": {"category": "devops", "level": "intermediate"} + } + ] + + # Ingest multiple documents + doc_ids = [] + for doc in documents: + doc_id = await db.ingest_document( + content=doc["content"], + metadata=doc["metadata"] + ) + doc_ids.append(doc_id) + print(f"✓ Ingested {len(doc_ids)} documents") + + # Query with filters + results = await db.query( + query="What is Python?", + filters={"category": "programming"} + ) + + print("\nQuery Results (Programming category only):") + for result in results: + print(f"\nContent: {result.content}") + print(f"Category: {result.metadata['category']}") + print(f"Level: {result.metadata['level']}") + + except DataBridgeError as e: + print(f"× Error: {str(e)}") + + finally: + await db.close() + + +async def main(): + """Run all examples""" + try: + await example_text() + await example_pdf() + await example_batch() + except Exception as e: + print(f"× Main error: {str(e)}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml new file mode 100644 index 0000000..365f679 --- /dev/null +++ b/sdks/python/pyproject.toml @@ -0,0 +1,17 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "databridge-client" +version = "0.1.0" +description = "Python client for DataBridge RAG service" +readme = "README.md" +requires-python = ">=3.8" +dependencies = [ + "httpx>=0.24.0", + "pyjwt>=2.0.0" +] + +[project.optional-dependencies] +dev = ["pytest", "pytest-asyncio", "black", "isort"] diff --git a/sdks/python/setup.py b/sdks/python/setup.py new file mode 100644 index 0000000..0cdd1d2 --- /dev/null +++ b/sdks/python/setup.py @@ -0,0 +1,12 @@ +from setuptools import setup, find_packages + +setup( + name="databridge", + version="0.1.0", + packages=find_packages(), + install_requires=[ + "httpx", + "pyjwt", + ], + python_requires=">=3.7", +) \ No newline at end of file diff --git a/start_server.py b/start_server.py new file mode 100644 index 0000000..10d0e06 --- /dev/null +++ b/start_server.py @@ -0,0 +1,30 @@ +import uvicorn +import os +from dotenv import load_dotenv + +def main(): + # Load environment variables from .env file + load_dotenv() + + # Verify required environment variables + required_vars = [ + "MONGODB_URI", + "OPENAI_API_KEY", + "UNSTRUCTURED_API_KEY", + "JWT_SECRET_KEY" + ] + + missing = [var for var in required_vars if not os.getenv(var)] + if missing: + raise ValueError(f"Missing required environment variables: {', '.join(missing)}") + + # Start server + uvicorn.run( + "core.api:app", + host="0.0.0.0", # Listen on all available interfaces + port=8000, + reload=True # Enable auto-reload during development + ) + +if __name__ == "__main__": + main() \ No newline at end of file