morphik-core/core/tools/document_tools.py
2025-05-01 17:02:22 -07:00

253 lines
8.5 KiB
Python

"""Document retrieval and management tools."""
import json
import logging
from datetime import UTC, datetime
from typing import Any, Dict, List, Literal, Optional
from core.models.auth import AuthContext
from core.services.document_service import DocumentService
logger = logging.getLogger(__name__)
class ToolError(Exception):
"""Exception raised when a tool execution fails."""
pass
async def retrieve_chunks(
query: str,
k: int = 5,
filters: Optional[Dict[str, Any]] = None,
min_relevance: float = 0.7,
use_colpali: bool = True,
folder_name: Optional[str] = None,
end_user_id: Optional[str] = None,
document_service: DocumentService = None,
auth: AuthContext = None,
) -> List[Dict[str, Any]]:
"""
Retrieve the most relevant text and image chunks from the knowledge base.
Args:
query: The search query or question
k: Number of chunks to retrieve (default: 5)
filters: Metadata filters to narrow results
min_relevance: Minimum relevance score threshold (0-1)
use_colpali: Whether to use multimodal features
folder_name: Optional folder to scope the search to
end_user_id: Optional end-user ID to scope the search to
document_service: DocumentService instance
auth: Authentication context
Returns:
List of content items with text and images
"""
if document_service is None:
raise ToolError("Document service not provided")
try:
# Directly await the document service method
chunks = await document_service.retrieve_chunks(
query=query,
auth=auth,
filters=filters,
k=k,
min_score=min_relevance,
use_colpali=use_colpali,
folder_name=folder_name,
end_user_id=end_user_id,
)
# Format the results for LiteLLM tool response
content = []
# Add a header text element
content.append({"type": "text", "text": f"Found {len(chunks)} relevant chunks:"})
for chunk in chunks:
# Check if this is an image chunk
if chunk.metadata.get("is_image", False):
# Add image to content
if chunk.content.startswith("data:"):
# Already in data URL format
content.append({"type": "image_url", "image_url": {"url": chunk.content}})
else:
# Assuming it's base64, convert to data URL format
# TODO: potential bug here, if the base64 image is not a png
content.append(
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{chunk.content}"}}
)
else:
# Add text content with metadata
text = f"Document: {chunk.filename or 'Unnamed'} (Score: {chunk.score:.2f})\n\n{chunk.content}"
content.append(
{
"type": "text",
"text": text,
}
)
return content
except Exception as e:
raise ToolError(f"Error retrieving chunks: {str(e)}")
async def retrieve_document(
document_id: str,
format: Optional[Literal["text", "metadata"]] = "text",
document_service: DocumentService = None,
auth: AuthContext = None,
end_user_id: Optional[str] = None,
) -> str:
"""
Retrieve full content of a specific document.
Args:
document_id: ID of the document to retrieve
format: Desired format of the returned document
document_service: DocumentService instance
auth: Authentication context
end_user_id: Optional end-user ID to retrieve as
Returns:
Document content or metadata as a string
"""
if document_service is None:
raise ToolError("Document service not provided")
try:
# Directly await the document service method
doc = await document_service.batch_retrieve_documents(
document_ids=[document_id], auth=auth, end_user_id=end_user_id
)
if not doc or len(doc) == 0:
return f"Document {document_id} not found or not accessible"
doc = doc[0] # Get the first document from the list
if format == "text":
return doc.system_metadata.get("content", "No content available")
else:
# Return both user-defined metadata and system metadata separately
result: Dict[str, Any] = {}
# User metadata
if hasattr(doc, "metadata") and doc.metadata:
result["metadata"] = doc.metadata
# System metadata without content field
if hasattr(doc, "system_metadata") and doc.system_metadata:
system_metadata = doc.system_metadata.copy()
if "content" in system_metadata:
del system_metadata["content"]
result["system_metadata"] = system_metadata
return json.dumps(result, indent=2, default=str)
except Exception as e:
raise ToolError(f"Error retrieving document: {str(e)}")
async def save_to_memory(
content: str,
memory_type: Literal["session", "long_term", "research_thread"],
tags: Optional[List[str]] = None,
document_service: DocumentService = None,
auth: AuthContext = None,
end_user_id: Optional[str] = None,
) -> str:
"""
Save important information to persistent memory.
Args:
content: Content to save
memory_type: Type of memory to save to
tags: Tags for categorizing the memory
document_service: DocumentService instance
auth: Authentication context
end_user_id: Optional end-user ID to save as
Returns:
Save operation result as a string
"""
if document_service is None:
raise ToolError("Document service not provided")
try:
# Create metadata for the saved memory
metadata = {"memory_type": memory_type, "source": "agent"}
if tags:
metadata["tags"] = tags
# Use document service to ingest the content as a document
timestamp = await get_timestamp()
result = await document_service.ingest_text(
content=content,
filename=f"memory_{memory_type}_{timestamp}",
metadata=metadata,
auth=auth,
end_user_id=end_user_id,
)
return json.dumps({"success": True, "memory_id": result.external_id, "memory_type": memory_type})
except Exception as e:
raise ToolError(f"Error saving to memory: {str(e)}")
async def list_documents(
filters: Optional[Dict[str, Any]] = None,
skip: int = 0,
limit: int = 100,
folder_name: Optional[str] = None,
end_user_id: Optional[str] = None,
document_service: DocumentService = None,
auth: AuthContext = None,
) -> str:
"""
List accessible documents, showing their IDs and filenames.
Args:
filters: Optional metadata filters
skip: Number of documents to skip (default: 0)
limit: Maximum number of documents to return (default: 100)
folder_name: Optional folder to scope the listing to
end_user_id: Optional end-user ID to scope the listing to
document_service: DocumentService instance
auth: Authentication context
Returns:
JSON string list of documents with id and filename
"""
if document_service is None:
raise ToolError("Document service not provided")
try:
# Create system filters for folder and user scoping
system_filters = {}
if folder_name:
system_filters["folder_name"] = folder_name
if end_user_id:
system_filters["end_user_id"] = end_user_id
# Retrieve documents from the database
docs = await document_service.db.get_documents(
auth=auth, skip=skip, limit=limit, filters=filters, system_filters=system_filters
)
# Format the results to only include ID and filename
formatted_docs = [{"id": doc.external_id, "filename": doc.filename} for doc in docs]
return json.dumps({"count": len(formatted_docs), "documents": formatted_docs}, indent=2)
except PermissionError as e:
# Re-raise PermissionError as ToolError for consistent handling
raise ToolError(str(e))
except Exception as e:
raise ToolError(f"Error listing documents: {str(e)}")
async def get_timestamp() -> str:
"""Get current timestamp in ISO format."""
return datetime.now(UTC).isoformat().replace(":", "-").replace(".", "-")