popoto.pubsub.publisher¶
popoto.pubsub.publisher
¶
Publisher module for Redis pub/sub messaging in Popoto.
This module provides the Publisher mixin class that enables any Python object to broadcast messages to Redis channels. It forms one half of Popoto's pub/sub system (paired with Subscriber), enabling decoupled, event-driven architectures where components communicate without direct knowledge of each other.
Design Philosophy
The Publisher is designed as an abstract base class (ABC) that can be mixed into any class hierarchy. This allows Model subclasses to gain publishing capabilities simply by inheriting from Publisher alongside Model. The mixin approach keeps publishing logic separate from persistence logic while allowing seamless integration.
Channel names default to the class name, supporting the convention that each model type broadcasts on its own channel. This creates natural topic boundaries without requiring explicit configuration.
Serialization
Messages are serialized using msgpack with numpy support (msgpack-numpy), making this publisher suitable for high-performance numerical data streams such as financial time-series or scientific computing applications.
Example
Publishing from a custom model on save::
class StockPrice(popoto.Model, popoto.Publisher):
symbol = popoto.KeyField()
price = popoto.FloatField()
def save(self, pipeline=None, **kwargs):
super().save(pipeline=pipeline, **kwargs)
self.publish({"symbol": self.symbol, "price": self.price},
pipeline=pipeline)
See Also
- Subscriber: The receiving end of the pub/sub system
- redis_db: Connection management for Redis
PublisherException
¶
Bases: Exception
Raised when a publish operation fails (e.g. missing channel name).
This exception indicates a problem with the publish operation itself, such as attempting to publish without specifying a channel. It is distinct from Redis connection errors, which propagate as redis.exceptions.
Source code in src/popoto/pubsub/publisher.py
Publisher
¶
Bases: ABC
Abstract base class for publishing msgpack-encoded messages to Redis channels.
Subclass and call :meth:publish to send data. The channel name defaults
to the class name but can be overridden via the constructor or per-call.
Publisher enables any class to broadcast messages to Redis channels, forming the sender side of Popoto's publish-subscribe messaging system. It is designed to be mixed into class hierarchies (especially with Model) to add event broadcasting without coupling business logic to specific subscribers.
The class uses cooperative multiple inheritance via super().init(), allowing it to be safely combined with Model and other base classes. Channel names default to the class name, establishing a convention where each model type has its own broadcast channel.
Design Decisions
- ABC inheritance signals this is meant to be subclassed, not instantiated directly in production code
- Class-level _publish_data allows stateful publishing where data can be set incrementally before calling publish()
- Pipeline support enables atomic publish-with-save operations, ensuring messages are only sent if the associated database write succeeds
Attributes:
| Name | Type | Description |
|---|---|---|
_channel_name |
str
|
The Redis channel to publish to. Defaults to the class name. |
_publish_data |
dict
|
Cached data for the next publish operation. Useful for building up complex messages across multiple method calls. |
Example
Basic standalone publisher::
publisher = Publisher(channel_name="price_updates")
publisher.publish({"symbol": "AAPL", "price": 150.25})
Mixin with Model for automatic save notifications::
class Order(popoto.Model, Publisher):
def save(self, **kwargs):
super().save(**kwargs)
self.publish({"event": "order_created", "id": self.db_key})
Source code in src/popoto/pubsub/publisher.py
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | |
channel_name
property
writable
¶
The Redis channel name this publisher broadcasts to.
Defaults to the class name, supporting the convention that each model type has its own dedicated channel. Can be overridden per-instance for custom routing scenarios.
Returns:
| Name | Type | Description |
|---|---|---|
str |
The channel name for Redis pub/sub operations. |
publish(data=None, channel_name=None, pipeline=None)
¶
Publish data as msgpack to the given (or default) channel.
Serializes the provided data using msgpack (with numpy array support) and publishes it to the specified Redis channel. Supports both immediate publishing and deferred publishing via Redis pipelines.
Pipeline Support
When a pipeline is provided, the publish command is queued within that pipeline rather than executed immediately. This enables atomic operations where a model save and its corresponding notification either both succeed or both fail together. This is critical for maintaining consistency between persisted state and published events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict
|
Dict payload to publish. Falls back to |
None
|
channel_name
|
str
|
Override the default channel for this call. |
None
|
pipeline
|
Pipeline
|
Optional Redis pipeline for batching. |
None
|
Returns:
| Type | Description |
|---|---|
|
The number of subscribers that received the message, or the |
|
|
pipeline when batching. Returns None if no data to publish. |
Raises:
| Type | Description |
|---|---|
PublisherException
|
If no channel name is available (neither provided nor set on the instance). |
Example
Immediate publish::
publisher.publish({"event": "updated", "value": 42})
Atomic save-and-publish with pipeline::
pipeline = redis_conn.pipeline()
model.save(pipeline=pipeline)
model.publish({"saved": True}, pipeline=pipeline)
pipeline.execute() # Both operations succeed or fail together