Skip to content

Agent Memory

Getting Started? See the Agent Memory Quickstart for a progressive 5-level adoption guide.

Popoto Agent Memory gives you ORM primitives for programmable memory — records that decay over time, strengthen through use, track confidence, form associations, and surface the right context at the right moment.

These are generic Redis-backed field types, mixins, and query methods. They don't encode any specific agent architecture. You compose them into memory models the same way you'd compose KeyField, SortedField, and Relationship into any Popoto model.

Why your agent needs memory primitives

LLMs reason well over content you put in their context window. What they can't do on their own:

  • Prioritize by recency and frequency — know which records are "hot" right now
  • Learn from outcomes — track what worked and what didn't
  • Manage certainty — downweight contradicted knowledge automatically
  • Retrieve associatively — surface related records without explicit graph queries
  • Filter noise — avoid storing low-value observations in the first place

Popoto Agent Memory gives you these capabilities as composable ORM building blocks. Each one is independently useful; together they form a complete agent memory system.

Primitives overview

The primitives ship incrementally. Each builds on the ones before it.

Primitive What it does Status
DecayingSortedField Time-weighted scoring — records lose relevance over time unless refreshed Shipped (PR #199)
CyclicDecayField Temporal rhythms + homeostatic pressure on top of decay Shipped (PR #201)
AccessTracker Tracks read patterns — access count, timestamps, spacing effects Shipped (PR #203)
ObservationProtocol Outcome-driven memory effects — acted/dismissed/deferred/contradicted/used Shipped (PR #206)
WriteFilter Gates persistence — low-value records silently discarded at write time Shipped (PR #214)
ConfidenceField Bayesian certainty — corroboration strengthens, contradiction weakens Shipped (PR #215)
CoOccurrenceField Weighted associations — co-accessed records strengthen their link Shipped (PR #218)
EventStreamMixin Append-only mutation log via Redis Streams Shipped
CompositeScoreQuery Multi-factor retrieval — combine N sorted indexes with weights Shipped
ExistenceFilter Bloom filter for O(1) "do I know anything about X?" checks Shipped
BM25Field Ranked keyword search with BM25 scoring in Redis sorted sets Shipped (PR #306)
Hybrid Retrieval (RRF) Multi-signal fusion — combine keyword, semantic, and graph signals via Reciprocal Rank Fusion Shipped (PR #306)
FrequencySketch Count-Min Sketch for approximate frequency counting Shipped
PredictionLedger Outcome tracking — record predictions, observe results, compute error Shipped
StreamConsumer Background processing framework for Redis Streams Shipped (PR #238)
PolicyCache Learned action selection — crystallized state-action-outcome patterns Shipped (PR #239)
ContextAssembler Retrieval-to-injection bridge — assemble LLM-ready context within token budgets Shipped

DecayingSortedField

A SortedField subclass where records lose retrieval weight over time following a power-law decay curve. This is the foundational primitive — most subsequent primitives depend on time-weighted scoring.

The sorted set score is always a timestamp. A Lua script computes decay-ranked results at query time:

decayed_score = base_score × elapsed_days ^ (-decay_rate)

With the default decay_rate=0.1 (empirically tuned in sweep 2026-04-17; prior default was 0.5), a record scores 1.0 after 1 day, 0.87 after 4 days, and 0.63 after 100 days.

Basic usage

from popoto import Model, KeyField, Field
from popoto.fields import DecayingSortedField

class Memory(Model):
    agent_id = KeyField()
    content = Field(type=str)
    relevance = DecayingSortedField()

The relevance field automatically timestamps records on save. Query for the most relevant recent records with top_by_decay():

memories = (
    Memory.query.filter(agent_id="agent-1")
    .top_by_decay(10)
)

Parameters

Parameter Type Default Description
decay_rate float 0.1 Controls how fast scores drop. Higher = faster decay. (Empirically tuned in sweep 2026-04-17; prior default was 0.5.)
base_score_field str None Name of a companion field whose value multiplies the decay curve. When None, base score is 1.0.
partition_by str or tuple () Partition the sorted set by key field values, inherited from SortedField.

Base score weighting

By default, all records decay at the same rate — ranking is purely by recency. To weight records differently, point base_score_field at another field:

class Memory(Model):
    agent_id = KeyField()
    content = Field(type=str)
    importance = Field(type=float, default=1.0)
    relevance = DecayingSortedField(base_score_field="importance")

A record with importance=5.0 stays relevant longer than one with importance=1.0 — the multiplier is score^(1/decay_rate). At the default decay_rate=0.1 importance strongly dominates recency; at decay_rate=0.5 the multiplier is 25×.

Source weighting for teamwork

In multi-agent teams with human oversight, human interactions are rare but high-signal. Agent-to-agent interactions are frequent but lower-signal. Use InteractionWeight constants to ensure human directives don't get drowned out by agent chatter:

from popoto.fields.constants import InteractionWeight

class TeamMemory(Model):
    agent_id = KeyField()
    source = Field(type=str)
    role = Field(type=str)
    importance = Field(type=float, default=InteractionWeight.AGENT)
    content = Field(type=str)
    relevance = DecayingSortedField(base_score_field="importance")

# CEO gives a directive — stays relevant for years
TeamMemory(agent_id="pm-1", source="human", role="executive",
           importance=InteractionWeight.combine(
               InteractionWeight.HUMAN, InteractionWeight.EXECUTIVE),
           content="We're pivoting to enterprise").save()

# Agent colleague logs a finding — moderate lifetime
TeamMemory(agent_id="pm-1", source="agent", role="peer",
           importance=InteractionWeight.combine(
               InteractionWeight.AGENT, InteractionWeight.PEER),
           content="Found 3 broken API contracts in staging").save()

Weights are split across two axes — source (human vs agent) and role (authority level) — combined by addition:

class InteractionWeight:
    # Source axis — what kind of entity
    HUMAN = 6.0
    AGENT = 1.0
    SYSTEM = 0.2

    # Role axis — authority level
    EXECUTIVE = 44.0
    MANAGER = 16.0
    PEER = 6.0
    SUBORDINATE = 1.0

    @staticmethod
    def combine(source, role):
        return source + role

With decay_rate=0.5, lifetime ≈ score² days (at the current default decay_rate=0.1, lifetime grows much more slowly with time — see Tuning Magic Numbers for the empirical rationale):

Combination Score Effective lifetime
Human executive 50.0 ~7 years
Human manager 22.0 ~1.3 years
Human peer 12.0 ~5 months
Agent executive 45.0 ~5.5 years
Agent manager 17.0 ~9 months
Agent peer 7.0 ~7 weeks
Agent subordinate 2.0 ~4 days
System 0.2 ~1 hour

These are just floats — override them freely for your domain. The values encode two principles: human interactions are stickier than agent interactions, and authority level determines how long directives persist.

Query-time overrides

Both decay_rate and base_score_field can be overridden per query for different retrieval contexts:

# Default decay rate from field definition
recent = Memory.query.filter(agent_id="agent-1").top_by_decay(5)

# Aggressive decay — only very recent records
hot = Memory.query.filter(agent_id="agent-1").top_by_decay(5, decay_rate=1.0)

# Weight by a different field for this query
by_urgency = Memory.query.filter(agent_id="agent-1").top_by_decay(5, base_score_field="urgency")

Refreshing timestamps

Call touch() to update a record's timestamp without a full save. This resets the decay clock:

memory = Memory.query.get(agent_id="agent-1", content="deployment procedure")
memory.touch("relevance")

Existing SortedField queries

All standard range queries work against the timestamp score:

import time

one_week_ago = time.time() - 86400 * 7
recent = Memory.query.filter(agent_id="agent-1", relevance__gte=one_week_ago)

CyclicDecayField

A DecayingSortedField subclass that adds cyclical resonance and homeostatic pressure to time-weighted scoring. When cycles and pressure are both zero, behavior is identical to DecayingSortedField.

The effective score at query time: decay + cyclic_resonance + pressure

  • Cyclical resonance — periodic boosts following cosine curves. A record about Q1 renewals resurfaces every January.
  • Homeostatic pressure — urgency that builds linearly while an item goes unresolved. Discharged by calling resolve_pressure().

Basic usage

from popoto import Model, KeyField, Field, CyclicDecayField
from popoto.fields.constants import TemporalPeriod

class Directive(Model):
    agent_id = KeyField()
    content = Field(type=str)
    relevance = CyclicDecayField(
        decay_rate=0.5,  # override default (0.1) for faster forgetting
        cycles=[(TemporalPeriod.QUARTERLY, 5.0, 0)],
        pressure_rate=0.1,
    )

Query with the same top_by_decay() interface:

top = Directive.query.filter(agent_id="agent-1").top_by_decay(n=10)

Discharge accumulated urgency when the agent acts on a record:

directive.resolve_pressure("relevance")

Parameters

Parameter Type Default Description
decay_rate float 0.1 Power-law decay exponent (inherited). Empirically tuned in sweep 2026-04-17; prior default was 0.5.
base_score_field str None Companion field whose value multiplies the decay curve (inherited).
cycles list [] List of (period, amplitude, phase) tuples. Use TemporalPeriod constants for period values.
pressure_rate float 0.0 Rate of urgency buildup per unresolved day.
partition_by str/tuple () Partition sorted set by key fields (inherited).

TemporalPeriod constants

Import from popoto.fields.constants:

Constant Value (seconds) Usage
TemporalPeriod.DAILY 86,400 Daily check-ins
TemporalPeriod.WEEKLY 604,800 Weekly reviews
TemporalPeriod.MONTHLY 2,592,000 Monthly reports
TemporalPeriod.QUARTERLY 7,776,000 Quarterly planning
TemporalPeriod.YEARLY 31,536,000 Annual cycles

See CyclicDecayField feature docs for the full reference including the scoring formula, Redis data model, and error handling.

AccessTracker

Tracks read access patterns on any model using a two-stage pipeline: reads are first staged (cheap), then atomically promoted to a confirmed access log. This prevents naive "every read strengthens" behavior — only meaningful reads count.

Shipped in PR #203.

Basic usage

from popoto import Model, KeyField, Field, AccessTrackerMixin, DecayingSortedField

class Memory(Model, AccessTrackerMixin):
    agent_id = KeyField()
    content = Field(type=str)
    relevance = DecayingSortedField()

# Reading triggers on_read() automatically via query hooks
memories = Memory.query.filter(agent_id="agent-1").top_by_decay(5)

# After the agent acts on a memory, confirm the read
memories[0].confirm_access()       # promotes staged → confirmed
memories[1].discard_staged_access() # clears staging without promoting

# Inspect access patterns
print(memories[0].access_count)    # 42
print(memories[0].last_accessed)   # 1741872000.0

How staging works

on_read() fires automatically when instances are hydrated via Query.get(), Query.filter(), top_by_decay(), and their async variants. Each call appends a timestamp to a per-instance staging list (RPUSH — single Redis command, batched via pipeline).

Staged reads are not yet "real" — they represent candidate accesses. Your application decides which reads were meaningful:

  • confirm_access() — atomically promotes all staged timestamps to the confirmed access log using a Lua script. Updates access_count and last_accessed.
  • discard_staged_access() — clears staging without affecting confirmed data.

Configuration

Attribute Type Default Description
_max_access_log int 100 Maximum timestamps kept in the confirmed access log. Older entries trimmed on confirm.
_track_reads bool True Set to False to disable automatic on_read() from queries.

Suppressing tracking for bulk operations

Use no_track() on the query builder to prevent on_read() from firing during internal operations like reindexing or migration:

# These reads won't be tracked
Memory.query.filter(agent_id="agent-1").no_track().all()

Delete cleanup

When a tracked model instance is deleted, all three AccessTracker Redis keys (staged, access_log, meta) are automatically removed.

Redis key patterns

Key Type Purpose
$AT:{ClassName}:staged:{pk} List Pending read timestamps
$AT:{ClassName}:access_log:{pk} List Confirmed access timestamps (capped)
$AT:{ClassName}:meta:{pk} Hash access_count (int) and last_accessed (float)

ObservationProtocol

Provides lifecycle hooks for outcome-driven memory effects. The application reports how the agent used retrieved memories (acted, dismissed, deferred, contradicted, used); the ORM applies effects atomically.

Shipped in PR #206.

Why observation matters

An LLM cannot manage its own memory mechanics. Calling touch(), resolving predictions, updating confidence is like asking a person to regulate their heartbeat. The ORM must provide hooks that fire automatically and infer memory outcomes from the agent's downstream behavior.

Basic usage

from popoto import (
    Model, KeyField, Field, AccessTrackerMixin,
    CyclicDecayField, ObservationProtocol, RecallProposal,
)
from popoto.fields.constants import TemporalPeriod

class Memory(AccessTrackerMixin, Model):
    agent_id = KeyField()
    content = Field(type=str)
    relevance = CyclicDecayField(
        decay_rate=0.5,  # override default (0.1) for faster forgetting
        cycles=[(TemporalPeriod.QUARTERLY, 5.0, 0)],
        pressure_rate=0.1,
    )

# 1. Agent retrieves memories (on_read fires automatically via query hooks)
memories = Memory.query.filter(agent_id="agent-1").top_by_decay(n=10)

# 2. Optional: mark proactively surfaced memories
ObservationProtocol.on_surfaced(memories[:3], reason="pressure_threshold")

# 3. Agent processes memories and generates a response...

# 4. Application infers outcomes from agent behavior
outcome_map = {
    memories[0].db_key.redis_key: "acted",        # Agent used this
    memories[1].db_key.redis_key: "dismissed",     # Agent rejected this
    memories[2].db_key.redis_key: "contradicted",  # Agent contradicted this
    # memories[3:] not in map → default to "deferred"
}
ObservationProtocol.on_context_used(memories, outcome_map)

Three hooks

Hook When it fires Effect
on_read(instance) Query hydrates an instance Delegates to AccessTrackerMixin staging
on_surfaced(instances, reason) Proactive system pushes memories into context Creates RecallProposal entries
on_context_used(instances, outcome_map) Application reports how agent responded Applies outcome-specific effects

Five outcomes

Outcome Meaning Effects
acted Agent used the memory touch(), confirm_access(), strengthen_cycle(1.2), resolve_pressure(), corroborate confidence (signal=0.9)
dismissed Agent explicitly rejected discard_staged_access(), weaken_cycle(0.8)
deferred Agent didn't address it discard_staged_access() only — pressure keeps building
contradicted Agent explicitly contradicted discard_staged_access(), weaken_cycle(0.5), contradict confidence (signal=0.1), auto-discharge pressure if confidence < 0.1
used Agent reasoned over it without citing confirm_access() only — no strength signal emitted

See Metacognitive Layer for when to use "used" vs "acted" and how AdaptiveAssembler tracks outcome quality over time.

Graceful degradation

Each effect checks whether the model supports it before applying. A model with DecayingSortedField but no CyclicDecayField still gets touch() on acted — just no cycle/pressure effects. A model without AccessTrackerMixin skips staging operations entirely. A model without ConfidenceField skips confidence updates.

Cycle amplitude adjustment

Two new Model methods for direct cycle control:

# Strengthen: multiply all cycle amplitudes by 1.5x
memory.strengthen_cycle("relevance", factor=1.5)

# Weaken: multiply all cycle amplitudes by 0.6x
memory.weaken_cycle("relevance", factor=0.6)

Amplitudes are clamped to [0.0, 100.0]. Values below 0.01 snap to zero (effectively dead cycle).

RecallProposal

Internal ORM infrastructure for tracking proactively surfaced memories. Not a user-facing Model.

# Create proposals when surfacing memories
RecallProposal.create_batch(instances, reason="proactive", partition="agent-1")

# Check pending proposals
pending = RecallProposal.get_pending(Memory, partition="agent-1")

# Expire stale proposals (older than TTL, default 1 hour)
expired = RecallProposal.expire_stale(Memory, partition="agent-1", ttl=3600)

Proposals are stored in Redis ZSETs keyed by $RP:{ClassName}:pending:{partition}, scored by surfaced_at timestamp. Resolved proposals are removed from the pending set. Expired proposals (past TTL) are treated as deferred.

Redis key patterns

Key Type Purpose
$RP:{ClassName}:pending:{partition} ZSET Pending recall proposals, scored by surfaced_at

WriteFilter

Gates persistence based on a scoring function evaluated at write time. Records below a threshold are silently discarded. Records above a high threshold are tagged for priority processing in a Redis sorted set.

from popoto import Model, KeyField, Field
from popoto.fields.write_filter import WriteFilterMixin

class Memory(WriteFilterMixin, Model):
    agent_id = KeyField()
    content = Field(type=str)
    importance = Field(type=float, default=0.5)

    def compute_filter_score(self):
        return self.importance or 0.0

# Score 0.05 < min_threshold (0.1) — silently discarded
Memory(agent_id="a1", content="noise", importance=0.05).save()

# Score 0.5 — persisted normally
Memory(agent_id="a1", content="useful", importance=0.5).save()

# Score 0.9 >= priority_threshold (0.7) — persisted AND added to priority set
Memory(agent_id="a1", content="critical", importance=0.9).save()

You provide the scoring logic; Popoto provides the gate. This keeps low-value observations out of the index, reducing noise in retrieval.

Parameter Default Description
_wf_min_threshold 0.1 Below this score, save() silently discards via SkipSaveException. (Empirically tuned in sweep 2026-04-17; prior default was 0.2.)
_wf_priority_threshold 0.7 At or above this score, record is added to $WF:{ClassName}:priority sorted set

Priority-tagged records are stored in a Redis sorted set keyed $WF:{ClassName}:priority, scored by the filter score. On delete(), cleanup removes the record from the priority set automatically.

See fields.md for the full field reference.

ConfidenceField

A Field subclass that tracks Bayesian confidence metadata per member, updated atomically via Lua script. Precision grows with sqrt(n) — early evidence has outsized effect while established beliefs resist change.

Shipped in PR #215.

Basic usage

from popoto import Model, UniqueKeyField, StringField, ConfidenceField

class Knowledge(Model):
    key = UniqueKeyField()
    claim = StringField()
    certainty = ConfidenceField(initial_confidence=0.5)

knowledge = Knowledge.create(key="fact1", claim="The sky is blue")

# Corroborate (signal >= 0.5 increases confidence)
ConfidenceField.update_confidence(knowledge, "certainty", signal=0.9)

# Contradict (signal < 0.5 decreases confidence)
ConfidenceField.update_confidence(knowledge, "certainty", signal=0.1)

# Read current confidence
confidence = ConfidenceField.get_confidence(knowledge, "certainty")

# Read all metadata
data = ConfidenceField.get_confidence_data(knowledge, "certainty")
# Returns: {confidence: 0.5, evidence_count: 2, corroborations: 1, contradictions: 1}

Bayesian update formula

new_confidence = prior + (signal - prior) / sqrt(evidence_count + 1)

Early updates move confidence significantly; later updates have diminishing effect as evidence accumulates. Results are clamped to [0, 1].

Entrainment with ObservationProtocol

When used with ObservationProtocol.on_context_used(), confidence is automatically updated based on how the agent uses retrieved memories:

Outcome Effect on Confidence
acted Corroborate (signal=0.9)
dismissed No change
deferred No change
contradicted Contradict (signal=0.1); auto-discharge pressure if confidence drops below 0.1

When confidence drops below 0.1 due to a contradicted outcome, homeostatic pressure on any CyclicDecayField is automatically resolved (discharged). This prevents low-confidence memories from building urgency.

Parameters

Parameter Type Default Description
initial_confidence float 0.5 Starting confidence for new members (0-1).

See ConfidenceField feature docs for the full reference including convergence behavior and Redis key patterns. See API Reference: ConfidenceField for the complete method signatures.

When combined with DecayingSortedField or CompositeScoreQuery, confidence acts as a multiplier on retrieval weight — low-confidence records are naturally deprioritized.

CoOccurrenceField

Maintains weighted association edges between model instances using per-PK Redis sorted sets. Weights strengthen via link() and strengthen(), and decay via weaken_all(). Supports symmetric (bidirectional) and asymmetric (unidirectional) modes.

Shipped in PR #218.

from popoto import Model, UniqueKeyField, StringField
from popoto.fields.co_occurrence_field import CoOccurrenceField

class Memory(Model):
    key = UniqueKeyField()
    content = StringField()
    associations = CoOccurrenceField(symmetric=True, max_edges=100)

# Create instances and access the field
mem_a = Memory.create(key="ml", content="Machine learning")
mem_b = Memory.create(key="nn", content="Neural networks")
field = Memory._meta.fields["associations"]

# Link and strengthen
field.link(Memory, mem_a.db_key.redis_key, mem_b.db_key.redis_key)
field.strengthen(Memory, mem_a.db_key.redis_key, mem_b.db_key.redis_key, delta=0.05)

# BFS graph propagation — multi-hop associative retrieval
scores = field.propagate(Memory, [mem_a.db_key.redis_key], depth=2, decay_per_hop=0.5)

Graph propagation uses a server-side Lua BFS script with exponential weight decay per hop. When the same PK is reached via multiple paths, max(weight) is used. Automatic edge pruning via max_edges prevents unbounded memory growth.

Parameter Type Default Description
symmetric bool True If True, edges are bidirectional
max_edges int 500 Maximum edges per PK; lowest-weight pruned when exceeded
decay_factor float 0.95 Default multiplicative decay for weaken_all()

See CoOccurrenceField field docs for the full reference including methods, Redis key patterns, and synergy with other memory fields.

EventStreamMixin

Automatically appends to a Redis Stream on every save(), update(), or delete(). This is infrastructure for background processing — the mixin writes events, your application consumes them via Redis Streams' consumer group API.

Quick Start

from popoto import Model, EventStreamMixin, UniqueKeyField, StringField

class Memory(EventStreamMixin, Model):
    _stream_name = "memory_mutations"       # Stream key: stream:memory_mutations
    _stream_max_length = 10000              # Approximate MAXLEN trimming
    _stream_metadata_fields = ("source",)   # Extra fields in each entry

    key = UniqueKeyField()
    content = StringField()
    source = StringField(default="")

memory = Memory(key="fact1", content="hello", source="user")
memory.save()    # XADD with op="create"
memory.content = "updated"
memory.save()    # XADD with op="update"
memory.delete()  # XADD with op="delete"

Configuration

Attribute Type Default Description
_stream_name str "mutations" Name for the Redis Stream (key: stream:{name})
_stream_partition_field str None Field name to partition streams by (key becomes stream:{name}:{value})
_stream_max_length int 10000 Approximate max entries via MAXLEN ~ trimming
_stream_metadata_fields tuple () Field names whose values are included in stream entries

Stream Entry Fields

Every stream entry contains these string fields:

Field Description
model Model class name
pk Redis key of the instance
op Operation: "create", "update", "delete", or custom
ts Unix timestamp
changed_fields Comma-separated list of updated fields (from update_fields)

Plus any fields listed in _stream_metadata_fields.

Partitioned Streams

Route events to different streams based on a field value:

class TenantMemory(EventStreamMixin, Model):
    _stream_name = "mutations"
    _stream_partition_field = "tenant"

    key = UniqueKeyField()
    tenant = StringField()

# Writes to stream:mutations:acme
TenantMemory(key="x", tenant="acme").save()

# Writes to stream:mutations:beta
TenantMemory(key="y", tenant="beta").save()

Custom Events (Non-Save Operations)

Operations that bypass Model.save() (like ConfidenceField.update_confidence() and CoOccurrenceField.strengthen()) can log events via the public _xadd_event() method:

# Called automatically by ConfidenceField.update_confidence()
instance._xadd_event(
    op="confidence_update",
    extra_fields={"field": "trust", "signal": "0.8"},
)

Error Handling

  • Non-pipeline mode: XADD failures are caught and logged — save() always succeeds if the data write succeeded.
  • Pipeline mode: XADD is queued atomically with the save. If the pipeline fails, both data and stream entry fail together.

WriteFilter Interaction

When WriteFilterMixin discards a record (score below threshold), the save() returns before reaching the EventStreamMixin hook. No stream entry is produced for filtered records.

Redis Key Patterns

  • stream:{stream_name} — default stream key
  • stream:{stream_name}:{partition_value} — partitioned stream key

CompositeScoreQuery

Combines multiple sorted set indexes via ZUNIONSTORE with configurable weights, returning top-K results ranked by composite score. This is where all the scoring primitives converge into a single retrieval call.

Without composite_score(), retrieving by multiple factors requires application-level code: fetch by decay, fetch confidence data, fetch access counts, compute composite in Python, re-rank. This is slow (multiple round trips), error-prone, and not composable via the query API. composite_score() does it all server-side in a single call.

Basic usage

from popoto import Model, KeyField, Field
from popoto.fields import DecayingSortedField, ConfidenceField
from popoto.fields.access_tracker import AccessTrackerMixin
from popoto.fields.write_filter import WriteFilterMixin

class Memory(AccessTrackerMixin, WriteFilterMixin, Model):
    agent_id = KeyField()
    content = Field(type=str)
    importance = Field(type=float, default=1.0)
    relevance = DecayingSortedField(
        base_score_field="importance",
        partition_by="agent_id",
    )
    certainty = ConfidenceField(initial_confidence=0.5)

    def compute_filter_score(self):
        return self.importance or 0.0

Retrieve the top-10 memories ranked by a weighted composite of decay, confidence, access frequency, and write filter priority:

results = Memory.query.filter(agent_id="agent-1").composite_score(
    indexes={
        "relevance": 0.4,      # DecayingSortedField (decay-computed scores)
        "certainty": 0.3,      # ConfidenceField (Bayesian confidence)
        "access_count": 0.2,   # AccessTracker (read frequency)
        "priority": 0.1,       # WriteFilter priority set
    },
    limit=10,
)

Parameters

Parameter Type Default Description
indexes dict[str, float] required Mapping of field names to weights. Weights are arbitrary positive floats; relative ratios matter, not absolute values.
limit int 10 Maximum number of results to return.
aggregate str "SUM" How ZUNIONSTORE combines scores: "SUM", "MIN", or "MAX".
min_score float None Optional minimum composite score. Results below this threshold are excluded.
post_filter Callable[[str, float], bool] None Optional (redis_key, score) -> bool callback. Applied after scoring but before hydration. Return True to keep.
co_occurrence_boost dict None Optional {redis_key: weight} dict from CoOccurrenceField.propagate(). Injected as an additional scoring signal.
temperature float 1.0 Scales composite scores by dividing each by this value. Low values (0.02-0.1) sharpen discrimination; high values (2.0+) flatten scores. Must be > 0. min_score filters on raw scores before temperature scaling; post_filter receives scaled scores.

Supported index types

Index name Field type Resolution strategy
Any DecayingSortedField DecayingSortedField / CyclicDecayField Materializes decay-computed scores into a temp ZSET via the existing Lua decay script
Any SortedField SortedFieldMixin Uses the existing sorted set directly
Any ConfidenceField ConfidenceField Materializes confidence values from the companion hash into a temp ZSET
"access_count" or "access_score" AccessTrackerMixin Materializes access_count from meta hashes into a temp ZSET (uses SMEMBERS — see scaling note below)
"priority" WriteFilterMixin Uses the $WF:{Class}:priority sorted set directly

Scaling note: The access_count/access_score index uses SMEMBERS to discover all model instances before materializing scores. For models with 100K+ instances, this scan can be expensive. Use post_filter or partitioned queries to narrow the result set at that scale.

CoOccurrence boost

Inject associative retrieval scores from CoOccurrenceField.propagate():

from popoto.fields.co_occurrence_field import CoOccurrenceField

# Get propagated association scores from a seed memory
assoc_field = Memory._meta.fields["associations"]
co_scores = assoc_field.propagate(Memory, seed_pks=["memory_key_1"], depth=2)

# Inject as a boost signal in composite scoring
results = Memory.query.filter(agent_id="agent-1").composite_score(
    indexes={"relevance": 0.3, "certainty": 0.3},
    co_occurrence_boost=co_scores,
    limit=10,
)

A record with mediocre decay and confidence scores but a strong co-occurrence association to the seed will surface higher in results.

Post-filter callback

Use post_filter to exclude specific records after scoring but before model hydration:

# Exclude already-used memories
used_keys = {"Memory:key1", "Memory:key2"}
results = Memory.query.filter(agent_id="agent-1").composite_score(
    indexes={"relevance": 0.5, "certainty": 0.5},
    post_filter=lambda key, score: key not in used_keys,
    limit=10,
)

Temp key management

All temporary Redis keys use a $CSQ: prefix with a UUID suffix and a 5-second EXPIRE for cleanup safety. Keys are deleted immediately after the query completes. Even if the process crashes mid-query, keys auto-expire within 5 seconds.

Error handling

  • Empty indexes dict raises QueryException
  • Invalid field name raises QueryException with list of valid fields
  • Field without a sorted set index raises QueryException
  • "priority" on a non-WriteFilterMixin model raises QueryException
  • "access_count" on a non-AccessTrackerMixin model raises QueryException
  • Missing partition filters for partitioned fields raises QueryException
  • limit=0 returns an empty list (no error)

ExistenceFilter

A Bloom filter for O(1) probabilistic membership checks. Answers "have I ever stored anything about X?" without touching any sorted set or hash. False positives are possible; false negatives are impossible.

Shipped in PR #225.

Implemented entirely with Redis strings (SETBIT/GETBIT) and Lua scripts. No Redis modules required -- works on both Redis and Valkey. The original roadmap specified RedisBloom module commands (BF.ADD/BF.EXISTS), but the implementation uses pure Lua with the Kirschner-Mitzenmacher double hashing optimization to maintain Valkey compatibility.

Basic usage

from popoto import Model, KeyField, Field
from popoto.fields.existence_filter import ExistenceFilter

class Memory(Model):
    agent_id = KeyField()
    topic = Field(type=str)
    bloom = ExistenceFilter(
        error_rate=0.01,
        capacity=100_000,
        fingerprint_fn=lambda inst: inst.topic,
    )

The bloom field automatically adds fingerprints to the Bloom filter on save. Check membership before running expensive queries:

# Fast pre-check before expensive retrieval
if not Memory.bloom.definitely_missing(Memory, "kubernetes deployments"):
    results = Memory.query.filter(agent_id="agent-1").top_by_decay(5)
else:
    results = []  # skip retrieval entirely

Parameters

Parameter Type Default Description
error_rate float 0.01 Target false positive rate. Lower = more bits required.
capacity int 100_000 Expected number of distinct items. Exceeding this degrades the error rate.
fingerprint_fn Callable None Takes a model instance, returns a string fingerprint. Falls back to redis_key if not set.

Methods

Method Returns Description
might_exist(model_class, fingerprint) bool True if fingerprint is possibly present (may be false positive).
definitely_missing(model_class, fingerprint) bool True if fingerprint is guaranteed absent. Use this for pre-filtering.
fill_ratio(model_class) float Proportion of set bits (0.0-1.0). Monitor for capacity warnings.

How it works

A Bloom filter is a bit array of size m with k hash functions. Parameters are derived automatically from error_rate and capacity:

  • m = -capacity * ln(error_rate) / (ln(2)^2) -- total bits
  • k = (m / capacity) * ln(2) -- optimal number of hash functions

Hash functions use the Kirschner-Mitzenmacher double hashing optimization (DJB2 + FNV-1), computed in Lua for atomic operations.

Design decisions

  • on_delete() is a no-op: Bloom filters do not support removal. Stale positives are harmless for a pre-filter use case.
  • WriteFilter integration: Records rejected by WriteFilterMixin are never added to the Bloom filter (the existing save flow raises SkipSaveException before on_save() hooks run).
  • Pipeline support: on_save() accepts an optional Redis pipeline for atomic batch saves.

FrequencySketch

A Count-Min Sketch for approximate frequency counting. Tracks how many times a fingerprint has been saved, with possible overcounting but never undercounting.

Shipped in PR #225.

Implemented entirely with Redis hashes (HINCRBY/HGET) and Lua scripts. No Redis modules required -- works on both Redis and Valkey. Like ExistenceFilter, the original roadmap specified RedisBloom module commands (CMS.INCRBY/CMS.QUERY), but the implementation uses pure Lua for Valkey compatibility.

Basic usage

from popoto import Model, KeyField, Field
from popoto.fields.existence_filter import FrequencySketch

class Memory(Model):
    agent_id = KeyField()
    topic = Field(type=str)
    freq = FrequencySketch(
        fingerprint_fn=lambda inst: inst.topic,
    )

Query approximate frequency:

count = Memory.freq.get_frequency(Memory, "kubernetes")

Parameters

Parameter Type Default Description
width int 2000 Number of counters per row. Higher = less overcounting.
depth int 7 Number of hash functions (rows). Higher = more accurate.
fingerprint_fn Callable None Takes a model instance, returns a string fingerprint. Falls back to redis_key if not set.

Methods

Method Returns Description
get_frequency(model_class, fingerprint) int Approximate count. May overcount, never undercounts. Returns 0 for unseen fingerprints.

Design decisions

  • on_delete() is a no-op: CMS counters are monotonically increasing. Decrementing would violate the "never undercount" guarantee.
  • Pipeline support: on_save() accepts an optional Redis pipeline for atomic batch saves.

Combining ExistenceFilter and FrequencySketch

Both fields can be used together on the same model for fast existence checks and frequency tracking:

class Memory(Model):
    agent_id = KeyField()
    topic = Field(type=str)
    bloom = ExistenceFilter(
        error_rate=0.01,
        capacity=100_000,
        fingerprint_fn=lambda inst: inst.topic,
    )
    freq = FrequencySketch(
        fingerprint_fn=lambda inst: inst.topic,
    )

# Pre-filter + frequency-weighted retrieval
if not Memory.bloom.definitely_missing(Memory, "kubernetes"):
    frequency = Memory.freq.get_frequency(Memory, "kubernetes")
    results = Memory.query.filter(agent_id="agent-1").top_by_decay(10)

PredictionLedger

Records prediction-outcome pairs. Before an action, the agent writes what it expects. After, it writes what happened. The mixin computes the delta and stores it as a learning signal. High prediction errors feed back into ConfidenceField to reduce trust in the knowledge that informed the bad prediction. Auto-resolution via ObservationProtocol handles the common case where outcomes are inferred from behavior rather than explicitly reported.

Setup

Add PredictionLedgerMixin as a base class alongside Model:

from popoto import Model, UniqueKeyField, StringField
from popoto.fields.prediction_ledger import PredictionLedgerMixin
from popoto.fields.confidence_field import ConfidenceField

class Memory(PredictionLedgerMixin, Model):
    key = UniqueKeyField()
    content = StringField()
    certainty = ConfidenceField()

    _pl_partition = "default"

Recording and resolving predictions

# Agent records what it expects before acting
memory = Memory.create(key="fact1", content="sky is blue")
PredictionLedgerMixin.record_prediction(memory, predicted={"relevance": 0.9})

# After acting, resolve with actual outcome
error = PredictionLedgerMixin.resolve_prediction(memory, actual={"relevance": 0.3})
# error ≈ 0.6  (normalized absolute error)

Auto-resolution via ObservationProtocol

When ObservationProtocol.on_context_used() fires with an acted, dismissed, contradicted, or used outcome, it automatically calls auto_resolve() on instances that use PredictionLedgerMixin:

Outcome Default Error Interpretation
acted 0.1 Prediction was roughly correct
dismissed 0.5 Wrong timing or relevance
contradicted 0.9 Factually wrong
used 0.3 Prediction approximately correct — memory informed reasoning

These values are configurable via the _pl_auto_resolve_errors class attribute.

ConfidenceField feedback

When prediction error exceeds _pl_confidence_error_threshold (default 0.7), PredictionLedgerMixin automatically calls ConfidenceField.update_confidence(signal=0.2) to reduce trust in the knowledge that informed the prediction.

Querying prediction errors

# Find instances with highest prediction errors
worst = PredictionLedgerMixin.get_highest_errors(Memory, partition="default", limit=10)
# Returns: [(redis_key, error_score), ...]

# Aggregate errors grouped by time or error band
summary = PredictionLedgerMixin.error_summary(Memory, partition="default")
# Returns: {"mean": 0.42, "p90": 0.81, ...} — see Metacognitive Layer for group_by options

# Read prediction data for a specific instance
data = PredictionLedgerMixin.get_prediction_data(memory)
# Returns: {"predicted": {...}, "resolved": True, "prediction_error": 0.6, ...}

For temporal and band-grouped aggregations via error_summary(group_by=...), see Metacognitive Layer.

Class attributes

Attribute Default Description
_pl_partition "default" Partition key for the error sorted set
_pl_confidence_error_threshold 0.7 Error above which confidence is reduced
_pl_confidence_low_signal 0.2 Signal value sent to ConfidenceField
_pl_auto_resolve_errors {"acted": 0.1, "dismissed": 0.5, "contradicted": 0.9, "used": 0.3} Outcome-to-error mapping

Redis key patterns

  • $PL:{ClassName}:meta:{pk} -- hash storing prediction metadata (msgpack-encoded)
  • $PL:{ClassName}:errors:{partition} -- sorted set of instance keys scored by |error|

Key properties

  • Idempotent resolution: Resolving an already-resolved prediction is a no-op (returns None)
  • Atomic: Resolution uses a Lua script for atomic read-compute-write-ZADD
  • Graceful degradation: Works without ConfidenceField or EventStreamMixin (features are additive)
  • Valkey compatible: Uses only HSET, HGET, ZADD, EVAL -- no Redis modules
  • Pipeline support: All operations accept an optional pipeline parameter

StreamConsumer

A Redis Streams consumer group framework for background processing. Manages consumer group creation, batch reading, acknowledgment, dead-letter handling, and pending entry recovery via XCLAIM.

Shipped in PR #238.

Basic usage

from popoto.streams import StreamConsumer

async def my_handler(entries):
    """Process a batch of stream entries."""
    for entry_id, fields in entries:
        print(f"Processing {entry_id}: {fields}")

consumer = StreamConsumer(
    stream_key="stream:memory_mutations:agent_1",
    group_name="compaction",
    consumer_name="worker-1",
    handler=my_handler,
)

# Blocking loop — runs until stopped
await consumer.run()

# Or process one batch (useful for testing)
count = await consumer.process_batch()

Sync wrappers are available for scripts without an event loop:

consumer.run_sync()              # blocking loop
count = consumer.process_batch_sync()  # single batch

Parameters

Parameter Type Default Description
stream_key str required Redis stream key (e.g., "stream:memory_mutations").
group_name str required Consumer group name for XREADGROUP.
consumer_name str required This consumer's name within the group.
handler Callable required Async function receiving list[(entry_id, fields_dict)]. All values decoded to str.
batch_size int 50 XREADGROUP COUNT — entries per batch.
block_ms int 5000 XREADGROUP BLOCK timeout in milliseconds.
max_retries int 3 Delivery count threshold before dead-lettering.
claim_timeout_ms int 180_000 XCLAIM idle timeout (3 minutes). Entries idle longer are reclaimed.
dead_letter_max_length int None Optional MAXLEN for the dead-letter stream.

Dead-letter handling

Entries that fail processing more than max_retries times (tracked via XPENDING delivery count) are moved to dead:{stream_key} with metadata:

Field Description
original_stream Source stream key
original_id Original entry ID
failure_count Number of delivery attempts
last_error Error description
dead_letter_ts Timestamp when dead-lettered

XCLAIM recovery

On each process_batch() call, the consumer checks for pending entries from crashed consumers. Entries idle longer than claim_timeout_ms are reclaimed via XCLAIM for reprocessing. Entries exceeding max_retries are dead-lettered instead.

Graceful shutdown

consumer.stop()  # exits run() after current batch completes

EventStreamMixin synergy

StreamConsumer reads the streams that EventStreamMixin writes. No import dependency — it operates on raw stream keys:

from popoto import Model, EventStreamMixin, UniqueKeyField, StringField
from popoto.streams import StreamConsumer

class Memory(EventStreamMixin, Model):
    _stream_name = "memory_mutations"
    key = UniqueKeyField()
    content = StringField()

# Producer: saves write to stream:memory_mutations
Memory(key="fact1", content="hello").save()

# Consumer: reads from the same stream
consumer = StreamConsumer(
    stream_key="stream:memory_mutations",
    group_name="compaction",
    consumer_name="worker-1",
    handler=my_handler,
)
count = consumer.process_batch_sync()

This is generic infrastructure — the processing logic is application code. One use case: pattern crystallization from raw events into PolicyCache entries.

PolicyCache

A reference recipe (not core ORM) composing all shipped primitives into a reinforcement-learning-style action selection cache. When a StreamConsumer detects repeated successful patterns, it crystallizes them into reusable PolicyEntry records. Agents query policies by state fingerprint and select actions based on Q-value, confidence, and co-occurrence.

For the full guide with architecture diagrams, tuning advice, and design rationale, see PolicyCache Recipe.

Shipped in PR #239.

Basic usage

from popoto.recipes.policy_cache import (
    PolicyEntry, compute_fingerprint, update_q_value,
    initialize_q_value, crystallization_handler,
)

# Create a policy directly
fp = compute_fingerprint({"task": "deploy", "env": "staging"})
policy = PolicyEntry(
    agent_id="agent-1",
    state_fingerprint=fp,
    state_features={"task": "deploy", "env": "staging"},
    action_type="run_playbook",
    action_spec={"playbook": "deploy.yml"},
)
policy.save()

# Initialize and update Q-value via temporal difference learning
initialize_q_value(policy, initial_q=0.0)
td_error = update_q_value(policy, reward=1.0)

# Or let crystallization detect patterns automatically
from popoto.streams import StreamConsumer

consumer = StreamConsumer(
    stream_key="stream:policy_mutations",
    group_name="crystallizer",
    consumer_name="worker-1",
    handler=crystallization_handler,
)

PolicyEntry model

class PolicyEntry(EventStreamMixin, AccessTrackerMixin, PredictionLedgerMixin, Model):
    entry_id = AutoKeyField()
    agent_id = KeyField()
    state_fingerprint = KeyField()
    state_features = Field()                      # JSON dict
    action_type = KeyField()
    action_spec = Field()                         # JSON dict
    expected_value = DecayingSortedField(
        partition_by="agent_id",
    )
    confidence = ConfidenceField(initial_confidence=0.5)
    related_policies = CoOccurrenceField(symmetric=True, max_edges=100)
    bloom = ExistenceFilter(
        error_rate=0.01,
        capacity=100_000,
        fingerprint_fn=lambda inst: inst.state_fingerprint,
    )

WriteFilterMixin is intentionally excluded — the crystallization handler IS the write gate (Wilson CI > 0.6 threshold).

Key functions

Function Description
compute_fingerprint(features, include_fields=None, include_timestamp=False) SHA-256 hash (16 hex chars) of state features
update_q_value(instance, reward, max_future_q=0.0, alpha=0.1, gamma=0.95) Atomic TD(0) Q-value update via Lua script
initialize_q_value(instance, initial_q=0.0) Set initial Q-value (overrides timestamp from save)
wilson_ci_lower(successes, total, z=1.96) Wilson score confidence interval lower bound
chi_squared_uniform(observed, expected_per_bucket) Chi-squared test against uniform distribution
crystallization_handler(entries) StreamConsumer handler for pattern detection
temporal_discovery_handler(entries) StreamConsumer handler for cycle discovery

Tuning constants

Constant Default Description
MIN_EVENTS_FOR_CRYSTALLIZATION 3 Min events before crystallization
WILSON_CI_THRESHOLD 0.6 Wilson CI lower bound threshold
TD_ALPHA 0.1 Q-value learning rate
TD_GAMMA 0.95 Q-value discount factor
CHI_SQUARED_P_THRESHOLD 0.05 p-value threshold for temporal discovery
INITIAL_CYCLE_AMPLITUDE 0.5 Initial amplitude for discovered cycles

ContextAssembler

A capstone recipe that orchestrates all shipped Popoto memory primitives into a single assemble() call. It combines query-driven retrieval (pull path) with proactive surfacing (push path), applies token budgets, and formats output for LLM context injection.

This is Step 12 of the Popoto Memory Roadmap.

Pipeline

Application
assemble(query_cues, agent_id)
    ├─── Pull Path (query-driven) ──────────────────────────┐
    │    1. ExistenceFilter pre-check                       │
    │       └─ Skip retrieval if all cues definitely_missing│
    │    2. CompositeScoreQuery                             │
    │       └─ Multi-factor ranked retrieval via ZUNIONSTORE│
    │    3. CoOccurrence propagation                        │
    │       └─ BFS expansion to discover associated records │
    │                                                       │
    ├─── Push Path (proactive surfacing) ───────────────────┤
    │    1. CyclicDecayField scan via top_by_decay()        │
    │    2. Filter by surfacing_threshold                   │
    │                                                       │
    ├─── Merge + Re-rank ──────────────────────────────────-┤
    │    1. Deduplicate by redis_key                        │
    │    2. Apply max_items cap                             │
    │    3. Apply max_tokens budget (if set)                │
    │                                                       │
    ├─── Post-retrieval Effects ────────────────────────────┤
    │    1. on_read() for pull-path records                 │
    │    2. on_surfaced() for push-path (creates Recalls)   │
    │    3. Competitive suppression of non-selected pulls   │
    │                                                       │
    └─── Format ────────────────────────────────────────────┘
         structured (JSON) | xml | natural
    AssemblyResult

Synergy Table

Primitive Role in ContextAssembler
DecayingSortedField Score index for CompositeScoreQuery
CyclicDecayField Push-path proactive surfacing
ConfidenceField Score index + competitive suppression
CoOccurrenceField Pull-path candidate expansion via BFS
ExistenceFilter Pull-path pre-check (skip if absent)
AccessTrackerMixin on_read post-effect tracking
ObservationProtocol on_read / on_surfaced dispatch
RecallProposal Created for push-path records
WriteFilterMixin Priority score in composite
EventStreamMixin Mutation logging (via model save)
PredictionLedgerMixin Outcome tracking (via model save)
CompositeScoreQuery Multi-factor ranked retrieval

API Reference

ContextAssembler.__init__()

from popoto.recipes.context_assembler import ContextAssembler

assembler = ContextAssembler(
    model_class,              # Popoto Model class to query
    score_weights,            # Dict mapping field names to weights for CompositeScoreQuery
    max_items=10,             # Maximum records to return
    max_tokens=None,          # Optional soft token budget
    surfacing_threshold=0.5,  # Minimum score for push-path records
    propagation_depth=2,      # BFS depth for CoOccurrence propagation
    output_format="structured",  # "structured" (JSON), "xml", or "natural"
    token_counter=None,       # Optional callable(record) -> int; default: len(str(r)) // 4
)

The assembler auto-detects which fields are present on model_class and adapts the pipeline accordingly. Models without ExistenceFilter skip the pre-check; models without CoOccurrenceField skip propagation; models without CyclicDecayField skip the push path entirely.

ContextAssembler.assemble()

result = assembler.assemble(
    query_cues=None,          # Dict of query cues (e.g., {"topic": "deploy"})
    agent_id=None,            # Agent ID for partition filtering
    partition_filters=None,   # Dict of partition key-value pairs
)

Returns an AssemblyResult. If query_cues is None, the pull path is skipped and only push-path (proactive) results are returned.

Pass assess_quality=True to also run a pre-retrieval quality probe; the resulting RetrievalQuality dataclass is available in result.metadata["quality"]. For self-calibrating assemblies that fall back gracefully when quality is low, see AdaptiveAssembler in the Metacognitive Layer.

AssemblyResult

@dataclass
class AssemblyResult:
    records: list       # All selected instances (pull + push, deduplicated)
    proactive: list     # Push-path subset of records
    formatted: str      # LLM-ready formatted string
    metadata: dict      # Scores, token counts, timing info

The metadata dict contains:

Key Description
pull_count Number of pull-path records in final selection
push_count Number of push-path records in final selection
token_count Estimated total tokens across selected records
timing_ms Wall-clock time for the full pipeline
total_candidates Total candidates before budget selection

Usage Examples

Pull-only assembly (query-driven retrieval)

from popoto.recipes.context_assembler import ContextAssembler

assembler = ContextAssembler(
    model_class=Memory,
    score_weights={"relevance": 0.6, "confidence": 0.3},
    max_items=10,
    max_tokens=4000,
)

result = assembler.assemble(
    query_cues={"topic": "deployment"},
    agent_id="agent-1",
)
print(result.formatted)       # JSON array of records
print(result.metadata)        # {"pull_count": 5, "push_count": 0, ...}

Push-only assembly (proactive surfacing)

assembler = ContextAssembler(
    model_class=Memory,
    score_weights={"relevance": 1.0},
    surfacing_threshold=0.3,
)

# No query_cues: only push-path runs
result = assembler.assemble(agent_id="agent-1")
for record in result.proactive:
    print(f"Proactively surfaced: {record.content}")

Combined pull + push assembly

assembler = ContextAssembler(
    model_class=Memory,
    score_weights={"relevance": 0.5, "urgency": 0.3, "confidence": 0.2},
    max_items=15,
    max_tokens=8000,
    surfacing_threshold=0.4,
    output_format="xml",
)

result = assembler.assemble(
    query_cues={"topic": "incident response"},
    agent_id="agent-1",
)
# result.records contains both pull and push results, deduplicated
# result.proactive is the push-path subset
# result.formatted is XML-formatted for LLM injection

Custom token counter

import tiktoken

enc = tiktoken.encoding_for_model("gpt-4")

assembler = ContextAssembler(
    model_class=Memory,
    score_weights={"relevance": 1.0},
    max_tokens=4000,
    token_counter=lambda record: len(enc.encode(str(record))),
)

Tuning Constants

Constant Default Description
COMPETITIVE_SUPPRESSION_SIGNAL 0.3 Signal strength for competitive suppression of non-selected pull candidates. Values < 0.5 act as contradiction signals via ConfidenceField.update_confidence().
DEFAULT_SURFACING_THRESHOLD 0.5 Minimum score for push-path records to be surfaced.
DEFAULT_MAX_ITEMS 10 Default maximum number of records returned.
DEFAULT_PROPAGATION_DEPTH 2 Default BFS depth for CoOccurrence propagation.

Graceful Degradation

ContextAssembler adapts to whatever fields are present on the model class:

Missing Field Behavior
ExistenceFilter Pre-check skipped; proceeds directly to CompositeScoreQuery
CoOccurrenceField Propagation skipped; uses CompositeScoreQuery results directly
CyclicDecayField Push path skipped entirely; returns pull-path results only
ConfidenceField Competitive suppression skipped for non-selected candidates

This means ContextAssembler works with minimal models (just a DecayingSortedField) all the way up to full-featured models with all 12 primitives.

Design principles

These primitives follow Popoto's existing patterns:

  1. ORM primitives, not application logic. Popoto provides fields, mixins, hooks, and query methods. Domain-specific agent memory models are built on top by application developers.

  2. Redis-native everything. No external brokers, job queues, or Redis modules. Lua scripts, sorted sets, streams, Bloom filters (via SETBIT/GETBIT) — all within the Redis process. Works identically on Redis and Valkey.

  3. Composable. Each primitive is independently useful. Use DecayingSortedField alone for time-weighted ranking, add CyclicDecayField for temporal rhythms and urgency, or combine all twelve for a full cognitive memory system.

  4. Pipeline-safe. Every operation accepts an optional pipeline parameter for atomic execution, consistent with all Popoto field hooks.

  5. Centralized tuning. All ~24 behavioral constants are consolidated in Defaults, importable from the package root. Override globally or per-field — explicit kwargs always win.

from popoto import Defaults

# Global override — all DecayingSortedFields default to 0.7
Defaults.DECAY_RATE = 0.7

# Per-field override still wins
relevance = DecayingSortedField(decay_rate=0.3)  # uses 0.3, not 0.7

See Defaults API reference and Tuning Magic Numbers for the full constant table and benchmark-validated guidance.

Further reading