2025-03-18 23:27:53 -04:00
|
|
|
from typing import Dict, Any, List, Literal, Optional, Union, BinaryIO
|
2025-03-17 17:36:43 -04:00
|
|
|
from pathlib import Path
|
|
|
|
from datetime import datetime
|
2024-12-22 19:46:53 -05:00
|
|
|
from pydantic import BaseModel, Field, field_validator
|
|
|
|
|
|
|
|
|
|
|
|
class Document(BaseModel):
|
|
|
|
"""Document metadata model"""
|
2024-12-26 11:34:24 -05:00
|
|
|
|
2024-12-22 19:46:53 -05:00
|
|
|
external_id: str = Field(..., description="Unique document identifier")
|
|
|
|
content_type: str = Field(..., description="Content type of the document")
|
|
|
|
filename: Optional[str] = Field(None, description="Original filename if available")
|
2024-12-29 12:48:41 +05:30
|
|
|
metadata: Dict[str, Any] = Field(default_factory=dict, description="User-defined metadata")
|
2024-12-26 11:34:24 -05:00
|
|
|
storage_info: Dict[str, str] = Field(
|
|
|
|
default_factory=dict, description="Storage-related information"
|
|
|
|
)
|
|
|
|
system_metadata: Dict[str, Any] = Field(
|
|
|
|
default_factory=dict, description="System-managed metadata"
|
|
|
|
)
|
|
|
|
access_control: Dict[str, Any] = Field(
|
|
|
|
default_factory=dict, description="Access control information"
|
|
|
|
)
|
2024-12-29 12:48:41 +05:30
|
|
|
chunk_ids: List[str] = Field(default_factory=list, description="IDs of document chunks")
|
2025-03-13 11:26:01 -04:00
|
|
|
|
|
|
|
# Client reference for update methods
|
|
|
|
_client = None
|
|
|
|
|
|
|
|
def update_with_text(
|
|
|
|
self,
|
|
|
|
content: str,
|
|
|
|
filename: Optional[str] = None,
|
|
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
|
|
rules: Optional[List] = None,
|
|
|
|
update_strategy: str = "add",
|
|
|
|
use_colpali: Optional[bool] = None,
|
|
|
|
) -> "Document":
|
|
|
|
"""
|
|
|
|
Update this document with new text content using the specified strategy.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
content: The new content to add
|
|
|
|
filename: Optional new filename for the document
|
|
|
|
metadata: Additional metadata to update (optional)
|
|
|
|
rules: Optional list of rules to apply to the content
|
|
|
|
update_strategy: Strategy for updating the document (currently only 'add' is supported)
|
|
|
|
use_colpali: Whether to use multi-vector embedding
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Document: Updated document metadata
|
|
|
|
"""
|
|
|
|
if self._client is None:
|
|
|
|
raise ValueError("Document instance not connected to a client. Use a document returned from a DataBridge client method.")
|
|
|
|
|
|
|
|
return self._client.update_document_with_text(
|
|
|
|
document_id=self.external_id,
|
|
|
|
content=content,
|
|
|
|
filename=filename,
|
|
|
|
metadata=metadata,
|
|
|
|
rules=rules,
|
|
|
|
update_strategy=update_strategy,
|
|
|
|
use_colpali=use_colpali
|
|
|
|
)
|
|
|
|
|
|
|
|
def update_with_file(
|
|
|
|
self,
|
|
|
|
file: "Union[str, bytes, BinaryIO, Path]",
|
|
|
|
filename: Optional[str] = None,
|
|
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
|
|
rules: Optional[List] = None,
|
|
|
|
update_strategy: str = "add",
|
|
|
|
use_colpali: Optional[bool] = None,
|
|
|
|
) -> "Document":
|
|
|
|
"""
|
|
|
|
Update this document with content from a file using the specified strategy.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
file: File to add (path string, bytes, file object, or Path)
|
|
|
|
filename: Name of the file
|
|
|
|
metadata: Additional metadata to update (optional)
|
|
|
|
rules: Optional list of rules to apply to the content
|
|
|
|
update_strategy: Strategy for updating the document (currently only 'add' is supported)
|
|
|
|
use_colpali: Whether to use multi-vector embedding
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Document: Updated document metadata
|
|
|
|
"""
|
|
|
|
if self._client is None:
|
|
|
|
raise ValueError("Document instance not connected to a client. Use a document returned from a DataBridge client method.")
|
|
|
|
|
|
|
|
return self._client.update_document_with_file(
|
|
|
|
document_id=self.external_id,
|
|
|
|
file=file,
|
|
|
|
filename=filename,
|
|
|
|
metadata=metadata,
|
|
|
|
rules=rules,
|
|
|
|
update_strategy=update_strategy,
|
|
|
|
use_colpali=use_colpali
|
|
|
|
)
|
|
|
|
|
|
|
|
def update_metadata(
|
|
|
|
self,
|
|
|
|
metadata: Dict[str, Any],
|
|
|
|
) -> "Document":
|
|
|
|
"""
|
|
|
|
Update this document's metadata only.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
metadata: Metadata to update
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Document: Updated document metadata
|
|
|
|
"""
|
|
|
|
if self._client is None:
|
|
|
|
raise ValueError("Document instance not connected to a client. Use a document returned from a DataBridge client method.")
|
|
|
|
|
|
|
|
return self._client.update_document_metadata(
|
|
|
|
document_id=self.external_id,
|
|
|
|
metadata=metadata
|
|
|
|
)
|
2024-12-22 19:46:53 -05:00
|
|
|
|
|
|
|
|
|
|
|
class ChunkResult(BaseModel):
|
|
|
|
"""Query result at chunk level"""
|
2024-12-26 11:34:24 -05:00
|
|
|
|
2024-12-22 19:46:53 -05:00
|
|
|
content: str = Field(..., description="Chunk content")
|
|
|
|
score: float = Field(..., description="Relevance score")
|
|
|
|
document_id: str = Field(..., description="Parent document ID")
|
|
|
|
chunk_number: int = Field(..., description="Chunk sequence number")
|
2024-12-29 12:48:41 +05:30
|
|
|
metadata: Dict[str, Any] = Field(default_factory=dict, description="Document metadata")
|
2024-12-22 19:46:53 -05:00
|
|
|
content_type: str = Field(..., description="Content type")
|
|
|
|
filename: Optional[str] = Field(None, description="Original filename")
|
2024-12-29 12:48:41 +05:30
|
|
|
download_url: Optional[str] = Field(None, description="URL to download full document")
|
2024-12-22 19:46:53 -05:00
|
|
|
|
|
|
|
|
|
|
|
class DocumentContent(BaseModel):
|
|
|
|
"""Represents either a URL or content string"""
|
2024-12-26 11:34:24 -05:00
|
|
|
|
2024-12-29 12:48:41 +05:30
|
|
|
type: Literal["url", "string"] = Field(..., description="Content type (url or string)")
|
2024-12-22 19:46:53 -05:00
|
|
|
value: str = Field(..., description="The actual content or URL")
|
|
|
|
filename: Optional[str] = Field(None, description="Filename when type is url")
|
|
|
|
|
2024-12-26 11:34:24 -05:00
|
|
|
@field_validator("filename")
|
2024-12-22 19:46:53 -05:00
|
|
|
def filename_only_for_url(cls, v, values):
|
2024-12-26 11:34:24 -05:00
|
|
|
if values.data.get("type") == "string" and v is not None:
|
|
|
|
raise ValueError("filename can only be set when type is url")
|
|
|
|
if values.data.get("type") == "url" and v is None:
|
|
|
|
raise ValueError("filename is required when type is url")
|
2024-12-22 19:46:53 -05:00
|
|
|
return v
|
|
|
|
|
|
|
|
|
|
|
|
class DocumentResult(BaseModel):
|
|
|
|
"""Query result at document level"""
|
2024-12-26 11:34:24 -05:00
|
|
|
|
2024-12-22 19:46:53 -05:00
|
|
|
score: float = Field(..., description="Relevance score")
|
|
|
|
document_id: str = Field(..., description="Document ID")
|
2024-12-29 12:48:41 +05:30
|
|
|
metadata: Dict[str, Any] = Field(default_factory=dict, description="Document metadata")
|
2024-12-22 19:46:53 -05:00
|
|
|
content: DocumentContent = Field(..., description="Document content or URL")
|
2024-12-26 08:52:25 -05:00
|
|
|
|
|
|
|
|
2025-03-09 18:42:04 -04:00
|
|
|
class ChunkSource(BaseModel):
|
|
|
|
"""Source information for a chunk used in completion"""
|
|
|
|
|
|
|
|
document_id: str = Field(..., description="ID of the source document")
|
|
|
|
chunk_number: int = Field(..., description="Chunk number within the document")
|
2025-03-11 14:53:42 -05:00
|
|
|
score: Optional[float] = Field(None, description="Relevance score")
|
2025-03-09 18:42:04 -04:00
|
|
|
|
|
|
|
|
2024-12-26 08:52:25 -05:00
|
|
|
class CompletionResponse(BaseModel):
|
|
|
|
"""Completion response model"""
|
2024-12-26 11:34:24 -05:00
|
|
|
|
2024-12-26 08:52:25 -05:00
|
|
|
completion: str
|
|
|
|
usage: Dict[str, int]
|
2025-03-09 18:42:04 -04:00
|
|
|
sources: List[ChunkSource] = Field(
|
|
|
|
default_factory=list, description="Sources of chunks used in the completion"
|
|
|
|
)
|
2025-03-17 17:36:43 -04:00
|
|
|
metadata: Optional[Dict[str, Any]] = None
|
2025-02-07 21:08:40 -05:00
|
|
|
|
|
|
|
|
|
|
|
class IngestTextRequest(BaseModel):
|
|
|
|
"""Request model for ingesting text content"""
|
|
|
|
|
|
|
|
content: str
|
2025-03-05 09:56:02 -06:00
|
|
|
filename: Optional[str] = None
|
2025-02-07 21:08:40 -05:00
|
|
|
metadata: Dict[str, Any] = Field(default_factory=dict)
|
|
|
|
rules: List[Dict[str, Any]] = Field(default_factory=list)
|
2025-02-26 20:17:12 -05:00
|
|
|
use_colpali: bool = Field(default=False)
|
2025-03-17 17:36:43 -04:00
|
|
|
|
|
|
|
|
|
|
|
class Entity(BaseModel):
|
|
|
|
"""Represents an entity in a knowledge graph"""
|
|
|
|
|
|
|
|
id: str = Field(..., description="Unique entity identifier")
|
|
|
|
label: str = Field(..., description="Display label for the entity")
|
|
|
|
type: str = Field(..., description="Entity type")
|
|
|
|
properties: Dict[str, Any] = Field(default_factory=dict, description="Entity properties")
|
|
|
|
document_ids: List[str] = Field(default_factory=list, description="Source document IDs")
|
|
|
|
chunk_sources: Dict[str, List[int]] = Field(default_factory=dict, description="Source chunk numbers by document ID")
|
|
|
|
|
|
|
|
def __hash__(self):
|
|
|
|
return hash(self.id)
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
|
if not isinstance(other, Entity):
|
|
|
|
return False
|
|
|
|
return self.id == other.id
|
|
|
|
|
|
|
|
|
|
|
|
class Relationship(BaseModel):
|
|
|
|
"""Represents a relationship between entities in a knowledge graph"""
|
|
|
|
|
|
|
|
id: str = Field(..., description="Unique relationship identifier")
|
|
|
|
source_id: str = Field(..., description="Source entity ID")
|
|
|
|
target_id: str = Field(..., description="Target entity ID")
|
|
|
|
type: str = Field(..., description="Relationship type")
|
|
|
|
document_ids: List[str] = Field(default_factory=list, description="Source document IDs")
|
|
|
|
chunk_sources: Dict[str, List[int]] = Field(default_factory=dict, description="Source chunk numbers by document ID")
|
|
|
|
|
|
|
|
def __hash__(self):
|
|
|
|
return hash(self.id)
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
|
if not isinstance(other, Relationship):
|
|
|
|
return False
|
|
|
|
return self.id == other.id
|
|
|
|
|
|
|
|
|
|
|
|
class Graph(BaseModel):
|
|
|
|
"""Represents a knowledge graph"""
|
|
|
|
|
|
|
|
id: str = Field(..., description="Unique graph identifier")
|
|
|
|
name: str = Field(..., description="Graph name")
|
|
|
|
entities: List[Entity] = Field(default_factory=list, description="Entities in the graph")
|
|
|
|
relationships: List[Relationship] = Field(default_factory=list, description="Relationships in the graph")
|
|
|
|
metadata: Dict[str, Any] = Field(default_factory=dict, description="Graph metadata")
|
|
|
|
document_ids: List[str] = Field(default_factory=list, description="Source document IDs")
|
|
|
|
filters: Optional[Dict[str, Any]] = Field(None, description="Document filters used to create the graph")
|
|
|
|
created_at: datetime = Field(..., description="Creation timestamp")
|
|
|
|
updated_at: datetime = Field(..., description="Last update timestamp")
|
|
|
|
owner: Dict[str, str] = Field(default_factory=dict, description="Graph owner information")
|
|
|
|
access_control: Dict[str, List[str]] = Field(
|
|
|
|
default_factory=dict, description="Access control information"
|
|
|
|
)
|