Skip to content

popoto.fields.co_occurrence_field

popoto.fields.co_occurrence_field

CoOccurrenceField — weighted association edges with graph propagation.

Maintains weighted bidirectional (or unidirectional) edges between model instances using Redis sorted sets. Weights strengthen via co-retrieval and decay when not reinforced. BFS graph propagation with exponential weight decay per hop enables multi-hop associative retrieval.

Each CoOccurrenceField instance owns per-PK sorted sets

$CoOcF:{ClassName}:{field_name}:{pk} -> ZSET { target_pk: weight, ... }

When symmetric=True (default), link/strengthen/unlink operations mirror on both source and target sorted sets.

Example

class Memory(Model): key = UniqueKeyField() content = StringField() associations = CoOccurrenceField(symmetric=True, max_edges=100)

Create edges

CoOccurrenceField.link(Memory, "pk_a", "pk_b", initial_weight=0.1) CoOccurrenceField.strengthen(Memory, "pk_a", "pk_b", delta=0.05)

Query associations

linked = CoOccurrenceField.get_linked(Memory, "pk_a")

=> [("pk_b", 0.15)]

Multi-hop propagation

scores = CoOccurrenceField.propagate(Memory, ["pk_a"], depth=2)

=> {"pk_b": 0.5, "pk_c": 0.25}

CoOccurrenceField

Bases: Field

A field that maintains weighted association edges between model instances.

Uses per-PK Redis sorted sets to store weighted edges to other PKs. Supports symmetric (bidirectional) and asymmetric (unidirectional) modes.

Parameters:

Name Type Description Default
symmetric

If True, edges are bidirectional. Default True.

required
max_edges

Maximum edges per PK. Lowest-weight edges pruned when exceeded. Default 500.

required
decay_factor

Multiplicative decay factor for weaken_all(). Default 0.95.

required
Example

class Memory(Model): key = UniqueKeyField() content = StringField() associations = CoOccurrenceField(symmetric=True, max_edges=100)

