Source code for kuhl_haus.mdp.analyzers.finlight_data_analyzer
"""Analyzer for Finlight news articles.
Processes articles from the Finlight WebSocket feed (via FinlightDataProcessor)
and publishes them to Redis for real-time distribution to WDS clients.
Publishing strategy:
- All articles → WidgetDataCacheKeys.NEWS_FEED_LATEST (news:feed:latest)
- Enhanced articles → WidgetDataCacheKeys.NEWS_TICKER per US-listed company
- Raw articles → WidgetDataCacheKeys.NEWS_TICKER for tickers parsed from title/summary
US exchange codes (MIC):
- XNYS → NYSE
- XASE → NYSE American (AMEX)
- XNAS → NASDAQ
"""
import logging
import re
from typing import Optional, List
from kuhl_haus.mdp.analyzers.analyzer import Analyzer, AnalyzerOptions
from kuhl_haus.mdp.data.market_data_analyzer_result import MarketDataAnalyzerResult
from kuhl_haus.mdp.enum.widget_data_cache_keys import WidgetDataCacheKeys
from kuhl_haus.mdp.enum.widget_data_cache_ttl import WidgetDataCacheTTL
from kuhl_haus.mdp.enum.finlight_data_cache import FinlightDataCache
[docs]
class FinlightDataAnalyzer(Analyzer):
"""Stateless analyzer that routes Finlight news articles to Redis pub/sub.
Receives article dicts (serialized via serde.to_dict) from FinlightDataProcessor
and returns MarketDataAnalyzerResult objects for caching and publication.
Two modes depending on whether the article includes entity data:
- Enhanced: companies field present → extract tickers via primaryListing.exchangeCode
- Raw: no companies field → extract tickers via regex on title + summary
"""
[docs]
def __init__(self, options: AnalyzerOptions):
super().__init__(options)
self.logger = logging.getLogger(__name__)
# MIC codes for US exchanges accepted for ticker routing
self.valid_exchanges = {"XNYS", "XASE", "XNAS"}
# Regex matching exchange-qualified tickers: (Nasdaq: ATOS), (NYSE:GM), (AMEX: XYZ)
self.ticker_re = re.compile(
r'\(\s*(?:nasdaq|nyse|amex)\s*:\s*([A-Z]{1,6})\s*\)',
re.IGNORECASE,
)
# Cache size limits — default to enum values, overridable via AnalyzerOptions.kwargs
self.news_feed_list_max: int = options.kwargs.get(
"news_feed_list_max", FinlightDataCache.NEWS_FEED_LIST_MAX.value
)
self.news_ticker_list_max: int = options.kwargs.get(
"news_ticker_list_max", FinlightDataCache.NEWS_TICKER_LIST_MAX.value
)
# Cache TTLs — default to enum values, overridable via AnalyzerOptions.kwargs
self.news_feed_cache_ttl: int = options.kwargs.get(
"news_feed_cache_ttl", WidgetDataCacheTTL.NEWS_FEED_LATEST.value
)
self.news_ticker_cache_ttl: int = options.kwargs.get(
"news_ticker_cache_ttl", WidgetDataCacheTTL.NEWS_TICKER.value
)
[docs]
async def rehydrate(self):
"""No-op — stateless analyzer; no Redis state to restore."""
pass
[docs]
async def analyze_data(self, data: dict) -> Optional[List[MarketDataAnalyzerResult]]:
"""Process a single Finlight article and return routing results.
Args:
data: Article dict serialized by serde.to_dict. Must be a non-empty dict.
Returns:
List of MarketDataAnalyzerResult — one for news:feed:latest plus one per
associated US ticker. Returns None for empty or invalid input.
"""
if not data:
return None
results: List[MarketDataAnalyzerResult] = [MarketDataAnalyzerResult(
data=data,
cache_key=WidgetDataCacheKeys.NEWS_FEED_LATEST.value,
cache_ttl=self.news_feed_cache_ttl,
publish_key=WidgetDataCacheKeys.NEWS_FEED_LATEST.value,
cache_list_max=self.news_feed_list_max,
)]
# All articles go to the feed regardless of mode
# Determine mode and extract tickers
companies = data.get("companies")
if companies:
tickers = self._extract_enhanced_tickers(companies)
else:
title = data.get("title", "")
summary = data.get("summary", "")
tickers = self._extract_raw_tickers(f"{title} {summary}")
for ticker in tickers:
results.append(MarketDataAnalyzerResult(
data=data,
cache_key=WidgetDataCacheKeys.NEWS_TICKER.value.format(ticker=ticker),
cache_ttl=self.news_ticker_cache_ttl,
publish_key=WidgetDataCacheKeys.NEWS_TICKER.value.format(ticker=ticker),
cache_list_max=self.news_ticker_list_max,
))
return results
def _extract_enhanced_tickers(self, companies: list) -> List[str]:
"""Extract US-listed tickers from enhanced article company list.
Checks each company's primaryListing.exchangeCode against valid_exchanges
(XNYS, XASE, XNAS). Companies listed on foreign exchanges are excluded.
Args:
companies: List of company dicts from the Finlight enhanced message.
Returns:
List of uppercase ticker symbols for US-listed companies.
"""
tickers = []
for company in companies:
primary = company.get("primaryListing")
if not primary:
continue
exchange_code = primary.get("exchangeCode", "")
if exchange_code in self.valid_exchanges:
ticker = primary.get("ticker") or company.get("ticker")
if ticker:
tickers.append(ticker.upper())
return tickers
def _extract_raw_tickers(self, text: str) -> List[str]:
"""Extract tickers from raw article text via exchange-qualified patterns.
Matches patterns like (Nasdaq: ATOS), (NYSE:GM), (AMEX: XYZ) in the
combined title + summary text.
Args:
text: Combined title and summary string.
Returns:
Deduplicated list of uppercase ticker symbols found in the text.
"""
matches = self.ticker_re.findall(text)
seen = []
for ticker in matches:
t = ticker.upper()
if t not in seen:
seen.append(t)
return seen