Skip to content

popoto.fields.embedding_field

popoto.fields.embedding_field

EmbeddingField: Automatic embedding generation and storage.

Generates vector embeddings from a source field on save, stores them as .npy files on the filesystem, and maintains an in-memory cache for fast similarity search at query time.

Design
  • on_save() reads the source field value, calls the provider to generate an embedding, and writes it as a .npy file.
  • The embedding cache is a class-level dict mapping model class names to pre-normalized numpy matrices for fast cosine similarity.
  • Cache is invalidated on save/delete within the same process, and across worker processes via the POPOTO_EMBEDDING_INVALIDATION mechanism (see EmbeddingField docstring: pubsub / mtime / none).
  • numpy is optional -- follows the DataFrameField pattern.
Example

class Memory(popoto.Model): topic = popoto.KeyField() content = ContentField() embedding = EmbeddingField(source="content")

m = Memory(topic="revenue", content="Revenue trends...") m.save() # embedding generated automatically via provider

EmbeddingField

Bases: Field

Field type for automatic embedding generation and storage.

Generates a vector embedding from a source field value on save, stores it as a .npy file, and maintains a cached numpy matrix for fast similarity computation.

Parameters:

Name Type Description Default
source str

Name of the field to read content from for embedding.

None
provider

An AbstractEmbeddingProvider instance, or None to use the globally configured default.

None
auto_embed bool

If True (default), generate embeddings automatically on save. If False, only embed on explicit calls.

True
cache bool

If True (default), cache embeddings in memory for fast similarity search. If False, load from disk per query.

True
**kwargs

Standard Field keyword arguments.

{}

Requires numpy: pip install popoto[embeddings]

Multi-worker cache invalidation

The embedding matrix is cached in a process-local dict. In a multi-worker deployment (gunicorn, multiple containers/pods) a write on one worker must invalidate peers' caches, or they serve stale semantic-search results. The POPOTO_EMBEDDING_INVALIDATION env var selects the mechanism:

=========== ================================================= ========================================== Mode Staleness window Notes =========== ================================================= ========================================== pubsub Valkey RTT + poll interval (~100 ms) Default. One daemon PubSubWorkerThread + (default) one Valkey connection per model class, plus one PUBLISH per save/delete. If the listener cannot start (Valkey down / ACL / pool exhaustion) it degrades to the on-disk _version check (never to the stale bug). mtime Next semantic_search() after the disk write No Valkey connection needed. Compares a monotonic integer _version in _index.json (mtime is only a cheap pre-check), so it is granularity-proof against same-tick writes on any filesystem. none Never invalidated across processes Pre-fix single-process behavior. Zero extra threads, connections, or os.stat() calls. =========== ================================================= ==========================================

The default pubsub mode is NOT byte-for-byte free for single-process apps (it starts a daemon thread, holds a connection, and PUBLISHes a self-loopback per write — the handler skips its own message via a per-process worker id). Single-process deployments wanting zero overhead should set POPOTO_EMBEDDING_INVALIDATION=none. Functional results are identical across all three modes for a single process.

Example

class Doc(popoto.Model): name = popoto.KeyField() content = ContentField() embedding = EmbeddingField(source="content")

