mirror of
https://github.com/james-m-jordan/morphik-core.git
synced 2025-05-09 19:32:38 +00:00
Robustify redis connection
This commit is contained in:
parent
1ec1efe176
commit
f161b7dd2a
@ -4,8 +4,11 @@ from typing import Dict, Any, List, Optional
|
||||
from datetime import datetime, UTC
|
||||
from pathlib import Path
|
||||
import asyncio
|
||||
import os
|
||||
import urllib.parse as up
|
||||
|
||||
import arq
|
||||
from arq.connections import RedisSettings
|
||||
from core.models.auth import AuthContext, EntityType
|
||||
from core.models.documents import Document
|
||||
from core.database.postgres_database import PostgresDatabase
|
||||
@ -461,6 +464,26 @@ async def shutdown(ctx):
|
||||
# Close any other open connections or resources that need cleanup
|
||||
logger.info("Worker shutdown complete.")
|
||||
|
||||
def redis_settings_from_env() -> RedisSettings:
|
||||
"""
|
||||
Create RedisSettings from environment variables for ARQ worker.
|
||||
|
||||
Returns:
|
||||
RedisSettings configured for Redis connection with optimized performance
|
||||
"""
|
||||
url = up.urlparse(os.getenv("REDIS_URL", "redis://127.0.0.1:6379/0"))
|
||||
|
||||
# Use ARQ's supported parameters with optimized values for stability
|
||||
# For high-volume ingestion (100+ documents), these settings help prevent timeouts
|
||||
return RedisSettings(
|
||||
host=url.hostname or os.getenv("REDIS_HOST", "127.0.0.1"),
|
||||
port=url.port or int(os.getenv("REDIS_PORT", "6379")),
|
||||
database=int(url.path.lstrip("/") or 0),
|
||||
conn_timeout=5, # Increased connection timeout (seconds)
|
||||
conn_retries=15, # More retries for transient connection issues
|
||||
conn_retry_delay=1 # Quick retry delay (seconds)
|
||||
)
|
||||
|
||||
# ARQ Worker Settings
|
||||
class WorkerSettings:
|
||||
"""
|
||||
@ -472,40 +495,92 @@ class WorkerSettings:
|
||||
functions = [process_ingestion_job]
|
||||
on_startup = startup
|
||||
on_shutdown = shutdown
|
||||
# Redis settings will be loaded from environment variables by default
|
||||
# Other optional settings:
|
||||
# redis_settings = arq.connections.RedisSettings(host='localhost', port=6379)
|
||||
|
||||
# Use robust Redis settings that handle connection issues
|
||||
redis_settings = redis_settings_from_env()
|
||||
|
||||
# Result storage settings
|
||||
keep_result_ms = 24 * 60 * 60 * 1000 # Keep results for 24 hours (24 * 60 * 60 * 1000 ms)
|
||||
max_jobs = 5 # Reduce concurrent jobs to prevent connection pool exhaustion
|
||||
health_check_interval = 300 # Check worker health every 5 minutes instead of 30 seconds to reduce connection overhead
|
||||
job_timeout = 3600 # 1 hour timeout for jobs
|
||||
max_tries = 3 # Retry failed jobs up to 3 times
|
||||
poll_delay = 0.5 # Poll delay to prevent excessive Redis queries
|
||||
|
||||
# Concurrency settings - optimized for high-volume ingestion
|
||||
max_jobs = 3 # Reduced to prevent resource contention during batch processing
|
||||
|
||||
# Resource management
|
||||
health_check_interval = 600 # Extended to 10 minutes to reduce Redis overhead
|
||||
job_timeout = 7200 # Extended to 2 hours for large document processing
|
||||
max_tries = 5 # Retry failed jobs up to 5 times
|
||||
poll_delay = 2.0 # Increased poll delay to prevent Redis connection saturation
|
||||
|
||||
# High reliability settings
|
||||
allow_abort_jobs = False # Don't abort jobs on worker shutdown
|
||||
retry_jobs = True # Always retry failed jobs
|
||||
|
||||
# Prevent queue blocking on error
|
||||
skip_queue_when_queues_read_fails = True # Continue processing other queues if one fails
|
||||
|
||||
# Log Redis and connection pool information for debugging
|
||||
@staticmethod
|
||||
async def health_check(ctx):
|
||||
"""Periodic health check to log connection status and job stats."""
|
||||
"""
|
||||
Enhanced periodic health check to log connection status and job stats.
|
||||
Monitors Redis memory, database connections, and job processing metrics.
|
||||
"""
|
||||
database = ctx.get('database')
|
||||
vector_store = ctx.get('vector_store')
|
||||
job_stats = ctx.get('job_stats', {})
|
||||
redis_info = await ctx['redis'].info()
|
||||
|
||||
logger.info(f"Health check: Redis v{redis_info.get('redis_version', 'unknown')} "
|
||||
f"mem_usage={redis_info.get('used_memory_human', 'unknown')} "
|
||||
f"clients_connected={redis_info.get('connected_clients', 'unknown')} "
|
||||
f"db_keys={redis_info.get('db0', {}).get('keys', 0)}"
|
||||
# Get detailed Redis info
|
||||
try:
|
||||
redis_info = await ctx['redis'].info(section=['Server', 'Memory', 'Clients', 'Stats'])
|
||||
|
||||
# Server and resource usage info
|
||||
redis_version = redis_info.get('redis_version', 'unknown')
|
||||
used_memory = redis_info.get('used_memory_human', 'unknown')
|
||||
used_memory_peak = redis_info.get('used_memory_peak_human', 'unknown')
|
||||
clients_connected = redis_info.get('connected_clients', 'unknown')
|
||||
rejected_connections = redis_info.get('rejected_connections', 0)
|
||||
total_commands = redis_info.get('total_commands_processed', 0)
|
||||
|
||||
# DB keys
|
||||
db_info = redis_info.get('db0', {})
|
||||
keys_count = db_info.get('keys', 0) if isinstance(db_info, dict) else 0
|
||||
|
||||
# Log comprehensive server status
|
||||
logger.info(
|
||||
f"Redis Status: v{redis_version} | "
|
||||
f"Memory: {used_memory} (peak: {used_memory_peak}) | "
|
||||
f"Clients: {clients_connected} (rejected: {rejected_connections}) | "
|
||||
f"DB Keys: {keys_count} | Commands: {total_commands}"
|
||||
)
|
||||
|
||||
# Check for memory warning thresholds
|
||||
if isinstance(used_memory, str) and used_memory.endswith('G'):
|
||||
memory_value = float(used_memory[:-1])
|
||||
if memory_value > 1.0: # More than 1GB used
|
||||
logger.warning(f"Redis memory usage is high: {used_memory}")
|
||||
|
||||
# Check for connection issues
|
||||
if rejected_connections and int(rejected_connections) > 0:
|
||||
logger.warning(f"Redis has rejected {rejected_connections} connections")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get Redis info: {str(e)}")
|
||||
|
||||
# Log job statistics with detailed processing metrics
|
||||
ongoing = job_stats.get('ongoing', 0)
|
||||
queued = job_stats.get('queued', 0)
|
||||
|
||||
logger.info(
|
||||
f"Job Stats: completed={job_stats.get('complete', 0)} | "
|
||||
f"failed={job_stats.get('failed', 0)} | "
|
||||
f"retried={job_stats.get('retried', 0)} | "
|
||||
f"ongoing={ongoing} | queued={queued}"
|
||||
)
|
||||
|
||||
# Log job statistics
|
||||
logger.info(f"Job stats: completed={job_stats.get('complete', 0)} "
|
||||
f"failed={job_stats.get('failed', 0)} "
|
||||
f"retried={job_stats.get('retried', 0)} "
|
||||
f"ongoing={job_stats.get('ongoing', 0)} "
|
||||
f"queued={job_stats.get('queued', 0)}"
|
||||
)
|
||||
# Warn if too many jobs are queued/backed up
|
||||
if queued > 50:
|
||||
logger.warning(f"Large job queue backlog: {queued} jobs waiting")
|
||||
|
||||
# Test database connectivity
|
||||
# Test database connectivity with extended timeout
|
||||
if database and hasattr(database, 'async_session'):
|
||||
try:
|
||||
async with database.async_session() as session:
|
||||
|
@ -8,6 +8,8 @@ import subprocess
|
||||
import signal
|
||||
import os
|
||||
import atexit
|
||||
import socket
|
||||
import time
|
||||
from dotenv import load_dotenv
|
||||
from core.config import get_settings
|
||||
from core.logging_config import setup_logging
|
||||
@ -15,6 +17,32 @@ from core.logging_config import setup_logging
|
||||
# Global variable to store the worker process
|
||||
worker_process = None
|
||||
|
||||
def wait_for_redis(host="localhost", port=6379, timeout=20):
|
||||
"""
|
||||
Wait for Redis to become available.
|
||||
|
||||
Args:
|
||||
host: Redis host address
|
||||
port: Redis port number
|
||||
timeout: Maximum time to wait in seconds
|
||||
|
||||
Returns:
|
||||
True if Redis becomes available within the timeout, False otherwise
|
||||
"""
|
||||
logging.info(f"Waiting for Redis to be available at {host}:{port}...")
|
||||
t0 = time.monotonic()
|
||||
while time.monotonic() - t0 < timeout:
|
||||
try:
|
||||
with socket.create_connection((host, port), timeout=1):
|
||||
logging.info("Redis is accepting connections.")
|
||||
return True
|
||||
except (OSError, socket.error):
|
||||
logging.debug(f"Redis not available yet, retrying... ({int(time.monotonic() - t0)}s elapsed)")
|
||||
time.sleep(0.3)
|
||||
|
||||
logging.error(f"Redis not reachable after {timeout}s")
|
||||
return False
|
||||
|
||||
def check_and_start_redis():
|
||||
"""Check if the Redis container is running, start if necessary."""
|
||||
try:
|
||||
@ -265,6 +293,13 @@ def main():
|
||||
|
||||
# Load settings (this will validate all required env vars)
|
||||
settings = get_settings()
|
||||
|
||||
# Wait for Redis to be available (using environment variables or defaults)
|
||||
redis_host = os.environ.get("REDIS_HOST", "127.0.0.1")
|
||||
redis_port = int(os.environ.get("REDIS_PORT", "6379"))
|
||||
if not wait_for_redis(host=redis_host, port=redis_port):
|
||||
logging.error("Cannot start server without Redis. Please ensure Redis is running.")
|
||||
sys.exit(1)
|
||||
|
||||
# Start ARQ worker in the background
|
||||
start_arq_worker()
|
||||
|
Loading…
x
Reference in New Issue
Block a user