diff --git a/core/workers/ingestion_worker.py b/core/workers/ingestion_worker.py index 5294b8f..4ac6352 100644 --- a/core/workers/ingestion_worker.py +++ b/core/workers/ingestion_worker.py @@ -4,8 +4,11 @@ from typing import Dict, Any, List, Optional from datetime import datetime, UTC from pathlib import Path import asyncio +import os +import urllib.parse as up import arq +from arq.connections import RedisSettings from core.models.auth import AuthContext, EntityType from core.models.documents import Document from core.database.postgres_database import PostgresDatabase @@ -461,6 +464,26 @@ async def shutdown(ctx): # Close any other open connections or resources that need cleanup logger.info("Worker shutdown complete.") +def redis_settings_from_env() -> RedisSettings: + """ + Create RedisSettings from environment variables for ARQ worker. + + Returns: + RedisSettings configured for Redis connection with optimized performance + """ + url = up.urlparse(os.getenv("REDIS_URL", "redis://127.0.0.1:6379/0")) + + # Use ARQ's supported parameters with optimized values for stability + # For high-volume ingestion (100+ documents), these settings help prevent timeouts + return RedisSettings( + host=url.hostname or os.getenv("REDIS_HOST", "127.0.0.1"), + port=url.port or int(os.getenv("REDIS_PORT", "6379")), + database=int(url.path.lstrip("/") or 0), + conn_timeout=5, # Increased connection timeout (seconds) + conn_retries=15, # More retries for transient connection issues + conn_retry_delay=1 # Quick retry delay (seconds) + ) + # ARQ Worker Settings class WorkerSettings: """ @@ -472,40 +495,92 @@ class WorkerSettings: functions = [process_ingestion_job] on_startup = startup on_shutdown = shutdown - # Redis settings will be loaded from environment variables by default - # Other optional settings: - # redis_settings = arq.connections.RedisSettings(host='localhost', port=6379) + + # Use robust Redis settings that handle connection issues + redis_settings = redis_settings_from_env() + + # Result storage settings keep_result_ms = 24 * 60 * 60 * 1000 # Keep results for 24 hours (24 * 60 * 60 * 1000 ms) - max_jobs = 5 # Reduce concurrent jobs to prevent connection pool exhaustion - health_check_interval = 300 # Check worker health every 5 minutes instead of 30 seconds to reduce connection overhead - job_timeout = 3600 # 1 hour timeout for jobs - max_tries = 3 # Retry failed jobs up to 3 times - poll_delay = 0.5 # Poll delay to prevent excessive Redis queries + + # Concurrency settings - optimized for high-volume ingestion + max_jobs = 3 # Reduced to prevent resource contention during batch processing + + # Resource management + health_check_interval = 600 # Extended to 10 minutes to reduce Redis overhead + job_timeout = 7200 # Extended to 2 hours for large document processing + max_tries = 5 # Retry failed jobs up to 5 times + poll_delay = 2.0 # Increased poll delay to prevent Redis connection saturation + + # High reliability settings + allow_abort_jobs = False # Don't abort jobs on worker shutdown + retry_jobs = True # Always retry failed jobs + + # Prevent queue blocking on error + skip_queue_when_queues_read_fails = True # Continue processing other queues if one fails # Log Redis and connection pool information for debugging @staticmethod async def health_check(ctx): - """Periodic health check to log connection status and job stats.""" + """ + Enhanced periodic health check to log connection status and job stats. + Monitors Redis memory, database connections, and job processing metrics. + """ database = ctx.get('database') vector_store = ctx.get('vector_store') job_stats = ctx.get('job_stats', {}) - redis_info = await ctx['redis'].info() - logger.info(f"Health check: Redis v{redis_info.get('redis_version', 'unknown')} " - f"mem_usage={redis_info.get('used_memory_human', 'unknown')} " - f"clients_connected={redis_info.get('connected_clients', 'unknown')} " - f"db_keys={redis_info.get('db0', {}).get('keys', 0)}" + # Get detailed Redis info + try: + redis_info = await ctx['redis'].info(section=['Server', 'Memory', 'Clients', 'Stats']) + + # Server and resource usage info + redis_version = redis_info.get('redis_version', 'unknown') + used_memory = redis_info.get('used_memory_human', 'unknown') + used_memory_peak = redis_info.get('used_memory_peak_human', 'unknown') + clients_connected = redis_info.get('connected_clients', 'unknown') + rejected_connections = redis_info.get('rejected_connections', 0) + total_commands = redis_info.get('total_commands_processed', 0) + + # DB keys + db_info = redis_info.get('db0', {}) + keys_count = db_info.get('keys', 0) if isinstance(db_info, dict) else 0 + + # Log comprehensive server status + logger.info( + f"Redis Status: v{redis_version} | " + f"Memory: {used_memory} (peak: {used_memory_peak}) | " + f"Clients: {clients_connected} (rejected: {rejected_connections}) | " + f"DB Keys: {keys_count} | Commands: {total_commands}" + ) + + # Check for memory warning thresholds + if isinstance(used_memory, str) and used_memory.endswith('G'): + memory_value = float(used_memory[:-1]) + if memory_value > 1.0: # More than 1GB used + logger.warning(f"Redis memory usage is high: {used_memory}") + + # Check for connection issues + if rejected_connections and int(rejected_connections) > 0: + logger.warning(f"Redis has rejected {rejected_connections} connections") + except Exception as e: + logger.error(f"Failed to get Redis info: {str(e)}") + + # Log job statistics with detailed processing metrics + ongoing = job_stats.get('ongoing', 0) + queued = job_stats.get('queued', 0) + + logger.info( + f"Job Stats: completed={job_stats.get('complete', 0)} | " + f"failed={job_stats.get('failed', 0)} | " + f"retried={job_stats.get('retried', 0)} | " + f"ongoing={ongoing} | queued={queued}" ) - # Log job statistics - logger.info(f"Job stats: completed={job_stats.get('complete', 0)} " - f"failed={job_stats.get('failed', 0)} " - f"retried={job_stats.get('retried', 0)} " - f"ongoing={job_stats.get('ongoing', 0)} " - f"queued={job_stats.get('queued', 0)}" - ) + # Warn if too many jobs are queued/backed up + if queued > 50: + logger.warning(f"Large job queue backlog: {queued} jobs waiting") - # Test database connectivity + # Test database connectivity with extended timeout if database and hasattr(database, 'async_session'): try: async with database.async_session() as session: diff --git a/start_server.py b/start_server.py index 7e54318..f914bbd 100644 --- a/start_server.py +++ b/start_server.py @@ -8,6 +8,8 @@ import subprocess import signal import os import atexit +import socket +import time from dotenv import load_dotenv from core.config import get_settings from core.logging_config import setup_logging @@ -15,6 +17,32 @@ from core.logging_config import setup_logging # Global variable to store the worker process worker_process = None +def wait_for_redis(host="localhost", port=6379, timeout=20): + """ + Wait for Redis to become available. + + Args: + host: Redis host address + port: Redis port number + timeout: Maximum time to wait in seconds + + Returns: + True if Redis becomes available within the timeout, False otherwise + """ + logging.info(f"Waiting for Redis to be available at {host}:{port}...") + t0 = time.monotonic() + while time.monotonic() - t0 < timeout: + try: + with socket.create_connection((host, port), timeout=1): + logging.info("Redis is accepting connections.") + return True + except (OSError, socket.error): + logging.debug(f"Redis not available yet, retrying... ({int(time.monotonic() - t0)}s elapsed)") + time.sleep(0.3) + + logging.error(f"Redis not reachable after {timeout}s") + return False + def check_and_start_redis(): """Check if the Redis container is running, start if necessary.""" try: @@ -265,6 +293,13 @@ def main(): # Load settings (this will validate all required env vars) settings = get_settings() + + # Wait for Redis to be available (using environment variables or defaults) + redis_host = os.environ.get("REDIS_HOST", "127.0.0.1") + redis_port = int(os.environ.get("REDIS_PORT", "6379")) + if not wait_for_redis(host=redis_host, port=redis_port): + logging.error("Cannot start server without Redis. Please ensure Redis is running.") + sys.exit(1) # Start ARQ worker in the background start_arq_worker()