Source code in src/popoto/fields/embedding_field.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
class EmbeddingField(Field):
    """Field type for automatic embedding generation and storage.

    Generates a vector embedding from a source field value on save,
    stores it as a .npy file, and maintains a cached numpy matrix
    for fast similarity computation.

    Args:
        source: Name of the field to read content from for embedding.
        provider: An AbstractEmbeddingProvider instance, or None to use
            the globally configured default.
        auto_embed: If True (default), generate embeddings automatically
            on save. If False, only embed on explicit calls.
        cache: If True (default), cache embeddings in memory for fast
            similarity search. If False, load from disk per query.
        **kwargs: Standard Field keyword arguments.

    Requires numpy: pip install popoto[embeddings]

    Multi-worker cache invalidation:
        The embedding matrix is cached in a process-local dict. In a
        multi-worker deployment (gunicorn, multiple containers/pods) a write
        on one worker must invalidate peers' caches, or they serve stale
        semantic-search results. The ``POPOTO_EMBEDDING_INVALIDATION`` env var
        selects the mechanism:

        ===========  =================================================  ==========================================
        Mode         Staleness window                                   Notes
        ===========  =================================================  ==========================================
        ``pubsub``   Valkey RTT + poll interval (~100 ms)               Default. One daemon PubSubWorkerThread +
        (default)                                                       one Valkey connection per model class, plus
                                                                        one PUBLISH per save/delete. If the listener
                                                                        cannot start (Valkey down / ACL / pool
                                                                        exhaustion) it degrades to the on-disk
                                                                        ``_version`` check (never to the stale bug).
        ``mtime``    Next ``semantic_search()`` after the disk write    No Valkey connection needed. Compares a
                                                                        monotonic integer ``_version`` in
                                                                        ``_index.json`` (mtime is only a cheap
                                                                        pre-check), so it is granularity-proof
                                                                        against same-tick writes on any filesystem.
        ``none``     Never invalidated across processes                 Pre-fix single-process behavior. Zero extra
                                                                        threads, connections, or os.stat() calls.
        ===========  =================================================  ==========================================

        The default ``pubsub`` mode is NOT byte-for-byte free for single-process
        apps (it starts a daemon thread, holds a connection, and PUBLISHes a
        self-loopback per write — the handler skips its own message via a
        per-process worker id). Single-process deployments wanting zero overhead
        should set ``POPOTO_EMBEDDING_INVALIDATION=none``. Functional results are
        identical across all three modes for a single process.

    Example:
        class Doc(popoto.Model):
            name = popoto.KeyField()
            content = ContentField()
            embedding = EmbeddingField(source="content")
    """

    def __init__(
        self,
        source: str = None,
        provider=None,
        auto_embed: bool = True,
        cache: bool = True,
        **kwargs,
    ):
        if not _numpy_available:
            raise ImportError(
                "numpy is required to use EmbeddingField. "
                "Install it with: pip install popoto[embeddings]"
            )
        kwargs.setdefault("type", int)  # Stores dimension count in Redis
        kwargs.setdefault("null", True)
        kwargs.setdefault("default", None)
        super().__init__(**kwargs)
        self.type = int  # Redis stores the dimension count
        self.source = source
        self._provider = provider
        self.auto_embed = auto_embed
        self.cache_enabled = cache

    @property
    def provider(self):
        """Get the embedding provider instance."""
        if self._provider is not None:
            return self._provider
        return get_default_provider()

    @classmethod
    def _embedding_path(cls, model_class_name: str, redis_key: str) -> str:
        """Return the .npy file path for a model instance's embedding.

        Uses SHA-256 hash of the Redis key to produce a fixed-length
        filename (64 hex chars + '.npy' = 68 bytes), avoiding the
        255-byte filesystem limit that hex encoding could exceed with
        long Redis keys.

        Since SHA-256 is one-way, a sidecar ``_index.json`` file maps
        hash filenames back to Redis keys for reverse lookup.
        """
        base = _get_embeddings_dir()
        hash_key = hashlib.sha256(redis_key.encode("utf-8")).hexdigest()
        return os.path.join(base, model_class_name, f"{hash_key}.npy")

    @classmethod
    def _legacy_embedding_path(cls, model_class_name: str, redis_key: str) -> str:
        """Return the legacy hex-encoded .npy file path for migration."""
        base = _get_embeddings_dir()
        hex_key = redis_key.encode("utf-8").hex()
        return os.path.join(base, model_class_name, f"{hex_key}.npy")

    @classmethod
    def on_save(
        cls,
        model_instance,
        field_name: str,
        field_value,
        pipeline=None,
        **kwargs,
    ):
        """Generate and store embedding on save.

        Reads the source field value, calls the provider to generate an
        embedding vector, saves it as a .npy file, and stores the
        dimension count in Redis.

        Args:
            model_instance: The Model instance being saved.
            field_name: Name of this field on the model.
            field_value: Current value (dimension count or None).
            pipeline: Redis pipeline for batched operations.
            **kwargs: Additional context.

        Returns:
            The pipeline.
        """
        field_instance = model_instance._meta.fields.get(field_name)
        if not isinstance(field_instance, EmbeddingField):
            return pipeline if pipeline else None

        if not field_instance.auto_embed:
            return pipeline if pipeline else None

        provider = field_instance.provider
        if provider is None:
            return pipeline if pipeline else None

        # Read source field content
        source_name = field_instance.source
        if source_name is None:
            return pipeline if pipeline else None

        source_value = getattr(model_instance, source_name, None)
        if source_value is None or source_value == "":
            return pipeline if pipeline else None

        # If source value is a $CF reference, we need the actual content
        # The ContentField descriptor should have already resolved it,
        # but if accessed via __dict__ it might still be a reference.
        if isinstance(source_value, str) and source_value.startswith("$CF:"):
            # Try to load via the content field's store
            source_field = model_instance._meta.fields.get(source_name)
            if hasattr(source_field, "store"):
                try:
                    content_bytes = source_field.store.load(source_value)
                    source_value = content_bytes.decode("utf-8")
                except Exception:
                    logger.warning(
                        f"Could not load content for embedding from {source_name}"
                    )
                    return pipeline if pipeline else None

        # Generate embedding
        try:
            vectors = provider.embed([source_value], input_type="document")
            if not vectors or not vectors[0]:
                return pipeline if pipeline else None
            embedding = vectors[0]
        except Exception as e:
            raise RuntimeError(
                f"Embedding provider failed for {field_name}: {e}"
            ) from e

        # Validate dimensions
        if len(embedding) != provider.dimensions:
            raise ValueError(
                f"Provider returned {len(embedding)} dimensions, "
                f"expected {provider.dimensions}"
            )

        # Save as .npy file
        vector = np.array(embedding, dtype=np.float32)
        redis_key = model_instance._redis_key or model_instance.db_key.redis_key
        npy_path = cls._embedding_path(model_instance.__class__.__name__, redis_key)

        directory = os.path.dirname(npy_path)
        os.makedirs(directory, exist_ok=True)

        # Atomic write: temp file + rename
        fd, tmp_path = tempfile.mkstemp(dir=directory, suffix=".npy")
        try:
            os.close(fd)
            np.save(tmp_path, vector)
            os.rename(tmp_path, npy_path)
        except Exception:
            if os.path.exists(tmp_path):
                os.unlink(tmp_path)
            raise

        # Update the _index.json sidecar with the hash-to-key mapping
        model_class_name = model_instance.__class__.__name__
        npy_filename = os.path.basename(npy_path)
        index = _read_index(model_class_name)
        index[npy_filename] = redis_key
        _write_index(model_class_name, index)

        # Migrate legacy hex-encoded file if it exists
        legacy_path = cls._legacy_embedding_path(model_class_name, redis_key)
        if legacy_path != npy_path and os.path.exists(legacy_path):
            try:
                os.unlink(legacy_path)
                # Remove legacy filename from index if present
                legacy_filename = os.path.basename(legacy_path)
                if legacy_filename in index:
                    del index[legacy_filename]
                    _write_index(model_class_name, index)
            except OSError:
                logger.warning(f"Failed to remove legacy embedding file {legacy_path}")

        # Store dimension count in Redis field
        dim_count = len(embedding)
        setattr(model_instance, field_name, dim_count)

        # Update Redis with dimension count
        import msgpack
        from ..redis_db import ENCODING

        encoded_dim = msgpack.packb(dim_count, use_bin_type=True)
        if pipeline:
            pipeline.hset(redis_key, field_name.encode(ENCODING), encoded_dim)
        else:
            from ..redis_db import POPOTO_REDIS_DB

            POPOTO_REDIS_DB.hset(redis_key, field_name.encode(ENCODING), encoded_dim)

        # Invalidate the in-memory cache for this model class
        invalidate_cache(model_instance.__class__.__name__)
        # Signal peer processes to invalidate their caches (pubsub mode).
        _publish_invalidation(model_instance.__class__.__name__)

        return pipeline if pipeline else None

    @classmethod
    def on_delete(
        cls,
        model_instance,
        field_name: str,
        field_value,
        pipeline=None,
        **kwargs,
    ):
        """Remove the embedding .npy file on delete."""
        model_class_name = model_instance.__class__.__name__
        redis_key = (
            kwargs.get("saved_redis_key")
            or model_instance._redis_key
            or model_instance.db_key.redis_key
        )
        npy_path = cls._embedding_path(model_class_name, redis_key)

        if os.path.exists(npy_path):
            os.unlink(npy_path)

        # Remove entry from _index.json sidecar
        npy_filename = os.path.basename(npy_path)
        index = _read_index(model_class_name)
        if npy_filename in index:
            del index[npy_filename]
            _write_index(model_class_name, index)

        invalidate_cache(model_class_name)
        # Signal peer processes to invalidate their caches (pubsub mode).
        _publish_invalidation(model_class_name)
        return pipeline if pipeline else None

    @classmethod
    def load_embeddings(cls, model_class) -> tuple:
        """Load all embeddings for a model class into a numpy matrix.

        Returns a pre-normalized matrix and corresponding Redis key list,
        using the in-memory cache when available.

        Args:
            model_class: The Model class to load embeddings for.

        Returns:
            Tuple of (matrix, keys) where matrix is a 2D numpy array
            of shape (N, dimensions) and keys is a list of Redis key
            strings. Returns (None, []) if no embeddings found.
        """
        if not _numpy_available:
            return None, []

        model_name = model_class.__name__

        # Ensure a cross-process invalidation listener is running (pubsub mode;
        # no-op in mtime/none mode or if already started).
        _start_invalidation_listener(model_name)

        # Cross-process staleness backstop: in mtime mode, or in pubsub mode
        # when no live listener exists (Valkey down / ACL / pool exhaustion),
        # confirm the cached entry against the on-disk _version counter. mtime
        # is a cheap "did anything change at all" pre-check; the integer
        # _version is authoritative (granularity-proof against same-tick writes).
        if model_name in _embedding_cache and _should_version_check(model_name):
            entry = _embedding_cache[model_name]
            try:
                current_mtime = os.stat(_index_path(model_name)).st_mtime
                if current_mtime != entry.get("index_mtime"):
                    disk_version = _read_index(model_name).get("_version", 0)
                    if disk_version != entry.get("index_version"):
                        _embedding_cache.pop(model_name, None)  # force reload
            except OSError:
                _embedding_cache.pop(model_name, None)  # index gone, reload

        # Check cache first
        if model_name in _embedding_cache:
            cached = _embedding_cache[model_name]
            return cached["matrix"], cached["keys"]

        # Load from disk
        emb_dir = os.path.join(_get_embeddings_dir(), model_name)
        if not os.path.isdir(emb_dir):
            return None, []

        vectors = []
        keys = []

        # Read the index for hash-to-key mapping
        index = _read_index(model_name)
        index_updated = False

        for filename in os.listdir(emb_dir):
            if not filename.endswith(".npy"):
                continue
            filepath = os.path.join(emb_dir, filename)
            try:
                vec = np.load(filepath)

                # Look up Redis key from the index first
                if filename in index:
                    redis_key = index[filename]
                else:
                    # Fallback: try legacy hex-decode for backward compatibility
                    hex_key = filename[:-4]  # strip .npy
                    try:
                        redis_key = bytes.fromhex(hex_key).decode("utf-8")
                        # Add to index for future lookups
                        index[filename] = redis_key
                        index_updated = True
                    except (ValueError, UnicodeDecodeError):
                        # Downgraded to DEBUG: orphan files are reconciled by
                        # garbage_collect / sweep_stale_tempfiles; logging at
                        # WARNING produced thousands of lines per query before
                        # cleanup. After reconciliation this should never fire
                        # on a clean corpus.
                        logger.debug(
                            f"Skipping unrecognized embedding file {filename}: "
                            "not in index and not a valid hex-encoded key"
                        )
                        continue

                vectors.append(vec.astype(np.float32))
                keys.append(redis_key)
            except Exception as e:
                logger.warning(f"Failed to load embedding {filepath}: {e}")
                continue

        # Persist any index updates from legacy fallback. This is a READ path —
        # pass bump_version=False so a pure read never advances _version (else
        # peers see a phantom write and reload; reload storm on orphan corpora).
        if index_updated:
            try:
                _write_index(model_name, index, bump_version=False)
            except Exception:
                logger.warning(f"Failed to update index for {model_name}")

        if not vectors:
            return None, []

        matrix = np.stack(vectors)

        # Pre-normalize for cosine similarity (dot product on unit vectors)
        norms = np.linalg.norm(matrix, axis=1, keepdims=True)
        norms = np.where(norms == 0, 1, norms)  # Avoid division by zero
        matrix = matrix / norms

        # Cache the result. Populate the staleness-check fields in all modes
        # except "none" so the pubsub backstop / mtime path can use them.
        entry = {"matrix": matrix, "keys": keys}
        if _INVALIDATION_MODE != "none":
            try:
                entry["index_mtime"] = os.stat(_index_path(model_name)).st_mtime
            except OSError:
                entry["index_mtime"] = None
            entry["index_version"] = _read_index(model_name).get("_version", 0)
        _embedding_cache[model_name] = entry

        return matrix, keys

    @classmethod
    def garbage_collect(cls, model_class, min_age_seconds: int = 300):
        """Remove orphaned .npy files not referenced by live model instances.

        Walks the on-disk embedding directory for ``model_class``, computes
        the set of expected-to-survive filenames via the shared
        :func:`_compute_expected_keep` helper, and unlinks any non-tempfile
        ``.npy`` whose name is not in that set AND whose mtime is older
        than ``min_age_seconds``. The mtime guard is the only protection
        against deleting files that a concurrent ``Memory.save()`` has
        landed on disk but not yet recorded in ``$Class:{Name}`` — the
        save path order is rename → ``_index.json`` mutation → Ollama embed
        call (network, possibly retried) → Redis ``hset`` of dimension
        count. During that window the file exists on disk but is in
        NEITHER ``_index.json`` NOR the class set; the mtime guard bounds
        the race.

        ``_index.json`` is reconciled in the same pass — entries whose
        filename is not in ``expected_keep`` are removed.

        Required class-level invariants for the caller:

        1. ``model_class`` is registered with Popoto (i.e.,
           ``$Class:{model_class.__name__}`` is the canonical live-record
           set; the legacy ``{Name}:_all`` key is empty in production
           and MUST NOT be used).
        2. ``EmbeddingField.on_save`` writes via SHA-256-hashed filenames
           (this is the contract enforced by ``_embedding_path``).
        3. The opt-in marker ``__embedding_garbage_collect__ = True`` is
           set on ``model_class``. Without it, this method is a no-op
           (returns 0) so that future Popoto consumers attaching an
           ``EmbeddingField`` cannot have their embeddings deleted by
           accident on a routine pull.

        Args:
            model_class: The Model class to garbage collect embeddings for.
            min_age_seconds: Mtime guard threshold. Files newer than this
                are skipped to avoid racing with concurrent saves. Default
                300 seconds (5 minutes), which covers Ollama timeout/retry
                pathologies.

        Returns:
            int: Number of orphaned files removed. Returns 0 if the
            opt-in marker is missing, the embedding directory does not
            exist, or no orphans are found.
        """
        # Opt-in marker: prevent accidental cross-model destruction
        if not getattr(model_class, "__embedding_garbage_collect__", False):
            logger.info(
                "garbage_collect called for %s; opt-in marker "
                "__embedding_garbage_collect__ not set — skipping",
                model_class.__name__,
            )
            return 0

        model_name = model_class.__name__
        emb_dir = os.path.join(_get_embeddings_dir(), model_name)
        if not os.path.isdir(emb_dir):
            return 0

        expected_keep = _compute_expected_keep(model_class)

        try:
            disk_files = os.listdir(emb_dir)
        except OSError as e:
            logger.warning("garbage_collect: listdir(%s) failed: %s", emb_dir, e)
            return 0

        now = time.time()
        removed_count = 0
        index = _read_index(model_name)
        index_dirty = False

        for filename in disk_files:
            if not filename.endswith(".npy"):
                continue
            if _TMP_NPY_RE.match(filename):
                # tempfiles handled separately by sweep_stale_tempfiles
                continue
            if filename in expected_keep:
                continue

            filepath = os.path.join(emb_dir, filename)

            # Mtime guard: skip recently-written files (race protection)
            try:
                mtime = os.stat(filepath).st_mtime
            except OSError:
                # File vanished between listdir and stat — treat as already gone
                continue
            if (now - mtime) < min_age_seconds:
                continue

            try:
                os.unlink(filepath)
                removed_count += 1
            except FileNotFoundError:
                # Concurrent delete — converged on the same end state
                pass
            except OSError as e:
                logger.warning(
                    "garbage_collect: unlink(%s) failed: %s — skipping",
                    filepath,
                    e,
                )
                continue

            # Drop the index entry if present
            if filename in index:
                del index[filename]
                index_dirty = True

        # Also drop index entries whose filename is no longer on disk and
        # whose redis_key is not a live record. We do this conservatively
        # by intersecting with expected_keep — never trusting _index.json
        # as a source of truth for "live".
        for filename in list(index.keys()):
            if filename not in expected_keep:
                # filename may already be removed above; only drop here if
                # it still exists in the index (idempotent).
                if filename in index:
                    del index[filename]
                    index_dirty = True

        if index_dirty:
            try:
                _write_index(model_name, index)
            except Exception as e:
                logger.warning("garbage_collect: failed to update _index.json: %s", e)

        if removed_count > 0:
            logger.info(
                "garbage_collect(%s): removed %d orphan .npy files",
                model_name,
                removed_count,
            )
        return removed_count

    @classmethod
    def sweep_stale_tempfiles(cls, model_class, max_age_seconds: int = 3600):
        """Remove ``tmp*.npy`` atomic-write tempfiles older than the cutoff.

        ``EmbeddingField.on_save`` uses ``tempfile.mkstemp`` + ``os.rename``
        for atomic writes. If a process crashes between ``mkstemp`` and
        ``rename``, the tempfile is leaked. Atomic writes complete in
        milliseconds; anything older than the cutoff is unambiguously a
        leaked file.

        Args:
            model_class: The Model class whose embedding directory should
                be swept.
            max_age_seconds: Tempfiles with mtime older than this many
                seconds are removed. Default 3600 (1 hour).

        Returns:
            int: Number of tempfiles removed.
        """
        model_name = model_class.__name__
        emb_dir = os.path.join(_get_embeddings_dir(), model_name)
        if not os.path.isdir(emb_dir):
            return 0

        try:
            disk_files = os.listdir(emb_dir)
        except OSError as e:
            logger.warning("sweep_stale_tempfiles: listdir(%s) failed: %s", emb_dir, e)
            return 0

        now = time.time()
        removed_count = 0

        for filename in disk_files:
            if not _TMP_NPY_RE.match(filename):
                continue
            filepath = os.path.join(emb_dir, filename)
            try:
                mtime = os.stat(filepath).st_mtime
            except OSError:
                continue
            if (now - mtime) < max_age_seconds:
                continue
            try:
                os.unlink(filepath)
                removed_count += 1
            except FileNotFoundError:
                pass
            except OSError as e:
                logger.warning(
                    "sweep_stale_tempfiles: unlink(%s) failed: %s — skipping",
                    filepath,
                    e,
                )

        if removed_count > 0:
            logger.info(
                "sweep_stale_tempfiles(%s): removed %d stale tempfiles",
                model_name,
                removed_count,
            )
        return removed_count

