morphik-core/evaluations/hotpot_ragas_eval.py

250 lines
8.6 KiB
Python
Raw Permalink Normal View History

import argparse
2025-04-12 00:24:38 -07:00
import sys
import uuid
from pathlib import Path
import pandas as pd
from datasets import Dataset, load_dataset
from dotenv import load_dotenv
from ragas import evaluate
from ragas.metrics import answer_correctness, context_precision, faithfulness
from tqdm import tqdm
2025-04-12 00:24:38 -07:00
# Add the SDK path to the Python path
sdk_path = str(Path(__file__).parent.parent / "sdks" / "python")
sys.path.insert(0, sdk_path)
# Import Morphik after adding the SDK path
from morphik import Morphik # noqa: E402
2025-04-12 00:24:38 -07:00
# Load environment variables
load_dotenv()
# Connect to Morphik
db = Morphik(timeout=10000, is_local=True)
2025-04-12 00:24:38 -07:00
# Generate a run identifier
def generate_run_id():
"""Generate a unique run identifier"""
return str(uuid.uuid4())
def load_hotpotqa_dataset(num_samples=10, split="validation"):
"""Load and prepare the HotpotQA dataset"""
dataset = load_dataset("hotpot_qa", "distractor", split=split, trust_remote_code=True)
# Sample a subset
dataset = dataset.select(range(min(num_samples, len(dataset))))
return dataset
def process_with_morphik(dataset, run_id=None):
"""
Process dataset with Morphik and prepare data for RAGAS evaluation
2025-04-12 00:24:38 -07:00
Args:
dataset: The dataset to process
run_id: Unique identifier for this evaluation run
"""
# Generate a run_id if not provided
if run_id is None:
run_id = generate_run_id()
2025-04-12 00:24:38 -07:00
print(f"Using run identifier: {run_id}")
2025-04-12 00:24:38 -07:00
data_samples = {
"question": [],
"answer": [],
"contexts": [],
2025-04-12 00:24:38 -07:00
"ground_truth": [],
"run_id": [], # Store run_id for each sample
2025-04-12 00:24:38 -07:00
}
for i, item in enumerate(tqdm(dataset, desc="Processing documents")):
try:
# Extract question and ground truth
question = item["question"].strip()
ground_truth = item["answer"].strip()
if not question or not ground_truth:
print(f"Skipping item {i}: Empty question or answer")
continue
# Ingest the document's context into Morphik
context = ""
for title, sentences in zip(item["context"]["title"], item["context"]["sentences"]):
paragraph = " ".join(sentences)
context += f"{title}:\n{paragraph}\n\n"
# Handle a potentially longer context
# if len(context) > 10000:
# print(f"Warning: Long context ({len(context)} chars), truncating...")
# context = context[:10000]
# Ingest text with run_id in metadata
db.ingest_text(
2025-04-12 00:24:38 -07:00
context,
metadata={
"source": "hotpotqa",
"question_id": item.get("_id", ""),
"item_index": i,
"evaluation_run_id": run_id, # Add run_id to metadata
2025-04-12 00:24:38 -07:00
},
use_colpali=False,
)
2025-04-12 00:24:38 -07:00
# Query Morphik for the answer with concise prompt override
prompt_override = {
"query": {
"prompt_template": "Answer the following question based on the provided context. Your answer should be as concise as possible. If a yes/no answer is appropriate, just respond with 'Yes' or 'No'. Do not provide explanations or additional context unless absolutely necessary.\n\nQuestion: {question}\n\nContext: {context}"
}
}
response = db.query(
question,
use_colpali=False,
k=10,
2025-04-12 00:24:38 -07:00
filters={"evaluation_run_id": run_id},
prompt_overrides=prompt_override,
2025-04-12 00:24:38 -07:00
)
answer = response.completion
if not answer:
print(f"Warning: Empty answer for question: {question[:50]}...")
answer = "No answer provided"
# Get retrieved chunks for context with filter by run_id
chunks = db.retrieve_chunks(query=question, k=10, filters={"evaluation_run_id": run_id}) # Filter by run_id
2025-04-12 00:24:38 -07:00
context_texts = [chunk.content for chunk in chunks]
if not context_texts:
print(f"Warning: No contexts retrieved for question: {question[:50]}...")
context_texts = ["No context retrieved"]
# Add to our dataset
data_samples["question"].append(question)
data_samples["answer"].append(answer)
data_samples["contexts"].append(context_texts)
data_samples["ground_truth"].append(ground_truth)
data_samples["run_id"].append(run_id)
except Exception as e:
import traceback
print(f"Error processing item {i}:")
print(f"Question: {item.get('question', 'N/A')[:50]}...")
print(f"Error: {e}")
traceback.print_exc()
continue
return data_samples, run_id
def run_evaluation(num_samples=5, output_file="ragas_results.csv", run_id=None):
"""
Run the full evaluation pipeline
2025-04-12 00:24:38 -07:00
Args:
num_samples: Number of samples to use from the dataset
output_file: Path to save the results CSV
run_id: Optional run identifier. If None, a new one will be generated
"""
try:
# Load dataset
print("Loading HotpotQA dataset...")
hotpot_dataset = load_hotpotqa_dataset(num_samples=num_samples)
print(f"Loaded {len(hotpot_dataset)} samples from HotpotQA")
# Process with Morphik
print("Processing with Morphik...")
data_samples, run_id = process_with_morphik(hotpot_dataset, run_id=run_id)
# Check if we have enough samples
if len(data_samples["question"]) == 0:
print("Error: No samples were successfully processed. Exiting.")
return
print(f"Successfully processed {len(data_samples['question'])} samples")
# Convert to RAGAS format
ragas_dataset = Dataset.from_dict(data_samples)
# Run RAGAS evaluation
print("Running RAGAS evaluation...")
metrics = [faithfulness, answer_correctness, context_precision]
result = evaluate(ragas_dataset, metrics=metrics)
# Convert results to DataFrame and save
df_result = result.to_pandas()
2025-04-12 00:24:38 -07:00
# Add run_id to the results DataFrame
df_result["run_id"] = run_id
2025-04-12 00:24:38 -07:00
print("\nRAGAS Evaluation Results:")
print(df_result)
# Add more detailed analysis
print("\nDetailed Metric Analysis:")
# First ensure all metric columns are numeric
for column in ["faithfulness", "answer_correctness", "context_precision"]:
if column in df_result.columns:
try:
# Convert column to numeric, errors='coerce' will replace non-numeric values with NaN
df_result[column] = pd.to_numeric(df_result[column], errors="coerce")
# Calculate and print mean, ignoring NaN values
mean_value = df_result[column].mean(skipna=True)
if pd.notna(mean_value): # Check if mean is not NaN
print(f"{column}: {mean_value:.4f}")
else:
print(f"{column}: No valid numeric values found")
except Exception as e:
print(f"Error processing {column}: {e}")
print(f"Values: {df_result[column].head().tolist()}")
# Include run_id in the output filename if not explicitly provided
if output_file == "ragas_results.csv":
# Get just the filename without extension
base_name = output_file.rsplit(".", 1)[0]
2025-04-12 00:24:38 -07:00
output_file = f"{base_name}_{run_id}.csv"
2025-04-12 00:24:38 -07:00
# Save results
df_result.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return df_result, run_id
except Exception as e:
import traceback
print(f"Error in evaluation: {e}")
traceback.print_exc()
print("Exiting due to error.")
return None
def main():
"""Command-line entry point"""
parser = argparse.ArgumentParser(description="Run RAGAS evaluation on Morphik using HotpotQA dataset")
parser.add_argument("--samples", type=int, default=5, help="Number of samples to use (default: 5)")
2025-04-12 00:24:38 -07:00
parser.add_argument(
"--output",
type=str,
default="ragas_results.csv",
help="Output file for results (default: ragas_results.csv)",
)
parser.add_argument(
"--run-id",
type=str,
default=None,
help="Specific run identifier to use (default: auto-generated UUID)",
)
args = parser.parse_args()
run_evaluation(num_samples=args.samples, output_file=args.output, run_id=args.run_id)
2025-04-12 00:24:38 -07:00
if __name__ == "__main__":
main()