popoto.pubsub.subscriber¶
popoto.pubsub.subscriber
¶
Redis Pub/Sub subscriber base class for reactive event handling.
This module provides the abstract Subscriber class, which enables a reactive, event-driven architecture within Popoto. While Popoto Models handle data persistence, Subscribers handle real-time data flow--allowing components to react to changes as they happen rather than polling for updates.
Design Philosophy
The Subscriber implements the Observer pattern over Redis pub/sub channels. This decouples publishers (who emit events) from subscribers (who react to them), enabling loose coupling between system components. A price update, for example, can trigger multiple independent calculations (moving averages, RSI, etc.) without the price publisher needing to know about any of them.
The callable interface (via call) is intentionally designed for polling loops. Rather than blocking on a message, each call checks for a message and returns immediately if none is available. This allows subscribers to be integrated into event loops, async frameworks, or simple while-true loops with sleep intervals.
Key Design Decisions
- Non-blocking by default: get_message() returns immediately, giving the caller control over timing and concurrency.
- Two-phase handling: pre_handle() runs before handle(), allowing subclasses to extract and validate data before the main logic executes. This separation keeps parsing concerns separate from business logic.
- Graceful error handling: Malformed messages are logged and discarded rather than crashing the subscriber. Only unexpected errors raise SubscriberException.
- msgpack serialization: Uses msgpack (with numpy support) for efficient binary serialization, matching the Publisher's format.
Integration with Finance Module
The finance module extends this pattern with specialized subscribers like IndicatorSubscriber that automatically compute technical indicators when new price data arrives. This creates a reactive pipeline: price updates flow through channels, triggering cascading indicator calculations.
Usage Example
class PriceAlertSubscriber(Subscriber): sub_channel_names = ['PriceStorage']
def handle(self, channel, data):
if data['price'] > self.threshold:
send_alert(f"Price crossed {self.threshold}")
In your event loop:¶
subscriber = PriceAlertSubscriber() while True: subscriber() # Check for and process one message time.sleep(0.1)
See Also
- Publisher: The counterpart that emits messages to channels
- popoto.finance.subscribers: Concrete implementations for financial data
SubscriberException
¶
Bases: Exception
Raised when a subscriber's message handler fails.
This exception wraps errors that occur within handle() or pre_handle() methods, providing context about which subscriber class failed. It is intentionally NOT raised for malformed messages (which are silently discarded) but only for unexpected errors that indicate a bug in the subscriber implementation.
The exception message includes the subscriber class name to aid debugging in systems with many subscriber types running concurrently.
Source code in src/popoto/pubsub/subscriber.py
Subscriber
¶
Bases: ABC
Abstract base class for consuming messages from Redis pub/sub channels.
Set sub_channel_names to the list of channels to subscribe to.
Override :meth:handle to process incoming messages. Call the instance
(subscriber()) in a loop to poll for new messages.
Subscriber provides a framework for building reactive components that respond to messages published on Redis channels. Subclasses define which channels to listen to and implement handle() to process incoming messages.
The class follows a pull-based model: you call the subscriber instance as a callable to check for and process one message at a time. This gives you full control over when and how often to check for messages, making it easy to integrate with various concurrency models (threads, asyncio, simple loops).
Attributes:
| Name | Type | Description |
|---|---|---|
sub_channel_names |
list
|
List of Redis channel names to subscribe to. Define this as a class attribute in your subclass. Multiple channels allow a single subscriber to react to events from different sources. |
Design Notes
- Each Subscriber instance creates its own Redis pubsub connection. This isolation prevents subscribers from interfering with each other.
- The ABC inheritance signals that this class should not be instantiated directly; subclasses must implement handle().
- Channel subscriptions happen at construction time, so the subscriber is immediately ready to receive messages after init completes.
Example
class OrderSubscriber(Subscriber): sub_channel_names = ['new_orders', 'order_updates']
def handle(self, channel, data):
if channel == 'new_orders':
process_new_order(data)
else:
update_order(data)
Source code in src/popoto/pubsub/subscriber.py
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 | |
pre_handle(channel, data, *args, **kwargs)
¶
Hook called before :meth:handle. Override for logging, filtering, etc.
Override this method to perform data extraction, validation, or transformation before the main handle() logic runs. This separation allows subclasses to standardize data parsing (e.g., extracting ticker symbols, timestamps) independently of business logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel
|
The channel name the message was received on (decoded string). |
required | |
data
|
The deserialized message payload (typically a dict). |
required | |
*args
|
Reserved for future use. |
()
|
|
**kwargs
|
Reserved for future use. |
{}
|
Note
Errors raised here will propagate as SubscriberException, stopping message processing. For validation that should skip invalid messages without crashing, catch exceptions and return early from handle().
Source code in src/popoto/pubsub/subscriber.py
handle(channel, data, *args, **kwargs)
¶
Process an incoming message. Override this in your subclass.
This is the main extension point for Subscriber subclasses. When a message arrives on any subscribed channel, this method is called with the decoded channel name and deserialized data payload. Implement your business logic here.
The default implementation logs a warning and discards the message, serving as a reminder to implement this method in subclasses.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel
|
The channel name the message arrived on. |
required | |
data
|
The deserialized (msgpack-unpacked) message payload. |
required | |
*args
|
Reserved for future use. |
()
|
|
**kwargs
|
Reserved for future use. |
{}
|
Returns:
| Type | Description |
|---|---|
|
None. Results should be achieved through side effects (saving to |
|
|
database, sending alerts, updating state, etc.). |
Example
def handle(self, channel, data): price = Price( ticker=data['ticker'], value=data['price'], timestamp=data['timestamp'] ) price.save()