provider property

Get the embedding provider instance.

on_save(model_instance, field_name, field_value, pipeline=None, **kwargs) classmethod

Generate and store embedding on save.

Reads the source field value, calls the provider to generate an embedding vector, saves it as a .npy file, and stores the dimension count in Redis.

Parameters:

Name Type Description Default
model_instance

The Model instance being saved.

required
field_name str

Name of this field on the model.

required
field_value

Current value (dimension count or None).

required
pipeline

Redis pipeline for batched operations.

None
**kwargs

Additional context.

{}

Returns:

Type Description

The pipeline.

Source code in src/popoto/fields/embedding_field.py
@classmethod
def on_save(
    cls,
    model_instance,
    field_name: str,
    field_value,
    pipeline=None,
    **kwargs,
):
    """Generate and store embedding on save.

    Reads the source field value, calls the provider to generate an
    embedding vector, saves it as a .npy file, and stores the
    dimension count in Redis.

    Args:
        model_instance: The Model instance being saved.
        field_name: Name of this field on the model.
        field_value: Current value (dimension count or None).
        pipeline: Redis pipeline for batched operations.
        **kwargs: Additional context.

    Returns:
        The pipeline.
    """
    field_instance = model_instance._meta.fields.get(field_name)
    if not isinstance(field_instance, EmbeddingField):
        return pipeline if pipeline else None

    if not field_instance.auto_embed:
        return pipeline if pipeline else None

    provider = field_instance.provider
    if provider is None:
        return pipeline if pipeline else None

    # Read source field content
    source_name = field_instance.source
    if source_name is None:
        return pipeline if pipeline else None

    source_value = getattr(model_instance, source_name, None)
    if source_value is None or source_value == "":
        return pipeline if pipeline else None

    # If source value is a $CF reference, we need the actual content
    # The ContentField descriptor should have already resolved it,
    # but if accessed via __dict__ it might still be a reference.
    if isinstance(source_value, str) and source_value.startswith("$CF:"):
        # Try to load via the content field's store
        source_field = model_instance._meta.fields.get(source_name)
        if hasattr(source_field, "store"):
            try:
                content_bytes = source_field.store.load(source_value)
                source_value = content_bytes.decode("utf-8")
            except Exception:
                logger.warning(
                    f"Could not load content for embedding from {source_name}"
                )
                return pipeline if pipeline else None

    # Generate embedding
    try:
        vectors = provider.embed([source_value], input_type="document")
        if not vectors or not vectors[0]:
            return pipeline if pipeline else None
        embedding = vectors[0]
    except Exception as e:
        raise RuntimeError(
            f"Embedding provider failed for {field_name}: {e}"
        ) from e

    # Validate dimensions
    if len(embedding) != provider.dimensions:
        raise ValueError(
            f"Provider returned {len(embedding)} dimensions, "
            f"expected {provider.dimensions}"
        )

    # Save as .npy file
    vector = np.array(embedding, dtype=np.float32)
    redis_key = model_instance._redis_key or model_instance.db_key.redis_key
    npy_path = cls._embedding_path(model_instance.__class__.__name__, redis_key)

    directory = os.path.dirname(npy_path)
    os.makedirs(directory, exist_ok=True)

    # Atomic write: temp file + rename
    fd, tmp_path = tempfile.mkstemp(dir=directory, suffix=".npy")
    try:
        os.close(fd)
        np.save(tmp_path, vector)
        os.rename(tmp_path, npy_path)
    except Exception:
        if os.path.exists(tmp_path):
            os.unlink(tmp_path)
        raise

    # Update the _index.json sidecar with the hash-to-key mapping
    model_class_name = model_instance.__class__.__name__
    npy_filename = os.path.basename(npy_path)
    index = _read_index(model_class_name)
    index[npy_filename] = redis_key
    _write_index(model_class_name, index)

    # Migrate legacy hex-encoded file if it exists
    legacy_path = cls._legacy_embedding_path(model_class_name, redis_key)
    if legacy_path != npy_path and os.path.exists(legacy_path):
        try:
            os.unlink(legacy_path)
            # Remove legacy filename from index if present
            legacy_filename = os.path.basename(legacy_path)
            if legacy_filename in index:
                del index[legacy_filename]
                _write_index(model_class_name, index)
        except OSError:
            logger.warning(f"Failed to remove legacy embedding file {legacy_path}")

    # Store dimension count in Redis field
    dim_count = len(embedding)
    setattr(model_instance, field_name, dim_count)

    # Update Redis with dimension count
    import msgpack
    from ..redis_db import ENCODING

    encoded_dim = msgpack.packb(dim_count, use_bin_type=True)
    if pipeline:
        pipeline.hset(redis_key, field_name.encode(ENCODING), encoded_dim)
    else:
        from ..redis_db import POPOTO_REDIS_DB

        POPOTO_REDIS_DB.hset(redis_key, field_name.encode(ENCODING), encoded_dim)

    # Invalidate the in-memory cache for this model class
    invalidate_cache(model_instance.__class__.__name__)
    # Signal peer processes to invalidate their caches (pubsub mode).
    _publish_invalidation(model_instance.__class__.__name__)

    return pipeline if pipeline else None

