Source code for kuhl_haus.mdp.analyzers.massive_data_analyzer

"""Stateless event router for raw Massive.com WebSocket messages.

Dispatches incoming events (LULD, EquityAgg, Trade, Quote) to type-specific
handlers that return cache/publish results. No state maintained—purely a
pass-through with observability instrumentation.
"""
import logging
from time import time
from typing import List, Optional
from massive.websocket.models import EventType

from kuhl_haus.mdp.analyzers.analyzer import Analyzer, AnalyzerOptions
from kuhl_haus.mdp.data.market_data_analyzer_result import MarketDataAnalyzerResult
from kuhl_haus.mdp.enum.market_data_cache_keys import MarketDataCacheKeys
from kuhl_haus.mdp.enum.market_data_cache_ttl import MarketDataCacheTTL
from kuhl_haus.mdp.helpers.observability import get_meter, get_tracer

tracer = get_tracer(__name__)


[docs] class MassiveDataAnalyzer(Analyzer): """Stateless event router for Massive.com WebSocket messages. Maps event_type to handler functions that return cache/publish results. No external I/O or state—purely routing logic with metrics. Designed for simplicity; heavier analysis lives in specialized analyzers. """
[docs] def __init__(self, options: AnalyzerOptions): super().__init__(options) self.logger = logging.getLogger(__name__) self.event_handlers = { EventType.LimitUpLimitDown.value: self.handle_luld_event, EventType.EquityAgg.value: self.handle_equity_agg_event, EventType.EquityAggMin.value: self.handle_equity_agg_event, EventType.EquityTrade.value: self.handle_equity_trade_event, EventType.EquityQuote.value: self.handle_equity_quote_event, } # Metrics self.meter = get_meter(__name__) self.processed_counter = self.meter.create_counter( name="mda.processed", description="Massive Data Analyzer processed events", unit="1" ) self.luld_counter = self.meter.create_counter( name="mda.luld", description="Massive Data Analyzer processed LULD events", unit="1" ) self.agg_counter = self.meter.create_counter( name="mda.agg", description="Massive Data Analyzer processed Agg events", unit="1" ) self.trade_counter = self.meter.create_counter( name="mda.trade", description="Massive Data Analyzer processed Trade events", unit="1" ) self.quote_counter = self.meter.create_counter( name="mda.quote", description="Massive Data Analyzer processed Quote events", unit="1" ) self.unknown_counter = self.meter.create_counter( name="mda.unknown", description="Massive Data Analyzer processed unknown events", unit="1" )
[docs] @tracer.start_as_current_span("mda.analyze_data") async def analyze_data(self, data: dict) -> Optional[List[MarketDataAnalyzerResult]]: """Process raw market data message. Validates presence of event_type and symbol, then dispatches to handler. Unknown or malformed events are routed to unknown handler for debugging visibility. Args: data: Deserialized message from Massive/Polygon.io WebSocket. """ if "event_type" not in data: self.logger.info("Message missing 'event_type'") return self.handle_unknown_event(data) event_type = data.get("event_type") if "symbol" not in data: self.logger.info("Message missing 'symbol'") return self.handle_unknown_event(data) symbol = data.get("symbol") if event_type in self.event_handlers: self.processed_counter.add(1) return self.event_handlers[event_type](**{"data": data, "symbol": symbol}) else: self.logger.warning(f"Unsupported message type: {event_type}") return self.handle_unknown_event(data)
[docs] @tracer.start_as_current_span("mda.handle_luld_event") def handle_luld_event(self, data: dict, symbol: str) -> Optional[List[MarketDataAnalyzerResult]]: """Handle Limit Up/Limit Down events (halts).""" self.luld_counter.add(1) return [MarketDataAnalyzerResult( data=data, cache_key=f"{MarketDataCacheKeys.HALTS.value}:{symbol}", cache_ttl=MarketDataCacheTTL.HALTS.value, publish_key=f"{MarketDataCacheKeys.HALTS.value}:{symbol}", )]
[docs] @tracer.start_as_current_span("mda.handle_equity_agg_event") def handle_equity_agg_event(self, data: dict, symbol: str) -> Optional[List[MarketDataAnalyzerResult]]: """Handle EquityAgg and EquityAggMin events (aggregated bars).""" self.agg_counter.add(1) return [MarketDataAnalyzerResult( data=data, cache_key=f"{MarketDataCacheKeys.AGGREGATE.value}:{symbol}", cache_ttl=MarketDataCacheTTL.AGGREGATE.value, publish_key=f"{MarketDataCacheKeys.AGGREGATE.value}:{symbol}", )]
[docs] @tracer.start_as_current_span("mda.handle_equity_trade_event") def handle_equity_trade_event(self, data: dict, symbol: str) -> Optional[List[MarketDataAnalyzerResult]]: """Handle EquityTrade events (individual trades).""" self.trade_counter.add(1) return [MarketDataAnalyzerResult( data=data, cache_key=f"{MarketDataCacheKeys.TRADES.value}:{symbol}", cache_ttl=MarketDataCacheTTL.TRADES.value, publish_key=f"{MarketDataCacheKeys.TRADES.value}:{symbol}", )]
[docs] @tracer.start_as_current_span("mda.handle_equity_quote_event") def handle_equity_quote_event(self, data: dict, symbol: str) -> Optional[List[MarketDataAnalyzerResult]]: """Handle EquityQuote events (bid/ask updates).""" self.quote_counter.add(1) return [MarketDataAnalyzerResult( data=data, cache_key=f"{MarketDataCacheKeys.QUOTES.value}:{symbol}", cache_ttl=MarketDataCacheTTL.QUOTES.value, publish_key=f"{MarketDataCacheKeys.QUOTES.value}:{symbol}", )]
[docs] @tracer.start_as_current_span("mda.handle_unknown_event") def handle_unknown_event(self, data: dict) -> Optional[List[MarketDataAnalyzerResult]]: """Handle unknown or malformed events for debugging visibility.""" self.unknown_counter.add(1) timestamp = f"{time()}".replace('.', '') cache_key = f"{MarketDataCacheKeys.UNKNOWN.value}:{timestamp}" return [MarketDataAnalyzerResult( data=data, cache_key=cache_key, cache_ttl=MarketDataCacheTTL.UNKNOWN.value, publish_key=f"{MarketDataCacheKeys.UNKNOWN.value}", )]