kuhl_haus.mdp.analyzers package

Analyzers for real-time market data streams.

Provides base analyzer abstractions and concrete implementations for processing WebSocket events from Massive.com, including top stocks, top trades, and leaderboard analysis backed by Redis caching.

Submodules

kuhl_haus.mdp.analyzers.analyzer module

Base analyzer and configuration for market data processing.

Provides abstract interface and shared client factories for Redis-backed analyzers that consume WebSocket events from Massive.com and Finlight.

class kuhl_haus.mdp.analyzers.analyzer.AnalyzerOptions(redis_url: str | None = None, massive_api_key: str | None = None, finlight_api_key: str | None = None, kwargs: Dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Configuration for analyzer instances.

Encapsulates API keys and connection URLs for the two market data sources (Massive.com and Finlight) and Redis. Factory methods ensure consistent client instantiation across all analyzer implementations.

redis_url

Redis connection URL. Required for all stateful analyzers.

Type:

str | None

massive_api_key

Massive.com REST API key. Required for analyzers that fetch reference data (snapshots, floats, avg volume).

Type:

str | None

finlight_api_key

Finlight REST/WebSocket API key. Required for analyzers that process or query financial news data.

Type:

str | None

kwargs

Arbitrary keyword arguments for subclass-specific configuration. Allows analyzer subclasses to accept additional parameters without modifying this class or breaking existing implementations.

Type:

Dict[str, Any]

redis_url: str | None = None
massive_api_key: str | None = None
finlight_api_key: str | None = None
kwargs: Dict[str, Any]
new_rest_client()[source]

Create Massive.com REST client if API key is configured.

new_finlight_client()[source]

Create Finlight API client if API key is configured.

new_redis_client(encoding: str = 'utf-8', decode_responses: bool = True, max_connections: int = 1000, connect_timeout: int = 10, **kwargs)[source]

Create async Redis client with connection pooling.

Pool size defaults to 1000 to support high-throughput pipelines under concurrent load (1,000+ events/sec).

__init__(redis_url: str | None = None, massive_api_key: str | None = None, finlight_api_key: str | None = None, kwargs: Dict[str, ~typing.Any]=<factory>) None
class kuhl_haus.mdp.analyzers.analyzer.Analyzer(options: AnalyzerOptions)[source]

Bases: object

Abstract base for stateless market data analyzers.

Subclasses implement analyze_data to process WebSocket events and return results for caching and pub/sub distribution. All analyzers are designed to run concurrently across multiple instances without coordination beyond Redis atomics.

__init__(options: AnalyzerOptions)[source]
options: AnalyzerOptions
async rehydrate()[source]

Restore analyzer state from Redis on startup.

Optional hook for analyzers that need to load cached data before processing events. Base implementation is a no-op.

async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]

Process a single WebSocket event.

Parameters:

data – Deserialized JSON from Massive.com WebSocket stream.

Returns:

Results to cache and/or publish, or None if event should be discarded (e.g., throttled, filtered, or incomplete).

kuhl_haus.mdp.analyzers.daily_range_analyzer module

Daily range analyzer — session HOD/LOD tracking.

Subscribes to the quote:* feed and publishes per-symbol session high/low (pre-market, regular, after-hours) to daily_range:{symbol}.

All state is maintained in process memory with Redis-backed rehydration on startup so restarts do not lose intraday data.

Session detection uses the Massive get_market_status() API (60-second in-memory cache) so exchange holidays and early closes are handled correctly.

Pre-market H/L is frozen (not cleared) when the regular session opens — it remains visible in published payloads throughout the trading day.

Day boundary reset: - Triggered by observing a transition to pre_market session after a

period of None (market closed). All six HOD/LOD dicts are cleared. A Redis SET NX key scoped to the calendar date prevents duplicate resets across restarts and replicas.

class kuhl_haus.mdp.analyzers.daily_range_analyzer.DailyRangeAnalyzer(options: AnalyzerOptions)[source]

Bases: Analyzer

Tracks intraday session highs and lows for real-time quote events.

Maintains per-symbol high/low for three intraday sessions (pre-market, regular, after-hours) in process memory. Publishes results to daily_range:{symbol} in the WDC on every quote tick.

Concurrency: HOD/LOD state is per-instance (not coordinated across replicas). Each instance maintains its own in-memory window.

DAY_BOUNDARY_KEY = 'daily_range:day_boundary'
__init__(options: AnalyzerOptions)[source]
async rehydrate()[source]

Restore session HOD/LOD state from Redis on startup.

Scans all daily_range:* keys and restores per-symbol H/L values into the in-memory session dicts for sessions that have already elapsed today.

Only fields from sessions that have already occurred are restored:

  • pre_market → pre-market fields only

  • regular → pre-market + regular-session fields

  • after_hours → all six fields

  • None → nothing (market closed; cannot determine which trading day the cached data belongs to)

This prevents yesterday’s regular/after-hours values from leaking into today’s pre-market display on restart.