Source code in src/popoto/fields/co_occurrence_field.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
class CoOccurrenceField(Field):
    """A field that maintains weighted association edges between model instances.

    Uses per-PK Redis sorted sets to store weighted edges to other PKs.
    Supports symmetric (bidirectional) and asymmetric (unidirectional) modes.

    Args:
        symmetric: If True, edges are bidirectional. Default True.
        max_edges: Maximum edges per PK. Lowest-weight edges pruned when exceeded.
            Default 500.
        decay_factor: Multiplicative decay factor for weaken_all(). Default 0.95.

    Example:
        class Memory(Model):
            key = UniqueKeyField()
            content = StringField()
            associations = CoOccurrenceField(symmetric=True, max_edges=100)
    """

    def __init__(self, **kwargs):
        self.symmetric = kwargs.pop("symmetric", True)
        self.max_edges = kwargs.pop("max_edges", 500)
        decay_factor = kwargs.pop("decay_factor", None)
        self.decay_factor = (
            decay_factor
            if decay_factor is not None
            else Defaults.CO_OCCURRENCE_DECAY_FACTOR
        )

        if self.max_edges < 1:
            from ..exceptions import ModelException

            raise ModelException(f"max_edges must be >= 1 (got {self.max_edges})")
        if not (0 <= self.decay_factor < 1):
            from ..exceptions import ModelException

            raise ModelException(
                f"decay_factor must be >= 0 and < 1 (got {self.decay_factor})"
            )

        # CoOccurrenceField stores no value on the model instance itself
        kwargs.setdefault("type", str)
        kwargs.setdefault("null", True)
        kwargs.setdefault("default", None)
        super().__init__(**kwargs)

    def get_edge_key(self, model_class, pk):
        """Build the Redis key for a PK's edge sorted set.

        Public API for external callers that need direct Redis access to
        a PK's edge set (e.g., bulk edge inspection, custom graph queries,
        monitoring). Prefer the higher-level methods (link, get_linked,
        propagate) for normal operations.

        Pattern: $CoOcF:{ClassName}:{field_name}:{pk}

        Args:
            model_class: The Model class (or instance).
            pk: The primary key string.

        Returns:
            str: The Redis key for this PK's edge sorted set.
        """
        base_key = self.get_special_use_field_db_key(model_class, self.name)
        return base_key.redis_key + ":" + str(pk)

    def get_edge_key_prefix(self, model_class):
        """Build the Redis key prefix for BFS propagation.

        Public API for external callers that need to scan or iterate over
        all edge sorted sets for a field (e.g., graph analytics, bulk cleanup).

        Pattern: $CoOcF:{ClassName}:{field_name}:

        Args:
            model_class: The Model class.

        Returns:
            str: The key prefix (ending with colon).
        """
        base_key = self.get_special_use_field_db_key(model_class, self.name)
        return base_key.redis_key + ":"

    def link(
        self,
        model_class,
        source_pk,
        target_pk,
        initial_weight=_UNSET,
        pipeline=None,
    ):
        """Create a weighted edge between two PKs.

        If symmetric=True, creates edges in both directions. Uses an atomic
        Lua script to handle ZADD + ZCARD + conditional pruning.

        Args:
            model_class: The Model class.
            source_pk: Source primary key string.
            target_pk: Target primary key string.
            initial_weight: Weight for the new edge. Default from
                ``Defaults.CO_OCCURRENCE_INITIAL_WEIGHT``.
            pipeline: Optional Redis pipeline (unused for Lua eval).

        Returns:
            float: The weight of the edge (existing weight if already linked).

        Raises:
            ValueError: If source_pk == target_pk (no self-loops).
        """
        if initial_weight is _UNSET:
            initial_weight = Defaults.CO_OCCURRENCE_INITIAL_WEIGHT
        source_pk = str(source_pk)
        target_pk = str(target_pk)

        if source_pk == target_pk:
            raise ValueError("Cannot link a PK to itself (no self-loops)")

        source_key = self.get_edge_key(model_class, source_pk)
        result = POPOTO_REDIS_DB.eval(
            LINK_WITH_PRUNE_LUA,
            1,
            source_key,
            target_pk,
            str(initial_weight),
            str(self.max_edges),
        )

        if self.symmetric:
            target_key = self.get_edge_key(model_class, target_pk)
            POPOTO_REDIS_DB.eval(
                LINK_WITH_PRUNE_LUA,
                1,
                target_key,
                source_pk,
                str(initial_weight),
                str(self.max_edges),
            )

        return float(result)

    def strengthen(
        self,
        model_class,
        source_pk,
        target_pk,
        delta=0.05,
        pipeline=None,
    ):
        """Increase the weight of an existing edge.

        Uses ZINCRBY to atomically increment the edge weight.

        Args:
            model_class: The Model class.
            source_pk: Source primary key string.
            target_pk: Target primary key string.
            delta: Amount to increase weight by. Must be > 0. Default 0.05.
            pipeline: Optional Redis pipeline.

        Returns:
            float: The new weight after increment.

        Raises:
            ValueError: If delta <= 0.
        """
        if delta <= 0:
            raise ValueError(f"delta must be > 0 (got {delta})")

        source_pk = str(source_pk)
        target_pk = str(target_pk)

        source_key = self.get_edge_key(model_class, source_pk)
        db = pipeline if pipeline else POPOTO_REDIS_DB
        new_weight = db.zincrby(source_key, delta, target_pk)

        if self.symmetric:
            target_key = self.get_edge_key(model_class, target_pk)
            db.zincrby(target_key, delta, source_pk)

        # EventStreamMixin: log strengthen event
        from .event_stream import EventStreamMixin

        if not pipeline and issubclass(model_class, EventStreamMixin):
            try:
                import time

                stream_name = getattr(
                    model_class, "_stream_name", EventStreamMixin._stream_name
                )
                max_length = getattr(
                    model_class,
                    "_stream_max_length",
                    EventStreamMixin._stream_max_length,
                )
                stream_key = f"stream:{stream_name}"
                partition_field = getattr(model_class, "_stream_partition_field", None)
                if partition_field:
                    stream_key = f"{stream_key}:{source_pk}"
                entry = {
                    "model": model_class.__name__,
                    "pk": str(source_pk),
                    "op": "strengthen",
                    "ts": str(time.time()),
                    "changed_fields": "",
                    "source_pk": str(source_pk),
                    "target_pk": str(target_pk),
                    "delta": str(delta),
                }
                POPOTO_REDIS_DB.xadd(
                    stream_key, entry, maxlen=max_length, approximate=True
                )
            except Exception:
                pass  # Best-effort, don't block strengthen

        if pipeline:
            return None  # Pipeline defers execution
        return float(new_weight)

    def unlink(self, model_class, source_pk, target_pk, pipeline=None):
        """Remove an edge between two PKs.

        If symmetric=True, removes edges in both directions.

        Args:
            model_class: The Model class.
            source_pk: Source primary key string.
            target_pk: Target primary key string.
            pipeline: Optional Redis pipeline.
        """
        source_pk = str(source_pk)
        target_pk = str(target_pk)

        source_key = self.get_edge_key(model_class, source_pk)
        db = pipeline if pipeline else POPOTO_REDIS_DB
        db.zrem(source_key, target_pk)

        if self.symmetric:
            target_key = self.get_edge_key(model_class, target_pk)
            db.zrem(target_key, source_pk)

    def weaken_all(self, model_class, pk, factor=None, pipeline=None):
        """Multiplicatively decay all edge weights for a PK.

        Edges that fall below a threshold (factor * 0.01) after weakening
        are automatically pruned.

        Args:
            model_class: The Model class.
            pk: Primary key whose edges to weaken.
            factor: Decay factor (0 < factor < 1). Defaults to self.decay_factor.
                factor=0 removes all edges.
            pipeline: Optional Redis pipeline (unused for Lua eval).

        Returns:
            int: Number of edges removed by pruning.

        Raises:
            ValueError: If factor > 1 or factor < 0.
        """
        if factor is None:
            factor = self.decay_factor

        if factor < 0 or factor > 1:
            raise ValueError(f"factor must be between 0 and 1 inclusive (got {factor})")

        pk = str(pk)
        edge_key = self.get_edge_key(model_class, pk)

        if factor == 0:
            # Special case: remove all edges
            count = POPOTO_REDIS_DB.zcard(edge_key)
            POPOTO_REDIS_DB.delete(edge_key)
            return int(count)

        # Use threshold of 0.001 for pruning
        threshold = 0.001
        result = POPOTO_REDIS_DB.eval(
            WEAKEN_ALL_LUA,
            1,
            edge_key,
            str(factor),
            str(threshold),
        )
        return int(result)

    def get_linked(self, model_class, pk, min_weight=0.01, limit=20):
        """Get linked PKs sorted by weight descending.

        Args:
            model_class: The Model class.
            pk: Primary key to get links for.
            min_weight: Minimum weight threshold. Default 0.01.
            limit: Maximum number of results. Default 20.

        Returns:
            list[tuple[str, float]]: List of (pk, weight) tuples,
                sorted by weight descending.
        """
        pk = str(pk)
        edge_key = self.get_edge_key(model_class, pk)

        # ZREVRANGEBYSCORE: highest to lowest, with score filter
        results = POPOTO_REDIS_DB.zrevrangebyscore(
            edge_key,
            "+inf",
            str(min_weight),
            start=0,
            num=limit,
            withscores=True,
        )

        return [
            (
                member.decode("utf-8") if isinstance(member, bytes) else member,
                float(score),
            )
            for member, score in results
        ]

    def propagate(
        self,
        model_class,
        seed_pks,
        depth=2,
        decay_per_hop=_UNSET,
        threshold=0.01,
    ):
        """BFS graph propagation with exponential weight decay per hop.

        Traverses edges starting from seed PKs, applying multiplicative
        decay at each hop. When the same PK is reached via multiple paths,
        the maximum weight is kept.

        Args:
            model_class: The Model class.
            seed_pks: List of starting primary keys.
            depth: Maximum BFS depth. Default 2. depth=0 returns seeds only.
            decay_per_hop: Weight multiplier per hop. Default from
                ``Defaults.CO_OCCURRENCE_DECAY_PER_HOP``.
            threshold: Minimum propagated weight to continue. Default 0.01.

        Returns:
            dict[str, float]: Mapping of discovered PKs to their propagated
                weights. Seeds are not included in results (except for depth=0).
        """
        if decay_per_hop is _UNSET:
            decay_per_hop = Defaults.CO_OCCURRENCE_DECAY_PER_HOP
        if not seed_pks:
            return {}

        seed_pks = [str(pk) for pk in seed_pks]

        if depth == 0:
            return {pk: 1.0 for pk in seed_pks}

        key_prefix = self.get_edge_key_prefix(model_class)

        result = POPOTO_REDIS_DB.eval(
            PROPAGATE_BFS_LUA,
            1,
            key_prefix,
            json.dumps(seed_pks),
            str(depth),
            str(decay_per_hop),
            str(threshold),
            str(self.max_edges),
        )

        # Parse flat array [pk1, weight1, pk2, weight2, ...]
        scores = {}
        if result:
            for i in range(0, len(result), 2):
                pk = result[i]
                if isinstance(pk, bytes):
                    pk = pk.decode("utf-8")
                weight = float(result[i + 1])
                scores[pk] = weight

        return scores

    @classmethod
    def on_delete(
        cls,
        model_instance,
        field_name,
        field_value,
        pipeline=None,
        **kwargs,
    ):
        """Clean up edges when a model instance is deleted.

        Removes the instance's own edge sorted set AND removes it from
        all other instances' edge sorted sets via SCAN + ZREM.

        Args:
            model_instance: The Model instance being deleted.
            field_name: Name of this CoOccurrenceField.
            field_value: Current field value (unused).
            pipeline: Optional Redis pipeline.
            **kwargs: Additional context.
        """
        field = model_instance._meta.fields.get(field_name)
        if not isinstance(field, CoOccurrenceField):
            return super().on_delete(
                model_instance, field_name, field_value, pipeline=pipeline, **kwargs
            )

        # Get this instance's PK from its redis key
        member_key = kwargs.get("saved_redis_key") or model_instance.db_key.redis_key

        # Get the edge key for this instance
        edge_key = field.get_edge_key(model_instance, member_key)

        if field.symmetric:
            # Get all linked PKs so we can remove reverse edges
            linked = POPOTO_REDIS_DB.zrange(edge_key, 0, -1)
            for target_pk in linked:
                if isinstance(target_pk, bytes):
                    target_pk = target_pk.decode("utf-8")
                target_edge_key = field.get_edge_key(model_instance, target_pk)
                if pipeline:
                    pipeline.zrem(target_edge_key, member_key)
                else:
                    POPOTO_REDIS_DB.zrem(target_edge_key, member_key)

        # Delete this instance's edge sorted set
        if pipeline:
            pipeline.delete(edge_key)
        else:
            POPOTO_REDIS_DB.delete(edge_key)

        return super().on_delete(
            model_instance, field_name, field_value, pipeline=pipeline, **kwargs
        )