on_delete(model_instance, field_name, field_value, pipeline=None, **kwargs) classmethod

Remove the embedding .npy file on delete.

Source code in src/popoto/fields/embedding_field.py
@classmethod
def on_delete(
    cls,
    model_instance,
    field_name: str,
    field_value,
    pipeline=None,
    **kwargs,
):
    """Remove the embedding .npy file on delete."""
    model_class_name = model_instance.__class__.__name__
    redis_key = (
        kwargs.get("saved_redis_key")
        or model_instance._redis_key
        or model_instance.db_key.redis_key
    )
    npy_path = cls._embedding_path(model_class_name, redis_key)

    if os.path.exists(npy_path):
        os.unlink(npy_path)

    # Remove entry from _index.json sidecar
    npy_filename = os.path.basename(npy_path)
    index = _read_index(model_class_name)
    if npy_filename in index:
        del index[npy_filename]
        _write_index(model_class_name, index)

    invalidate_cache(model_class_name)
    # Signal peer processes to invalidate their caches (pubsub mode).
    _publish_invalidation(model_class_name)
    return pipeline if pipeline else None

load_embeddings(model_class) classmethod

Load all embeddings for a model class into a numpy matrix.

Returns a pre-normalized matrix and corresponding Redis key list, using the in-memory cache when available.

