kuhl_haus.mdp.helpers package

Shared utilities for logging, observability, serialization, and process management.

Submodules

kuhl_haus.mdp.helpers.observability module

OpenTelemetry tracer and meter factory with version resolution.

Provides centralized access to instrumented tracers and meters with automatic package version detection. Supports multi-package deployments where servers and libraries report distinct versions.

kuhl_haus.mdp.helpers.observability.get_tracer(name, pkg_name=None)[source]

Get an OpenTelemetry tracer.

Parameters:
  • name – Tracer name (typically __name__).

  • pkg_name – Optional package name to resolve version. Defaults to kuhl-haus-mdp.

Examples

Default usage (resolves kuhl-haus-mdp version):

from kuhl_haus.mdp.helpers.observability import (
    get_tracer,
)
tracer = get_tracer(__name__)

Cross-package usage (resolves another package’s version):

tracer = get_tracer(
    __name__,
    pkg_name="kuhl-haus-mdp-servers",
)
kuhl_haus.mdp.helpers.observability.get_meter(name, pkg_name=None)[source]

Get an OpenTelemetry meter.

Parameters:
  • name – Meter name (typically __name__).

  • pkg_name – Optional package name to resolve version. Defaults to kuhl-haus-mdp.

Examples

Default usage (resolves kuhl-haus-mdp version):

from kuhl_haus.mdp.helpers.observability import (
    get_meter,
)
meter = get_meter(__name__)

Cross-package usage (resolves another package’s version):

meter = get_meter(
    __name__,
    pkg_name="kuhl-haus-mdp-servers",
)

kuhl_haus.mdp.helpers.process_manager module

Multiprocess orchestration for async workers with lifecycle management.

Spawns, monitors, and gracefully terminates worker processes in the real-time data pipeline. Handles asyncio event loop creation, signal propagation, and OpenTelemetry context propagation across process boundaries.

class kuhl_haus.mdp.helpers.process_manager.ProcessManager[source]

Bases: object

Spawn and manage async worker processes with graceful shutdown and status reporting.

__init__()[source]
start_worker(name: str, worker_class: Type, **kwargs)[source]

Fork worker process with dedicated event loop and OpenTelemetry trace context.

Side effects: Creates Process, starts OS-level process, registers in internal tracking.

stop_process(name: str, timeout: float = 10.0)[source]

Signal graceful shutdown and wait; force-kill if worker doesn’t terminate.

stop_all(timeout: float = 10.0)[source]

Gracefully stop all managed workers; force-kill any that exceed timeout.

get_status(name: str) dict[source]

Retrieve latest worker metrics from status queue without blocking.

kuhl_haus.mdp.helpers.queue_name_resolver module

Route WebSocket messages to appropriate Redis queues by message type.

Maps Massive WebSocket message types (trade, quote, agg, halt) to named Redis queues for downstream processing. Called once per incoming message.

class kuhl_haus.mdp.helpers.queue_name_resolver.QueueNameResolver[source]

Bases: object

Determine Redis queue name for a given WebSocket message type.

static queue_name_for_web_socket_message(message: WebSocketMessage)[source]

Return Redis queue name based on message type; UNKNOWN for unrecognized types.

kuhl_haus.mdp.helpers.serde module

kuhl_haus.mdp.helpers.serde.to_dict(obj: Any) Any[source]

Recursively convert an object to a dictionary, handling nested objects and lists.

Parameters:

obj – The object to convert

Returns:

A JSON-serializable representation of the object, with all nested objects also converted to dicts.

kuhl_haus.mdp.helpers.structured_logging module

Structured JSON logging for Kubernetes/OpenObserve with human-readable dev mode.

Configures python-json-logger for production deployments (stdout → K8s → OpenObserve) or human-readable formatting for local development. Call setup_logging() once at application entry point; all subsequent getLogger() calls inherit configuration.

Usage:

In server entry point (ONCE at startup):

from kuhl_haus.mdp.helpers.structured_logging import setup_logging
setup_logging()

In ALL modules (library and servers):

import logging
logger = logging.getLogger(__name__)
logger.info("message here")
kuhl_haus.mdp.helpers.structured_logging.setup_logging(log_level: str | None = None, enable_console: bool = True, json_format: bool = True, include_trace_fields: bool = True) None[source]