get_edge_key(model_class, pk)

Build the Redis key for a PK's edge sorted set.

Public API for external callers that need direct Redis access to a PK's edge set (e.g., bulk edge inspection, custom graph queries, monitoring). Prefer the higher-level methods (link, get_linked, propagate) for normal operations.

Pattern: $CoOcF:{ClassName}:{field_name}:{pk}

Parameters:

Name Type Description Default
model_class

The Model class (or instance).

required
pk

The primary key string.

required

Returns:

Name Type Description
str

The Redis key for this PK's edge sorted set.

Source code in src/popoto/fields/co_occurrence_field.py
def get_edge_key(self, model_class, pk):
    """Build the Redis key for a PK's edge sorted set.

    Public API for external callers that need direct Redis access to
    a PK's edge set (e.g., bulk edge inspection, custom graph queries,
    monitoring). Prefer the higher-level methods (link, get_linked,
    propagate) for normal operations.

    Pattern: $CoOcF:{ClassName}:{field_name}:{pk}

    Args:
        model_class: The Model class (or instance).
        pk: The primary key string.

    Returns:
        str: The Redis key for this PK's edge sorted set.
    """
    base_key = self.get_special_use_field_db_key(model_class, self.name)
    return base_key.redis_key + ":" + str(pk)

get_edge_key_prefix(model_class)

