Source code for kuhl_haus.mdp.components.market_data_scanner

"""Redis pub/sub consumer that analyzes market data and publishes derived results.

Scans post-processed market data using pluggable analyzers performing secondary processes such as:
event correlation, alert generation, trend analysis, pattern recognition, etc.
"""
import asyncio
import json
import logging
from typing import Any, Union, Optional, List

import redis.asyncio as aioredis
from redis.exceptions import ConnectionError

from kuhl_haus.mdp.analyzers.analyzer import Analyzer, AnalyzerOptions
from kuhl_haus.mdp.data.market_data_analyzer_result import MarketDataAnalyzerResult
from kuhl_haus.mdp.helpers.observability import get_tracer

tracer = get_tracer(__name__)


[docs] class MarketDataScanner: """Process Redis pub/sub market data through pluggable analyzers. Listens to Redis channels and passes messages through an Analyzer subclass, then caches results and publishes them back to Redis. Unlike RabbitMQ-fed processors, this component works entirely within Redis as a processor of enriched market data for event correlation, alert generation, trend analysis, pattern recognition, etc. Pattern/wildcard subscriptions are supported; rehydration from cache happens on startup to recover analyzer state after restarts. Threading: Single pub/sub task runs in asyncio event loop; message processing is sequential but does not block new messages (pub/sub buffer holds them). """ mdc_connected: bool processed: int decoding_error: int dropped: int error: int restarts: int
[docs] def __init__(self, redis_url: str, subscriptions: List[str], analyzer_class: Any, analyzer_options: AnalyzerOptions): self.redis_url = redis_url self.analyzer_options = analyzer_options self.logger = logging.getLogger(__name__) self.analyzer: Analyzer = None self.analyzer_class = analyzer_class # Connection objects self.redis_client = None # : aioredis.Redis = None self.pubsub_client: Optional[aioredis.client.PubSub] = None # State self.mdc_connected = False self.running = False self.subscriptions: List[str] = subscriptions self._pubsub_task: Union[asyncio.Task, None] = None # Metrics self.restarts = 0 self.processed = 0 self.decoding_errors = 0 self.empty_results = 0 self.published_results = 0 self.errors = 0
[docs] @tracer.start_as_current_span("mds.start") async def start(self): """Initialize Redis connections and begin consuming subscribed channels. Establishes async Redis client, creates pub/sub client, instantiates the analyzer, rehydrates analyzer state from cache, subscribes to configured channels, and spawns the pub/sub message handler task. Side effects: Spawns background asyncio task; writes to Redis during analyzer rehydration. """ self.logger.info("mds.starting") await self.connect() self.pubsub_client = self.redis_client.pubsub() self.analyzer = self.analyzer_class(options=self.analyzer_options) self.logger.info("mds rehydrating from cache") await self.analyzer.rehydrate() self.logger.info("mds rehydration complete") for subscription in self.subscriptions: if subscription.endswith("*"): await self.pubsub_client.psubscribe(subscription) else: await self.pubsub_client.subscribe(subscription) self._pubsub_task = asyncio.create_task(self._handle_pubsub()) self.logger.info("mds.started")
[docs] @tracer.start_as_current_span("mds.stop") async def stop(self): """Shutdown pub/sub task and close Redis connections. Cancels background task, unsubscribes from all channels, closes pub/sub client, and releases Redis connection pool. Side effects: Writes unsubscribe commands to Redis; closes network sockets. """ self.logger.info("mds.stopping") if self._pubsub_task: self._pubsub_task.cancel() try: await self._pubsub_task except asyncio.CancelledError: pass self._pubsub_task = None if self.pubsub_client: for subscription in self.subscriptions: if subscription.endswith("*"): await self.pubsub_client.punsubscribe(subscription) else: await self.pubsub_client.unsubscribe(subscription) await self.pubsub_client.close() self.pubsub_client = None if self.redis_client: await self.redis_client.close() self.redis_client = None self.mdc_connected = False self.logger.info("mds.stopped")
[docs] @tracer.start_as_current_span("mds.connect") async def connect(self, force: bool = False): """Establish async Redis (WDC) connection for result storage. Args: force: Reconnect even if already connected. """ if not self.mdc_connected or force: # Redis connection pool try: self.redis_client = aioredis.from_url( self.redis_url, encoding="utf-8", decode_responses=True, max_connections=1000, socket_connect_timeout=10, # Add timeout ) # Test Redis connection await self.redis_client.ping() self.mdc_connected = True self.logger.debug(f"Connected to Redis: {self.redis_url}") except Exception as e: self.logger.error(f"Failed to connect to Redis: {e}") raise
[docs] @tracer.start_as_current_span("mds.restart") async def restart(self): """Cycle scanner: stop, wait 1s, start. Increments restart counter for observability. """ try: await self.stop() await asyncio.sleep(1) await self.start() self.restarts += 1 except Exception as e: self.logger.error(f"Error restarting Market Data Scanner: {e}")
@tracer.start_as_current_span("mds._handle_pubsub") async def _handle_pubsub(self): """Background task that polls Redis pub/sub and delegates messages to analyzer. Runs indefinitely until cancelled. Handles subscription lifecycle events, processes data messages via _process_message, and auto-restarts on connection errors. Uses exponential backoff (max 60s) when no messages arrive. Side effects: Calls analyzer.analyze_data (may mutate analyzer state); writes results to Redis cache. """ try: self.logger.info("mds.pubsub.starting") message_count = 0 retry_count = 0 max_retry_interval = 60 self.running = True while True: # get_message() requires active subscriptions message = await self.pubsub_client.get_message( ignore_subscribe_messages=False, timeout=1.0 ) if message is None: # Timeout reached, no message available sleep with exponential backoff # to a maximum duration of max_retry_interval seconds retry_count += 1 self.logger.debug( f"mds.pubsub.message timeout reached, no message available. Retry count: {retry_count}") sleep_interval = min(2**retry_count, max_retry_interval) await asyncio.sleep(sleep_interval) continue else: retry_count = 0 msg_type = message.get("type") channel = message.get("channel") data = message.get("data") # Log subscription lifecycle events if msg_type == "subscribe" or msg_type == "psubscribe": self.logger.info(f"mds.pubsub.subscribed channel:{channel}, num_subs:{data}") elif msg_type == "unsubscribe" or msg_type == "punsubscribe": self.logger.info(f"mds.pubsub.unsubscribed channel:{channel}, num_subs:{data}") # Process actual data messages elif msg_type == "message" or msg_type == "pmessage": message_count += 1 self.logger.debug( f"mds.pubsub.message channel:{channel}, " f"data_len:{len(data)}, msg_num:{message_count}, data:{data}" ) await self._process_message(data=json.loads(data)) else: self.logger.warning(f"mds.pubsub.unknown message type: {msg_type}") self.dropped += 1 continue except ConnectionError as e: self.logger.error(f"mds.pubsub.connection_error error:{repr(e)}", e) self.running = False self.mdc_connected = False await self.restart() except asyncio.CancelledError: self.logger.info("mds.pubsub.cancelled") self.running = False self.mdc_connected = False raise except Exception as e: self.logger.error(f"mds.pubsub.error error:{repr(e)}", e) self.running = False self.mdc_connected = False raise @tracer.start_as_current_span("mds._process_message") async def _process_message(self, data: dict): """Delegate message to analyzer and cache results. Passes raw dict to analyzer.analyze_data (async), iterates over returned MarketDataAnalyzerResult objects, and caches each via cache_result. Tracks metrics for processed, empty, decode, and processing errors. Args: data: Deserialized JSON payload from Redis pub/sub channel. Side effects: Writes to Redis (SET + PUBLISH per result). """ try: # Delegate to analyzer (async) self.logger.debug(f"Processing message - data_len:{len(data)}") analyzer_results = await self.analyzer.analyze_data(data) self.processed += 1 if analyzer_results: for analyzer_result in analyzer_results: # Cache in Redis self.logger.debug(f"Caching message {analyzer_result.cache_key}") await self.cache_result(analyzer_result) self.published_results += 1 else: # Empty result - nothing to cache self.empty_results += 1 except json.JSONDecodeError as e: self.logger.error(f"JSON decode error: {e}") self.decoding_errors += 1 except Exception as e: self.logger.error(f"Processing error: {e}", exc_info=True) self.errors += 1
[docs] @tracer.start_as_current_span("mds.get_cache") async def get_cache(self, cache_key: str) -> Optional[dict]: """Retrieve cached analyzer result by key.""" value = await self.redis_client.get(cache_key) if value: return json.loads(value) return None
[docs] @tracer.start_as_current_span("mds.cache_result") async def cache_result(self, analyzer_result: MarketDataAnalyzerResult): """Write analyzer output to Redis and publish notification. Uses a non-transactional pipeline to SET the cache key (with optional TTL) and PUBLISH to the notification channel. Both operations execute atomically from a network perspective (single pipeline). Args: analyzer_result: Contains cache_key, cache_ttl, publish_key, and data. Side effects: Writes to Redis (SET + PUBLISH). """ result_json = json.dumps(analyzer_result.data) # Pipeline - no async context manager, no await on queue methods pipe = self.redis_client.pipeline(transaction=False) if analyzer_result.cache_key: if analyzer_result.cache_list_max is not None: # List-backed cache: LPUSH + LTRIM to cap at cache_list_max entries. # Used for event streams (e.g. HOD/LOD alert channels) where consumers # want the N most recent events rather than a single current value. pipe.lpush(analyzer_result.cache_key, result_json) pipe.ltrim(analyzer_result.cache_key, 0, analyzer_result.cache_list_max - 1) if analyzer_result.cache_ttl > 0: pipe.expire(analyzer_result.cache_key, analyzer_result.cache_ttl) elif analyzer_result.cache_ttl > 0: pipe.setex(analyzer_result.cache_key, analyzer_result.cache_ttl, result_json) else: pipe.set(analyzer_result.cache_key, result_json) if analyzer_result.publish_key: pipe.publish(analyzer_result.publish_key, result_json) await pipe.execute() self.logger.debug(f"Cached result for {analyzer_result.cache_key}")