SDK changes, add a sync client

This commit is contained in:
Adityavardhan Agrawal 2024-12-22 19:46:53 -05:00
parent 58726e1f2f
commit a925dcd7ac
5 changed files with 381 additions and 69 deletions

View File

@ -1,10 +1,15 @@
from .client import DataBridge
from .exceptions import DataBridgeError, AuthenticationError
__version__ = "0.1.4"
"""
DataBridge Python SDK for document ingestion and querying.
"""
from .sync import DataBridge
from .async_ import AsyncDataBridge
from .models import Document, IngestTextRequest
__all__ = [
"DataBridge",
"DataBridgeError",
"AuthenticationError",
"AsyncDataBridge",
"Document",
"IngestTextRequest",
]
__version__ = "0.1.5"

View File

@ -1,77 +1,27 @@
import json
from typing import Dict, Any, List, Literal, Optional, Union, BinaryIO
import httpx
from urllib.parse import urlparse
import jwt
from pydantic import BaseModel, Field, field_validator
from pathlib import Path
from io import BytesIO
import json
from pathlib import Path
from typing import Dict, Any, List, Optional, Union, BinaryIO
from urllib.parse import urlparse
import httpx
import jwt
from .models import Document, IngestTextRequest, ChunkResult, DocumentResult
class IngestTextRequest(BaseModel):
"""Request model for text ingestion"""
content: str
metadata: Dict[str, Any] = {}
class Document(BaseModel):
"""Document metadata model"""
external_id: str
content_type: str
filename: Optional[str] = None
metadata: Dict[str, Any] = {}
storage_info: Dict[str, str] = {}
system_metadata: Dict[str, Any] = {}
access_control: Dict[str, Any] = {}
chunk_ids: List[str] = []
class ChunkResult(BaseModel):
"""Query result at chunk level"""
content: str
score: float
document_id: str
chunk_number: int
metadata: Dict[str, Any]
content_type: str
filename: Optional[str] = None
download_url: Optional[str] = None
class DocumentContent(BaseModel):
"""Represents either a URL or content string"""
type: Literal["url", "string"]
value: str
filename: Optional[str] = Field(None, description="Filename when type is url")
@field_validator('filename')
def filename_only_for_url(cls, v, values):
if values.data.get('type') == 'string' and v is not None:
raise ValueError('filename can only be set when type is url')
if values.data.get('type') == 'url' and v is None:
raise ValueError('filename is required when type is url')
return v
class DocumentResult(BaseModel):
"""Query result at document level"""
score: float
document_id: str
metadata: Dict[str, Any]
content: DocumentContent
class DataBridge:
class AsyncDataBridge:
"""
DataBridge client for document operations.
Args:
uri (str): DataBridge URI in the format "databridge://<owner_id>:<token>@<host>"
timeout (int, optional): Request timeout in seconds. Defaults to 30.
is_local (bool, optional): Whether to connect to a local server. Defaults to False.
Examples:
```python
async with DataBridge("databridge://owner_id:token@api.databridge.ai") as db:
async with AsyncDataBridge("databridge://owner_id:token@api.databridge.ai") as db:
# Ingest text
doc = await db.ingest_text(
"Sample content",

View File

@ -0,0 +1,55 @@
from typing import Dict, Any, List, Literal, Optional
from pydantic import BaseModel, Field, field_validator
class Document(BaseModel):
"""Document metadata model"""
external_id: str = Field(..., description="Unique document identifier")
content_type: str = Field(..., description="Content type of the document")
filename: Optional[str] = Field(None, description="Original filename if available")
metadata: Dict[str, Any] = Field(default_factory=dict, description="User-defined metadata")
storage_info: Dict[str, str] = Field(default_factory=dict, description="Storage-related information")
system_metadata: Dict[str, Any] = Field(default_factory=dict, description="System-managed metadata")
access_control: Dict[str, Any] = Field(default_factory=dict, description="Access control information")
chunk_ids: List[str] = Field(default_factory=list, description="IDs of document chunks")
class IngestTextRequest(BaseModel):
"""Request model for text ingestion"""
content: str = Field(..., description="Text content to ingest")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Optional metadata")
class ChunkResult(BaseModel):
"""Query result at chunk level"""
content: str = Field(..., description="Chunk content")
score: float = Field(..., description="Relevance score")
document_id: str = Field(..., description="Parent document ID")
chunk_number: int = Field(..., description="Chunk sequence number")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Document metadata")
content_type: str = Field(..., description="Content type")
filename: Optional[str] = Field(None, description="Original filename")
download_url: Optional[str] = Field(None, description="URL to download full document")
class DocumentContent(BaseModel):
"""Represents either a URL or content string"""
type: Literal["url", "string"] = Field(..., description="Content type (url or string)")
value: str = Field(..., description="The actual content or URL")
filename: Optional[str] = Field(None, description="Filename when type is url")
@field_validator('filename')
def filename_only_for_url(cls, v, values):
if values.data.get('type') == 'string' and v is not None:
raise ValueError('filename can only be set when type is url')
if values.data.get('type') == 'url' and v is None:
raise ValueError('filename is required when type is url')
return v
class DocumentResult(BaseModel):
"""Query result at document level"""
score: float = Field(..., description="Relevance score")
document_id: str = Field(..., description="Document ID")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Document metadata")
content: DocumentContent = Field(..., description="Document content or URL")

View File

@ -0,0 +1,302 @@
from io import BytesIO
import json
from pathlib import Path
from typing import Dict, Any, List, Optional, Union, BinaryIO
from urllib.parse import urlparse
import jwt
import requests
from .models import Document, IngestTextRequest, ChunkResult, DocumentResult
class DataBridge:
"""
DataBridge client for document operations.
Args:
uri (str): DataBridge URI in the format "databridge://<owner_id>:<token>@<host>"
timeout (int, optional): Request timeout in seconds. Defaults to 30.
is_local (bool, optional): Whether connecting to local development server. Defaults to False.
Examples:
```python
with DataBridge("databridge://owner_id:token@api.databridge.ai") as db:
# Ingest text
doc = db.ingest_text(
"Sample content",
metadata={"category": "sample"}
)
# Query documents
results = db.query("search query")
```
"""
def __init__(self, uri: str, timeout: int = 30, is_local: bool = False):
self._timeout = timeout
self._session = requests.Session()
if is_local:
self._session.verify = False # Disable SSL for localhost
self._is_local = is_local
self._setup_auth(uri)
def _setup_auth(self, uri: str) -> None:
"""Setup authentication from URI"""
parsed = urlparse(uri)
if not parsed.netloc:
raise ValueError("Invalid URI format")
# Split host and auth parts
auth, host = parsed.netloc.split('@')
self._owner_id, self._auth_token = auth.split(':')
# Set base URL
self._base_url = f"{'http' if self._is_local else 'https'}://{host}"
# Basic token validation
jwt.decode(self._auth_token, options={"verify_signature": False})
def _request(
self,
method: str,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
files: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Make authenticated HTTP request"""
headers = {"Authorization": f"Bearer {self._auth_token}"}
if not files:
headers["Content-Type"] = "application/json"
response = self._session.request(
method,
f"{self._base_url}/{endpoint.lstrip('/')}",
json=data if not files else None,
files=files,
data=data if files else None,
headers=headers,
timeout=self._timeout
)
response.raise_for_status()
return response.json()
def ingest_text(
self,
content: str,
metadata: Optional[Dict[str, Any]] = None
) -> Document:
"""
Ingest a text document into DataBridge.
Args:
content: Text content to ingest
metadata: Optional metadata dictionary
Returns:
Document: Metadata of the ingested document
Example:
```python
doc = db.ingest_text(
"Machine learning is fascinating...",
metadata={
"title": "ML Introduction",
"category": "tech"
}
)
```
"""
request = IngestTextRequest(
content=content,
metadata=metadata or {}
)
response = self._request(
"POST",
"ingest/text",
request.model_dump()
)
return Document(**response)
def ingest_file(
self,
file: Union[str, bytes, BinaryIO, Path],
filename: str,
content_type: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> Document:
"""
Ingest a file document into DataBridge.
Args:
file: File to ingest (path string, bytes, file object, or Path)
filename: Name of the file
content_type: MIME type (optional, will be guessed if not provided)
metadata: Optional metadata dictionary
Returns:
Document: Metadata of the ingested document
Example:
```python
# From file path
doc = db.ingest_file(
"document.pdf",
filename="document.pdf",
content_type="application/pdf",
metadata={"department": "research"}
)
# From file object
with open("document.pdf", "rb") as f:
doc = db.ingest_file(f, "document.pdf")
```
"""
# Handle different file input types
if isinstance(file, (str, Path)):
file_path = Path(file)
if not file_path.exists():
raise ValueError(f"File not found: {file}")
with open(file_path, "rb") as f:
content = f.read()
file_obj = BytesIO(content)
elif isinstance(file, bytes):
file_obj = BytesIO(file)
else:
file_obj = file
try:
# Prepare multipart form data
files = {
"file": (filename, file_obj, content_type or "application/octet-stream")
}
# Add metadata
data = {"metadata": json.dumps(metadata or {})}
response = self._request(
"POST",
"ingest/file",
data=data,
files=files
)
return Document(**response)
finally:
# Close file if we opened it
if isinstance(file, (str, Path)):
file_obj.close()
def query(
self,
query: str,
return_type: str = "chunks",
filters: Optional[Dict[str, Any]] = None,
k: int = 4,
min_score: float = 0.0
) -> Union[List[ChunkResult], List[DocumentResult]]:
"""
Query documents in DataBridge.
Args:
query: Search query text
return_type: Type of results ("chunks" or "documents")
filters: Optional metadata filters
k: Number of results (default: 4)
min_score: Minimum similarity threshold (default: 0.0)
Returns:
List[ChunkResult] or List[DocumentResult] depending on return_type
Example:
```python
# Query for chunks
chunks = db.query(
"What are the key findings?",
return_type="chunks",
filters={"department": "research"}
)
# Query for documents
docs = db.query(
"machine learning",
return_type="documents",
k=5
)
```
"""
request = {
"query": query,
"return_type": return_type,
"filters": filters,
"k": k,
"min_score": min_score
}
response = self._request("POST", "query", request)
if return_type == "chunks":
return [ChunkResult(**r) for r in response]
return [DocumentResult(**r) for r in response]
def list_documents(
self,
skip: int = 0,
limit: int = 100,
filters: Optional[Dict[str, Any]] = None
) -> List[Document]:
"""
List accessible documents.
Args:
skip: Number of documents to skip
limit: Maximum number of documents to return
filters: Optional filters
Returns:
List[Document]: List of accessible documents
Example:
```python
# Get first page
docs = db.list_documents(limit=10)
# Get next page
next_page = db.list_documents(skip=10, limit=10, filters={"department": "research"})
```
"""
response = self._request(
"GET",
f"documents?skip={skip}&limit={limit}&filters={filters}"
)
return [Document(**doc) for doc in response]
def get_document(self, document_id: str) -> Document:
"""
Get document metadata by ID.
Args:
document_id: ID of the document
Returns:
Document: Document metadata
Example:
```python
doc = db.get_document("doc_123")
print(f"Title: {doc.metadata.get('title')}")
```
"""
response = self._request("GET", f"documents/{document_id}")
return Document(**response)
def close(self):
"""Close the HTTP session"""
self._session.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

View File

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "databridge-client"
version = "0.1.4"
version = "0.1.5"
authors = [
{ name = "DataBridge", email = "databridgesuperuser@gmail.com" },
]