Build the Redis key prefix for BFS propagation.

Public API for external callers that need to scan or iterate over all edge sorted sets for a field (e.g., graph analytics, bulk cleanup).

Pattern: $CoOcF:{ClassName}:{field_name}:

Parameters:

Name Type Description Default
model_class

The Model class.

required

Returns:

Name Type Description
str

The key prefix (ending with colon).

Source code in src/popoto/fields/co_occurrence_field.py
def get_edge_key_prefix(self, model_class):
    """Build the Redis key prefix for BFS propagation.

    Public API for external callers that need to scan or iterate over
    all edge sorted sets for a field (e.g., graph analytics, bulk cleanup).

    Pattern: $CoOcF:{ClassName}:{field_name}:

    Args:
        model_class: The Model class.

    Returns:
        str: The key prefix (ending with colon).
    """
    base_key = self.get_special_use_field_db_key(model_class, self.name)
    return base_key.redis_key + ":"

Create a weighted edge between two PKs.

If symmetric=True, creates edges in both directions. Uses an atomic Lua script to handle ZADD + ZCARD + conditional pruning.

Parameters:

Name Type Description Default
model_class

The Model class.

required
source_pk

Source primary key string.

required
target_pk

Target primary key string.

required
initial_weight

Weight for the new edge. Default from Defaults.CO_OCCURRENCE_INITIAL_WEIGHT.