Parameters:

Name Type Description Default
model_class

The Model class to load embeddings for.

required

Returns:

Type Description
tuple

Tuple of (matrix, keys) where matrix is a 2D numpy array

tuple

of shape (N, dimensions) and keys is a list of Redis key

tuple

strings. Returns (None, []) if no embeddings found.

Source code in src/popoto/fields/embedding_field.py
@classmethod
def load_embeddings(cls, model_class) -> tuple:
    """Load all embeddings for a model class into a numpy matrix.

    Returns a pre-normalized matrix and corresponding Redis key list,
    using the in-memory cache when available.

    Args:
        model_class: The Model class to load embeddings for.

    Returns:
        Tuple of (matrix, keys) where matrix is a 2D numpy array
        of shape (N, dimensions) and keys is a list of Redis key
        strings. Returns (None, []) if no embeddings found.
    """
    if not _numpy_available:
        return None, []

    model_name = model_class.__name__

    # Ensure a cross-process invalidation listener is running (pubsub mode;
    # no-op in mtime/none mode or if already started).
    _start_invalidation_listener(model_name)

    # Cross-process staleness backstop: in mtime mode, or in pubsub mode
    # when no live listener exists (Valkey down / ACL / pool exhaustion),
    # confirm the cached entry against the on-disk _version counter. mtime
    # is a cheap "did anything change at all" pre-check; the integer
    # _version is authoritative (granularity-proof against same-tick writes).
    if model_name in _embedding_cache and _should_version_check(model_name):
        entry = _embedding_cache[model_name]
        try:
            current_mtime = os.stat(_index_path(model_name)).st_mtime
            if current_mtime != entry.get("index_mtime"):
                disk_version = _read_index(model_name).get("_version", 0)
                if disk_version != entry.get("index_version"):
                    _embedding_cache.pop(model_name, None)  # force reload
        except OSError:
            _embedding_cache.pop(model_name, None)  # index gone, reload

    # Check cache first
    if model_name in _embedding_cache:
        cached = _embedding_cache[model_name]
        return cached["matrix"], cached["keys"]

    # Load from disk
    emb_dir = os.path.join(_get_embeddings_dir(), model_name)
    if not os.path.isdir(emb_dir):
        return None, []

    vectors = []
    keys = []

    # Read the index for hash-to-key mapping
    index = _read_index(model_name)
    index_updated = False

    for filename in os.listdir(emb_dir):
        if not filename.endswith(".npy"):
            continue
        filepath = os.path.join(emb_dir, filename)
        try:
            vec = np.load(filepath)

            # Look up Redis key from the index first
            if filename in index:
                redis_key = index[filename]
            else:
                # Fallback: try legacy hex-decode for backward compatibility
                hex_key = filename[:-4]  # strip .npy
                try:
                    redis_key = bytes.fromhex(hex_key).decode("utf-8")
                    # Add to index for future lookups
                    index[filename] = redis_key
                    index_updated = True
                except (ValueError, UnicodeDecodeError):
                    # Downgraded to DEBUG: orphan files are reconciled by
                    # garbage_collect / sweep_stale_tempfiles; logging at
                    # WARNING produced thousands of lines per query before
                    # cleanup. After reconciliation this should never fire
                    # on a clean corpus.
                    logger.debug(
                        f"Skipping unrecognized embedding file {filename}: "
                        "not in index and not a valid hex-encoded key"
                    )
                    continue

            vectors.append(vec.astype(np.float32))
            keys.append(redis_key)
        except Exception as e:
            logger.warning(f"Failed to load embedding {filepath}: {e}")
            continue

    # Persist any index updates from legacy fallback. This is a READ path —
    # pass bump_version=False so a pure read never advances _version (else
    # peers see a phantom write and reload; reload storm on orphan corpora).
    if index_updated:
        try:
            _write_index(model_name, index, bump_version=False)
        except Exception:
            logger.warning(f"Failed to update index for {model_name}")

    if not vectors:
        return None, []

    matrix = np.stack(vectors)

    # Pre-normalize for cosine similarity (dot product on unit vectors)
    norms = np.linalg.norm(matrix, axis=1, keepdims=True)
    norms = np.where(norms == 0, 1, norms)  # Avoid division by zero
    matrix = matrix / norms

    # Cache the result. Populate the staleness-check fields in all modes
    # except "none" so the pubsub backstop / mtime path can use them.
    entry = {"matrix": matrix, "keys": keys}
    if _INVALIDATION_MODE != "none":
        try:
            entry["index_mtime"] = os.stat(_index_path(model_name)).st_mtime
        except OSError:
            entry["index_mtime"] = None
        entry["index_version"] = _read_index(model_name).get("_version", 0)
    _embedding_cache[model_name] = entry

    return matrix, keys