Pre-market H/L is intentionally preserved through the regular session open and remains visible in published payloads throughout the day.

async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]

Process a quote event and publish session HOD/LOD results.

Returns a list of one or more MarketDataAnalyzerResult entries:

  • Always: one state result published to daily_range:{symbol}.

  • Conditionally: one alert result per new session extreme (HOD or LOD), published to daily_range_hod_alert or daily_range_lod_alert. Alerts are suppressed on the first tick for a symbol (no prior value to compare against) and after day-boundary resets.

Parameters:

data – Quote event dict from the quote:* feed.

Returns:

List of MarketDataAnalyzerResult (state first, alerts appended), or None if the event could not be processed.

kuhl_haus.mdp.analyzers.finlight_data_analyzer module

Analyzer for Finlight news articles.

Processes articles from the Finlight WebSocket feed (via FinlightDataProcessor) and publishes them to Redis for real-time distribution to WDS clients.

Publishing strategy: - All articles → WidgetDataCacheKeys.NEWS_FEED_LATEST (news:feed:latest) - Enhanced articles → WidgetDataCacheKeys.NEWS_TICKER per US-listed company - Raw articles → WidgetDataCacheKeys.NEWS_TICKER for tickers parsed from title/summary

US exchange codes (MIC): - XNYS → NYSE - XASE → NYSE American (AMEX) - XNAS → NASDAQ

class kuhl_haus.mdp.analyzers.finlight_data_analyzer.FinlightDataAnalyzer(options: AnalyzerOptions)[source]

Bases: Analyzer

Stateless analyzer that routes Finlight news articles to Redis pub/sub.

Receives article dicts (serialized via serde.to_dict) from FinlightDataProcessor and returns MarketDataAnalyzerResult objects for caching and publication.

Two modes depending on whether the article includes entity data: - Enhanced: companies field present → extract tickers via primaryListing.exchangeCode - Raw: no companies field → extract tickers via regex on title + summary

__init__(options: AnalyzerOptions)[source]
async rehydrate()[source]

No-op — stateless analyzer; no Redis state to restore.

async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]

Process a single Finlight article and return routing results.

Parameters:

data – Article dict serialized by serde.to_dict. Must be a non-empty dict.

Returns:

feed:latest plus one per associated US ticker. Returns None for empty or invalid input.

Return type:

List of MarketDataAnalyzerResult — one for news

kuhl_haus.mdp.analyzers.leaderboard_analyzer module

Real-time leaderboard analyzer using Redis sorted sets.

Processes EquityAgg events to maintain three leaderboards (volume, gappers, gainers) with automatic reset logic at market boundaries (4 AM ET, 9:30 AM ET). Designed for horizontal scaling—multiple instances coordinate via Redis atomics.

class kuhl_haus.mdp.analyzers.leaderboard_analyzer.LeaderboardAnalyzer(options: AnalyzerOptions)[source]

Bases: Analyzer

Redis-backed leaderboard analyzer using Sorted Sets.

Maintains real-time rankings for top volume, gappers (% from prev close), and gainers (% from open). Updates are batched via pipelines; publishing is throttled to ~1/sec cluster-wide. Sorted sets are trimmed to top 500 to prevent unbounded growth.

Concurrency: Safe for multiple instances. Redis atomics prevent race conditions on leaderboard updates and day/market boundary resets.

LEADERBOARD_TOP_VOLUME = 'leaderboard:top_volume'
LEADERBOARD_TOP_GAPPERS = 'leaderboard:top_gappers'
LEADERBOARD_TOP_GAINERS = 'leaderboard:top_gainers'
MARKET_DAY_KEY = 'leaderboard:market:current_day_start'
MARKET_OPEN_RESET_KEY = 'leaderboard:market:open_reset:{date}'
PUBLISH_THROTTLE_KEY = 'leaderboard:last_publish'
__init__(options: AnalyzerOptions)[source]
cache: MarketDataCache
async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]

Process EquityAgg event and update Redis leaderboards.

Updates sorted sets atomically, checks day/market boundaries, and returns throttled leaderboard snapshots (~1/sec cluster-wide).

Side effects: Updates Redis sorted sets, hash keys for symbol metadata, and may reset leaderboards at market boundaries (4 AM ET, 9:30 AM ET).

kuhl_haus.mdp.analyzers.massive_data_analyzer module

Stateless event router for raw Massive.com WebSocket messages.

Dispatches incoming events (LULD, EquityAgg, Trade, Quote) to type-specific handlers that return cache/publish results. No state maintained—purely a pass-through with observability instrumentation.

class kuhl_haus.mdp.analyzers.massive_data_analyzer.MassiveDataAnalyzer(options: AnalyzerOptions)[source]

Bases: Analyzer

Stateless event router for Massive.com WebSocket messages.

Maps event_type to handler functions that return cache/publish results. No external I/O or state—purely routing logic with metrics. Designed for simplicity; heavier analysis lives in specialized analyzers.

