mirror of
https://github.com/james-m-jordan/morphik-core.git
synced 2025-05-09 19:32:38 +00:00
253 lines
8.5 KiB
Python
253 lines
8.5 KiB
Python
"""Document retrieval and management tools."""
|
|
|
|
import json
|
|
import logging
|
|
from datetime import UTC, datetime
|
|
from typing import Any, Dict, List, Literal, Optional
|
|
|
|
from core.models.auth import AuthContext
|
|
from core.services.document_service import DocumentService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ToolError(Exception):
|
|
"""Exception raised when a tool execution fails."""
|
|
|
|
pass
|
|
|
|
|
|
async def retrieve_chunks(
|
|
query: str,
|
|
k: int = 5,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
min_relevance: float = 0.7,
|
|
use_colpali: bool = True,
|
|
folder_name: Optional[str] = None,
|
|
end_user_id: Optional[str] = None,
|
|
document_service: DocumentService = None,
|
|
auth: AuthContext = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Retrieve the most relevant text and image chunks from the knowledge base.
|
|
|
|
Args:
|
|
query: The search query or question
|
|
k: Number of chunks to retrieve (default: 5)
|
|
filters: Metadata filters to narrow results
|
|
min_relevance: Minimum relevance score threshold (0-1)
|
|
use_colpali: Whether to use multimodal features
|
|
folder_name: Optional folder to scope the search to
|
|
end_user_id: Optional end-user ID to scope the search to
|
|
document_service: DocumentService instance
|
|
auth: Authentication context
|
|
|
|
Returns:
|
|
List of content items with text and images
|
|
"""
|
|
if document_service is None:
|
|
raise ToolError("Document service not provided")
|
|
|
|
try:
|
|
# Directly await the document service method
|
|
chunks = await document_service.retrieve_chunks(
|
|
query=query,
|
|
auth=auth,
|
|
filters=filters,
|
|
k=k,
|
|
min_score=min_relevance,
|
|
use_colpali=use_colpali,
|
|
folder_name=folder_name,
|
|
end_user_id=end_user_id,
|
|
)
|
|
|
|
# Format the results for LiteLLM tool response
|
|
content = []
|
|
|
|
# Add a header text element
|
|
content.append({"type": "text", "text": f"Found {len(chunks)} relevant chunks:"})
|
|
|
|
for chunk in chunks:
|
|
# Check if this is an image chunk
|
|
if chunk.metadata.get("is_image", False):
|
|
# Add image to content
|
|
if chunk.content.startswith("data:"):
|
|
# Already in data URL format
|
|
content.append({"type": "image_url", "image_url": {"url": chunk.content}})
|
|
else:
|
|
# Assuming it's base64, convert to data URL format
|
|
# TODO: potential bug here, if the base64 image is not a png
|
|
content.append(
|
|
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{chunk.content}"}}
|
|
)
|
|
else:
|
|
# Add text content with metadata
|
|
text = f"Document: {chunk.filename or 'Unnamed'} (Score: {chunk.score:.2f})\n\n{chunk.content}"
|
|
content.append(
|
|
{
|
|
"type": "text",
|
|
"text": text,
|
|
}
|
|
)
|
|
return content
|
|
except Exception as e:
|
|
raise ToolError(f"Error retrieving chunks: {str(e)}")
|
|
|
|
|
|
async def retrieve_document(
|
|
document_id: str,
|
|
format: Optional[Literal["text", "metadata"]] = "text",
|
|
document_service: DocumentService = None,
|
|
auth: AuthContext = None,
|
|
end_user_id: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
Retrieve full content of a specific document.
|
|
|
|
Args:
|
|
document_id: ID of the document to retrieve
|
|
format: Desired format of the returned document
|
|
document_service: DocumentService instance
|
|
auth: Authentication context
|
|
end_user_id: Optional end-user ID to retrieve as
|
|
|
|
Returns:
|
|
Document content or metadata as a string
|
|
"""
|
|
if document_service is None:
|
|
raise ToolError("Document service not provided")
|
|
|
|
try:
|
|
# Directly await the document service method
|
|
doc = await document_service.batch_retrieve_documents(
|
|
document_ids=[document_id], auth=auth, end_user_id=end_user_id
|
|
)
|
|
|
|
if not doc or len(doc) == 0:
|
|
return f"Document {document_id} not found or not accessible"
|
|
|
|
doc = doc[0] # Get the first document from the list
|
|
|
|
if format == "text":
|
|
return doc.system_metadata.get("content", "No content available")
|
|
else:
|
|
# Return both user-defined metadata and system metadata separately
|
|
result: Dict[str, Any] = {}
|
|
# User metadata
|
|
if hasattr(doc, "metadata") and doc.metadata:
|
|
result["metadata"] = doc.metadata
|
|
# System metadata without content field
|
|
if hasattr(doc, "system_metadata") and doc.system_metadata:
|
|
system_metadata = doc.system_metadata.copy()
|
|
if "content" in system_metadata:
|
|
del system_metadata["content"]
|
|
result["system_metadata"] = system_metadata
|
|
return json.dumps(result, indent=2, default=str)
|
|
|
|
except Exception as e:
|
|
raise ToolError(f"Error retrieving document: {str(e)}")
|
|
|
|
|
|
async def save_to_memory(
|
|
content: str,
|
|
memory_type: Literal["session", "long_term", "research_thread"],
|
|
tags: Optional[List[str]] = None,
|
|
document_service: DocumentService = None,
|
|
auth: AuthContext = None,
|
|
end_user_id: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
Save important information to persistent memory.
|
|
|
|
Args:
|
|
content: Content to save
|
|
memory_type: Type of memory to save to
|
|
tags: Tags for categorizing the memory
|
|
document_service: DocumentService instance
|
|
auth: Authentication context
|
|
end_user_id: Optional end-user ID to save as
|
|
|
|
Returns:
|
|
Save operation result as a string
|
|
"""
|
|
if document_service is None:
|
|
raise ToolError("Document service not provided")
|
|
|
|
try:
|
|
# Create metadata for the saved memory
|
|
metadata = {"memory_type": memory_type, "source": "agent"}
|
|
|
|
if tags:
|
|
metadata["tags"] = tags
|
|
|
|
# Use document service to ingest the content as a document
|
|
timestamp = await get_timestamp()
|
|
result = await document_service.ingest_text(
|
|
content=content,
|
|
filename=f"memory_{memory_type}_{timestamp}",
|
|
metadata=metadata,
|
|
auth=auth,
|
|
end_user_id=end_user_id,
|
|
)
|
|
|
|
return json.dumps({"success": True, "memory_id": result.external_id, "memory_type": memory_type})
|
|
except Exception as e:
|
|
raise ToolError(f"Error saving to memory: {str(e)}")
|
|
|
|
|
|
async def list_documents(
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
folder_name: Optional[str] = None,
|
|
end_user_id: Optional[str] = None,
|
|
document_service: DocumentService = None,
|
|
auth: AuthContext = None,
|
|
) -> str:
|
|
"""
|
|
List accessible documents, showing their IDs and filenames.
|
|
|
|
Args:
|
|
filters: Optional metadata filters
|
|
skip: Number of documents to skip (default: 0)
|
|
limit: Maximum number of documents to return (default: 100)
|
|
folder_name: Optional folder to scope the listing to
|
|
end_user_id: Optional end-user ID to scope the listing to
|
|
document_service: DocumentService instance
|
|
auth: Authentication context
|
|
|
|
Returns:
|
|
JSON string list of documents with id and filename
|
|
"""
|
|
if document_service is None:
|
|
raise ToolError("Document service not provided")
|
|
|
|
try:
|
|
# Create system filters for folder and user scoping
|
|
system_filters = {}
|
|
if folder_name:
|
|
system_filters["folder_name"] = folder_name
|
|
if end_user_id:
|
|
system_filters["end_user_id"] = end_user_id
|
|
|
|
# Retrieve documents from the database
|
|
docs = await document_service.db.get_documents(
|
|
auth=auth, skip=skip, limit=limit, filters=filters, system_filters=system_filters
|
|
)
|
|
|
|
# Format the results to only include ID and filename
|
|
formatted_docs = [{"id": doc.external_id, "filename": doc.filename} for doc in docs]
|
|
|
|
return json.dumps({"count": len(formatted_docs), "documents": formatted_docs}, indent=2)
|
|
|
|
except PermissionError as e:
|
|
# Re-raise PermissionError as ToolError for consistent handling
|
|
raise ToolError(str(e))
|
|
except Exception as e:
|
|
raise ToolError(f"Error listing documents: {str(e)}")
|
|
|
|
|
|
async def get_timestamp() -> str:
|
|
"""Get current timestamp in ISO format."""
|
|
return datetime.now(UTC).isoformat().replace(":", "-").replace(".", "-")
|