_UNSET
pipeline

Optional Redis pipeline (unused for Lua eval).

None

Returns:

Name Type Description
float

The weight of the edge (existing weight if already linked).

Raises:

Type Description
ValueError

If source_pk == target_pk (no self-loops).

Source code in src/popoto/fields/co_occurrence_field.py
def link(
    self,
    model_class,
    source_pk,
    target_pk,
    initial_weight=_UNSET,
    pipeline=None,
):
    """Create a weighted edge between two PKs.

    If symmetric=True, creates edges in both directions. Uses an atomic
    Lua script to handle ZADD + ZCARD + conditional pruning.

    Args:
        model_class: The Model class.
        source_pk: Source primary key string.
        target_pk: Target primary key string.
        initial_weight: Weight for the new edge. Default from
            ``Defaults.CO_OCCURRENCE_INITIAL_WEIGHT``.
        pipeline: Optional Redis pipeline (unused for Lua eval).

    Returns:
        float: The weight of the edge (existing weight if already linked).

    Raises:
        ValueError: If source_pk == target_pk (no self-loops).
    """
    if initial_weight is _UNSET:
        initial_weight = Defaults.CO_OCCURRENCE_INITIAL_WEIGHT
    source_pk = str(source_pk)
    target_pk = str(target_pk)

    if source_pk == target_pk:
        raise ValueError("Cannot link a PK to itself (no self-loops)")

    source_key = self.get_edge_key(model_class, source_pk)
    result = POPOTO_REDIS_DB.eval(
        LINK_WITH_PRUNE_LUA,
        1,
        source_key,
        target_pk,
        str(initial_weight),
        str(self.max_edges),
    )

    if self.symmetric:
        target_key = self.get_edge_key(model_class, target_pk)
        POPOTO_REDIS_DB.eval(
            LINK_WITH_PRUNE_LUA,
            1,
            target_key,
            source_pk,
            str(initial_weight),
            str(self.max_edges),
        )

    return float(result)

strengthen(model_class, source_pk, target_pk, delta=0.05, pipeline=None)

Increase the weight of an existing edge.

Uses ZINCRBY to atomically increment the edge weight.

Parameters:

Name Type Description Default
model_class

The Model class.