garbage_collect(model_class, min_age_seconds=300) classmethod

Remove orphaned .npy files not referenced by live model instances.

Walks the on-disk embedding directory for model_class, computes the set of expected-to-survive filenames via the shared :func:_compute_expected_keep helper, and unlinks any non-tempfile .npy whose name is not in that set AND whose mtime is older than min_age_seconds. The mtime guard is the only protection against deleting files that a concurrent Memory.save() has landed on disk but not yet recorded in $Class:{Name} — the save path order is rename → _index.json mutation → Ollama embed call (network, possibly retried) → Redis hset of dimension count. During that window the file exists on disk but is in NEITHER _index.json NOR the class set; the mtime guard bounds the race.

_index.json is reconciled in the same pass — entries whose filename is not in expected_keep are removed.

Required class-level invariants for the caller:

  1. model_class is registered with Popoto (i.e., $Class:{model_class.__name__} is the canonical live-record set; the legacy {Name}:_all key is empty in production and MUST NOT be used).
  2. EmbeddingField.on_save writes via SHA-256-hashed filenames (this is the contract enforced by _embedding_path).
  3. The opt-in marker __embedding_garbage_collect__ = True is set on model_class. Without it, this method is a no-op (returns 0) so that future Popoto consumers attaching an EmbeddingField cannot have their embeddings deleted by accident on a routine pull.