__init__(options: AnalyzerOptions)[source]
async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]

Process raw market data message.

Validates presence of event_type and symbol, then dispatches to handler. Unknown or malformed events are routed to unknown handler for debugging visibility.

Parameters:

data – Deserialized message from Massive/Polygon.io WebSocket.

handle_luld_event(data: dict, symbol: str) List[MarketDataAnalyzerResult] | None[source]

Handle Limit Up/Limit Down events (halts).

handle_equity_agg_event(data: dict, symbol: str) List[MarketDataAnalyzerResult] | None[source]

Handle EquityAgg and EquityAggMin events (aggregated bars).

handle_equity_trade_event(data: dict, symbol: str) List[MarketDataAnalyzerResult] | None[source]

Handle EquityTrade events (individual trades).

handle_equity_quote_event(data: dict, symbol: str) List[MarketDataAnalyzerResult] | None[source]

Handle EquityQuote events (bid/ask updates).

handle_unknown_event(data: dict) List[MarketDataAnalyzerResult] | None[source]

Handle unknown or malformed events for debugging visibility.

kuhl_haus.mdp.analyzers.top_stocks module

In-memory leaderboard analyzer (legacy prototype pattern).

Maintains top volume/gainers/gappers in a single TopStocksCacheItem instance. Rehydrates state from Redis on startup. Single-instance design—does not coordinate across replicas. Retained for historical context; production deployments prefer LeaderboardAnalyzer (Redis-native, stateless).

class kuhl_haus.mdp.analyzers.top_stocks.TopStocksAnalyzer(options: AnalyzerOptions)[source]

Bases: Analyzer

In-memory leaderboard analyzer (single-instance).

Maintains TopStocksCacheItem in process memory with day/market boundary resets. Fetches external metadata (snapshot, avg volume, free float) on first symbol encounter, then caches in memory. Publishes once per second.

Not suitable for horizontal scaling—state lives in memory. Use LeaderboardAnalyzer for multi-instance deployments.

__init__(options: AnalyzerOptions)[source]
async rehydrate()[source]

Restore in-memory state from Redis on startup.

Clears cache if outside trading hours (Mon-Fri, 04:00-19:59 ET). Avoids stale data carryover from previous session.

async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]

Process EquityAgg events and maintain in-memory leaderboards.

Checks day/market boundaries (4 AM ET, 9:30 AM ET) and resets state. Fetches external metadata on first symbol encounter. Publishes once per second (local throttle, not distributed).

Side effects: Updates in-memory cache_item, fetches REST API data for new symbols, writes results to Redis on publish.

async handle_equity_agg(event: EquityAgg)[source]

Update in-memory leaderboards for EquityAgg event.

Fetches snapshot, avg volume, and free float on first symbol encounter (with retries). Calculates derived metrics and updates three in-memory maps (volume, gappers, gainers) plus symbol data cache.

kuhl_haus.mdp.analyzers.top_trades_analyzer module

Real-time trade analyzer using Redis Lists.

Stores recent trades per symbol in Redis Lists (sliding window), then aggregates stats (volume, trade count, max size) on publish. Publishes every 5 seconds cluster-wide via distributed throttle. Stateless design suitable for horizontal scaling.

class kuhl_haus.mdp.analyzers.top_trades_analyzer.TopTradesAnalyzer(options: AnalyzerOptions)[source]

Bases: Analyzer

Redis-backed trade analyzer using Lists for time-series data.

Maintains per-symbol trade history in Redis Lists (last 1,000 trades). On publish, scans for active symbols and computes aggregated stats from Lists without maintaining in-memory state. Throttles publishing to every 5 seconds cluster-wide.

Concurrency: Safe for multiple instances. Atomic LPUSH + LTRIM ensures clean sliding windows without coordination.

TRADES_RECENT_PREFIX = 'tta:{symbol}:recent'
TRADES_STATS_PREFIX = 'tta:{symbol}:stats'
PUBLISH_THROTTLE_KEY = 'tta:last_publish'
TRADE_TTL = 300
TOP_TRADES_ALL_SYMBOLS_CACHE_KEY = 'tta:all_symbols:widget'
TOP_TRADES_ALL_SYMBOLS_CACHE_TTL = 60
TOP_TRADES_WIDGET_CACHE_KEY = 'tta:{symbol}:widget'
TOP_TRADES_WIDGET_CACHE_TTL = 60
MAX_TRADES_PER_SYMBOL = 1000
PUBLISH_INTERVAL = 5
__init__(options: AnalyzerOptions)[source]
cache: MarketDataCache
async analyze_data(data: dict) List[MarketDataAnalyzerResult] | None[source]

Process Trade event and update Redis trade history.

Stores trade in Redis List, then checks distributed throttle. If elected to publish, scans for all active symbols and aggregates stats.

Side effects: Writes to Redis List, may publish aggregated results every 5 seconds (one instance elected cluster-wide).