required
source_pk

Source primary key string.

required
target_pk

Target primary key string.

required
delta

Amount to increase weight by. Must be > 0. Default 0.05.

0.05
pipeline

Optional Redis pipeline.

None

Returns:

Name Type Description
float

The new weight after increment.

Raises:

Type Description
ValueError

If delta <= 0.

Source code in src/popoto/fields/co_occurrence_field.py
def strengthen(
    self,
    model_class,
    source_pk,
    target_pk,
    delta=0.05,
    pipeline=None,
):
    """Increase the weight of an existing edge.

    Uses ZINCRBY to atomically increment the edge weight.

    Args:
        model_class: The Model class.
        source_pk: Source primary key string.
        target_pk: Target primary key string.
        delta: Amount to increase weight by. Must be > 0. Default 0.05.
        pipeline: Optional Redis pipeline.

    Returns:
        float: The new weight after increment.

    Raises:
        ValueError: If delta <= 0.
    """
    if delta <= 0:
        raise ValueError(f"delta must be > 0 (got {delta})")

    source_pk = str(source_pk)
    target_pk = str(target_pk)

    source_key = self.get_edge_key(model_class, source_pk)
    db = pipeline if pipeline else POPOTO_REDIS_DB
    new_weight = db.zincrby(source_key, delta, target_pk)

    if self.symmetric:
        target_key = self.get_edge_key(model_class, target_pk)
        db.zincrby(target_key, delta, source_pk)

    # EventStreamMixin: log strengthen event
    from .event_stream import EventStreamMixin

    if not pipeline and issubclass(model_class, EventStreamMixin):
        try:
            import time

            stream_name = getattr(
                model_class, "_stream_name", EventStreamMixin._stream_name
            )
            max_length = getattr(
                model_class,
                "_stream_max_length",
                EventStreamMixin._stream_max_length,
            )
            stream_key = f"stream:{stream_name}"
            partition_field = getattr(model_class, "_stream_partition_field", None)
            if partition_field:
                stream_key = f"{stream_key}:{source_pk}"
            entry = {
                "model": model_class.__name__,
                "pk": str(source_pk),
                "op": "strengthen",
                "ts": str(time.time()),
                "changed_fields": "",
                "source_pk": str(source_pk),
                "target_pk": str(target_pk),
                "delta": str(delta),
            }
            POPOTO_REDIS_DB.xadd(
                stream_key, entry, maxlen=max_length, approximate=True
            )
        except Exception:
            pass  # Best-effort, don't block strengthen

    if pipeline:
        return None  # Pipeline defers execution
    return float(new_weight)

Remove an edge between two PKs.

If symmetric=True, removes edges in both directions.

Parameters:

Name Type Description Default
model_class

The Model class.

required
source_pk

Source primary key string.

required
target_pk

Target primary key string.

required
pipeline

Optional Redis pipeline.

None
Source code in src/popoto/fields/co_occurrence_field.py
def unlink(self, model_class, source_pk, target_pk, pipeline=None):
    """Remove an edge between two PKs.

    If symmetric=True, removes edges in both directions.

    Args:
        model_class: The Model class.
        source_pk: Source primary key string.
        target_pk: Target primary key string.
        pipeline: Optional Redis pipeline.
    """
    source_pk = str(source_pk)
    target_pk = str(target_pk)

    source_key = self.get_edge_key(model_class, source_pk)
    db = pipeline if pipeline else POPOTO_REDIS_DB
    db.zrem(source_key, target_pk)

    if self.symmetric:
        target_key = self.get_edge_key(model_class, target_pk)
        db.zrem(target_key, source_pk)

weaken_all(model_class, pk, factor=None, pipeline=None)

Multiplicatively decay all edge weights for a PK.

Edges that fall below a threshold (factor * 0.01) after weakening are automatically pruned.

Parameters:

Name Type Description Default
model_class

The Model class.

required
pk

Primary key whose edges to weaken.

required
factor

Decay factor (0 < factor < 1). Defaults to self.decay_factor. factor=0 removes all edges.

None
pipeline