Parameters:

Name Type Description Default
model_class

The Model class to garbage collect embeddings for.

required
min_age_seconds int

Mtime guard threshold. Files newer than this are skipped to avoid racing with concurrent saves. Default 300 seconds (5 minutes), which covers Ollama timeout/retry pathologies.

300

Returns:

Name Type Description
int

Number of orphaned files removed. Returns 0 if the

opt-in marker is missing, the embedding directory does not

exist, or no orphans are found.

Source code in src/popoto/fields/embedding_field.py
@classmethod
def garbage_collect(cls, model_class, min_age_seconds: int = 300):
    """Remove orphaned .npy files not referenced by live model instances.

    Walks the on-disk embedding directory for ``model_class``, computes
    the set of expected-to-survive filenames via the shared
    :func:`_compute_expected_keep` helper, and unlinks any non-tempfile
    ``.npy`` whose name is not in that set AND whose mtime is older
    than ``min_age_seconds``. The mtime guard is the only protection
    against deleting files that a concurrent ``Memory.save()`` has
    landed on disk but not yet recorded in ``$Class:{Name}`` — the
    save path order is rename → ``_index.json`` mutation → Ollama embed
    call (network, possibly retried) → Redis ``hset`` of dimension
    count. During that window the file exists on disk but is in
    NEITHER ``_index.json`` NOR the class set; the mtime guard bounds
    the race.

    ``_index.json`` is reconciled in the same pass — entries whose
    filename is not in ``expected_keep`` are removed.

    Required class-level invariants for the caller:

    1. ``model_class`` is registered with Popoto (i.e.,
       ``$Class:{model_class.__name__}`` is the canonical live-record
       set; the legacy ``{Name}:_all`` key is empty in production
       and MUST NOT be used).
    2. ``EmbeddingField.on_save`` writes via SHA-256-hashed filenames
       (this is the contract enforced by ``_embedding_path``).
    3. The opt-in marker ``__embedding_garbage_collect__ = True`` is
       set on ``model_class``. Without it, this method is a no-op
       (returns 0) so that future Popoto consumers attaching an
       ``EmbeddingField`` cannot have their embeddings deleted by
       accident on a routine pull.

    Args:
        model_class: The Model class to garbage collect embeddings for.
        min_age_seconds: Mtime guard threshold. Files newer than this
            are skipped to avoid racing with concurrent saves. Default
            300 seconds (5 minutes), which covers Ollama timeout/retry
            pathologies.

    Returns:
        int: Number of orphaned files removed. Returns 0 if the
        opt-in marker is missing, the embedding directory does not
        exist, or no orphans are found.
    """
    # Opt-in marker: prevent accidental cross-model destruction
    if not getattr(model_class, "__embedding_garbage_collect__", False):
        logger.info(
            "garbage_collect called for %s; opt-in marker "
            "__embedding_garbage_collect__ not set — skipping",
            model_class.__name__,
        )
        return 0

    model_name = model_class.__name__
    emb_dir = os.path.join(_get_embeddings_dir(), model_name)
    if not os.path.isdir(emb_dir):
        return 0

    expected_keep = _compute_expected_keep(model_class)

    try:
        disk_files = os.listdir(emb_dir)
    except OSError as e:
        logger.warning("garbage_collect: listdir(%s) failed: %s", emb_dir, e)
        return 0

    now = time.time()
    removed_count = 0
    index = _read_index(model_name)
    index_dirty = False

    for filename in disk_files:
        if not filename.endswith(".npy"):
            continue
        if _TMP_NPY_RE.match(filename):
            # tempfiles handled separately by sweep_stale_tempfiles
            continue
        if filename in expected_keep:
            continue

        filepath = os.path.join(emb_dir, filename)

        # Mtime guard: skip recently-written files (race protection)
        try:
            mtime = os.stat(filepath).st_mtime
        except OSError:
            # File vanished between listdir and stat — treat as already gone
            continue
        if (now - mtime) < min_age_seconds:
            continue

        try:
            os.unlink(filepath)
            removed_count += 1
        except FileNotFoundError:
            # Concurrent delete — converged on the same end state
            pass
        except OSError as e:
            logger.warning(
                "garbage_collect: unlink(%s) failed: %s — skipping",
                filepath,
                e,
            )
            continue

        # Drop the index entry if present
        if filename in index:
            del index[filename]
            index_dirty = True

    # Also drop index entries whose filename is no longer on disk and
    # whose redis_key is not a live record. We do this conservatively
    # by intersecting with expected_keep — never trusting _index.json
    # as a source of truth for "live".
    for filename in list(index.keys()):
        if filename not in expected_keep:
            # filename may already be removed above; only drop here if
            # it still exists in the index (idempotent).
            if filename in index:
                del index[filename]
                index_dirty = True

    if index_dirty:
        try:
            _write_index(model_name, index)
        except Exception as e:
            logger.warning("garbage_collect: failed to update _index.json: %s", e)

    if removed_count > 0:
        logger.info(
            "garbage_collect(%s): removed %d orphan .npy files",
            model_name,
            removed_count,
        )
    return removed_count

