kuhl_haus.mdp.components package¶
Components for market data ingestion and processing.
Provides the core pipeline components including WebSocket listeners, data processors, queue management, market data caching and scanning, and widget data services for the Massive.com market data stream.
Submodules¶
kuhl_haus.mdp.components.finlight_data_listener module¶
WebSocket client wrapper for Finlight financial news streams.
Manages a persistent WebSocket connection to the Finlight news API, handles reconnection logic on disconnect, and delegates incoming articles to a user- provided handler. Designed to run as a long-lived background task that survives temporary connection failures.
Key design decisions (mirroring FinlightSimpleListener): - WebSocketOptions(takeover=True) to prevent multi-session conflicts - Direct await on connect coroutine (not asyncio.gather with swallowed exceptions) - while-loop reconnect in a single background task (no recursive start()) - includeEntities=True by default on enhanced subscriptions - Async handler support via loop.create_task() when message_handler is a coroutine
- class kuhl_haus.mdp.components.finlight_data_listener.FinlightDataListener(api_key: str, message_handler: Callable[[...], Awaitable] | Callable, query: str | None = None, tickers: List[str] | None = None, sources: List[str] | None = None, language: str | None = None, raw: bool = False, include_entities: bool = True, max_reconnects: int | None = None, **kwargs)[source]¶
Bases:
objectMaintain Finlight WebSocket connection with auto-reconnect logic.
Wraps the official Finlight SDK FinlightApi, providing lifecycle management (start/stop), connection health tracking, and automatic reconnection. When the WebSocket disconnects, automatically attempts to reconnect via a while-loop in a single background task. Delegates each incoming article to the provided message_handler callable.
Supports both sync and async message handlers — async handlers are scheduled via loop.create_task() since the Finlight SDK calls on_article synchronously.
Threading: Spawns a single asyncio.Task; caller is responsible for awaiting or managing the task lifecycle via start()/stop().
- __init__(api_key: str, message_handler: Callable[[...], Awaitable] | Callable, query: str | None = None, tickers: List[str] | None = None, sources: List[str] | None = None, language: str | None = None, raw: bool = False, include_entities: bool = True, max_reconnects: int | None = None, **kwargs)[source]¶
- async start()[source]¶
Spawn the background connection task.
Creates a single asyncio.Task that connects to Finlight and handles reconnection via a while-loop. Does not block.
Side effects: Spawns asyncio.Task; updates _running flag.
kuhl_haus.mdp.components.finlight_data_processor module¶
RabbitMQ consumer that processes Finlight news articles through pluggable analyzers.
Pulls messages from a dedicated RabbitMQ queue, deserializes JSON news articles, delegates analysis, and publishes results to Redis. Designed for high-throughput scenarios where multiple processors can scale horizontally, each consuming from the same queue with automatic load balancing.
- class kuhl_haus.mdp.components.finlight_data_processor.FinlightDataProcessor(rabbitmq_url: str, queue_name: str, redis_url: str, analyzer_class: Any, analyzer_options: AnalyzerOptions, prefetch_count: int = 100, max_concurrent_tasks: int = 500)[source]¶
Bases:
objectConsume RabbitMQ Finlight news queue with async concurrency control.
Connects to RabbitMQ and Redis, consumes messages from a single queue (e.g., ‘news’), deserializes JSON articles, passes through an Analyzer subclass, and writes results to Redis cache with pub/sub notifications. Prefetch and semaphore-based concurrency prevent memory exhaustion during traffic spikes.
Rehydration from Redis cache occurs on startup to restore analyzer state after restarts. Graceful shutdown waits for in-flight tasks to complete.
Concurrency: Uses asyncio.Semaphore (default 500) to cap concurrent message processing; prefetch_count (default 100) controls how many messages RabbitMQ delivers before waiting for ACKs.
- __init__(rabbitmq_url: str, queue_name: str, redis_url: str, analyzer_class: Any, analyzer_options: AnalyzerOptions, prefetch_count: int = 100, max_concurrent_tasks: int = 500)[source]¶
Initialize the Finlight data processor.
- Parameters:
rabbitmq_url – RabbitMQ connection URL.
queue_name – Name of the RabbitMQ queue to consume from (e.g., ‘news’).
redis_url – Redis connection URL used for result caching and pub/sub.
analyzer_class – Analyzer subclass to instantiate for each message.
analyzer_options – Configuration for the analyzer instance. Supplies API keys (Massive.com, Finlight) and any subclass-specific kwargs to the analyzer.
prefetch_count – RabbitMQ prefetch count. Higher values increase throughput at the cost of memory. Defaults to 100.
max_concurrent_tasks – Maximum number of messages processed concurrently via asyncio.Semaphore. Defaults to 500.
- async connect(force: bool = False)[source]¶
Establish async connections to RabbitMQ and Redis.
Creates RabbitMQ channel with prefetch QoS, verifies target queue exists, creates Redis connection pool, and tests connectivity via PING.
- Parameters:
force – Reconnect even if already connected.
- async start()[source]¶
Connect, rehydrate analyzer state, and begin consuming RabbitMQ queue.
Retries connection up to 5 times with exponential backoff, instantiates analyzer and rehydrates from cache, then starts consuming messages via _callback. Blocks until self.running is set to False or CancelledError.
Side effects: Spawns background message processing tasks; writes to Redis during analyzer rehydration.
kuhl_haus.mdp.components.finlight_data_queues module¶
RabbitMQ publisher for Finlight WebSocket article messages.
Routes incoming article messages from Finlight to a dedicated RabbitMQ queue. Uses a single per-queue channel for publishing to maximize throughput and minimize latency.
- class kuhl_haus.mdp.components.finlight_data_queues.FinlightDataQueues(rabbitmq_url, message_ttl: int, publisher_confirms: bool = True)[source]¶
Bases:
objectPublish Finlight article messages to RabbitMQ with a dedicated channel.
Creates a dedicated RabbitMQ channel for the articles queue to publish incoming article data with configurable TTL and delivery mode. Accepts either Pydantic models (with model_dump) or plain dicts.
Uses NOT_PERSISTENT delivery mode to prioritize throughput over durability, relying on queue TTL to expire unprocessed messages rather than fsync on every publish. Designed for real-time pipelines where fresh data is more valuable than stale data.
- connection: Connection | AbstractConnection¶
- async connect()[source]¶
Establish RabbitMQ connection and create a per-queue channel.
Opens a robust connection (auto-reconnect on failure), creates a dedicated channel for the articles queue with publisher confirms enabled, and verifies the target queue exists (passive declaration).
Side effects: Opens network socket; creates RabbitMQ channel.
- async handle_message(article)[source]¶
Serialize and publish a single Finlight article to RabbitMQ.
Accepts a Pydantic model (with model_dump) or a plain dict. Serializes to JSON, encodes to UTF-8 bytes, and publishes to the articles queue via its dedicated channel.
- Parameters:
article – A Pydantic model or dict representing the article data.
Side effects: Publishes to RabbitMQ; increments metrics counters.
- async shutdown()[source]¶
Close the channel and RabbitMQ connection.
Closes the articles channel, then closes the connection. Clears internal state and marks the component as disconnected.
Side effects: Closes network sockets.
- async setup_queues()[source]¶
Create the articles RabbitMQ queue with TTL and durability settings.
Called during deployment/setup to provision the queue. Opens connection, creates a per-queue channel, and declares the queue with x-message-ttl. Should be idempotent (queue settings must match if queue already exists).
Side effects: Creates durable RabbitMQ queue with configured TTL.
kuhl_haus.mdp.components.finlight_simple_listener module¶
Simplified Finlight WebSocket listener for real-time news article ingestion.
Uses the exact pattern verified to work with the Finlight SDK: - Sync on_article callback (SDK calls it synchronously) - WebSocketOptions(takeover=True) / RawWebSocketOptions(takeover=True) to
prevent multi-session conflicts
includeEntities=True on enhanced (non-raw) subscription for entity tagging
Serializes each incoming article to JSON and publishes to the RabbitMQ news queue via FinlightDataQueues.
- class kuhl_haus.mdp.components.finlight_simple_listener.FinlightSimpleListener(api_key: str, queues: FinlightDataQueues, raw: bool = False, include_entities: bool = True)[source]¶
Bases:
objectSimplified Finlight WebSocket listener that publishes articles to RabbitMQ.
Mirrors the pattern from the verified working Finlight SDK examples: - Sync on_article callback scheduled onto the event loop via create_task - WebSocketOptions(takeover=True) to avoid session conflicts - includeEntities=True for entity-tagged articles (enhanced mode)
Lifecycle: call start() to connect and begin consuming. Call stop() to disconnect. Designed to run as a background asyncio task.
kuhl_haus.mdp.components.market_data_cache module¶
Redis-backed cache for Massive.com API responses with TTL management.
Wraps Massive.com REST API calls with transparent Redis caching to reduce API load and improve latency. Provides specialized methods for ticker snapshots, average volume, and free float data with per-metric TTL policies. Negative caching prevents repeated API failures from overwhelming the service.
- class kuhl_haus.mdp.components.market_data_cache.MarketDataCache(rest_client: RESTClient, redis_client: Redis)[source]¶
Bases:
objectAsync cache layer for Massive.com API with configurable TTLs.
Provides read, write, broadcast, delete primitives plus specialized methods for retrieving ticker snapshots, average volume (30-day), and free float. Cache-aside pattern: check Redis first, fetch from Massive API on miss, then cache with appropriate TTL. Negative caching (short TTL) for missing/error responses prevents API hammering.
- async write(data: Any, cache_key: str, cache_ttl: int = 0)[source]¶
Write JSON-serialized data to Redis with optional TTL.
- Parameters:
data – Python object serializable to JSON.
cache_key – Redis key.
cache_ttl – Expiration in seconds; 0 = no expiration.
Side effects: Writes to Redis.
- async broadcast(data: Any, publish_key: str = None)[source]¶
Publish JSON-serialized data to Redis pub/sub channel.
Side effects: Publishes to Redis channel.
- async delete_ticker_snapshot(ticker: str)[source]¶
Invalidate cached snapshot for a ticker.
Called when fresh snapshot data arrives via WebSocket, ensuring cache coherence with real-time stream.
- async get_ticker_snapshot(ticker: str) TickerSnapshot[source]¶
Fetch ticker snapshot from cache or Massive API.
Returns cached TickerSnapshot if available. On a cache miss, uses a two-layer coordination strategy to prevent stampeding herd:
In-process: An
asyncio.Eventper ticker collapses N concurrent coroutines into one Redis lock attempt, eliminating redundant polling fromredis.asyncio.lockwaiters.Cross-instance: A Redis distributed lock ensures only one process calls the Massive API for a given ticker at a time.
If the winning coroutine fails (exception), the event is still signaled so waiters wake up, find the cache empty, and fall through to contend for the distributed lock themselves.
The underlying REST client includes retry logic with exponential backoff; exceptions bubble up to the caller. The API call is instrumented with a histogram so the lock TTL can be tuned from production data.
Side effects: Calls Massive API on cache miss; writes to Redis.
- async get_avg_volume(ticker: str)[source]¶
Retrieve 30-day average volume from cache or Massive API.
Returns cached average volume if available. On a cache miss, uses a two-layer coordination strategy to prevent stampeding herd:
In-process: An
asyncio.Eventper ticker collapses N concurrent coroutines into one Redis lock attempt, eliminating redundant polling fromredis.asyncio.lockwaiters.Cross-instance: A Redis distributed lock ensures only one process calls the Massive API for a given ticker at a time.
If the winning coroutine fails (exception), the event is still signaled so waiters wake up, find the cache empty, and fall through to contend for the distributed lock themselves.
Computes average volume from last 30 trading days if ratio data unavailable. Applies negative caching (short TTL) on failures. Returns 0 if no data available.
Side effects: Calls Massive API on cache miss; writes to Redis.
- async get_free_float(ticker: str)[source]¶
Retrieve free float shares from cache or experimental Massive API.
Returns cached free float if available. On a cache miss, uses a two-layer coordination strategy to prevent stampeding herd:
In-process: An
asyncio.Eventper ticker collapses N concurrent coroutines into one Redis lock attempt, eliminating redundant polling fromredis.asyncio.lockwaiters.Cross-instance: A Redis distributed lock ensures only one process calls the Massive API for a given ticker at a time.
If the winning coroutine fails (exception), the event is still signaled so waiters wake up, find the cache empty, and fall through to contend for the distributed lock themselves.
Applies negative caching on timeouts or HTTP errors to avoid hammering the API during outages. Returns 0 if unavailable or on error.
Side effects: HTTP GET to Massive API on cache miss; writes to Redis.
kuhl_haus.mdp.components.market_data_scanner module¶
Redis pub/sub consumer that analyzes market data and publishes derived results.
Scans post-processed market data using pluggable analyzers performing secondary processes such as: event correlation, alert generation, trend analysis, pattern recognition, etc.
- class kuhl_haus.mdp.components.market_data_scanner.MarketDataScanner(redis_url: str, subscriptions: List[str], analyzer_class: Any, analyzer_options: AnalyzerOptions)[source]¶
Bases:
objectProcess Redis pub/sub market data through pluggable analyzers.
Listens to Redis channels and passes messages through an Analyzer subclass, then caches results and publishes them back to Redis. Unlike RabbitMQ-fed processors, this component works entirely within Redis as a processor of enriched market data for event correlation, alert generation, trend analysis, pattern recognition, etc.
Pattern/wildcard subscriptions are supported; rehydration from cache happens on startup to recover analyzer state after restarts.
Threading: Single pub/sub task runs in asyncio event loop; message processing is sequential but does not block new messages (pub/sub buffer holds them).
- __init__(redis_url: str, subscriptions: List[str], analyzer_class: Any, analyzer_options: AnalyzerOptions)[source]¶
- async start()[source]¶
Initialize Redis connections and begin consuming subscribed channels.
Establishes async Redis client, creates pub/sub client, instantiates the analyzer, rehydrates analyzer state from cache, subscribes to configured channels, and spawns the pub/sub message handler task.
Side effects: Spawns background asyncio task; writes to Redis during analyzer rehydration.
- async stop()[source]¶
Shutdown pub/sub task and close Redis connections.
Cancels background task, unsubscribes from all channels, closes pub/sub client, and releases Redis connection pool.
Side effects: Writes unsubscribe commands to Redis; closes network sockets.
- async connect(force: bool = False)[source]¶
Establish async Redis (WDC) connection for result storage.
- Parameters:
force – Reconnect even if already connected.
- async restart()[source]¶
Cycle scanner: stop, wait 1s, start.
Increments restart counter for observability.
- async cache_result(analyzer_result: MarketDataAnalyzerResult)[source]¶
Write analyzer output to Redis and publish notification.
Uses a non-transactional pipeline to SET the cache key (with optional TTL) and PUBLISH to the notification channel. Both operations execute atomically from a network perspective (single pipeline).
- Parameters:
analyzer_result – Contains cache_key, cache_ttl, publish_key, and data.
Side effects: Writes to Redis (SET + PUBLISH).
kuhl_haus.mdp.components.massive_data_listener module¶
WebSocket client wrapper for Massive.com market data streams.
Manages a persistent WebSocket connection to Massive.com, handles reconnection logic with market status awareness, and delegates incoming messages to a user- provided handler. Designed to run as a long-lived background task that survives temporary connection failures and market closures.
- class kuhl_haus.mdp.components.massive_data_listener.MassiveDataListener(message_handler: Callable[[List[WebSocketMessage]], Awaitable] | Callable[[str | bytes], Awaitable], api_key: str, feed: Feed, market: Market, subscriptions: List[str], raw: bool = False, verbose: bool = False, max_reconnects: int | None = 5, secure: bool = True, **kwargs)[source]¶
Bases:
objectMaintain Massive.com WebSocket connection with auto-reconnect logic.
Wraps the official Massive SDK WebSocketClient, providing lifecycle management (start/stop/restart), connection health tracking, and market-aware reconnection. When the WebSocket disconnects during market hours, automatically attempts to reconnect. During market closures, polls market status every 60s and reconnects when the market reopens.
Threading: Spawns async task for ws_connection.connect(); caller is responsible for awaiting or managing the task lifecycle.
- property feed: Feed¶
Current WebSocket feed.
- property market: Market¶
Current WebSocket market.
- __init__(message_handler: Callable[[List[WebSocketMessage]], Awaitable] | Callable[[str | bytes], Awaitable], api_key: str, feed: Feed, market: Market, subscriptions: List[str], raw: bool = False, verbose: bool = False, max_reconnects: int | None = 5, secure: bool = True, **kwargs)[source]¶
- async start()[source]¶
Instantiate WebSocketClient and spawn connection task.
Creates WebSocketClient with configured feed/market/subscriptions, schedules async_task as a background task. Does not block; caller must await the task or manage it separately.
Side effects: Spawns asyncio.Task; updates connection_status dict.
- async stop()[source]¶
Shutdown WebSocket connection gracefully.
Cancels coroutine task, unsubscribes from all feeds, closes WebSocket, and resets connection status. Waits 1s between steps to allow in-flight messages to flush.
Side effects: Closes network socket; updates connection_status dict.
- async market_is_open() bool[source]¶
Check market status via REST API with exponential backoff.
Retries up to max_reconnects times on transient network failures (DNS errors, timeouts, etc.). Returns False on all failures so the caller can decide whether to retry or give up.
- Returns:
True if market is open, False if closed or on any exception.
- async async_task()[source]¶
Connect to Massive WebSocket and handle disconnections.
Calls ws_connection.connect(message_handler) and awaits it. On disconnect, checks market status via REST API: if market is open, reconnects immediately; if closed, polls every 60s until market reopens. Increments reconnect counter for observability.
Side effects: Network I/O (WebSocket + REST API); calls message_handler for each incoming message.
kuhl_haus.mdp.components.massive_data_processor module¶
RabbitMQ consumer that processes market data through pluggable analyzers.
Pulls messages from a dedicated RabbitMQ queue, deserializes WebSocket messages, delegates analysis, and publishes results to Redis. Designed for high-throughput scenarios where multiple processors can scale horizontally, each consuming from the same queue with automatic load balancing.
- class kuhl_haus.mdp.components.massive_data_processor.MassiveDataProcessor(rabbitmq_url: str, queue_name: str, redis_url: str, analyzer_class: Any, analyzer_options: AnalyzerOptions, prefetch_count: int = 100, max_concurrent_tasks: int = 500)[source]¶
Bases:
objectConsume RabbitMQ market data queue with async concurrency control.
Connects to RabbitMQ and Redis, consumes messages from a single queue (e.g., ‘trades’, ‘quotes’), deserializes them, passes through an Analyzer subclass, and writes results to Redis cache with pub/sub notifications. Prefetch and semaphore-based concurrency prevent memory exhaustion during traffic spikes.
Rehydration from Redis cache occurs on startup to restore analyzer state after restarts. Graceful shutdown waits for in-flight tasks to complete.
Concurrency: Uses asyncio.Semaphore (default 500) to cap concurrent message processing; prefetch_count (default 100) controls how many messages RabbitMQ delivers before waiting for ACKs.
- __init__(rabbitmq_url: str, queue_name: str, redis_url: str, analyzer_class: Any, analyzer_options: AnalyzerOptions, prefetch_count: int = 100, max_concurrent_tasks: int = 500)[source]¶
- async connect(force: bool = False)[source]¶
Establish async connections to RabbitMQ and Redis.
Creates RabbitMQ channel with prefetch QoS, verifies target queue exists, creates Redis connection pool, and tests connectivity via PING.
- Parameters:
force – Reconnect even if already connected.
- async start()[source]¶
Connect, rehydrate analyzer state, and begin consuming RabbitMQ queue.
Retries connection up to 5 times with exponential backoff, instantiates analyzer and rehydrates from cache, then starts consuming messages via _callback. Blocks until self.running is set to False or CancelledError.
Side effects: Spawns background message processing tasks; writes to Redis during analyzer rehydration.
kuhl_haus.mdp.components.massive_data_queues module¶
RabbitMQ publisher for Massive.com WebSocket messages with multi-channel concurrency.
Routes incoming WebSocket messages from Massive.com to dedicated RabbitMQ queues by message type (trades, quotes, aggregates, halts). Uses per-queue dedicated channels for parallel publishing to maximize throughput and minimize latency.
- class kuhl_haus.mdp.components.massive_data_queues.MassiveDataQueues(rabbitmq_url, message_ttl: int, publisher_confirms: bool = True)[source]¶
Bases:
objectPublish Massive.com WebSocket messages to RabbitMQ with per-queue channels.
Creates a dedicated RabbitMQ channel for each message type (trades, quotes, etc.) to enable concurrent publishing without channel contention. Messages are routed by type, serialized, and published with configurable TTL and delivery mode.
Uses NOT_PERSISTENT delivery mode to prioritize throughput over durability, relying on queue TTL to expire unprocessed messages rather than fsync on every publish. Designed for real-time pipelines where fresh data is more valuable than stale data.
Concurrency: All publications within a batch (handle_messages) run concurrently via asyncio.gather, each using its own channel for true parallelism.
- connection: Connection | AbstractConnection¶
- async connect()[source]¶
Establish RabbitMQ connection and create per-queue channels.
Opens a robust connection (auto-reconnect on failure), creates a dedicated channel for each queue with publisher confirms enabled, and verifies all target queues exist (passive declaration).
Side effects: Opens network socket; creates RabbitMQ channels.
- async handle_messages(msgs: List[WebSocketMessage])[source]¶
Route and publish batch of WebSocket messages to RabbitMQ.
Serializes all messages upfront, resolves target queue per message type, then publishes concurrently via asyncio.gather. Each message uses its queue’s dedicated channel for true parallel I/O.
Called by MassiveDataListener’s message handler; typically handles 1-100 messages per WebSocket frame during peak market activity.
- Parameters:
msgs – List of WebSocketMessage objects from Massive SDK.
Side effects: Publishes to RabbitMQ; increments metrics counters.
- async shutdown()[source]¶
Close all channels and RabbitMQ connection.
Iterates over channels dict, closing each in sequence, then closes the connection. Clears internal state.
Side effects: Closes network sockets.
- async setup_queues()[source]¶
Create RabbitMQ queues with TTL and durability settings.
Called during deployment/setup to provision queues. Opens connection, creates per-queue channels, and declares each queue with x-message-ttl argument. Should be idempotent (queue settings must match if queue already exists).
Side effects: Creates durable RabbitMQ queues with configured TTL.
kuhl_haus.mdp.components.widget_data_service module¶
WebSocket-to-Redis bridge for real-time market data delivery to clients.
Manages client WebSocket subscriptions to Redis pub/sub channels, handles fan-out from Redis messages to connected clients, and provides snapshot API for cached data. Designed for scenarios where multiple browser/dashboard clients subscribe to the same market data feeds with minimal latency.
- exception kuhl_haus.mdp.components.widget_data_service.UnauthorizedException[source]¶
Bases:
ExceptionRaised when client attempts unauthorized operation.
- class kuhl_haus.mdp.components.widget_data_service.WidgetDataService(redis_client: Redis, pubsub_client: PubSub)[source]¶
Bases:
objectFan out Redis pub/sub messages to WebSocket clients.
Maintains a registry of WebSocket connections per Redis channel, subscribes to channels on-demand as clients join, and fans out incoming pub/sub messages to all subscribers. The background pub/sub task starts lazily on first subscription and stops when the last client disconnects.
Pattern/wildcard subscriptions are supported. Clients can fetch snapshots from Redis cache via get_cache for initial state before streaming updates.
Concurrency: Single background task (_handle_pubsub) polls Redis and sends to all WebSockets in the event loop. Lock protects subscription dict mutations.
- async start()[source]¶
Verify Redis connectivity.
Pub/sub task starts lazily on first client subscription; this method simply PINGs Redis to confirm the connection is healthy.
- async stop()[source]¶
Cancel pub/sub task if running.
Does not close Redis client (caller owns that lifecycle).
- async subscribe(feed: str, websocket: WebSocket)[source]¶
Add WebSocket client to Redis channel subscription.
Creates Redis subscription if this is the first client for the channel. Starts background pub/sub task if this is the first subscription overall. Supports wildcard patterns (e.g.,
leaderboard:*).- Parameters:
feed – Redis channel name or pattern.
websocket – FastAPI WebSocket connection.
Side effects: Subscribes to Redis channel; spawns background task on first use.
- async unsubscribe(feed: str, websocket: WebSocket)[source]¶
Remove WebSocket client from Redis channel subscription.
Unsubscribes from Redis if this was the last client for the channel. Stops background pub/sub task if this was the last subscription overall.
- Parameters:
feed – Redis channel name or pattern.
websocket – FastAPI WebSocket connection.
Side effects: Unsubscribes from Redis channel; cancels background task if idle.
- async disconnect(websocket: WebSocket)[source]¶
Unsubscribe client from all channels.
Iterates over all feeds the WebSocket is subscribed to and calls unsubscribe for each. Called when WebSocket connection closes or client disconnects.
- async get_cache(cache_key: str, limit: int = 0) list[Any] | None | Any[source]¶
Retrieve cached market data for initial client snapshot.
Clients typically call this before subscribing to a feed to get current state, then receive incremental updates via WebSocket. Returns None if key not found.
Supports both Redis string keys (returns dict) and list keys (returns list of dicts). List keys are used for rolling news feed caches (news:feed:latest, news:ticker:*).
- Parameters:
cache_key – Redis key to fetch.
limit – Maximum number of items to return from list keys. 0 means fetch all.