morphik-core/core/services/telemetry.py

570 lines
21 KiB
Python
Raw Normal View History

2024-12-31 10:22:25 +05:30
from datetime import datetime
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import threading
from collections import defaultdict
import time
from contextlib import asynccontextmanager
import os
import json
from pathlib import Path
2025-03-23 17:50:18 -04:00
import uuid
import hashlib
import logging
from core.config import get_settings
2024-12-31 10:22:25 +05:30
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace import Status, StatusCode
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics.export import (
PeriodicExportingMetricReader,
MetricExporter,
AggregationTemporality,
MetricsData,
)
2025-03-23 17:50:18 -04:00
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
import requests
from urllib3.exceptions import ProtocolError, ReadTimeoutError
# Get settings from config
settings = get_settings()
# Telemetry configuration - use settings directly from TOML
TELEMETRY_ENABLED = settings.TELEMETRY_ENABLED
HONEYCOMB_ENABLED = settings.HONEYCOMB_ENABLED
# Honeycomb configuration - using proxy to avoid exposing API key in code
# Default to localhost:8080 for the proxy, but allow override from settings
2025-03-26 17:07:38 -07:00
HONEYCOMB_PROXY_ENDPOINT = getattr(settings, "HONEYCOMB_PROXY_ENDPOINT", "https://otel-proxy.onrender.com")
HONEYCOMB_PROXY_ENDPOINT = HONEYCOMB_PROXY_ENDPOINT if isinstance(HONEYCOMB_PROXY_ENDPOINT, str) and len(HONEYCOMB_PROXY_ENDPOINT) > 0 else "https://otel-proxy.onrender.com"
2025-03-23 17:50:18 -04:00
SERVICE_NAME = settings.SERVICE_NAME
# Headers for OTLP - no API key needed as the proxy will add it
OTLP_HEADERS = {
"Content-Type": "application/x-protobuf"
}
# Configure timeouts and retries directly from TOML config
OTLP_TIMEOUT = settings.OTLP_TIMEOUT
OTLP_MAX_RETRIES = settings.OTLP_MAX_RETRIES
OTLP_RETRY_DELAY = settings.OTLP_RETRY_DELAY
OTLP_MAX_EXPORT_BATCH_SIZE = settings.OTLP_MAX_EXPORT_BATCH_SIZE
OTLP_SCHEDULE_DELAY_MILLIS = settings.OTLP_SCHEDULE_DELAY_MILLIS
OTLP_MAX_QUEUE_SIZE = settings.OTLP_MAX_QUEUE_SIZE
# OTLP endpoints - using our proxy instead of direct Honeycomb connection
OTLP_TRACES_ENDPOINT = f"{HONEYCOMB_PROXY_ENDPOINT}/v1/traces"
OTLP_METRICS_ENDPOINT = f"{HONEYCOMB_PROXY_ENDPOINT}/v1/metrics"
# Enable debug logging for OpenTelemetry
os.environ["OTEL_PYTHON_LOGGING_LEVEL"] = "INFO" # Changed from DEBUG to reduce verbosity
# Add export protocol setting if not already set
if not os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL"):
os.environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "http/protobuf"
def get_installation_id() -> str:
"""Generate or retrieve a unique anonymous installation ID."""
id_file = Path.home() / ".databridge" / "installation_id"
id_file.parent.mkdir(parents=True, exist_ok=True)
if id_file.exists():
return id_file.read_text().strip()
# Generate a new installation ID
# We hash the machine-id (if available) or a random UUID
machine_id_file = Path("/etc/machine-id")
if machine_id_file.exists():
machine_id = machine_id_file.read_text().strip()
else:
machine_id = str(uuid.uuid4())
# Hash the machine ID to make it anonymous
installation_id = hashlib.sha256(machine_id.encode()).hexdigest()[:32]
# Save it for future use
id_file.write_text(installation_id)
return installation_id
2024-12-31 10:22:25 +05:30
class FileSpanExporter:
def __init__(self, log_dir: str):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
self.trace_file = self.log_dir / "traces.log"
def export(self, spans):
with open(self.trace_file, "a") as f:
for span in spans:
f.write(json.dumps(self._format_span(span)) + "\n")
return True
def shutdown(self):
pass
def _format_span(self, span):
return {
"name": span.name,
"trace_id": format(span.context.trace_id, "x"),
"span_id": format(span.context.span_id, "x"),
"parent_id": format(span.parent.span_id, "x") if span.parent else None,
"start_time": span.start_time,
"end_time": span.end_time,
"attributes": dict(span.attributes),
"status": span.status.status_code.name,
}
class FileMetricExporter(MetricExporter):
"""File metric exporter for OpenTelemetry."""
def __init__(self, log_dir: str):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
self.metrics_file = self.log_dir / "metrics.log"
super().__init__()
def export(self, metrics_data: MetricsData, **kwargs) -> bool:
"""Export metrics data to a file.
Args:
metrics_data: The metrics data to export.
Returns:
True if the export was successful, False otherwise.
"""
try:
with open(self.metrics_file, "a") as f:
for resource_metrics in metrics_data.resource_metrics:
for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
f.write(json.dumps(self._format_metric(metric)) + "\n")
return True
except Exception:
return False
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> bool:
"""Shuts down the exporter.
Args:
timeout_millis: Time to wait for the export to complete in milliseconds.
Returns:
True if the shutdown succeeded, False otherwise.
"""
return True
def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Force flush the exporter.
Args:
timeout_millis: Time to wait for the flush to complete in milliseconds.
Returns:
True if the flush succeeded, False otherwise.
"""
return True
def _preferred_temporality(self) -> Dict:
"""Returns the preferred temporality for each instrument kind."""
return {
"counter": AggregationTemporality.CUMULATIVE,
"up_down_counter": AggregationTemporality.CUMULATIVE,
"observable_counter": AggregationTemporality.CUMULATIVE,
"observable_up_down_counter": AggregationTemporality.CUMULATIVE,
"histogram": AggregationTemporality.CUMULATIVE,
"observable_gauge": AggregationTemporality.CUMULATIVE,
}
def _format_metric(self, metric):
return {
"name": metric.name,
"description": metric.description,
"unit": metric.unit,
"data": self._format_data(metric.data),
}
def _format_data(self, data):
if hasattr(data, "data_points"):
return {
"data_points": [
{
"attributes": dict(point.attributes),
"value": point.value if hasattr(point, "value") else None,
"count": point.count if hasattr(point, "count") else None,
"sum": point.sum if hasattr(point, "sum") else None,
"timestamp": point.time_unix_nano,
}
for point in data.data_points
]
}
return {}
2025-03-23 17:50:18 -04:00
class RetryingOTLPMetricExporter(MetricExporter):
"""A wrapper around OTLPMetricExporter that adds better retry logic."""
def __init__(self, endpoint, headers=None, timeout=10):
self.exporter = OTLPMetricExporter(
endpoint=endpoint,
headers=headers,
timeout=timeout
)
self.max_retries = OTLP_MAX_RETRIES
self.retry_delay = OTLP_RETRY_DELAY
self.logger = logging.getLogger(__name__)
super().__init__()
def export(self, metrics_data, **kwargs):
"""Export metrics with retry logic for handling connection issues."""
retries = 0
last_exception = None
while retries <= self.max_retries:
try:
return self.exporter.export(metrics_data, **kwargs)
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
ProtocolError,
ReadTimeoutError) as e:
last_exception = e
retries += 1
if retries <= self.max_retries:
# Use exponential backoff
delay = self.retry_delay * (2 ** (retries - 1))
2025-03-26 17:07:38 -07:00
# self.logger.warning(
# f"Honeycomb export attempt {retries} failed: {str(e)}. "
# f"Retrying in {delay}s..."
# )
2025-03-23 17:50:18 -04:00
time.sleep(delay)
2025-03-26 17:07:38 -07:00
# else:
# self.logger.error(
# f"Failed to export to Honeycomb after {retries} attempts: {str(e)}"
# )
2025-03-23 17:50:18 -04:00
except Exception as e:
# For non-connection errors, don't retry
2025-03-26 17:07:38 -07:00
# self.logger.error(f"Unexpected error exporting to Honeycomb: {str(e)}")
2025-03-23 17:50:18 -04:00
return False
# If we get here, all retries failed
return False
def shutdown(self, timeout_millis=30000, **kwargs):
"""Shutdown the exporter."""
return self.exporter.shutdown(timeout_millis, **kwargs)
def force_flush(self, timeout_millis=10000):
"""Force flush the exporter."""
return self.exporter.force_flush(timeout_millis)
def _preferred_temporality(self):
"""Returns the preferred temporality."""
return self.exporter._preferred_temporality()
class RetryingOTLPSpanExporter:
"""A wrapper around OTLPSpanExporter that adds better retry logic."""
def __init__(self, endpoint, headers=None, timeout=10):
self.exporter = OTLPSpanExporter(
endpoint=endpoint,
headers=headers,
timeout=timeout
)
self.max_retries = OTLP_MAX_RETRIES
self.retry_delay = OTLP_RETRY_DELAY
self.logger = logging.getLogger(__name__)
def export(self, spans):
"""Export spans with retry logic for handling connection issues."""
retries = 0
while retries <= self.max_retries:
try:
return self.exporter.export(spans)
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
ProtocolError,
ReadTimeoutError) as e:
retries += 1
if retries <= self.max_retries:
# Use exponential backoff
delay = self.retry_delay * (2 ** (retries - 1))
self.logger.warning(
f"Honeycomb trace export attempt {retries} failed: {str(e)}. "
f"Retrying in {delay}s..."
)
time.sleep(delay)
else:
self.logger.error(
f"Failed to export traces to Honeycomb after {retries} attempts: {str(e)}"
)
except Exception as e:
# For non-connection errors, don't retry
self.logger.error(f"Unexpected error exporting traces to Honeycomb: {str(e)}")
return False
# If we get here, all retries failed
return False
def shutdown(self):
"""Shutdown the exporter."""
return self.exporter.shutdown()
def force_flush(self):
"""Force flush the exporter."""
try:
return self.exporter.force_flush()
except Exception as e:
self.logger.error(f"Error during trace force_flush: {str(e)}")
return False
2024-12-31 10:22:25 +05:30
@dataclass
class UsageRecord:
timestamp: datetime
operation_type: str
tokens_used: int
user_id: str
duration_ms: float
status: str
metadata: Optional[Dict] = None
class TelemetryService:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self):
2025-03-23 17:50:18 -04:00
if not TELEMETRY_ENABLED:
return
2024-12-31 10:22:25 +05:30
self._usage_records: List[UsageRecord] = []
self._user_totals = defaultdict(lambda: defaultdict(int))
self._lock = threading.Lock()
2025-03-23 17:50:18 -04:00
self._installation_id = get_installation_id()
# Initialize OpenTelemetry with more detailed resource attributes
resource = Resource.create({
"service.name": SERVICE_NAME,
"service.version": os.getenv("DATABRIDGE_VERSION", "unknown"),
"installation.id": self._installation_id,
"environment": os.getenv("ENVIRONMENT", "production"),
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.language": "python",
"telemetry.sdk.version": "1.0.0"
})
# Initialize tracing with both file and OTLP exporters
tracer_provider = TracerProvider(resource=resource)
# Always use both exporters
2024-12-31 10:22:25 +05:30
log_dir = Path("logs/telemetry")
log_dir.mkdir(parents=True, exist_ok=True)
2025-03-23 17:50:18 -04:00
# Add file exporter for local logging
file_span_processor = BatchSpanProcessor(FileSpanExporter(str(log_dir)))
tracer_provider.add_span_processor(file_span_processor)
# Add Honeycomb OTLP exporter with retry logic
if HONEYCOMB_ENABLED:
# Create BatchSpanProcessor with improved configuration
otlp_span_processor = BatchSpanProcessor(
RetryingOTLPSpanExporter(
endpoint=OTLP_TRACES_ENDPOINT,
headers=OTLP_HEADERS,
timeout=OTLP_TIMEOUT,
),
# Configure batch processing settings
max_queue_size=OTLP_MAX_QUEUE_SIZE,
max_export_batch_size=OTLP_MAX_EXPORT_BATCH_SIZE,
schedule_delay_millis=OTLP_SCHEDULE_DELAY_MILLIS,
)
tracer_provider.add_span_processor(otlp_span_processor)
2024-12-31 10:22:25 +05:30
trace.set_tracer_provider(tracer_provider)
self.tracer = trace.get_tracer(__name__)
2025-03-23 17:50:18 -04:00
# Initialize metrics with both exporters
metric_readers = [
# Local file metrics reader
PeriodicExportingMetricReader(
2024-12-31 10:22:25 +05:30
FileMetricExporter(str(log_dir)),
export_interval_millis=60000, # Export every minute
2025-03-23 17:50:18 -04:00
),
]
# Add Honeycomb metrics reader if API key is available
if HONEYCOMB_ENABLED:
try:
# Configure the OTLP metric exporter with improved error handling
otlp_metric_exporter = RetryingOTLPMetricExporter(
endpoint=OTLP_METRICS_ENDPOINT,
headers=OTLP_HEADERS,
timeout=OTLP_TIMEOUT,
)
# Configure the metrics reader with improved settings
metric_readers.append(
PeriodicExportingMetricReader(
otlp_metric_exporter,
export_interval_millis=OTLP_SCHEDULE_DELAY_MILLIS,
export_timeout_millis=OTLP_TIMEOUT * 1000,
)
)
print(f"Successfully configured Honeycomb metrics exporter to {OTLP_METRICS_ENDPOINT}")
except Exception as e:
print(f"Failed to configure Honeycomb metrics exporter: {str(e)}")
meter_provider = MeterProvider(resource=resource, metric_readers=metric_readers)
2024-12-31 10:22:25 +05:30
metrics.set_meter_provider(meter_provider)
self.meter = metrics.get_meter(__name__)
# Create metrics
self.operation_counter = self.meter.create_counter(
"databridge.operations",
description="Number of operations performed",
)
self.token_counter = self.meter.create_counter(
"databridge.tokens",
description="Number of tokens processed",
)
self.operation_duration = self.meter.create_histogram(
"databridge.operation.duration",
description="Duration of operations",
unit="ms",
)
@asynccontextmanager
async def track_operation(
self,
operation_type: str,
user_id: str,
tokens_used: int = 0,
metadata: Optional[Dict[str, Any]] = None,
):
"""
2025-03-23 17:50:18 -04:00
Context manager for tracking operations with both usage metrics and OpenTelemetry.
The user_id is hashed to ensure anonymity.
2024-12-31 10:22:25 +05:30
"""
2025-03-23 17:50:18 -04:00
if not TELEMETRY_ENABLED:
yield None
return
2024-12-31 10:22:25 +05:30
start_time = time.time()
status = "success"
current_span = trace.get_current_span()
2025-03-23 17:50:18 -04:00
# Hash the user ID for anonymity
hashed_user_id = hashlib.sha256(user_id.encode()).hexdigest()[:16]
2024-12-31 10:22:25 +05:30
try:
# Add operation attributes to the current span
current_span.set_attribute("operation.type", operation_type)
2025-03-23 17:50:18 -04:00
current_span.set_attribute("user.id", hashed_user_id)
2024-12-31 10:22:25 +05:30
if metadata:
2025-03-23 17:50:18 -04:00
# Create a copy of metadata to avoid modifying the original
metadata_copy = metadata.copy()
# Remove the nested 'metadata' field completely if it exists
if 'metadata' in metadata_copy:
del metadata_copy['metadata']
# Set attributes for all remaining metadata fields
for key, value in metadata_copy.items():
2024-12-31 10:22:25 +05:30
current_span.set_attribute(f"metadata.{key}", str(value))
yield current_span
except Exception as e:
status = "error"
current_span.set_status(Status(StatusCode.ERROR))
current_span.record_exception(e)
raise
finally:
duration = (time.time() - start_time) * 1000 # Convert to milliseconds
# Record metrics
2025-03-23 17:50:18 -04:00
attributes = {
"operation": operation_type,
"status": status,
"installation_id": self._installation_id
}
self.operation_counter.add(1, attributes)
2024-12-31 10:22:25 +05:30
if tokens_used > 0:
2025-03-23 17:50:18 -04:00
self.token_counter.add(tokens_used, attributes)
self.operation_duration.record(duration, attributes)
2024-12-31 10:22:25 +05:30
# Record usage
2025-03-23 17:50:18 -04:00
# Create a sanitized copy of metadata for the usage record
sanitized_metadata = None
if metadata:
sanitized_metadata = metadata.copy()
# Remove the nested 'metadata' field completely if it exists
if 'metadata' in sanitized_metadata:
del sanitized_metadata['metadata']
2024-12-31 10:22:25 +05:30
record = UsageRecord(
timestamp=datetime.now(),
operation_type=operation_type,
tokens_used=tokens_used,
2025-03-23 17:50:18 -04:00
user_id=hashed_user_id,
2024-12-31 10:22:25 +05:30
duration_ms=duration,
status=status,
2025-03-23 17:50:18 -04:00
metadata=sanitized_metadata,
2024-12-31 10:22:25 +05:30
)
with self._lock:
self._usage_records.append(record)
2025-03-23 17:50:18 -04:00
self._user_totals[hashed_user_id][operation_type] += tokens_used
2024-12-31 10:22:25 +05:30
def get_user_usage(self, user_id: str) -> Dict[str, int]:
"""Get usage statistics for a user."""
2025-03-23 17:50:18 -04:00
if not TELEMETRY_ENABLED:
return {}
hashed_user_id = hashlib.sha256(user_id.encode()).hexdigest()[:16]
2024-12-31 10:22:25 +05:30
with self._lock:
2025-03-23 17:50:18 -04:00
return dict(self._user_totals[hashed_user_id])
2024-12-31 10:22:25 +05:30
def get_recent_usage(
self,
user_id: Optional[str] = None,
operation_type: Optional[str] = None,
since: Optional[datetime] = None,
status: Optional[str] = None,
) -> List[UsageRecord]:
"""Get recent usage records with optional filtering."""
2025-03-23 17:50:18 -04:00
if not TELEMETRY_ENABLED:
return []
2024-12-31 10:22:25 +05:30
with self._lock:
records = self._usage_records.copy()
# Apply filters
if user_id:
2025-03-23 17:50:18 -04:00
hashed_user_id = hashlib.sha256(user_id.encode()).hexdigest()[:16]
records = [r for r in records if r.user_id == hashed_user_id]
2024-12-31 10:22:25 +05:30
if operation_type:
records = [r for r in records if r.operation_type == operation_type]
if since:
records = [r for r in records if r.timestamp >= since]
if status:
records = [r for r in records if r.status == status]
return records