sweep_stale_tempfiles(model_class, max_age_seconds=3600) classmethod

Remove tmp*.npy atomic-write tempfiles older than the cutoff.

EmbeddingField.on_save uses tempfile.mkstemp + os.rename for atomic writes. If a process crashes between mkstemp and rename, the tempfile is leaked. Atomic writes complete in milliseconds; anything older than the cutoff is unambiguously a leaked file.

Parameters:

Name Type Description Default
model_class

The Model class whose embedding directory should be swept.

required
max_age_seconds int

Tempfiles with mtime older than this many seconds are removed. Default 3600 (1 hour).

3600

Returns:

Name Type Description
int

Number of tempfiles removed.

Source code in src/popoto/fields/embedding_field.py
@classmethod
def sweep_stale_tempfiles(cls, model_class, max_age_seconds: int = 3600):
    """Remove ``tmp*.npy`` atomic-write tempfiles older than the cutoff.

    ``EmbeddingField.on_save`` uses ``tempfile.mkstemp`` + ``os.rename``
    for atomic writes. If a process crashes between ``mkstemp`` and
    ``rename``, the tempfile is leaked. Atomic writes complete in
    milliseconds; anything older than the cutoff is unambiguously a
    leaked file.

    Args:
        model_class: The Model class whose embedding directory should
            be swept.
        max_age_seconds: Tempfiles with mtime older than this many
            seconds are removed. Default 3600 (1 hour).

    Returns:
        int: Number of tempfiles removed.
    """
    model_name = model_class.__name__
    emb_dir = os.path.join(_get_embeddings_dir(), model_name)
    if not os.path.isdir(emb_dir):
        return 0

    try:
        disk_files = os.listdir(emb_dir)
    except OSError as e:
        logger.warning("sweep_stale_tempfiles: listdir(%s) failed: %s", emb_dir, e)
        return 0

    now = time.time()
    removed_count = 0

    for filename in disk_files:
        if not _TMP_NPY_RE.match(filename):
            continue
        filepath = os.path.join(emb_dir, filename)
        try:
            mtime = os.stat(filepath).st_mtime
        except OSError:
            continue
        if (now - mtime) < max_age_seconds:
            continue
        try:
            os.unlink(filepath)
            removed_count += 1
        except FileNotFoundError:
            pass
        except OSError as e:
            logger.warning(
                "sweep_stale_tempfiles: unlink(%s) failed: %s — skipping",
                filepath,
                e,
            )

    if removed_count > 0:
        logger.info(
            "sweep_stale_tempfiles(%s): removed %d stale tempfiles",
            model_name,
            removed_count,
        )
    return removed_count

stop_invalidation_listeners()

Stop every registered PubSubWorkerThread and clear the registry.

Used for test teardown and graceful shutdown. Safe to call when no listeners are running. Each thread's pubsub connection is closed so it is returned to the connection pool rather than leaked.

Source code in src/popoto/fields/embedding_field.py
def stop_invalidation_listeners() -> None:
    """Stop every registered PubSubWorkerThread and clear the registry.

    Used for test teardown and graceful shutdown. Safe to call when no
    listeners are running. Each thread's pubsub connection is closed so it is
    returned to the connection pool rather than leaked.
    """
    for thread in list(_listener_threads.values()):
        _stop_listener_thread(thread)
    _listener_threads.clear()

get_default_provider()

Get the configured default embedding provider.

Source code in src/popoto/fields/embedding_field.py
def get_default_provider():
    """Get the configured default embedding provider."""
    return _default_embedding_provider

set_default_provider(provider)

Set the default embedding provider. Called by popoto.configure().

Source code in src/popoto/fields/embedding_field.py
def set_default_provider(provider):
    """Set the default embedding provider. Called by popoto.configure()."""
    global _default_embedding_provider
    _default_embedding_provider = provider

invalidate_cache(model_class_name=None)

Invalidate the embedding cache for a model class (or all).

Source code in src/popoto/fields/embedding_field.py
def invalidate_cache(model_class_name: str = None):
    """Invalidate the embedding cache for a model class (or all)."""
    if model_class_name:
        _embedding_cache.pop(model_class_name, None)
    else:
        _embedding_cache.clear()