PubSub¶
Popoto provides real-time messaging through Redis pub/sub channels. In a food delivery app, order status changes need to reach drivers, customers, and restaurant dashboards the moment they happen. The pub/sub pattern lets you broadcast these updates without tight coupling between services -- publishers send messages to named channels, and all active subscribers receive them simultaneously.
Data is serialized with msgpack (with numpy support), so you can publish dicts containing numbers, strings, lists, and numpy arrays.
Publisher¶
The Publisher class sends messages to Redis channels. Subclass it and call publish() to broadcast
order status changes across your system.
from popoto.pubsub import Publisher
class OrderStatusPublisher(Publisher):
pass
publisher = OrderStatusPublisher(channel_name="order_updates")
publisher.publish(data={
"order_id": "abc123",
"status": "picked_up",
"driver": "Maria",
})
The publish() method returns the number of active subscribers that received the message.
subscriber_count = publisher.publish(data={
"order_id": "abc123",
"status": "delivered",
})
print(subscriber_count)
# => 3
Custom Channel Names¶
By default, the channel name is the class name. You can override it at initialization with the
channel_name parameter, or on a per-call basis when publishing.
# Defaults to class name
publisher = OrderStatusPublisher()
print(publisher.channel_name)
# => "OrderStatusPublisher"
# Override at init
publisher = OrderStatusPublisher(channel_name="order_updates")
print(publisher.channel_name)
# => "order_updates"
# Override per call
publisher.publish(
data={"order_id": "abc123", "status": "cancelled"},
channel_name="order_cancellations",
)
Note
The per-call channel_name override only applies to that single publish. Subsequent calls
without a channel_name argument revert to the instance default.
Pipeline Support¶
Use a Redis pipeline to batch multiple order updates into a single round-trip to Redis. This is useful when processing a batch of orders at once, such as marking all orders from a closing restaurant as delayed.
from popoto.redis_db import POPOTO_REDIS_DB
publisher = OrderStatusPublisher(channel_name="order_updates")
pipeline = POPOTO_REDIS_DB.pipeline()
publisher.publish(data={"order_id": "order_1", "status": "delayed"}, pipeline=pipeline)
publisher.publish(data={"order_id": "order_2", "status": "delayed"}, pipeline=pipeline)
publisher.publish(data={"order_id": "order_3", "status": "delayed"}, pipeline=pipeline)
pipeline.execute() # All three messages sent in one round-trip
Tip
Pipeline batching significantly improves throughput when publishing many messages in quick succession. Instead of one network round-trip per message, you pay the latency cost once for the entire batch.
Subscriber¶
The Subscriber class listens for messages on one or more channels. Subclass it and override
handle() to process incoming order updates.
from popoto.pubsub import Subscriber
class OrderStatusSubscriber(Subscriber):
sub_channel_names = ["order_updates"]
def handle(self, channel, data, *args, **kwargs):
order_id = data.get("order_id")
status = data.get("status")
print(f"Order {order_id} is now: {status}")
The data argument is automatically deserialized from msgpack, so you receive the original
Python dict that was published. See Publisher for how messages are sent.
Subscriber Lifecycle¶
A subscriber follows three steps: initialize, poll, and handle.
- Init -- Creates a Redis pubsub connection and subscribes to all channels in
sub_channel_names - Poll -- Call the instance to check for the next message (non-blocking)
- Handle -- If a message is available,
pre_handle()thenhandle()are called
subscriber = OrderStatusSubscriber()
# Poll for messages in a loop
import time
while True:
subscriber() # Checks for next message, calls handle() if available
time.sleep(0.01) # Small delay to avoid busy-waiting
Each call to the subscriber instance polls Redis once. If no message is waiting, the call returns immediately without invoking the handler.
Warning
The poll loop is blocking in the sense that your code must actively call the subscriber.
If your application needs to do other work between polls, keep the sleep interval short
or integrate the poll into your event loop.
pre_handle Hook¶
Override pre_handle() to run logic before the main handler. This is useful for logging
incoming events, filtering out irrelevant updates, or collecting metrics.
class OrderStatusSubscriber(Subscriber):
sub_channel_names = ["order_updates"]
def pre_handle(self, channel, data, *args, **kwargs):
print(f"[{channel}] Incoming: order {data.get('order_id')}")
def handle(self, channel, data, *args, **kwargs):
if data.get("status") == "delivered":
print(f"Order {data['order_id']} delivered successfully")
Both pre_handle() and handle() receive the same channel and data arguments. If you
need to filter messages, raise an exception in pre_handle() to prevent handle() from
running (though the exception will be wrapped in a SubscriberException).
Multi-Channel Subscription¶
A single subscriber can listen to multiple channels. The channel argument in handle() tells
you which channel the message arrived on, so you can route to different processing logic.
class DeliverySubscriber(Subscriber):
sub_channel_names = ["order_updates", "driver_location", "restaurant_alerts"]
def handle(self, channel, data, *args, **kwargs):
if channel == "order_updates":
self.process_order_update(data)
elif channel == "driver_location":
self.update_driver_position(data)
elif channel == "restaurant_alerts":
self.handle_restaurant_alert(data)
def process_order_update(self, data):
print(f"Order {data['order_id']}: {data['status']}")
def update_driver_position(self, data):
print(f"Driver {data['driver']}: lat={data['lat']}, lon={data['lon']}")
def handle_restaurant_alert(self, data):
print(f"Alert from {data['restaurant']}: {data['message']}")
Tip
Using one subscriber for related channels is more efficient than running separate instances when you need coordinated handling, such as matching a driver location update with an active order.
Exception Handling¶
The subscriber handles exceptions at two levels. Format errors from malformed msgpack data or
missing fields are logged as warnings and silently discarded. Exceptions raised in your
handle() or pre_handle() methods are wrapped in a SubscriberException.
from popoto.pubsub import Subscriber, SubscriberException
subscriber = OrderStatusSubscriber()
while True:
try:
subscriber()
except SubscriberException as e:
print(f"Error processing order update: {e}")
# Log the error, notify ops, or add to a dead letter queue
time.sleep(0.01)
Warning
An unhandled SubscriberException will break your poll loop. Always wrap subscriber()
calls in a try/except block in production to keep the listener running.
Logging¶
The pub/sub system uses two standard Python loggers that you can configure independently.
POPOTO-publisher-- Logs publish events and subscriber counts at DEBUG levelPOPOTO-subscriber-- Logs subscription setup, message handling, and format warnings
import logging
logging.getLogger("POPOTO-publisher").setLevel(logging.DEBUG)
logging.getLogger("POPOTO-subscriber").setLevel(logging.DEBUG)
Enable DEBUG level during development to see every message published and received. In production, INFO level shows subscription setup without the per-message noise.