Architecture

Components Summary

Market Data Platform Context Diagram

Market Data Platform Context Diagram

Data Plane Components

Finlight Data Listener (FDL)

WebSocket client connecting to the Finlight news API, routing articles to the RabbitMQ news queue with minimal processing overhead.

Finlight Data Processor (FDP)

Async RabbitMQ consumer processing Finlight news articles through pluggable analyzers and writing results to Redis.

Massive Data Listener (MDL)

WebSocket client connecting to Massive.com, routing events to appropriate queues with minimal processing overhead.

Market Data Queues (MDQ)

RabbitMQ-based FIFO queues with 5-second TTL, buffering high-velocity streams for distributed processing.

Market Data Processor (MDP)

Horizontally-scalable event processors with semaphore-based concurrency (500 concurrent tasks), delegating to pluggable analyzers.

Market Data Scanner (MDS)

Redis pub/sub consumer that performs secondary analysis on enriched market data — event correlation, alert generation, trend analysis, and pattern recognition — through pluggable analyzers.

Market Data Cache (MDC)

Internal Redis store for analyzer state with TTL policies (5s-24h), atomic operations, and pub/sub distribution. Separate from WDC.

Widget Data Cache (WDC)

Client-facing Redis store for widget-ready results (scanner feeds, quotes, news) with TTL policies optimized for UI consumption. Separate from MDC.

Widget Data Service (WDS)

WebSocket-to-Redis bridge providing real-time streaming to client applications with fan-out pattern.

Control Plane

Service Control Plane (SCP)

OAuth authentication, SPA serving, runtime controls, and management API (external repository: kuhl-haus-mdp-app).

Observability

All components emit OpenTelemetry traces/metrics and structured JSON logs for Kubernetes/OpenObserve integration.

Deployment Model

The platform deploys to Kubernetes with independent scaling per component:

  • Data plane: Internal network only (MDL, MDQ, MDP, MDS, MDC, WDC)

  • Client interface: Exposed to client networks (WDS)

  • Control plane: External access (SCP)

All components run as Docker containers with automated deployment via Ansible playbooks and Kubernetes manifests (kuhl-haus-mdp-deployment repository).

Component Descriptions

@startuml Market Data Platform Architecture

skinparam componentStyle rectangle
skinparam backgroundColor white
skinparam shadowing false
skinparam ArrowColor #333333
skinparam ArrowThickness 2

title Kuhl Haus Market Data Platform - Component Architecture

cloud "Massive.com WebSocket API" as Massive #FFEEEE
cloud "Finlight News API" as Finlight #EEF4EE

package "Application Server" #EEEEEE {
    component "Service Control Plane (SCP)" as SCP #7ED321
}

package "Market Data Listener (MDL)" #DDDDDD {
    component "MassiveDataListener" as Listener #0099FF
    component "MassiveDataQueues" as Queues #0099FF
    component "QueueNameResolver" as Resolver #0099FF
}

package "Finlight Data Listener (FDL)" #DDDDDD {
    component "FinlightDataListener" as FDListener #00FF99
    component "FinlightDataQueues" as FDQueues #00FF99
}

queue "Market Data Queues (MDQ)" as MDQ #E67E22

package "Market Data Scanner (MDS)" #DDDDDD {
    component "Market Data Scanner" as MDScanner #DDDD11
    component "DailyRangeAnalyzer" as DRA #DDDD11
}

package "Market Data Processor (MDP)" #DDDDDD {
    component "MassiveDataProcessor" as Processor #0099FF
    component "MassiveDataAnalyzer" as Router #0099FF
    component "LeaderboardAnalyzer" as Leaderboard #0099FF
    component "TopTradesAnalyzer" as TopTrades #0099FF
    component "TopStocksAnalyzer" as TopStocks #0099FF
}

package "Finlight Data Processor (FDP)" #DDDDDD {
    component "FinlightDataProcessor" as FDProcessor #00FF99
    component "FinlightDataAnalyzer" as FDAnalyzer #00FF99
}

database "Market Data Cache (MDC)" as MDC #FF0011
database "Widget Data Cache (WDC)" as WDC #DD00DD

package "Widget Data Service (WDS)" #DDDDDD {
    component "WidgetDataService" as Widget #DD00DD
}

