kuhl_haus.mdp.analyzers package¶
Analyzers for real-time market data streams.
Provides base analyzer abstractions and concrete implementations for processing WebSocket events from Massive.com, including top stocks, top trades, and leaderboard analysis backed by Redis caching.
Submodules¶
kuhl_haus.mdp.analyzers.analyzer module¶
Base analyzer and configuration for market data processing.
Provides abstract interface and shared client factories for Redis-backed analyzers that consume WebSocket events from Massive.com and Finlight.
- class kuhl_haus.mdp.analyzers.analyzer.AnalyzerOptions(redis_url: str | None = None, massive_api_key: str | None = None, finlight_api_key: str | None = None, kwargs: Dict[str, ~typing.Any]=<factory>)[source]¶
Bases:
objectConfiguration for analyzer instances.
Encapsulates API keys and connection URLs for the two market data sources (Massive.com and Finlight) and Redis. Factory methods ensure consistent client instantiation across all analyzer implementations.
- massive_api_key¶
Massive.com REST API key. Required for analyzers that fetch reference data (snapshots, floats, avg volume).
- Type:
str | None
- finlight_api_key¶
Finlight REST/WebSocket API key. Required for analyzers that process or query financial news data.
- Type:
str | None
- kwargs¶
Arbitrary keyword arguments for subclass-specific configuration. Allows analyzer subclasses to accept additional parameters without modifying this class or breaking existing implementations.
- Type:
Dict[str, Any]
- new_redis_client(encoding: str = 'utf-8', decode_responses: bool = True, max_connections: int = 1000, connect_timeout: int = 10, **kwargs)[source]¶
Create async Redis client with connection pooling.
Pool size defaults to 1000 to support high-throughput pipelines under concurrent load (1,000+ events/sec).
- class kuhl_haus.mdp.analyzers.analyzer.Analyzer(options: AnalyzerOptions)[source]¶
Bases:
objectAbstract base for stateless market data analyzers.
Subclasses implement analyze_data to process WebSocket events and return results for caching and pub/sub distribution. All analyzers are designed to run concurrently across multiple instances without coordination beyond Redis atomics.
- __init__(options: AnalyzerOptions)[source]¶
- options: AnalyzerOptions¶
- async rehydrate()[source]¶
Restore analyzer state from Redis on startup.
Optional hook for analyzers that need to load cached data before processing events. Base implementation is a no-op.
- async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]¶
Process a single WebSocket event.
- Parameters:
data – Deserialized JSON from Massive.com WebSocket stream.
- Returns:
Results to cache and/or publish, or None if event should be discarded (e.g., throttled, filtered, or incomplete).
kuhl_haus.mdp.analyzers.daily_range_analyzer module¶
Daily range analyzer — session HOD/LOD tracking.
Subscribes to the quote:* feed and publishes per-symbol session
high/low (pre-market, regular, after-hours) to daily_range:{symbol}.
All state is maintained in process memory with Redis-backed rehydration on startup so restarts do not lose intraday data.
Session detection uses the Massive get_market_status() API (60-second
in-memory cache) so exchange holidays and early closes are handled correctly.
Pre-market H/L is frozen (not cleared) when the regular session opens — it remains visible in published payloads throughout the trading day.
Day boundary reset:
- Triggered by observing a transition to pre_market session after a
period of
None(market closed). All six HOD/LOD dicts are cleared. A Redis SET NX key scoped to the calendar date prevents duplicate resets across restarts and replicas.
- class kuhl_haus.mdp.analyzers.daily_range_analyzer.DailyRangeAnalyzer(options: AnalyzerOptions)[source]¶
Bases:
AnalyzerTracks intraday session highs and lows for real-time quote events.
Maintains per-symbol high/low for three intraday sessions (pre-market, regular, after-hours) in process memory. Publishes results to
daily_range:{symbol}in the WDC on every quote tick.Concurrency: HOD/LOD state is per-instance (not coordinated across replicas). Each instance maintains its own in-memory window.
- DAY_BOUNDARY_KEY = 'daily_range:day_boundary'¶
- __init__(options: AnalyzerOptions)[source]¶
- async rehydrate()[source]¶
Restore session HOD/LOD state from Redis on startup.
Scans all
daily_range:*keys and restores per-symbol H/L values into the in-memory session dicts for sessions that have already elapsed today.Only fields from sessions that have already occurred are restored:
pre_market→ pre-market fields onlyregular→ pre-market + regular-session fieldsafter_hours→ all six fieldsNone→ nothing (market closed; cannot determine which trading day the cached data belongs to)
This prevents yesterday’s regular/after-hours values from leaking into today’s pre-market display on restart.
Pre-market H/L is intentionally preserved through the regular session open and remains visible in published payloads throughout the day.
- async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]¶
Process a quote event and publish session HOD/LOD results.
Returns a list of one or more
MarketDataAnalyzerResultentries:Always: one state result published to
daily_range:{symbol}.Conditionally: one alert result per new session extreme (HOD or LOD), published to
daily_range_hod_alertordaily_range_lod_alert. Alerts are suppressed on the first tick for a symbol (no prior value to compare against) and after day-boundary resets.
- Parameters:
data – Quote event dict from the
quote:*feed.- Returns:
List of
MarketDataAnalyzerResult(state first, alerts appended), orNoneif the event could not be processed.
kuhl_haus.mdp.analyzers.finlight_data_analyzer module¶
Analyzer for Finlight news articles.
Processes articles from the Finlight WebSocket feed (via FinlightDataProcessor) and publishes them to Redis for real-time distribution to WDS clients.
Publishing strategy: - All articles → WidgetDataCacheKeys.NEWS_FEED_LATEST (news:feed:latest) - Enhanced articles → WidgetDataCacheKeys.NEWS_TICKER per US-listed company - Raw articles → WidgetDataCacheKeys.NEWS_TICKER for tickers parsed from title/summary
US exchange codes (MIC): - XNYS → NYSE - XASE → NYSE American (AMEX) - XNAS → NASDAQ
- class kuhl_haus.mdp.analyzers.finlight_data_analyzer.FinlightDataAnalyzer(options: AnalyzerOptions)[source]¶
Bases:
AnalyzerStateless analyzer that routes Finlight news articles to Redis pub/sub.
Receives article dicts (serialized via serde.to_dict) from FinlightDataProcessor and returns MarketDataAnalyzerResult objects for caching and publication.
Two modes depending on whether the article includes entity data: - Enhanced: companies field present → extract tickers via primaryListing.exchangeCode - Raw: no companies field → extract tickers via regex on title + summary
- __init__(options: AnalyzerOptions)[source]¶
- async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]¶
Process a single Finlight article and return routing results.
- Parameters:
data – Article dict serialized by serde.to_dict. Must be a non-empty dict.
- Returns:
feed:latest plus one per associated US ticker. Returns None for empty or invalid input.
- Return type:
List of MarketDataAnalyzerResult — one for news
kuhl_haus.mdp.analyzers.leaderboard_analyzer module¶
Real-time leaderboard analyzer using Redis sorted sets.
Processes EquityAgg events to maintain three leaderboards (volume, gappers, gainers) with automatic reset logic at market boundaries (4 AM ET, 9:30 AM ET). Designed for horizontal scaling—multiple instances coordinate via Redis atomics.
- class kuhl_haus.mdp.analyzers.leaderboard_analyzer.LeaderboardAnalyzer(options: AnalyzerOptions)[source]¶
Bases:
AnalyzerRedis-backed leaderboard analyzer using Sorted Sets.
Maintains real-time rankings for top volume, gappers (% from prev close), and gainers (% from open). Updates are batched via pipelines; publishing is throttled to ~1/sec cluster-wide. Sorted sets are trimmed to top 500 to prevent unbounded growth.
Concurrency: Safe for multiple instances. Redis atomics prevent race conditions on leaderboard updates and day/market boundary resets.
- LEADERBOARD_TOP_VOLUME = 'leaderboard:top_volume'¶
- LEADERBOARD_TOP_GAPPERS = 'leaderboard:top_gappers'¶
- LEADERBOARD_TOP_GAINERS = 'leaderboard:top_gainers'¶
- MARKET_DAY_KEY = 'leaderboard:market:current_day_start'¶
- MARKET_OPEN_RESET_KEY = 'leaderboard:market:open_reset:{date}'¶
- PUBLISH_THROTTLE_KEY = 'leaderboard:last_publish'¶
- __init__(options: AnalyzerOptions)[source]¶
- cache: MarketDataCache¶
- async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]¶
Process EquityAgg event and update Redis leaderboards.
Updates sorted sets atomically, checks day/market boundaries, and returns throttled leaderboard snapshots (~1/sec cluster-wide).
Side effects: Updates Redis sorted sets, hash keys for symbol metadata, and may reset leaderboards at market boundaries (4 AM ET, 9:30 AM ET).
kuhl_haus.mdp.analyzers.massive_data_analyzer module¶
Stateless event router for raw Massive.com WebSocket messages.
Dispatches incoming events (LULD, EquityAgg, Trade, Quote) to type-specific handlers that return cache/publish results. No state maintained—purely a pass-through with observability instrumentation.
- class kuhl_haus.mdp.analyzers.massive_data_analyzer.MassiveDataAnalyzer(options: AnalyzerOptions)[source]¶
Bases:
AnalyzerStateless event router for Massive.com WebSocket messages.
Maps event_type to handler functions that return cache/publish results. No external I/O or state—purely routing logic with metrics. Designed for simplicity; heavier analysis lives in specialized analyzers.
- __init__(options: AnalyzerOptions)[source]¶
- async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]¶
Process raw market data message.
Validates presence of event_type and symbol, then dispatches to handler. Unknown or malformed events are routed to unknown handler for debugging visibility.
- Parameters:
data – Deserialized message from Massive/Polygon.io WebSocket.
- handle_luld_event(data: dict, symbol: str) List[MarketDataAnalyzerResult] | None[source]¶
Handle Limit Up/Limit Down events (halts).
- handle_equity_agg_event(data: dict, symbol: str) List[MarketDataAnalyzerResult] | None[source]¶
Handle EquityAgg and EquityAggMin events (aggregated bars).
- handle_equity_trade_event(data: dict, symbol: str) List[MarketDataAnalyzerResult] | None[source]¶
Handle EquityTrade events (individual trades).
kuhl_haus.mdp.analyzers.top_stocks module¶
In-memory leaderboard analyzer (legacy prototype pattern).
Maintains top volume/gainers/gappers in a single TopStocksCacheItem instance. Rehydrates state from Redis on startup. Single-instance design—does not coordinate across replicas. Retained for historical context; production deployments prefer LeaderboardAnalyzer (Redis-native, stateless).
- class kuhl_haus.mdp.analyzers.top_stocks.TopStocksAnalyzer(options: AnalyzerOptions)[source]¶
Bases:
AnalyzerIn-memory leaderboard analyzer (single-instance).
Maintains TopStocksCacheItem in process memory with day/market boundary resets. Fetches external metadata (snapshot, avg volume, free float) on first symbol encounter, then caches in memory. Publishes once per second.
Not suitable for horizontal scaling—state lives in memory. Use LeaderboardAnalyzer for multi-instance deployments.
- __init__(options: AnalyzerOptions)[source]¶
- async rehydrate()[source]¶
Restore in-memory state from Redis on startup.
Clears cache if outside trading hours (Mon-Fri, 04:00-19:59 ET). Avoids stale data carryover from previous session.
- async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]¶
Process EquityAgg events and maintain in-memory leaderboards.
Checks day/market boundaries (4 AM ET, 9:30 AM ET) and resets state. Fetches external metadata on first symbol encounter. Publishes once per second (local throttle, not distributed).
Side effects: Updates in-memory cache_item, fetches REST API data for new symbols, writes results to Redis on publish.
kuhl_haus.mdp.analyzers.top_trades_analyzer module¶
Real-time trade analyzer using Redis Lists.
Stores recent trades per symbol in Redis Lists (sliding window), then aggregates stats (volume, trade count, max size) on publish. Publishes every 5 seconds cluster-wide via distributed throttle. Stateless design suitable for horizontal scaling.
- class kuhl_haus.mdp.analyzers.top_trades_analyzer.TopTradesAnalyzer(options: AnalyzerOptions)[source]¶
Bases:
AnalyzerRedis-backed trade analyzer using Lists for time-series data.
Maintains per-symbol trade history in Redis Lists (last 1,000 trades). On publish, scans for active symbols and computes aggregated stats from Lists without maintaining in-memory state. Throttles publishing to every 5 seconds cluster-wide.
Concurrency: Safe for multiple instances. Atomic LPUSH + LTRIM ensures clean sliding windows without coordination.
- TRADES_RECENT_PREFIX = 'tta:{symbol}:recent'¶
- TRADES_STATS_PREFIX = 'tta:{symbol}:stats'¶
- PUBLISH_THROTTLE_KEY = 'tta:last_publish'¶
- TRADE_TTL = 300¶
- TOP_TRADES_ALL_SYMBOLS_CACHE_KEY = 'tta:all_symbols:widget'¶
- TOP_TRADES_ALL_SYMBOLS_CACHE_TTL = 60¶
- TOP_TRADES_WIDGET_CACHE_KEY = 'tta:{symbol}:widget'¶
- TOP_TRADES_WIDGET_CACHE_TTL = 60¶
- MAX_TRADES_PER_SYMBOL = 1000¶
- PUBLISH_INTERVAL = 5¶
- __init__(options: AnalyzerOptions)[source]¶
- cache: MarketDataCache¶
- async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]¶
Process Trade event and update Redis trade history.
Stores trade in Redis List, then checks distributed throttle. If elected to publish, scans for all active symbols and aggregates stats.
Side effects: Writes to Redis List, may publish aggregated results every 5 seconds (one instance elected cluster-wide).