Optional Redis pipeline (unused for Lua eval).

None

Returns:

Name Type Description
int

Number of edges removed by pruning.

Raises:

Type Description
ValueError

If factor > 1 or factor < 0.

Source code in src/popoto/fields/co_occurrence_field.py
def weaken_all(self, model_class, pk, factor=None, pipeline=None):
    """Multiplicatively decay all edge weights for a PK.

    Edges that fall below a threshold (factor * 0.01) after weakening
    are automatically pruned.

    Args:
        model_class: The Model class.
        pk: Primary key whose edges to weaken.
        factor: Decay factor (0 < factor < 1). Defaults to self.decay_factor.
            factor=0 removes all edges.
        pipeline: Optional Redis pipeline (unused for Lua eval).

    Returns:
        int: Number of edges removed by pruning.

    Raises:
        ValueError: If factor > 1 or factor < 0.
    """
    if factor is None:
        factor = self.decay_factor

    if factor < 0 or factor > 1:
        raise ValueError(f"factor must be between 0 and 1 inclusive (got {factor})")

    pk = str(pk)
    edge_key = self.get_edge_key(model_class, pk)

    if factor == 0:
        # Special case: remove all edges
        count = POPOTO_REDIS_DB.zcard(edge_key)
        POPOTO_REDIS_DB.delete(edge_key)
        return int(count)

    # Use threshold of 0.001 for pruning
    threshold = 0.001
    result = POPOTO_REDIS_DB.eval(
        WEAKEN_ALL_LUA,
        1,
        edge_key,
        str(factor),
        str(threshold),
    )
    return int(result)

get_linked(model_class, pk, min_weight=0.01, limit=20)

Get linked PKs sorted by weight descending.

Parameters:

Name Type Description Default
model_class

The Model class.

required
pk

Primary key to get links for.

required
min_weight

Minimum weight threshold. Default 0.01.

0.01
limit

Maximum number of results. Default 20.

20

Returns:

Type Description

list[tuple[str, float]]: List of (pk, weight) tuples, sorted by weight descending.

Source code in src/popoto/fields/co_occurrence_field.py
def get_linked(self, model_class, pk, min_weight=0.01, limit=20):
    """Get linked PKs sorted by weight descending.

    Args:
        model_class: The Model class.
        pk: Primary key to get links for.
        min_weight: Minimum weight threshold. Default 0.01.
        limit: Maximum number of results. Default 20.

    Returns:
        list[tuple[str, float]]: List of (pk, weight) tuples,
            sorted by weight descending.
    """
    pk = str(pk)
    edge_key = self.get_edge_key(model_class, pk)

    # ZREVRANGEBYSCORE: highest to lowest, with score filter
    results = POPOTO_REDIS_DB.zrevrangebyscore(
        edge_key,
        "+inf",
        str(min_weight),
        start=0,
        num=limit,
        withscores=True,
    )

    return [
        (
            member.decode("utf-8") if isinstance(member, bytes) else member,
            float(score),
        )
        for member, score in results
    ]

propagate(model_class, seed_pks, depth=2, decay_per_hop=_UNSET, threshold=0.01)

BFS graph propagation with exponential weight decay per hop.

Traverses edges starting from seed PKs, applying multiplicative decay at each hop. When the same PK is reached via multiple paths, the maximum weight is kept.

Parameters:

Name Type Description Default
model_class

The Model class.

required
seed_pks

List of starting primary keys.

required
depth

Maximum BFS depth. Default 2. depth=0 returns seeds only.

2
decay_per_hop

Weight multiplier per hop. Default from Defaults.CO_OCCURRENCE_DECAY_PER_HOP.

_UNSET
threshold

Minimum propagated weight to continue. Default 0.01.

0.01

Returns:

Type Description

dict[str, float]: Mapping of discovered PKs to their propagated weights. Seeds are not included in results (except for depth=0).

