"""Real-time leaderboard analyzer using Redis sorted sets.
Processes EquityAgg events to maintain three leaderboards (volume, gappers,
gainers) with automatic reset logic at market boundaries (4 AM ET, 9:30 AM ET).
Designed for horizontal scaling—multiple instances coordinate via Redis atomics.
"""
import logging
from datetime import datetime, timezone
from typing import Optional, List
from zoneinfo import ZoneInfo
from kuhl_haus.mdp.analyzers.analyzer import Analyzer, AnalyzerOptions
from kuhl_haus.mdp.components.market_data_cache import MarketDataCache
from kuhl_haus.mdp.data.market_data_analyzer_result import MarketDataAnalyzerResult
from kuhl_haus.mdp.enum.market_data_cache_keys import MarketDataCacheKeys
from kuhl_haus.mdp.enum.market_data_cache_ttl import MarketDataCacheTTL
from kuhl_haus.mdp.enum.widget_data_cache_keys import WidgetDataCacheKeys
from kuhl_haus.mdp.enum.widget_data_cache_ttl import WidgetDataCacheTTL
from kuhl_haus.mdp.exceptions.data_analysis_exception import DataAnalysisException
from kuhl_haus.mdp.helpers.observability import get_tracer, get_meter
tracer = get_tracer(__name__)
[docs]
class LeaderboardAnalyzer(Analyzer):
"""Redis-backed leaderboard analyzer using Sorted Sets.
Maintains real-time rankings for top volume, gappers (% from prev close),
and gainers (% from open). Updates are batched via pipelines; publishing
is throttled to ~1/sec cluster-wide. Sorted sets are trimmed to top 500
to prevent unbounded growth.
Concurrency: Safe for multiple instances. Redis atomics prevent race
conditions on leaderboard updates and day/market boundary resets.
"""
# Redis key constants
LEADERBOARD_TOP_VOLUME = MarketDataCacheKeys.LEADERBOARD_TOP_VOLUME.value
LEADERBOARD_TOP_GAPPERS = MarketDataCacheKeys.LEADERBOARD_TOP_GAPPERS.value
LEADERBOARD_TOP_GAINERS = MarketDataCacheKeys.LEADERBOARD_TOP_GAINERS.value
MARKET_DAY_KEY = MarketDataCacheKeys.LEADERBOARD_MARKET_DAY_KEY.value
MARKET_OPEN_RESET_KEY = MarketDataCacheKeys.LEADERBOARD_MARKET_OPEN_RESET_KEY.value
PUBLISH_THROTTLE_KEY = MarketDataCacheKeys.LEADERBOARD_PUBLISH_THROTTLE_KEY.value
cache: MarketDataCache
[docs]
def __init__(self, options: AnalyzerOptions):
super().__init__(options)
self.logger = logging.getLogger(__name__)
self.redis_client = options.new_redis_client()
self.rest_client = options.new_rest_client()
self.cache = MarketDataCache(
rest_client=self.rest_client,
redis_client=self.redis_client,
)
meter = get_meter(__name__)
self.processed_counter = meter.create_counter(
name="lba.processed", description="Leaderboard Analyzer processed events", unit="1"
)
self.published_counter = meter.create_counter(
name="lba.published", description="Leaderboard Analyzer published results", unit="1"
)
self.errors_counter = meter.create_counter(
name="lba.errors", description="Leaderboard Analyzer errors", unit="1"
)
[docs]
@tracer.start_as_current_span("lba.analyze_data")
async def analyze_data(self, data: dict) -> Optional[List[MarketDataAnalyzerResult]]:
"""Process EquityAgg event and update Redis leaderboards.
Updates sorted sets atomically, checks day/market boundaries, and
returns throttled leaderboard snapshots (~1/sec cluster-wide).
Side effects: Updates Redis sorted sets, hash keys for symbol metadata,
and may reset leaderboards at market boundaries (4 AM ET, 9:30 AM ET).
"""
try:
self.processed_counter.add(1)
# Check and handle day/market boundaries
await self._check_day_boundary()
await self._check_market_open_reset()
# Update leaderboards atomically; returned mapping used for quote result
symbol_data = await self._update_leaderboards(data)
# Always publish per-symbol quote — every instance, every agg event.
# The leaderboard snapshots (top 500 fan-out) are throttled; the quote
# feed is per-symbol so there is no fan-out concern.
#
# _update_leaderboards returns the mapping it just wrote to Redis so we
# can build the quote result without a redundant hgetall round-trip.
symbol = data.get("symbol")
quote_result = None
if symbol_data:
quote_result = MarketDataAnalyzerResult(
data=symbol_data,
cache_key=f"{WidgetDataCacheKeys.QUOTE.value}:{symbol}",
cache_ttl=WidgetDataCacheTTL.QUOTE.value,
publish_key=f"{WidgetDataCacheKeys.QUOTE.value}:{symbol}",
)
# Throttled leaderboard publish (only one instance publishes per second)
should_publish = await self._check_publish_throttle()
if should_publish:
results = await self._build_leaderboard_results()
if quote_result:
results.append(quote_result)
self.published_counter.add(len(results))
return results
# Return just the quote result when not publishing leaderboards
if quote_result:
self.published_counter.add(1)
return [quote_result]
return None
except Exception as e:
self.errors_counter.add(1)
raise DataAnalysisException(f"Error processing {data.get('symbol', 'unknown symbol')}", e)
@tracer.start_as_current_span("lba._update_leaderboards")
async def _update_leaderboards(self, event: dict) -> Optional[dict]:
"""Update Redis sorted sets and symbol metadata atomically.
Fetches external metadata (snapshot, avg volume, free float) via
MarketDataCache, calculates derived metrics (pct_change, relative_vol),
and batches updates in a single pipeline. Trims sorted sets to top 500.
Returns the mapping dict written to Redis so the caller can build a quote
result without a redundant hgetall round-trip.
"""
symbol = event.get("symbol")
if not symbol:
return None
accumulated_volume = event.get("accumulated_volume", 0)
close = event.get("close", 0)
volume = event.get("volume", 0)
vwap = event.get("vwap", 0)
# Fetch external metadata (cached by MarketDataCache)
snapshot = await self.cache.get_ticker_snapshot(symbol)
avg_volume = await self.cache.get_avg_volume(symbol)
free_float = await self.cache.get_free_float(symbol)
# Calculate metrics
prev_day_close = snapshot.prev_day.close if snapshot and snapshot.prev_day else close
prev_day_open = snapshot.prev_day.open if snapshot and snapshot.prev_day else 0
prev_day_high = snapshot.prev_day.high if snapshot and snapshot.prev_day else 0
prev_day_low = snapshot.prev_day.low if snapshot and snapshot.prev_day else 0
prev_day_volume = snapshot.prev_day.volume if snapshot and snapshot.prev_day else volume
prev_day_vwap = snapshot.prev_day.vwap if snapshot and snapshot.prev_day else vwap
change = close - prev_day_close if prev_day_close else 0
pct_change = ((close - prev_day_close) / prev_day_close * 100) if prev_day_close else 0
relative_volume = (accumulated_volume / avg_volume) if avg_volume > 0 else 0
# Get open price for intraday calculation
open_price = await self._get_symbol_open_price(symbol, event)
change_since_open = close - open_price if open_price else 0
pct_change_since_open = ((close - open_price) / open_price * 100) if open_price else 0
# Atomic batch update
pipe = self.redis_client.pipeline()
# Update sorted sets (scores)
pipe.zadd(self.LEADERBOARD_TOP_VOLUME, {symbol: accumulated_volume})
pipe.zadd(self.LEADERBOARD_TOP_GAPPERS, {symbol: pct_change})
pipe.zadd(self.LEADERBOARD_TOP_GAINERS, {symbol: pct_change_since_open})
# Trim to the top 500 per leaderboard (prevent unbounded growth)
pipe.zremrangebyrank(self.LEADERBOARD_TOP_VOLUME, 0, -501)
pipe.zremrangebyrank(self.LEADERBOARD_TOP_GAPPERS, 0, -501)
pipe.zremrangebyrank(self.LEADERBOARD_TOP_GAINERS, 0, -501)
# Store symbol metadata (hash)
symbol_key = f"symbol:{symbol}:data"
mapping = {
"symbol": symbol,
"volume": volume,
"free_float": free_float or 0,
"accumulated_volume": accumulated_volume,
"relative_volume": relative_volume,
"official_open_price": event.get("official_open_price") or 0,
"vwap": vwap,
"open": open_price,
"close": close,
"high": event.get("high") or 0,
"low": event.get("low") or 0,
"aggregate_vwap": event.get("aggregate_vwap") or 0,
"average_size": event.get("average_size") or 0,
"avg_volume": avg_volume or 0,
"prev_day_close": prev_day_close,
"prev_day_open": prev_day_open,
"prev_day_high": prev_day_high,
"prev_day_low": prev_day_low,
"prev_day_volume": prev_day_volume,
"prev_day_vwap": prev_day_vwap,
"change": change,
"pct_change": pct_change,
"change_since_open": change_since_open,
"pct_change_since_open": pct_change_since_open,
"start_timestamp": event.get("start_timestamp") or 0,
"end_timestamp": event.get("end_timestamp") or 0,
}
pipe.hset(symbol_key, mapping=mapping)
pipe.expire(symbol_key, MarketDataCacheTTL.LEADERBOARD_ANALYZER.value)
try:
await pipe.execute()
except Exception as e:
self.logger.error(f"mapping: {mapping}")
self.logger.error(f"Error updating leaderboards for {symbol}: {e}")
return None
return mapping
@tracer.start_as_current_span("lba._build_leaderboard_results")
async def _build_leaderboard_results(self, limit: int = 500) -> List[MarketDataAnalyzerResult]:
"""Fetch top N from each leaderboard and return as MarketDataAnalyzerResults.
Hydrates symbol data from Redis hashes and returns separate results
for volume, gappers, and gainers leaderboards for widget consumption.
"""
results = []
# Fetch all three leaderboards
leaderboards = await self._fetch_leaderboards(limit)
if not leaderboards:
return results
# Create results for each leaderboard type
# Top Volume
if leaderboards["top_volume"]:
results.append(MarketDataAnalyzerResult(
data=leaderboards["top_volume"],
cache_key=WidgetDataCacheKeys.TOP_VOLUME_SCANNER.value,
cache_ttl=WidgetDataCacheTTL.TOP_VOLUME_SCANNER.value,
publish_key=WidgetDataCacheKeys.TOP_VOLUME_SCANNER.value,
))
# Top Gappers
if leaderboards["top_gappers"]:
results.append(MarketDataAnalyzerResult(
data=leaderboards["top_gappers"],
cache_key=WidgetDataCacheKeys.TOP_GAPPERS_SCANNER.value,
cache_ttl=WidgetDataCacheTTL.TOP_GAPPERS_SCANNER.value,
publish_key=WidgetDataCacheKeys.TOP_GAPPERS_SCANNER.value,
))
# Top Gainers
if leaderboards["top_gainers"]:
results.append(MarketDataAnalyzerResult(
data=leaderboards["top_gainers"],
cache_key=WidgetDataCacheKeys.TOP_GAINERS_SCANNER.value,
cache_ttl=WidgetDataCacheTTL.TOP_GAINERS_SCANNER.value,
publish_key=WidgetDataCacheKeys.TOP_GAINERS_SCANNER.value,
))
return results
@tracer.start_as_current_span("lba._fetch_leaderboards")
async def _fetch_leaderboards(self, limit: int = 500) -> dict:
"""Fetch top N from each sorted set with hydrated symbol data.
Uses ZREVRANGE to pull top symbols with scores, then hydrates each
leaderboard with full symbol metadata from Redis hashes.
"""
pipe = self.redis_client.pipeline()
# Get top symbols with scores
pipe.zrevrange(self.LEADERBOARD_TOP_VOLUME, 0, limit - 1, withscores=True)
pipe.zrevrange(self.LEADERBOARD_TOP_GAPPERS, 0, limit - 1, withscores=True)
pipe.zrevrange(self.LEADERBOARD_TOP_GAINERS, 0, limit - 1, withscores=True)
results = await pipe.execute()
volume_list, gappers_list, gainers_list = results
# Hydrate each leaderboard with symbol metadata
top_volume = await self._hydrate_leaderboard(volume_list, "accumulated_volume")
top_gappers = await self._hydrate_leaderboard(gappers_list, "pct_change")
top_gainers = await self._hydrate_leaderboard(gainers_list, "pct_change_since_open")
return {
"top_volume": top_volume,
"top_gappers": top_gappers,
"top_gainers": top_gainers,
}
@tracer.start_as_current_span("lba._hydrate_leaderboard")
async def _hydrate_leaderboard(self, symbol_scores: List, score_field: str) -> List[dict]:
"""Fetch symbol data for leaderboard entries.
Batch-fetches Redis hashes for all symbols and builds ranked list
with rank number, score, and full symbol metadata.
"""
if not symbol_scores:
return []
symbols = [s[0] for s in symbol_scores]
# Batch fetch symbol data
pipe = self.redis_client.pipeline()
for symbol in symbols:
pipe.hgetall(f"symbol:{symbol}:data")
data_list = await pipe.execute()
# Build ranked list
ranked = []
for i, (symbol, score) in enumerate(symbol_scores):
symbol_data = data_list[i]
if not symbol_data:
continue
# Convert Redis hash strings to appropriate types
entry = {
"rank": i + 1,
"symbol": symbol,
score_field: float(score),
**{k: self._convert_value(v) for k, v in symbol_data.items()}
}
ranked.append(entry)
return ranked
@staticmethod
def _convert_value(value: str):
"""Convert Redis string values to int/float where applicable."""
try:
# Try float first (handles integers too)
if '.' in value:
return float(value)
return int(value)
except (ValueError, AttributeError):
return value
@tracer.start_as_current_span("lba._check_publish_throttle")
async def _check_publish_throttle(self) -> bool:
"""Distributed throttle—only publish once per second across all instances.
Uses Redis SET NX with 1-second TTL to elect a single publisher per
second cluster-wide, preventing duplicate broadcasts.
"""
now = datetime.now(timezone.utc).timestamp()
# Atomic set-if-not-exists with 1-second expiry
was_set = await self.redis_client.set(
self.PUBLISH_THROTTLE_KEY,
str(now),
ex=1, # 1 second TTL
nx=True # Only set if not exists
)
return bool(was_set)
@tracer.start_as_current_span("lba._get_symbol_open_price")
async def _get_symbol_open_price(self, symbol: str, event: dict) -> float:
"""Get or set symbol's opening price for the trading day.
Caches open price in Redis with 24h TTL. Falls back to event's open
or close if not cached. Cleared at market open (9:30 AM ET) reset.
"""
open_key = f"symbol:{symbol}:open_price"
# Check Redis cache
cached_open = await self.redis_client.get(open_key)
if cached_open:
return float(cached_open)
# Use event's open or close as fallback
open_price = event.get("open", event.get("close", 0))
# Cache with 24h expiry
await self.redis_client.setex(open_key, 86400, str(open_price))
return open_price
@tracer.start_as_current_span("lba._check_day_boundary")
async def _check_day_boundary(self):
"""Reset leaderboards at 4 AM ET (new trading day).
Uses Lua script for atomic check-and-reset. Only one instance across
the cluster performs the reset; others see stored day key unchanged.
"""
et_now = datetime.now(timezone.utc).astimezone(ZoneInfo("America/New_York"))
current_day = et_now.replace(hour=4, minute=0, second=0, microsecond=0)
current_day_ts = str(int(current_day.timestamp()))
# Lua script for atomic check-and-reset
lua_script = """
local stored = redis.call('GET', KEYS[1])
if stored ~= ARGV[1] then
redis.call('SET', KEYS[1], ARGV[1])
redis.call('DEL', KEYS[2], KEYS[3], KEYS[4])
return 1
end
return 0
"""
reset = await self.redis_client.eval(
lua_script,
4,
self.MARKET_DAY_KEY,
self.LEADERBOARD_TOP_VOLUME,
self.LEADERBOARD_TOP_GAPPERS,
self.LEADERBOARD_TOP_GAINERS,
current_day_ts,
)
if reset:
self.logger.info(f"Day boundary reset at {current_day}")
@tracer.start_as_current_span("lba._check_market_open_reset")
async def _check_market_open_reset(self):
"""Reset 'gainers' leaderboard and open prices at 9:30 AM ET.
Only triggers during the 9:30–9:31 AM ET window. Uses SET NX to
ensure single execution per day across all instances. Scans and
deletes all symbol open price keys to establish fresh intraday baseline.
"""
et_now = datetime.now(timezone.utc).astimezone(ZoneInfo("America/New_York"))
# Only trigger during 9:30 AM hour
if not (et_now.hour == 9 and 30 <= et_now.minute < 31):
return
today = et_now.strftime("%Y-%m-%d")
reset_key = self.MARKET_OPEN_RESET_KEY.format(date=today)
# Atomic set-if-not-exists with 1 hour TTL
was_set = await self.redis_client.set(reset_key, "1", ex=3600, nx=True)
if was_set:
self.logger.info(f"Market open reset for {today}")
# Clear gainers leaderboard and open prices
pipe = self.redis_client.pipeline()
pipe.delete(self.LEADERBOARD_TOP_GAINERS)
# Scan and delete all open price keys
cursor = "0"
while cursor != 0:
cursor, keys = await self.redis_client.scan(
cursor=int(cursor) if cursor != "0" else 0,
match="symbol:*:open_price",
count=100
)
if keys:
pipe.delete(*keys)
await pipe.execute()