Source code for kuhl_haus.mdp.components.finlight_data_queues

"""RabbitMQ publisher for Finlight WebSocket article messages.

Routes incoming article messages from Finlight to a dedicated RabbitMQ queue.
Uses a single per-queue channel for publishing to maximize throughput and
minimize latency.
"""
import json
import logging

from datetime import datetime
from typing import List, Dict, Union

from aio_pika import Connection, Channel, connect_robust, Message
from aio_pika import DeliveryMode
from aio_pika.abc import AbstractConnection, AbstractChannel, AbstractExchange

from kuhl_haus.mdp.enum.finlight_data_queue import FinlightDataQueue
from kuhl_haus.mdp.helpers.observability import get_tracer, get_meter

tracer = get_tracer(__name__)


[docs] class FinlightDataQueues: """Publish Finlight article messages to RabbitMQ with a dedicated channel. Creates a dedicated RabbitMQ channel for the articles queue to publish incoming article data with configurable TTL and delivery mode. Accepts either Pydantic models (with model_dump) or plain dicts. Uses NOT_PERSISTENT delivery mode to prioritize throughput over durability, relying on queue TTL to expire unprocessed messages rather than fsync on every publish. Designed for real-time pipelines where fresh data is more valuable than stale data. """ rabbitmq_url: str queues: List[str] message_ttl: int publisher_confirms: bool connection: Union[Connection, AbstractConnection] channels: Dict[str, Union[Channel, AbstractChannel]] exchanges: Dict[str, AbstractExchange] connection_status: dict
[docs] def __init__( self, rabbitmq_url, message_ttl: int, publisher_confirms: bool = True ): self.logger = logging.getLogger(__name__) self.rabbitmq_url = rabbitmq_url self.queues = [ FinlightDataQueue.NEWS.value, ] self.message_ttl = message_ttl self.publisher_confirms = publisher_confirms self.channels = {} self.exchanges = {} self.connection_status = { "connected": False, "last_message_time": None, "messages_received": 0, FinlightDataQueue.NEWS.value: 0, "reconnect_attempts": 0, } # Metrics meter = get_meter(__name__) self.error_counter = meter.create_counter( name="fdq.error", description=( "Number of errors while processing messages " "from the Finlight data provider" ), unit="1" ) self.message_counter = meter.create_counter( name="fdq.messages", description=( "Number of messages received from the Finlight data provider" ), unit="1" ) self.counters = { FinlightDataQueue.NEWS.value: meter.create_counter( name=f"fdq.{FinlightDataQueue.NEWS.value}", description=( "Number of articles received from the Finlight data provider" ), unit="1" ), }
[docs] @tracer.start_as_current_span("fdq.connect") async def connect(self): """Establish RabbitMQ connection and create a per-queue channel. Opens a robust connection (auto-reconnect on failure), creates a dedicated channel for the articles queue with publisher confirms enabled, and verifies the target queue exists (passive declaration). Side effects: Opens network socket; creates RabbitMQ channel. """ self.connection = await connect_robust(self.rabbitmq_url) # Create a dedicated channel per queue for parallel I/O for q in self.queues: channel = await self.connection.channel( publisher_confirms=self.publisher_confirms, ) self.channels[q] = channel self.exchanges[q] = channel.default_exchange try: # Verify queues exist using the first available channel first_channel = next(iter(self.channels.values())) for q in self.queues: _ = await first_channel.declare_queue( q, passive=True ) # Don't create, just check self.connection_status["connected"] = self.connection is not None except Exception as e: self.logger.error(f"Fatal error while processing request: {e}") raise
[docs] @tracer.start_as_current_span("fdq.handle_message") async def handle_message(self, article): """Serialize and publish a single Finlight article to RabbitMQ. Accepts a Pydantic model (with model_dump) or a plain dict. Serializes to JSON, encodes to UTF-8 bytes, and publishes to the articles queue via its dedicated channel. Args: article: A Pydantic model or dict representing the article data. Side effects: Publishes to RabbitMQ; increments metrics counters. """ if not self.channels: self.logger.error("RabbitMQ channels not initialized") raise Exception("RabbitMQ channels not initialized") if not self.connection: self.logger.error("RabbitMQ connection not initialized") raise Exception("RabbitMQ connection not initialized") if not hasattr(article, "model_dump") and not isinstance(article, dict): raise Exception( f"Invalid article type: expected pydantic model or dict, " f"got {type(article).__name__}" ) try: self.message_counter.add(1) self.connection_status["messages_received"] += 1 self.connection_status["last_message_time"] = ( datetime.now().isoformat() ) if hasattr(article, "model_dump"): serialized = json.dumps(article.model_dump()) else: serialized = json.dumps(article) encoded = serialized.encode() rabbit_message = Message( body=encoded, # Persistent mode forces RabbitMQ to fsync to disk, # which adds significant latency per message. # Since we have short TTLs on the queues, the messages # are ephemeral by nature. delivery_mode=DeliveryMode.NOT_PERSISTENT, content_type="application/json", timestamp=datetime.now(), ) await self._publish_message( rabbit_message, FinlightDataQueue.NEWS.value ) except Exception as e: self.logger.error(f"Fatal error while processing message: {e}") self.error_counter.add(1) raise
@tracer.start_as_current_span("fdq._publish_message") async def _publish_message( self, rabbit_message: Message, queue_name: str ): """Publish message to RabbitMQ using the queue-specific channel. Retrieves the exchange for the target queue and publishes with routing_key equal to queue name (default exchange behavior). Logs errors but does not raise to avoid failing the caller. Side effects: Publishes to RabbitMQ; increments counter. """ try: exchange = self.exchanges[queue_name] await exchange.publish(rabbit_message, routing_key=queue_name) self.counters[queue_name].add(1) self.connection_status[queue_name] += 1 except Exception as e: self.logger.error( f"Error publishing to RabbitMQ queue {queue_name}: {e}" )
[docs] @tracer.start_as_current_span("fdq.shutdown") async def shutdown(self): """Close the channel and RabbitMQ connection. Closes the articles channel, then closes the connection. Clears internal state and marks the component as disconnected. Side effects: Closes network sockets. """ self.connection_status["connected"] = False for queue_name, channel in self.channels.items(): self.logger.info(f"Closing RabbitMQ channel for {queue_name}") await channel.close() self.logger.info(f"RabbitMQ channel for {queue_name} closed") self.channels.clear() self.exchanges.clear() self.logger.info("Closing RabbitMQ connection") await self.connection.close() self.logger.info("RabbitMQ connection closed")
[docs] @tracer.start_as_current_span("fdq.setup_queues") async def setup_queues(self): """Create the articles RabbitMQ queue with TTL and durability settings. Called during deployment/setup to provision the queue. Opens connection, creates a per-queue channel, and declares the queue with x-message-ttl. Should be idempotent (queue settings must match if queue already exists). Side effects: Creates durable RabbitMQ queue with configured TTL. """ self.connection = await connect_robust(self.rabbitmq_url) # Create a dedicated channel per queue for queue in self.queues: channel = await self.connection.channel( publisher_confirms=self.publisher_confirms, ) self.channels[queue] = channel self.exchanges[queue] = channel.default_exchange # Declare queue with message TTL await channel.declare_queue( queue, durable=True, arguments={"x-message-ttl": self.message_ttl} ) self.logger.info( f"{queue} queue created with {self.message_ttl}ms TTL " "(channel dedicated)" ) self.connection_status["connected"] = self.connection is not None