Source code in src/popoto/fields/co_occurrence_field.py
def propagate(
    self,
    model_class,
    seed_pks,
    depth=2,
    decay_per_hop=_UNSET,
    threshold=0.01,
):
    """BFS graph propagation with exponential weight decay per hop.

    Traverses edges starting from seed PKs, applying multiplicative
    decay at each hop. When the same PK is reached via multiple paths,
    the maximum weight is kept.

    Args:
        model_class: The Model class.
        seed_pks: List of starting primary keys.
        depth: Maximum BFS depth. Default 2. depth=0 returns seeds only.
        decay_per_hop: Weight multiplier per hop. Default from
            ``Defaults.CO_OCCURRENCE_DECAY_PER_HOP``.
        threshold: Minimum propagated weight to continue. Default 0.01.

    Returns:
        dict[str, float]: Mapping of discovered PKs to their propagated
            weights. Seeds are not included in results (except for depth=0).
    """
    if decay_per_hop is _UNSET:
        decay_per_hop = Defaults.CO_OCCURRENCE_DECAY_PER_HOP
    if not seed_pks:
        return {}

    seed_pks = [str(pk) for pk in seed_pks]

    if depth == 0:
        return {pk: 1.0 for pk in seed_pks}

    key_prefix = self.get_edge_key_prefix(model_class)

    result = POPOTO_REDIS_DB.eval(
        PROPAGATE_BFS_LUA,
        1,
        key_prefix,
        json.dumps(seed_pks),
        str(depth),
        str(decay_per_hop),
        str(threshold),
        str(self.max_edges),
    )

    # Parse flat array [pk1, weight1, pk2, weight2, ...]
    scores = {}
    if result:
        for i in range(0, len(result), 2):
            pk = result[i]
            if isinstance(pk, bytes):
                pk = pk.decode("utf-8")
            weight = float(result[i + 1])
            scores[pk] = weight

    return scores

on_delete(model_instance, field_name, field_value, pipeline=None, **kwargs) classmethod

Clean up edges when a model instance is deleted.

Removes the instance's own edge sorted set AND removes it from all other instances' edge sorted sets via SCAN + ZREM.

Parameters:

Name Type Description Default
model_instance

The Model instance being deleted.

required
field_name

Name of this CoOccurrenceField.

required
field_value

Current field value (unused).

required
pipeline

Optional Redis pipeline.

None
**kwargs

Additional context.

{}
Source code in src/popoto/fields/co_occurrence_field.py
@classmethod
def on_delete(
    cls,
    model_instance,
    field_name,
    field_value,
    pipeline=None,
    **kwargs,
):
    """Clean up edges when a model instance is deleted.

    Removes the instance's own edge sorted set AND removes it from
    all other instances' edge sorted sets via SCAN + ZREM.

    Args:
        model_instance: The Model instance being deleted.
        field_name: Name of this CoOccurrenceField.
        field_value: Current field value (unused).
        pipeline: Optional Redis pipeline.
        **kwargs: Additional context.
    """
    field = model_instance._meta.fields.get(field_name)
    if not isinstance(field, CoOccurrenceField):
        return super().on_delete(
            model_instance, field_name, field_value, pipeline=pipeline, **kwargs
        )

    # Get this instance's PK from its redis key
    member_key = kwargs.get("saved_redis_key") or model_instance.db_key.redis_key

    # Get the edge key for this instance
    edge_key = field.get_edge_key(model_instance, member_key)

    if field.symmetric:
        # Get all linked PKs so we can remove reverse edges
        linked = POPOTO_REDIS_DB.zrange(edge_key, 0, -1)
        for target_pk in linked:
            if isinstance(target_pk, bytes):
                target_pk = target_pk.decode("utf-8")
            target_edge_key = field.get_edge_key(model_instance, target_pk)
            if pipeline:
                pipeline.zrem(target_edge_key, member_key)
            else:
                POPOTO_REDIS_DB.zrem(target_edge_key, member_key)

    # Delete this instance's edge sorted set
    if pipeline:
        pipeline.delete(edge_key)
    else:
        POPOTO_REDIS_DB.delete(edge_key)

    return super().on_delete(
        model_instance, field_name, field_value, pipeline=pipeline, **kwargs
    )