"""In-memory leaderboard analyzer (legacy prototype pattern).
Maintains top volume/gainers/gappers in a single TopStocksCacheItem instance.
Rehydrates state from Redis on startup. Single-instance design—does not
coordinate across replicas. Retained for historical context; production
deployments prefer LeaderboardAnalyzer (Redis-native, stateless).
"""
import logging
import time
from datetime import datetime, timezone
from typing import Optional, List
from zoneinfo import ZoneInfo
from massive.websocket.models import (
EquityAgg,
EventType
)
from kuhl_haus.mdp.analyzers.analyzer import Analyzer, AnalyzerOptions
from kuhl_haus.mdp.components.market_data_cache import MarketDataCache
from kuhl_haus.mdp.data.market_data_analyzer_result import MarketDataAnalyzerResult
from kuhl_haus.mdp.data.top_stocks_cache_item import TopStocksCacheItem
from kuhl_haus.mdp.enum.widget_data_cache_keys import WidgetDataCacheKeys
from kuhl_haus.mdp.enum.widget_data_cache_ttl import WidgetDataCacheTTL
[docs]
class TopStocksAnalyzer(Analyzer):
"""In-memory leaderboard analyzer (single-instance).
Maintains TopStocksCacheItem in process memory with day/market boundary
resets. Fetches external metadata (snapshot, avg volume, free float) on
first symbol encounter, then caches in memory. Publishes once per second.
Not suitable for horizontal scaling—state lives in memory. Use
LeaderboardAnalyzer for multi-instance deployments.
"""
[docs]
def __init__(self, options: AnalyzerOptions):
super().__init__(options)
self.redis_client = options.new_redis_client()
self.rest_client = options.new_rest_client()
self.cache = MarketDataCache(
rest_client=self.rest_client,
redis_client=self.redis_client,
)
self.cache_key = WidgetDataCacheKeys.TOP_STOCKS_SCANNER.value
self.logger = logging.getLogger(__name__)
self.cache_item = TopStocksCacheItem()
self.last_update_time = 0
self.pre_market_reset = False
[docs]
async def rehydrate(self):
"""Restore in-memory state from Redis on startup.
Clears cache if outside trading hours (Mon-Fri, 04:00-19:59 ET).
Avoids stale data carryover from previous session.
"""
# Get current time in UTC, then convert to Eastern Time
utc_now = datetime.now(timezone.utc)
et_now = utc_now.astimezone(ZoneInfo("America/New_York"))
# Check if within trading hours: Mon-Fri, 04:00-19:59 ET
is_weekday = et_now.weekday() < 5
is_trading_hours = 4 <= et_now.hour < 20
if not is_weekday or not is_trading_hours:
self.cache_item = TopStocksCacheItem()
self.logger.info(f"Outside market hours ({et_now.strftime('%H:%M:%S %Z')}), clearing cache.")
return
data = await self.cache.read(self.cache_key)
if not data:
self.cache_item = TopStocksCacheItem()
self.logger.info("No data to rehydrate TopStocksCacheItem.")
return
self.cache_item = TopStocksCacheItem(**data)
self.logger.info("Rehydrated TopStocksCacheItem")
[docs]
async def analyze_data(self, data: dict) -> Optional[List[MarketDataAnalyzerResult]]:
"""Process EquityAgg events and maintain in-memory leaderboards.
Checks day/market boundaries (4 AM ET, 9:30 AM ET) and resets state.
Fetches external metadata on first symbol encounter. Publishes once
per second (local throttle, not distributed).
Side effects: Updates in-memory cache_item, fetches REST API data for
new symbols, writes results to Redis on publish.
"""
utc_now = datetime.now(timezone.utc)
et_now = utc_now.astimezone(ZoneInfo("America/New_York"))
current_day = et_now.replace(hour=4, minute=0, second=0, microsecond=0).timestamp()
if current_day != self.cache_item.day_start_time:
self.logger.info(f"New day: {current_day} - resetting cache.")
self.cache_item = TopStocksCacheItem()
self.cache_item.day_start_time = current_day
self.pre_market_reset = False
elif et_now.hour == 9 and et_now.minute == 30 and not self.pre_market_reset:
self.logger.info("Market is now open; resetting cache.")
self.cache_item = TopStocksCacheItem()
self.cache_item.day_start_time = current_day
self.pre_market_reset = True
event_type = data.get("event_type")
symbol = data.get("symbol")
if not event_type:
self.logger.info(f"Discarding data: {data}")
return None
elif not symbol:
self.logger.info(f"Discarding data: {data}")
return None
elif event_type == EventType.EquityAgg.value:
self.logger.debug(f"Processing EquityAgg: {data.get('symbol')}")
await self.handle_equity_agg(EquityAgg(**data))
elif event_type == EventType.EquityAggMin.value:
self.logger.debug(f"Processing EquityAggMin: {data.get('symbol')}")
await self.handle_equity_agg(EquityAgg(**data))
else:
self.logger.info(f"Discarding data: {data}")
return None
current_time = int(time.time())
# return results once per second
if current_time <= self.last_update_time:
return None
self.last_update_time = current_time
result = [
MarketDataAnalyzerResult(
data=self.cache_item.to_dict(),
cache_key=self.cache_key,
cache_ttl=WidgetDataCacheTTL.TOP_STOCKS_SCANNER.value,
),
MarketDataAnalyzerResult(
data=self.cache_item.top_volume(500),
cache_key=WidgetDataCacheKeys.TOP_VOLUME_SCANNER.value,
cache_ttl=WidgetDataCacheTTL.TOP_VOLUME_SCANNER.value,
publish_key=WidgetDataCacheKeys.TOP_VOLUME_SCANNER.value,
),
MarketDataAnalyzerResult(
data=self.cache_item.top_gainers(500),
cache_key=WidgetDataCacheKeys.TOP_GAINERS_SCANNER.value,
cache_ttl=WidgetDataCacheTTL.TOP_GAINERS_SCANNER.value,
publish_key=WidgetDataCacheKeys.TOP_GAINERS_SCANNER.value,
),
MarketDataAnalyzerResult(
data=self.cache_item.top_gappers(500),
cache_key=WidgetDataCacheKeys.TOP_GAPPERS_SCANNER.value,
cache_ttl=WidgetDataCacheTTL.TOP_GAPPERS_SCANNER.value,
publish_key=WidgetDataCacheKeys.TOP_GAPPERS_SCANNER.value,
)
]
return result
[docs]
async def handle_equity_agg(self, event: EquityAgg):
"""Update in-memory leaderboards for EquityAgg event.
Fetches snapshot, avg volume, and free float on first symbol encounter
(with retries). Calculates derived metrics and updates three in-memory
maps (volume, gappers, gainers) plus symbol data cache.
"""
# Get data from symbol data cache or Rest API
if event.symbol in self.cache_item.symbol_data_cache:
cached_data = self.cache_item.symbol_data_cache[event.symbol]
avg_volume = cached_data["avg_volume"]
prev_day_close = cached_data["prev_day_close"]
prev_day_volume = cached_data["prev_day_volume"]
prev_day_vwap = cached_data["prev_day_vwap"]
free_float = cached_data["free_float"]
else:
# Get snapshot for previous day's data
retry_count = 0
max_tries = 3
prev_day_close = 0
prev_day_volume = 0
prev_day_vwap = 0
while retry_count < max_tries:
try:
snapshot = await self.cache.get_ticker_snapshot(event.symbol)
prev_day_close = snapshot.prev_day.close
prev_day_volume = snapshot.prev_day.volume
prev_day_vwap = snapshot.prev_day.vwap
break
except AttributeError as e:
self.logger.error(f"AttributeError exception thrown when getting snapshot for {event.symbol}: {e}")
retry_count += 1
await self.cache.delete_ticker_snapshot(event.symbol)
except Exception as e:
self.logger.error(f"Failed to get snapshot for {event.symbol}: {e}", stack_info=True, exc_info=True)
retry_count += 1
await self.cache.delete_ticker_snapshot(event.symbol)
if retry_count == max_tries and prev_day_close == 0:
self.logger.error(f"Failed to get snapshot for {event.symbol} after {max_tries} tries.")
return
# Get average volume
retry_count = 0
max_tries = 3
avg_volume = 0
while retry_count < max_tries:
try:
avg_volume = await self.cache.get_avg_volume(event.symbol)
break
except Exception as e:
self.logger.error(f"Failed to get average volume for {event.symbol}: {e}")
retry_count += 1
if retry_count == max_tries and avg_volume == 0:
self.logger.error(f"Failed to get average volume for {event.symbol} after {max_tries} tries.")
return
# Get free float - this uses an experimental API
retry_count = 0
max_tries = 2
free_float = 0
while retry_count < max_tries:
try:
free_float = await self.cache.get_free_float(event.symbol)
break
except Exception as e:
self.logger.error(f"Failed to get free float for {event.symbol}: {e}")
retry_count += 1
if retry_count == max_tries and free_float == 0:
self.logger.error(f"Failed to get free float for {event.symbol} after {max_tries} tries.")
# Calculate relative volume
if avg_volume == 0:
relative_volume = 0
else:
relative_volume = event.accumulated_volume / avg_volume
# Calculate percentage change since previous close
if prev_day_close == 0:
change = 0
pct_change = 0
else:
change = event.close - prev_day_close
pct_change = change / prev_day_close * 100
# Calculate percentage change since opening bell
change_since_open = 0
pct_change_since_open = 0
if event.official_open_price:
change_since_open = event.close - event.official_open_price
pct_change_since_open = change_since_open / event.official_open_price * 100
# Sort top tickers by accumulated volume
self.cache_item.top_volume_map[event.symbol] = event.accumulated_volume
# Sort top gappers by percentage gain since the previous day's close
self.cache_item.top_gappers_map[event.symbol] = pct_change
# Sort top gainers by percentage gain since the opening bell
self.cache_item.top_gainers_map[event.symbol] = pct_change_since_open
# Update symbol data cache
self.cache_item.symbol_data_cache[event.symbol] = {
"symbol": event.symbol,
"volume": event.volume,
"free_float": free_float,
"accumulated_volume": event.accumulated_volume,
"relative_volume": relative_volume,
"official_open_price": event.official_open_price,
"vwap": event.vwap,
"open": event.open,
"close": event.close,
"high": event.high,
"low": event.low,
"aggregate_vwap": event.aggregate_vwap,
"average_size": event.average_size,
"avg_volume": avg_volume,
"prev_day_close": prev_day_close,
"prev_day_volume": prev_day_volume,
"prev_day_vwap": prev_day_vwap,
"change": change,
"pct_change": pct_change,
"change_since_open": change_since_open,
"pct_change_since_open": pct_change_since_open,
"start_timestamp": event.start_timestamp,
"end_timestamp": event.end_timestamp,
}