morphik-core/start_server.py
Adityavardhan Agrawal 1792275cb8
Format fix, UI package update (#100)
Co-authored-by: Arnav Agrawal <aa779@cornell.edu>
2025-04-20 16:34:29 -07:00

319 lines
12 KiB
Python

import argparse
import atexit
import logging
import os
import signal
import socket
import subprocess
import sys
import time
import requests
import tomli
import uvicorn
from dotenv import load_dotenv
from core.config import get_settings
from core.logging_config import setup_logging
# Global variable to store the worker process
worker_process = None
def wait_for_redis(host="localhost", port=6379, timeout=20):
"""
Wait for Redis to become available.
Args:
host: Redis host address
port: Redis port number
timeout: Maximum time to wait in seconds
Returns:
True if Redis becomes available within the timeout, False otherwise
"""
logging.info(f"Waiting for Redis to be available at {host}:{port}...")
t0 = time.monotonic()
while time.monotonic() - t0 < timeout:
try:
with socket.create_connection((host, port), timeout=1):
logging.info("Redis is accepting connections.")
return True
except (OSError, socket.error):
logging.debug(f"Redis not available yet, retrying... ({int(time.monotonic() - t0)}s elapsed)")
time.sleep(0.3)
logging.error(f"Redis not reachable after {timeout}s")
return False
def check_and_start_redis():
"""Check if the Redis container is running, start if necessary."""
try:
# Check if container exists and is running
check_running_cmd = ["docker", "ps", "-q", "-f", "name=morphik-redis"]
running_container = subprocess.check_output(check_running_cmd).strip()
if running_container:
logging.info("Redis container (morphik-redis) is already running.")
return
# Check if container exists but is stopped
check_exists_cmd = ["docker", "ps", "-a", "-q", "-f", "name=morphik-redis"]
existing_container = subprocess.check_output(check_exists_cmd).strip()
if existing_container:
logging.info("Starting existing Redis container (morphik-redis)...")
subprocess.run(["docker", "start", "morphik-redis"], check=True, capture_output=True)
logging.info("Redis container started.")
else:
logging.info("Creating and starting Redis container (morphik-redis)...")
subprocess.run(
["docker", "run", "-d", "--name", "morphik-redis", "-p", "6379:6379", "redis"],
check=True,
capture_output=True,
)
logging.info("Redis container created and started.")
except subprocess.CalledProcessError as e:
logging.error(f"Failed to manage Redis container: {e}")
logging.error(f"Stderr: {e.stderr.decode() if e.stderr else 'N/A'}")
sys.exit(1)
except FileNotFoundError:
logging.error("Docker command not found. Please ensure Docker is installed and in PATH.")
sys.exit(1)
def start_arq_worker():
"""Start the ARQ worker as a subprocess."""
global worker_process
try:
logging.info("Starting ARQ worker...")
# Ensure logs directory exists
log_dir = os.path.join(os.getcwd(), "logs")
os.makedirs(log_dir, exist_ok=True)
# Worker log file paths
worker_log_path = os.path.join(log_dir, "worker.log")
# Open log files
worker_log = open(worker_log_path, "a")
# Add timestamp to log
timestamp = subprocess.check_output(["date"]).decode().strip()
worker_log.write(f"\n\n--- Worker started at {timestamp} ---\n\n")
worker_log.flush()
# Use sys.executable to ensure the same Python environment is used
worker_cmd = [sys.executable, "-m", "arq", "core.workers.ingestion_worker.WorkerSettings"]
# Start the worker with output redirected to log files
worker_process = subprocess.Popen(
worker_cmd,
stdout=worker_log,
stderr=worker_log,
env=dict(os.environ, PYTHONUNBUFFERED="1"), # Ensure unbuffered output
)
logging.info(f"ARQ worker started with PID: {worker_process.pid}")
logging.info(f"Worker logs available at: {worker_log_path}")
except Exception as e:
logging.error(f"Failed to start ARQ worker: {e}")
sys.exit(1)
def cleanup_processes():
"""Stop the ARQ worker process on exit."""
global worker_process
if worker_process and worker_process.poll() is None: # Check if process is still running
logging.info(f"Stopping ARQ worker (PID: {worker_process.pid})...")
# Log the worker termination
try:
log_dir = os.path.join(os.getcwd(), "logs")
worker_log_path = os.path.join(log_dir, "worker.log")
with open(worker_log_path, "a") as worker_log:
timestamp = subprocess.check_output(["date"]).decode().strip()
worker_log.write(f"\n\n--- Worker stopping at {timestamp} ---\n\n")
except Exception as e:
logging.warning(f"Could not write worker stop message to log: {e}")
# Send SIGTERM first for graceful shutdown
worker_process.terminate()
try:
# Wait a bit for graceful shutdown
worker_process.wait(timeout=5)
logging.info("ARQ worker stopped gracefully.")
except subprocess.TimeoutExpired:
logging.warning("ARQ worker did not terminate gracefully, sending SIGKILL.")
worker_process.kill() # Force kill if it doesn't stop
logging.info("ARQ worker killed.")
# Close any open file descriptors for the process
if hasattr(worker_process, "stdout") and worker_process.stdout:
worker_process.stdout.close()
if hasattr(worker_process, "stderr") and worker_process.stderr:
worker_process.stderr.close()
# Optional: Add Redis container stop logic here if desired
# try:
# logging.info("Stopping Redis container...")
# subprocess.run(["docker", "stop", "morphik-redis"], check=False, capture_output=True)
# except Exception as e:
# logging.warning(f"Could not stop Redis container: {e}")
# Register the cleanup function to be called on script exit
atexit.register(cleanup_processes)
# Also register for SIGINT (Ctrl+C) and SIGTERM
signal.signal(signal.SIGINT, lambda sig, frame: sys.exit(0))
signal.signal(signal.SIGTERM, lambda sig, frame: sys.exit(0))
def check_ollama_running(base_url):
"""Check if Ollama is running and accessible at the given URL."""
try:
api_url = f"{base_url}/api/tags"
response = requests.get(api_url, timeout=2)
return response.status_code == 200
except requests.RequestException:
return False
def get_ollama_usage_info():
"""Check if Ollama is required based on the configuration file and get base URLs."""
try:
with open("morphik.toml", "rb") as f:
config = tomli.load(f)
ollama_configs = []
# Get registered Ollama models first
ollama_models = {}
if "registered_models" in config:
for model_key, model_config in config["registered_models"].items():
model_name = model_config.get("model_name", "")
if "ollama" in model_name:
api_base = model_config.get("api_base")
if api_base:
ollama_models[model_key] = api_base
# Check which components are using Ollama models
components_to_check = ["embedding", "completion", "rules", "graph", "parser.vision"]
for component in components_to_check:
if component == "parser.vision":
# Special handling for parser.vision
if "parser" in config and "vision" in config["parser"]:
model_key = config["parser"]["vision"].get("model")
if model_key in ollama_models:
ollama_configs.append({"component": component, "base_url": ollama_models[model_key]})
else:
# Standard component check
if component in config:
model_key = config[component].get("model")
if model_key in ollama_models:
ollama_configs.append({"component": component, "base_url": ollama_models[model_key]})
# Add contextual chunking model check
if (
"parser" in config
and config["parser"].get("use_contextual_chunking")
and "contextual_chunking_model" in config["parser"]
):
model_key = config["parser"]["contextual_chunking_model"]
if model_key in ollama_models:
ollama_configs.append(
{
"component": "parser.contextual_chunking",
"base_url": ollama_models[model_key],
}
)
return ollama_configs
except Exception as e:
logging.error(f"Error checking Ollama configuration: {e}")
return []
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="Start the Morphik server")
parser.add_argument(
"--log",
choices=["debug", "info", "warning", "error"],
default="info",
help="Set the logging level",
)
parser.add_argument(
"--skip-ollama-check",
action="store_true",
help="Skip Ollama availability check",
)
args = parser.parse_args()
# Set up logging first with specified level
setup_logging(log_level=args.log.upper())
# Check and start Redis container
check_and_start_redis()
# Load environment variables from .env file
load_dotenv()
# Check if Ollama is required and running
if not args.skip_ollama_check:
ollama_configs = get_ollama_usage_info()
if ollama_configs:
# Group configs by base_url to avoid redundant checks
base_urls = {}
for config in ollama_configs:
if config["base_url"] not in base_urls:
base_urls[config["base_url"]] = []
base_urls[config["base_url"]].append(config["component"])
all_running = True
for base_url, components in base_urls.items():
if not check_ollama_running(base_url):
print(f"ERROR: Ollama is not accessible at {base_url}")
print(f"This URL is used by these components: {', '.join(components)}")
all_running = False
if not all_running:
print("\nPlease ensure Ollama is running at the configured URLs before starting the server")
print("Run with --skip-ollama-check to bypass this check")
sys.exit(1)
else:
component_list = [config["component"] for config in ollama_configs]
print(f"Ollama is running and will be used for: {', '.join(component_list)}")
# Load settings (this will validate all required env vars)
settings = get_settings()
# Wait for Redis to be available (using environment variables or defaults)
redis_host = os.environ.get("REDIS_HOST", "127.0.0.1")
redis_port = int(os.environ.get("REDIS_PORT", "6379"))
if not wait_for_redis(host=redis_host, port=redis_port):
logging.error("Cannot start server without Redis. Please ensure Redis is running.")
sys.exit(1)
# Start ARQ worker in the background
start_arq_worker()
# Start server (this is blocking)
logging.info("Starting Uvicorn server...")
uvicorn.run(
"core.api:app",
host=settings.HOST,
port=settings.PORT,
loop="asyncio",
log_level=args.log,
# reload=settings.RELOAD # Reload might interfere with subprocess management
)
if __name__ == "__main__":
main()