Source code for kuhl_haus.mdp.helpers.process_manager

"""Multiprocess orchestration for async workers with lifecycle management.

Spawns, monitors, and gracefully terminates worker processes in the real-time
data pipeline. Handles asyncio event loop creation, signal propagation, and
OpenTelemetry context propagation across process boundaries.
"""
from multiprocessing import Process, Queue
from queue import Empty, Full
from multiprocessing.synchronize import Event as MPEvent
from typing import Dict, Type, Any, Optional
import signal
import asyncio
import logging

logger = logging.getLogger(__name__)


[docs] class ProcessManager: """Spawn and manage async worker processes with graceful shutdown and status reporting."""
[docs] def __init__(self): self.processes: Dict[str, Process] = {} self.shutdown_events: Dict[str, MPEvent] = {} self.status_queues: Dict[str, Queue] = {}
[docs] def start_worker(self, name: str, worker_class: Type, **kwargs): """Fork worker process with dedicated event loop and OpenTelemetry trace context. Side effects: Creates Process, starts OS-level process, registers in internal tracking. """ import multiprocessing as mp # Keep factory import local from opentelemetry.context import get_current from opentelemetry.propagate import inject # Capture current trace context from parent process trace_context = {} inject(trace_context, get_current()) shutdown_event = mp.Event() # Factory call status_queue = mp.Queue(maxsize=1) process = Process( target=self._run_worker, args=(worker_class, shutdown_event, status_queue, trace_context), kwargs=kwargs, name=name, daemon=False ) self.processes[name] = process self.shutdown_events[name] = shutdown_event self.status_queues[name] = status_queue process.start() logger.info(f"Started process: {name} (PID: {process.pid})")
@staticmethod def _run_worker( worker_class: Type[Any], shutdown_event: MPEvent, status_queue: Queue, trace_context: Optional[Dict] = None, **kwargs: Any ) -> None: """Entry point for worker subprocess; sets up event loop, signals, and executes worker lifecycle. Creates fresh asyncio event loop, installs signal handlers for SIGTERM/SIGINT, propagates OpenTelemetry context from parent, and manages worker start/stop. Monitors shutdown_event and periodically reports status to parent via queue. """ # Set up OpenTelemetry instrumentation in this child process _setup_otel_auto_instrumentation() # Attach trace context from parent if provided if trace_context: try: from opentelemetry.context import attach from opentelemetry.propagate import extract attach(extract(trace_context)) logger.debug("Attached trace context from parent process") except Exception as e: logger.warning(f"Failed to attach trace context: {e}") # Signal handlers for graceful shutdown def shutdown_handler(signum, frame): shutdown_event.set() signal.signal(signal.SIGTERM, shutdown_handler) signal.signal(signal.SIGINT, shutdown_handler) # Create new event loop for this process loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Create asyncio event for coordinating shutdown async_shutdown = asyncio.Event() async def monitor_shutdown(): """Monitor multiprocessing shutdown_event and set async event""" while not shutdown_event.is_set(): await asyncio.sleep(0.1) # Poll frequently without blocking async_shutdown.set() logger.info(f"Shutdown signal detected for {worker_class.__name__}") async def status_reporter(worker_instance): """Periodically report worker status to parent process""" while not async_shutdown.is_set(): try: status = { "processed": getattr(worker_instance, 'processed', 0), "errors": getattr(worker_instance, 'errors', getattr(worker_instance, 'error', 0)), "decoding_errors": getattr(worker_instance, 'decoding_error', 0), "published": getattr(worker_instance, 'published', 0), "processing_errors": getattr(worker_instance, 'processing_error', 0), "mdq_connected": getattr(worker_instance, 'mdq_connected', False), "mdc_connected": getattr(worker_instance, 'mdc_connected', False), "restarts": getattr(worker_instance, 'restarts', 0), "running": getattr(worker_instance, 'running', False), } try: status_queue.put_nowait(status) except Full: pass # Queue full, skip this update except Exception as ex: logger.error(f"Error updating status for {worker_class.__name__}: {ex}") # Wait 1 second before next status update try: await asyncio.wait_for(async_shutdown.wait(), timeout=1.0) except asyncio.TimeoutError: pass # Expected - just continue loop async def run_worker(): """Main worker coroutine that manages worker lifecycle""" worker = None try: # Instantiate worker worker = worker_class(**kwargs) logger.info(f"Instantiated {worker_class.__name__}") # Start monitoring and status tasks monitor_task = asyncio.create_task(monitor_shutdown()) status_task = asyncio.create_task(status_reporter(worker)) # Start the worker (may block indefinitely or return immediately) logger.info(f"Starting {worker_class.__name__}") worker_task = asyncio.create_task(worker.start()) # Wait for shutdown signal while worker and status tasks run await async_shutdown.wait() logger.info(f"Shutdown initiated for {worker_class.__name__}") # Signal worker to stop if it has a running flag if hasattr(worker, 'running'): worker.running = False logger.debug(f"Set running=False for {worker_class.__name__}") # Cancel worker task if it's still running if not worker_task.done(): logger.info(f"Cancelling worker task for {worker_class.__name__}") worker_task.cancel() try: await asyncio.wait_for(worker_task, timeout=5.0) except (asyncio.CancelledError, asyncio.TimeoutError): logger.warning(f"Worker task cancellation timeout for {worker_class.__name__}") except Exception as ex: logger.error(f"Error during worker task cancellation: {ex}") # Call worker's stop method for cleanup logger.info(f"Calling stop() for {worker_class.__name__}") await worker.stop() # Cancel monitoring tasks monitor_task.cancel() status_task.cancel() # Wait for cleanup with timeout try: await asyncio.wait_for( asyncio.gather(monitor_task, status_task, return_exceptions=True), timeout=2.0 ) except asyncio.TimeoutError: logger.warning(f"Monitoring task cleanup timeout for {worker_class.__name__}") logger.info(f"Worker {worker_class.__name__} stopped cleanly") except Exception as ex: logger.error(f"Worker error in {worker_class.__name__}: {ex}", exc_info=True) finally: # Ensure stop is called even if errors occurred if worker is not None: try: await worker.stop() except Exception as ex: logger.error(f"Error during final stop() call: {ex}", exc_info=True) # Run the worker coroutine try: loop.run_until_complete(run_worker()) except Exception as e: logger.error(f"Fatal error in worker process {worker_class.__name__}: {e}", exc_info=True) finally: try: # Cancel any remaining tasks pending = asyncio.all_tasks(loop) for task in pending: task.cancel() # Give tasks a chance to clean up if pending: loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) except Exception as e: logger.error(f"Error during event loop cleanup: {e}") finally: loop.close()
[docs] def stop_process(self, name: str, timeout: float = 10.0): """Signal graceful shutdown and wait; force-kill if worker doesn't terminate.""" if name not in self.processes: return process = self.processes[name] shutdown_event = self.shutdown_events[name] logger.info(f"Stopping process: {name}") shutdown_event.set() process.join(timeout=timeout) if process.is_alive(): logger.warning(f"Force killing process: {name}") process.kill() process.join()
[docs] def stop_all(self, timeout: float = 10.0): """Gracefully stop all managed workers; force-kill any that exceed timeout.""" for name in list(self.processes.keys()): self.stop_process(name, timeout)
[docs] def get_status(self, name: str) -> dict: """Retrieve latest worker metrics from status queue without blocking.""" if name not in self.processes: return {"alive": False} process = self.processes[name] status_queue = self.status_queues[name] status = {"alive": process.is_alive(), "pid": process.pid} try: worker_status = status_queue.get_nowait() status.update(worker_status) except Empty: pass except Exception as e: logger.error(f"Error getting status for process name {name}: {e}", exc_info=True) return status
def _setup_otel_auto_instrumentation(): """Initialize OpenTelemetry instrumentation in child process using OTEL_* env vars.""" try: from opentelemetry.instrumentation.auto_instrumentation import sitecustomize # This reads all OTEL_* environment variables and configures everything sitecustomize.initialize() logger.info("OpenTelemetry auto-instrumentation configured from environment variables") except Exception as e: logger.error(f"Failed to set up OTel auto-instrumentation: {e}")