Configure root logger for JSON or human-readable output; call once at application entry point.

Side effects: Modifies global logging.root configuration via dictConfig.

Call ONCE at application entry point. All subsequent getLogger() calls inherit this configuration automatically.

Parameters:
  • log_level – Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Defaults to LOG_LEVEL env var or INFO.

  • enable_console – Output to stdout (default: True).

  • json_format – Use JSON formatting (default: True). If False, uses human-readable format for local development.

  • include_trace_fields – Include detailed trace fields in JSON (filename, function, line, pid, thread).

Examples

Production (JSON to stdout for K8s/OpenObserve):

setup_logging()

Local development (human-readable):

setup_logging(log_level='DEBUG', json_format=False)

Minimal JSON (no trace fields):

setup_logging(include_trace_fields=False)
kuhl_haus.mdp.helpers.structured_logging.get_logger(name: str | None = None) Logger[source]

Return logger instance; prefer direct logging.getLogger(__name__) in production code.

Convenience wrapper around logging.getLogger(__name__). Prefer using logging.getLogger(__name__) directly in modules.

Parameters:

name – Logger name. If None, returns root logger.

Returns:

Configured logger instance.

Example:

logger = get_logger(__name__)
logger.info("Started processing")
kuhl_haus.mdp.helpers.structured_logging.log_exception(logger: Logger, message: str, exc_info: bool = True, **extra_fields) None[source]

Log exception with traceback and optional structured fields for debugging.

Parameters:
  • logger – Logger instance.

  • message – Error message.

  • exc_info – Include exception traceback (default: True).

  • **extra_fields – Additional fields to include in log.

Example:

try:
    risky_operation()
except Exception as e:
    log_exception(
        logger,
        "Operation failed",
        error_code="WDS001",
        user_id=user.id
    )
kuhl_haus.mdp.helpers.structured_logging.ensure_logging_configured()[source]

Auto-configure logging if setup_logging() was never called; safety net for library usage.

kuhl_haus.mdp.helpers.utils module

Utility functions for Massive API integration and data conversion.

Handles API key resolution from environment/file and TickerSnapshot serialization for caching and network transmission.

kuhl_haus.mdp.helpers.utils.get_massive_api_key()[source]

Resolve Massive API key from environment variables or Docker secret file.

Resolution order:

  1. MASSIVE_API_KEY environment variable

  2. POLYGON_API_KEY environment variable (legacy fallback)

  3. /app/massive_api_key.txt (Docker secret mount)

Raises:

ValueError – When no API key found via any method.

kuhl_haus.mdp.helpers.utils.ticker_snapshot_to_dict(snapshot: TickerSnapshot) Dict[str, Any][source]

Convert TickerSnapshot to JSON-serializable dict with camelCase keys matching Massive API format.

kuhl_haus.mdp.helpers.web_socket_message_serde module

Serialize and deserialize Massive WebSocket messages to/from JSON.

Handles conversion between Massive SDK model objects (EquityTrade, EquityAgg, EquityQuote, LimitUpLimitDown) and JSON strings for Redis storage and network transmission. Called at high frequency (1,000+ messages/sec at peak).

class kuhl_haus.mdp.helpers.web_socket_message_serde.WebSocketMessageSerde[source]

Bases: object

Convert between Massive WebSocket message objects and JSON representations.

static serialize(message: WebSocketMessage) str[source]

Convert any Massive WebSocket message to JSON string for storage/transmission.

static to_dict(message: WebSocketMessage) dict[source]

Convert any Massive WebSocket message to dictionary for manipulation.

static deserialize(serialized_message: str) LimitUpLimitDown | EquityAgg | EquityTrade | EquityQuote[source]

Reconstruct Massive WebSocket message object from JSON string.

Raises:

ArgumentTypeError – When event_type is not recognized.

static decode_limit_up_limit_down(message: LimitUpLimitDown) dict[source]
static serialize_limit_up_limit_down(message: LimitUpLimitDown) str[source]
static decode_equity_agg(message: EquityAgg) dict[source]
static serialize_equity_agg(message: EquityAgg) str[source]
static decode_equity_trade(message: EquityTrade) dict[source]
static serialize_equity_trade(message: EquityTrade) str[source]
static decode_equity_quote(message: EquityQuote) dict[source]
static serialize_equity_quote(message: EquityQuote) str[source]