package "Observability" #FFF9E6 {
    component "OpenTelemetry" as OTel
    component "Structured Logging" as Logging
}

package "Shared Libraries" #F0F0F0 {
    component "ProcessManager" as PM
    component "WebSocketMessageSerde" as Serde
    component "Utils" as Utils
}

actor "Web Clients" as Clients

Massive -[#blue,thickness=3]-> Listener : WebSocket stream
Listener --> Resolver : route by type
Resolver --> Queues : publish
Queues -[#blue,thickness=3]-> MDQ : RabbitMQ (TTL: 5s)

Finlight -[#green,thickness=3]-> FDListener : WebSocket stream
FDListener --> FDQueues : handle_message
FDQueues -[#green,thickness=3]-> MDQ : RabbitMQ (news queue)

MDQ -[#green,thickness=3]-> FDProcessor : consume (news queue)
MDQ -[#blue,thickness=3]-> Processor : consume (prefetch: 100)

Processor <--> Router : returns MarketDataAnalyzerResult
Processor <--> Leaderboard : returns MarketDataAnalyzerResult
Processor <--> TopTrades : returns MarketDataAnalyzerResult
Processor <--> TopStocks : returns MarketDataAnalyzerResult

Router -[#red,thickness=3]-> MDC : analyzer cache (internal)
TopTrades -[#red,thickness=3]-> MDC : analyzer cache (internal)
TopStocks -[#red,thickness=3]-> MDC : analyzer cache (internal)
Leaderboard -[#red,thickness=3]-> MDC : analyzer cache (internal)
Processor -[#blue,thickness=3]-> WDC : widget results + pub/sub

FDProcessor <-[#black,thickness=3]-> FDAnalyzer : returns MarketDataAnalyzerResult
FDProcessor -[#green,thickness=3]-> WDC : widget results + pub/sub

DRA -[#DDDD11,thickness=3]-> WDC : analyzer cache (internal)
MDScanner <-[#black,thickness=3]-> DRA : returns MarketDataAnalyzerResult
MDScanner <-[#DDDD11,thickness=3]-> WDC : widget results + pub/sub

WDC -[#purple,thickness=3]-> Widget : Redis pub/sub
Widget -[#purple,thickness=3]-> Clients : WebSocket streaming

SCP -[#green,thickness=3]-> Clients : SPA + auth token

Listener .down.> OTel : traces/metrics
FDListener .down.> OTel : traces/metrics
Processor .down.> OTel : traces/metrics
FDProcessor .down.> OTel : traces/metrics
Widget .down.> OTel : traces/metrics
Listener .down.> Logging : JSON logs
FDListener .down.> Logging : JSON logs
Processor .down.> Logging : JSON logs
FDProcessor .down.> Logging : JSON logs
Widget .down.> Logging : JSON logs

Listener --> Serde : serialize
Processor --> PM : orchestrate
Listener --> Utils : Miscellaneous

note right of MDQ
  RabbitMQ
  Non-persistent messages
  Per-subscription queues
  FIFO ordering
end note

note right of MDC
  Market Data Cache (MDC)
  Redis db 0
  Internal analyzer state
  Leaderboards, snapshots,
  float, avg volume, tickers
  TTL policies (5s-7d)
end note

note right of WDC
  Widget Data Cache (WDC)
  Redis db 1
  Client-facing results
  Scanner feeds, quote feed,
  news feeds, top trades
  Pub/Sub to WDS
end note

note bottom of Processor
  Concurrency Model
  Semaphore limit: 500
  Async processing
  Horizontal scaling
  Stateless design
end note

note bottom of Listener
  Reconnection Strategy
  Market-aware
  Auto-reconnect
  60s polling (closed)
  Health tracking
end note

note as DeploymentNote
  Deployment: Kubernetes + Docker
  Independent scaling per component
  Data plane: internal network only
  WDS: exposed to clients
  SCP: external access
end note

@enduml

Market Data Platform Component Architecture

See the full-page diagram for a full-page view.

Finlight Data Listener (FDL)

The FDL connects to the Finlight news WebSocket API and routes incoming articles to the RabbitMQ news queue via FinlightDataQueues. It performs minimal processing — the listener delegates each article directly to FinlightDataQueues.handle_message, which serializes and publishes it. Auto-reconnect is handled by FinlightDataListener.

FDL runs as a container and scales independently of other components. FDL should not be accessible outside the data plane local network.

Code Libraries

Finlight Data Processor (FDP)

The FDP consumes news articles from the RabbitMQ news queue and delegates processing to a pluggable analyzer. Like the MDP, it uses semaphore-based concurrency control and writes results to Redis. The FDP is designed for a lower-throughput news feed rather than high-velocity tick data, so it runs as a single async processor rather than using ProcessManager parallelism.

FDP runs as a container and scales independently of other components. FDP should not be accessible outside the data plane local network.

Code Libraries

  • FinlightDataProcessor (components/finlight_data_processor.py) - Async RabbitMQ consumer with semaphore-based concurrency. Deserializes JSON article payloads directly (no WebSocketMessageSerde) and delegates to pluggable analyzers.

Massive Data Listener (MDL)

The MDL performs minimal processing on the messages. MDL inspects the message type for selecting the appropriate serialization method and destination queue. MDL implementations vary by market data provider — each provider gets its own Listener class (for example, a news-specific listener would be a separate implementation).

MDL runs as a container and scales independently of other components. The MDL should not be accessible outside the data plane local network.

Code Libraries

Market Data Queues (MDQ)

Purpose: Buffer high-velocity market data stream for server-side processing with aggressive freshness controls

  • Queue Type: FIFO with TTL (5-second max message age)

  • Cleanup Strategy: Discarded when TTL expires

  • Message Format: Timestamped JSON preserving original Massive.com structure

  • Durability: Non-persistent messages (speed over reliability for real-time data)

  • Independence: Queues operate completely independently - one queue per subscription

  • Technology: RabbitMQ

The MDQ should not be accessible outside the data plane local network.

Code Libraries

Market Data Processors (MDP)

The purpose of the MDP is to process raw real-time market data and delegate processing to data-specific handlers. This separation of concerns allows MDPs to handle any type of data and simplifies horizontal scaling. The MDP stores its processed results in the Market Data Cache (MDC).

The MDP:

  • Hydrates the in-memory cache on MDC

  • Processes market data

  • Publishes messages to pub/sub channels

  • Maintains cache entries in MDC

MDPs runs as containers and scale independently of other components. The MDPs should not be accessible outside the data plane local network.

Code Libraries

  • MassiveDataProcessor (components/massive_data_processor.py) - RabbitMQ consumer with semaphore-based concurrency control for high-throughput scenarios (1,000+ events/sec)

  • Analyzers (analyzers/)

    • MassiveDataAnalyzer (massive_data_analyzer.py) - Stateless event router dispatching by event type

    • LeaderboardAnalyzer (leaderboard_analyzer.py) - Redis sorted set leaderboards (volume, gappers, gainers) with day/market boundary resets and distributed throttling

    • TopTradesAnalyzer (top_trades_analyzer.py) - Redis List-based trade history with sliding window (last 1,000 trades/symbol) and aggregated statistics

    • TopStocksAnalyzer (top_stocks.py) - In-memory leaderboard prototype (legacy, single-instance)

  • MarketDataAnalyzerResult (data/market_data_analyzer_result.py) - Result envelope for analyzer output with cache/publish metadata

  • ProcessManager (helpers/process_manager.py) - Multiprocess orchestration for async workers with OpenTelemetry context propagation

Market Data Scanner (MDS)

The MDS performs secondary analysis on enriched market data already processed and published by MDPs. Unlike the MDP (which consumes raw RabbitMQ streams), the MDS subscribes directly to Redis pub/sub channels to receive post-processed data. This makes the MDS a Redis-only component, suited for secondary processing tasks such as event correlation, alert generation, trend analysis, and pattern recognition.

The MDS:

  • Subscribes to Redis pub/sub channels (including wildcard/pattern subscriptions)

  • Rehydrates analyzer state from the WDC cache on startup

  • Delegates messages to a pluggable Analyzer subclass

  • Writes results back to the Widget Data Cache (WDC) and publishes notifications

MDS instances run as containers and scale independently of other components. The MDS should not be accessible outside the data plane local network.

Code Libraries

  • MarketDataScanner (components/market_data_scanner.py) - Redis pub/sub consumer with pluggable analyzer pattern. Handles pattern/wildcard subscriptions, exponential-backoff idle polling (1s→60s cap), auto-restart on connection errors, and sequential message processing

  • Analyzers (analyzers/)

    • FinlightDataAnalyzer (finlight_data_analyzer.py) - Maintains capped news feed lists (news:feed:latest, news:ticker:{symbol}) in WDC with configurable TTLs

  • MarketDataAnalyzerResult (data/market_data_analyzer_result.py) - Result envelope for analyzer output with cache/publish metadata

  • AnalyzerOptions (analyzers/analyzer.py) - Shared configuration container (redis_url, finlight_api_key, massive_api_key, kwargs escape hatch for subclass-specific config)

  • WidgetDataCacheKeys enum (enum/widget_data_cache_keys.py) - WDC Redis key and channel name constants for all MDS-published data

  • WidgetDataCacheTTL enum (enum/widget_data_cache_ttl.py) - TTL values for WDC entries (quotes: 4 days, scanners: 4 days, news feed: 2 days, news ticker: 7 days)

Market Data Cache (MDC)

Purpose: Internal Redis store for analyzer state and intermediate market data produced by MDP analyzers.

  • Cache Type: In-memory persistent or with TTL

  • Queue Type: pub/sub

  • Technology: Redis (separate instance from WDC)

The MDC should not be accessible outside the data plane local network.

Code Libraries

  • MarketDataCache (components/market_data_cache.py) - Redis cache-aside layer for Massive.com API with TTL policies, negative caching, and specialized metric methods (snapshot, avg volume, free float)

  • MarketDataCacheKeys enum (enum/market_data_cache_keys.py) - Internal Redis cache key patterns and templates

  • MarketDataCacheTTL enum (enum/market_data_cache_ttl.py) - TTL values balancing freshness vs. API quotas vs. memory pressure (5s for trades, 24h for reference data)

  • MarketDataPubSubKeys enum (enum/market_data_pubsub_keys.py) - Redis pub/sub channel names (kept for backward compatibility; prefer WidgetDataCacheKeys for new work)

Widget Data Cache (WDC)

Purpose: Client-facing Redis store for widget-ready results produced by Analyzers and consumed by the Widget Data Service.

  • Cache Type: In-memory with TTL

  • Queue Type: pub/sub

  • Technology: Redis (separate instance from MDC)

The WDC holds scanner feeds, quote feeds, and news feeds — all data that flows directly to UI widgets via WDS. Separating WDC from MDC isolates client-facing load from internal analyzer state.

The WDC should not be accessible outside the data plane local network.

Code Libraries

  • WidgetDataCacheKeys enum (enum/widget_data_cache_keys.py) - Redis key and channel name constants for all WDC entries (scanner channels, quote feed, news feeds, top trades widget cache). Replaces MarketDataPubSubKeys for new work.

  • WidgetDataCacheTTL enum (enum/widget_data_cache_ttl.py) - TTL values for all WDC entries (quotes: 4 days, scanners: 4 days, news feed: 2 days, news ticker: 7 days)

Widget Data Service (WDS)

Purpose:

  1. WebSocket interface provides access to processed market data for client-side code

  2. Is the network-layer boundary between clients and the data that is available on the data plane

WDS runs as a container and scales independently of other components. WDS is the only data plane component that should be exposed to client networks.

Code Libraries

Service Control Plane (SCP)

Purpose:

  1. Authentication and authorization

  2. Serve static and dynamic content via py4web

  3. Serve SPA to authenticated clients

  4. Injects authentication token and WDS url into SPA environment for authenticated access to WDS

  5. Control plane for managing application components at runtime

  6. API for programmatic access to service controls and instrumentation.

The SCP requires access to the data plane network for API access to data plane components.

The SCP code is in the kuhl-haus/kuhl-haus-mdp-app repo.

Miscellaneous Code Libraries

  • Observability (helpers/observability.py) - OpenTelemetry tracer/meter factory for distributed tracing and metrics

  • StructuredLogging (helpers/structured_logging.py) - JSON logging for K8s/OpenObserve with dev mode support

  • Utils (helpers/utils.py) - API key resolution (MASSIVE_API_KEY → POLYGON_API_KEY → file) and TickerSnapshot serialization