Skip to content

popoto.recipes.context_assembler

popoto.recipes.context_assembler

ContextAssembler — Retrieval-to-injection bridge with token budgets.

A capstone recipe composing all shipped Popoto memory primitives into a single assemble() call. Orchestrates pull-path (query-driven) and push-path (proactive surfacing) retrieval, applies token budgets, and formats output for LLM context injection.

Metacognitive extensions (opt-in, off-by-default):

  • RetrievalQuality dataclass surfaces avg confidence, score spread, feeling-of-knowing (FOK), and staleness for the retrieval.
  • ContextAssembler.assess(query_cues) — pre-retrieval FOK probe.
  • ContextAssembler.assemble(..., assess_quality=True) — attaches a RetrievalQuality to AssemblyResult.metadata["quality"] without changing the default behavior.
Pipeline

Pull path: ExistenceFilter pre-check → CompositeScoreQuery → CoOccurrence propagation Push path: CyclicDecayField temporal scan above surfacing threshold Merge: Deduplicate, re-rank, budget-select, post-effects, format

Synergy with Popoto Primitives

┌────────────────────────┬───────────────────────────────────────┐ │ Primitive │ Role in ContextAssembler │ ├────────────────────────┼───────────────────────────────────────┤ │ DecayingSortedField │ Score index for CompositeScoreQuery │ │ CyclicDecayField │ Push-path proactive surfacing │ │ ConfidenceField │ Score index + competitive suppression │ │ CoOccurrenceField │ Pull-path candidate expansion │ │ ExistenceFilter │ Pull-path pre-check (skip if absent) │ │ AccessTrackerMixin │ on_read post-effect tracking │ │ ObservationProtocol │ on_read / on_surfaced dispatch │ │ RecallProposal │ Created for push-path records │ │ WriteFilterMixin │ Priority score in composite │ │ EventStreamMixin │ Mutation logging (via model save) │ │ PredictionLedgerMixin │ Outcome tracking (via model save) │ │ CompositeScoreQuery │ Multi-factor ranked retrieval │ └────────────────────────┴───────────────────────────────────────┘

Dependencies

All 12 shipped Popoto primitives (Steps 1-12 of the memory roadmap). No external dependencies beyond Popoto itself.

Example

from popoto.recipes.context_assembler import ContextAssembler

assembler = ContextAssembler( model_class=Memory, score_weights={"relevance": 0.6, "confidence": 0.3}, max_items=10, max_tokens=4000, ) result = assembler.assemble( query_cues={"topic": "deployment"}, agent_id="agent-1", )

result.records — selected instances

result.proactive — push-path subset

result.formatted — LLM-ready string

result.metadata — scores, timing, token counts

COMPETITIVE_SUPPRESSION_SIGNAL = Defaults.COMPETITIVE_SUPPRESSION_SIGNAL module-attribute

Signal strength for competitive suppression of non-selected pull-path candidates. Applied via ConfidenceField.update_confidence(). Values < 0.5 act as contradiction signals, mildly reducing future ranking. Optimal range: [0.1, 0.7]. Insensitive to retrieval quality.

DEFAULT_SURFACING_THRESHOLD = Defaults.DEFAULT_SURFACING_THRESHOLD module-attribute

Minimum score for push-path records to be surfaced. Records from CyclicDecayField scan below this threshold are filtered out. Optimal range: [0.1, 0.9]. Insensitive to retrieval quality.

DEFAULT_MAX_ITEMS = 10 module-attribute

Default maximum number of records returned by assemble().

DEFAULT_PROPAGATION_DEPTH = 2 module-attribute

Default BFS depth for CoOccurrence propagation.

AssemblyResult dataclass

Return type for ContextAssembler.assemble().

Attributes:

Name Type Description
records list

All selected instances (pull + push, deduplicated).

proactive list

Push-path subset of records (proactively surfaced).

formatted str

LLM-ready formatted string (JSON, XML, or natural).

metadata dict

Dict with scores, token_count, timing_ms, pull_count, push_count.

Source code in src/popoto/recipes/context_assembler.py
@dataclass
class AssemblyResult:
    """Return type for ContextAssembler.assemble().

    Attributes:
        records: All selected instances (pull + push, deduplicated).
        proactive: Push-path subset of records (proactively surfaced).
        formatted: LLM-ready formatted string (JSON, XML, or natural).
        metadata: Dict with scores, token_count, timing_ms, pull_count,
            push_count.
    """

    records: list = field(default_factory=list)
    proactive: list = field(default_factory=list)
    formatted: str = ""
    metadata: dict = field(default_factory=dict)

RetrievalQuality dataclass

Metacognitive signal describing retrieval trustworthiness.

Surfaces four machine-readable metrics about a retrieval so an agent can decide whether to trust its context, retry with different cues, widen scope, or caveat its downstream answer. This is a purely mechanical signal — no LLM self-reporting — following the research finding that GPT-4's self-reported confidence reflects output structure rather than internal uncertainty.

Attributes:

Name Type Description
avg_confidence float

Mean of ConfidenceField.get_confidence() across selected records. 1.0 when the model has no ConfidenceField (no evidence against the retrieval).

score_spread float

Coefficient of variation (stddev / mean) of the per-record composite scores. High spread means one or two records dominate; low spread means results are roughly equivalent. Falls back to 0.0 when abs(mean) < 1e-9 — stddev/mean is undefined when mean is zero.

fok_score float

Feeling-of-knowing — 0.4 * cue_familiarity + 0.4 * partial_retrieval_count + 0.2 * subthreshold_activation, averaged across query cues. 0.0 when no cues were provided.

staleness_ratio float

Fraction of selected records with DecayingSortedField score below the field's decay threshold. 0.0 when the model has no DecayingSortedField.

score_distribution list

Optional full list of per-record composite scores for histogram analysis; empty when unavailable.

per_cue_fok dict

Optional dict mapping cue value -> dict with the three FOK components for that cue (for debugging).

Example

quality = assembler.assess({"topic": "deploy"}) if quality.fok_score < 0.3: # Skip the expensive retrieval; we don't know this domain return result = assembler.assemble({"topic": "deploy"}, assess_quality=True) if result.metadata["quality"].avg_confidence < 0.4: # Caveat the downstream response ...

Source code in src/popoto/recipes/context_assembler.py
@dataclass
class RetrievalQuality:
    """Metacognitive signal describing retrieval trustworthiness.

    Surfaces four machine-readable metrics about a retrieval so an agent
    can decide whether to trust its context, retry with different cues,
    widen scope, or caveat its downstream answer. This is a purely
    *mechanical* signal — no LLM self-reporting — following the research
    finding that GPT-4's self-reported confidence reflects output structure
    rather than internal uncertainty.

    Attributes:
        avg_confidence: Mean of ``ConfidenceField.get_confidence()`` across
            selected records. ``1.0`` when the model has no ConfidenceField
            (no evidence against the retrieval).
        score_spread: Coefficient of variation (stddev / mean) of the
            per-record composite scores. High spread means one or two records
            dominate; low spread means results are roughly equivalent.
            Falls back to ``0.0`` when ``abs(mean) < 1e-9`` — stddev/mean is
            undefined when mean is zero.
        fok_score: Feeling-of-knowing — 0.4 * cue_familiarity + 0.4 *
            partial_retrieval_count + 0.2 * subthreshold_activation,
            averaged across query cues. ``0.0`` when no cues were provided.
        staleness_ratio: Fraction of selected records with
            DecayingSortedField score below the field's decay threshold.
            ``0.0`` when the model has no DecayingSortedField.
        score_distribution: Optional full list of per-record composite
            scores for histogram analysis; empty when unavailable.
        per_cue_fok: Optional dict mapping cue value -> dict with the
            three FOK components for that cue (for debugging).

    Example:
        quality = assembler.assess({"topic": "deploy"})
        if quality.fok_score < 0.3:
            # Skip the expensive retrieval; we don't know this domain
            return
        result = assembler.assemble({"topic": "deploy"}, assess_quality=True)
        if result.metadata["quality"].avg_confidence < 0.4:
            # Caveat the downstream response
            ...
    """

    avg_confidence: float = 0.0
    score_spread: float = 0.0
    fok_score: float = 0.0
    staleness_ratio: float = 0.0
    score_distribution: list = field(default_factory=list)
    per_cue_fok: dict = field(default_factory=dict)

    @classmethod
    def from_records(
        cls,
        records,
        query_cues=None,
        score_weights=None,
        max_items=DEFAULT_MAX_ITEMS,
        surfacing_threshold=DEFAULT_SURFACING_THRESHOLD,
    ) -> "RetrievalQuality":
        """Build a RetrievalQuality over an already-retrieved list of records.

        Intended for custom retrieval pipelines (BM25, RRF, hybrid) that
        want the metacognitive layer without adopting
        :class:`ContextAssembler`. All model capabilities (ConfidenceField,
        ExistenceFilter, DecayingSortedField) are introspected from
        ``records[0]._meta.fields``. Heterogeneous record lists are
        rejected with ``TypeError`` — score weights and capability field
        names are per-model-class, so a mixed list would silently produce
        incorrect FOK / score_spread / staleness_ratio values.

        Args:
            records: Non-empty list of Popoto Model instances of a single
                concrete class. When empty, returns a zero-valued
                :class:`RetrievalQuality`.
            query_cues: Optional dict of query cues — same shape as
                ``ContextAssembler.assess(query_cues=...)``. When falsy,
                ``fok_score`` is 0.0 and ``per_cue_fok`` is empty.
            score_weights: Optional dict mapping sorted-field names to
                weights. Used for ``score_spread`` and ``staleness_ratio``.
                When None, both default to 0.0 and ``score_distribution``
                is empty.
            max_items: Denominator for ``partial_retrieval_count`` in the
                FOK formula. Default matches ``ContextAssembler``.
            surfacing_threshold: Threshold for subthreshold_activation and
                staleness_ratio. Default matches ``ContextAssembler``.

        Returns:
            A :class:`RetrievalQuality` dataclass. Field semantics match
            the assembler path exactly — see class docstring.

        Raises:
            TypeError: If ``records`` contains instances of more than one
                concrete model class.

        Example:
            >>> from popoto import RetrievalQuality
            >>> records = my_bm25_pipeline(query)  # custom retrieval
            >>> quality = RetrievalQuality.from_records(
            ...     records,
            ...     query_cues={"topic": query},
            ...     score_weights={"relevance": 1.0},
            ... )
            >>> if quality.fok_score < 0.3:
            ...     return "low confidence retrieval"
        """
        # Empty list -> zero-valued quality, no warning.
        if not records:
            return cls()

        # Mixed-model guard (C4): fail loudly, not silently.
        distinct_types = {type(r) for r in records}
        if len(distinct_types) > 1:
            class_names = sorted(t.__name__ for t in distinct_types)
            raise TypeError(
                "RetrievalQuality.from_records requires a homogeneous list "
                f"of records; got {len(distinct_types)} distinct model "
                f"classes: {class_names}"
            )

        # All records are the same class — introspect once at the entry point.
        model_class = type(records[0])
        existence_filter = None
        confidence_field_name = None
        decaying_sorted_field_name = None
        for name, f in model_class._meta.fields.items():
            if isinstance(f, ExistenceFilter) and existence_filter is None:
                existence_filter = f
            if isinstance(f, ConfidenceField) and confidence_field_name is None:
                confidence_field_name = name
            if (
                isinstance(f, DecayingSortedField)
                and decaying_sorted_field_name is None
            ):
                decaying_sorted_field_name = name

        # Delegate numeric work to the pure module-level helpers (C2).
        avg_conf = _avg_confidence(
            records,
            confidence_field_name=confidence_field_name,
            model_class=model_class,
        )

        if score_weights is None:
            # Skip score_spread / staleness_ratio entirely.
            score_spread = 0.0
            distribution: list = []
            staleness = 0.0
        else:
            score_spread, distribution = _compute_score_spread(
                records,
                model_class=model_class,
                score_weights=score_weights,
            )
            staleness = _staleness_ratio(
                records,
                model_class=model_class,
                score_weights=score_weights,
                surfacing_threshold=surfacing_threshold,
                decaying_sorted_field_name=decaying_sorted_field_name,
            )

        if not query_cues:
            fok_score = 0.0
            per_cue: dict = {}
        else:
            # FOK needs pull-candidates; we have already-retrieved records,
            # so those are the candidates by construction.
            fok_score, per_cue = _compute_fok(
                query_cues,
                records,
                model_class=model_class,
                score_weights=score_weights or {},
                max_items=max_items,
                surfacing_threshold=surfacing_threshold,
                existence_filter=existence_filter,
            )

        return cls(
            avg_confidence=avg_conf,
            score_spread=score_spread,
            fok_score=fok_score,
            staleness_ratio=staleness,
            score_distribution=distribution,
            per_cue_fok=per_cue,
        )

from_records(records, query_cues=None, score_weights=None, max_items=DEFAULT_MAX_ITEMS, surfacing_threshold=DEFAULT_SURFACING_THRESHOLD) classmethod

Build a RetrievalQuality over an already-retrieved list of records.

Intended for custom retrieval pipelines (BM25, RRF, hybrid) that want the metacognitive layer without adopting :class:ContextAssembler. All model capabilities (ConfidenceField, ExistenceFilter, DecayingSortedField) are introspected from records[0]._meta.fields. Heterogeneous record lists are rejected with TypeError — score weights and capability field names are per-model-class, so a mixed list would silently produce incorrect FOK / score_spread / staleness_ratio values.

Parameters:

Name Type Description Default
records

Non-empty list of Popoto Model instances of a single concrete class. When empty, returns a zero-valued :class:RetrievalQuality.

required
query_cues

Optional dict of query cues — same shape as ContextAssembler.assess(query_cues=...). When falsy, fok_score is 0.0 and per_cue_fok is empty.

None
score_weights

Optional dict mapping sorted-field names to weights. Used for score_spread and staleness_ratio. When None, both default to 0.0 and score_distribution is empty.

None
max_items

Denominator for partial_retrieval_count in the FOK formula. Default matches ContextAssembler.

DEFAULT_MAX_ITEMS
surfacing_threshold

Threshold for subthreshold_activation and staleness_ratio. Default matches ContextAssembler.

DEFAULT_SURFACING_THRESHOLD

Returns:

Name Type Description
A RetrievalQuality

class:RetrievalQuality dataclass. Field semantics match

RetrievalQuality

the assembler path exactly — see class docstring.

Raises:

Type Description
TypeError

If records contains instances of more than one concrete model class.

Example

from popoto import RetrievalQuality records = my_bm25_pipeline(query) # custom retrieval quality = RetrievalQuality.from_records( ... records, ... query_cues={"topic": query}, ... score_weights={"relevance": 1.0}, ... ) if quality.fok_score < 0.3: ... return "low confidence retrieval"

Source code in src/popoto/recipes/context_assembler.py
@classmethod
def from_records(
    cls,
    records,
    query_cues=None,
    score_weights=None,
    max_items=DEFAULT_MAX_ITEMS,
    surfacing_threshold=DEFAULT_SURFACING_THRESHOLD,
) -> "RetrievalQuality":
    """Build a RetrievalQuality over an already-retrieved list of records.

    Intended for custom retrieval pipelines (BM25, RRF, hybrid) that
    want the metacognitive layer without adopting
    :class:`ContextAssembler`. All model capabilities (ConfidenceField,
    ExistenceFilter, DecayingSortedField) are introspected from
    ``records[0]._meta.fields``. Heterogeneous record lists are
    rejected with ``TypeError`` — score weights and capability field
    names are per-model-class, so a mixed list would silently produce
    incorrect FOK / score_spread / staleness_ratio values.

    Args:
        records: Non-empty list of Popoto Model instances of a single
            concrete class. When empty, returns a zero-valued
            :class:`RetrievalQuality`.
        query_cues: Optional dict of query cues — same shape as
            ``ContextAssembler.assess(query_cues=...)``. When falsy,
            ``fok_score`` is 0.0 and ``per_cue_fok`` is empty.
        score_weights: Optional dict mapping sorted-field names to
            weights. Used for ``score_spread`` and ``staleness_ratio``.
            When None, both default to 0.0 and ``score_distribution``
            is empty.
        max_items: Denominator for ``partial_retrieval_count`` in the
            FOK formula. Default matches ``ContextAssembler``.
        surfacing_threshold: Threshold for subthreshold_activation and
            staleness_ratio. Default matches ``ContextAssembler``.

    Returns:
        A :class:`RetrievalQuality` dataclass. Field semantics match
        the assembler path exactly — see class docstring.

    Raises:
        TypeError: If ``records`` contains instances of more than one
            concrete model class.

    Example:
        >>> from popoto import RetrievalQuality
        >>> records = my_bm25_pipeline(query)  # custom retrieval
        >>> quality = RetrievalQuality.from_records(
        ...     records,
        ...     query_cues={"topic": query},
        ...     score_weights={"relevance": 1.0},
        ... )
        >>> if quality.fok_score < 0.3:
        ...     return "low confidence retrieval"
    """
    # Empty list -> zero-valued quality, no warning.
    if not records:
        return cls()

    # Mixed-model guard (C4): fail loudly, not silently.
    distinct_types = {type(r) for r in records}
    if len(distinct_types) > 1:
        class_names = sorted(t.__name__ for t in distinct_types)
        raise TypeError(
            "RetrievalQuality.from_records requires a homogeneous list "
            f"of records; got {len(distinct_types)} distinct model "
            f"classes: {class_names}"
        )

    # All records are the same class — introspect once at the entry point.
    model_class = type(records[0])
    existence_filter = None
    confidence_field_name = None
    decaying_sorted_field_name = None
    for name, f in model_class._meta.fields.items():
        if isinstance(f, ExistenceFilter) and existence_filter is None:
            existence_filter = f
        if isinstance(f, ConfidenceField) and confidence_field_name is None:
            confidence_field_name = name
        if (
            isinstance(f, DecayingSortedField)
            and decaying_sorted_field_name is None
        ):
            decaying_sorted_field_name = name

    # Delegate numeric work to the pure module-level helpers (C2).
    avg_conf = _avg_confidence(
        records,
        confidence_field_name=confidence_field_name,
        model_class=model_class,
    )

    if score_weights is None:
        # Skip score_spread / staleness_ratio entirely.
        score_spread = 0.0
        distribution: list = []
        staleness = 0.0
    else:
        score_spread, distribution = _compute_score_spread(
            records,
            model_class=model_class,
            score_weights=score_weights,
        )
        staleness = _staleness_ratio(
            records,
            model_class=model_class,
            score_weights=score_weights,
            surfacing_threshold=surfacing_threshold,
            decaying_sorted_field_name=decaying_sorted_field_name,
        )

    if not query_cues:
        fok_score = 0.0
        per_cue: dict = {}
    else:
        # FOK needs pull-candidates; we have already-retrieved records,
        # so those are the candidates by construction.
        fok_score, per_cue = _compute_fok(
            query_cues,
            records,
            model_class=model_class,
            score_weights=score_weights or {},
            max_items=max_items,
            surfacing_threshold=surfacing_threshold,
            existence_filter=existence_filter,
        )

    return cls(
        avg_confidence=avg_conf,
        score_spread=score_spread,
        fok_score=fok_score,
        staleness_ratio=staleness,
        score_distribution=distribution,
        per_cue_fok=per_cue,
    )

ContextAssembler

Orchestrates memory retrieval into a single assemble() call.

Combines pull-path (query-driven via CompositeScoreQuery) and push-path (proactive via CyclicDecayField) retrieval, applies token budgets, and formats output for LLM context injection.

Parameters:

Name Type Description Default
model_class

Popoto Model class to query.

required
score_weights

Dict mapping field names to weights for CompositeScoreQuery (e.g., {"relevance": 0.6, "confidence": 0.3}).

required
max_items

Maximum records to return. Default 10.

DEFAULT_MAX_ITEMS
max_tokens

Optional soft token budget. Records are dropped to fit.

None
surfacing_threshold

Minimum score for push-path records. Default 0.5.

DEFAULT_SURFACING_THRESHOLD
propagation_depth

BFS depth for CoOccurrence. Default 2.

DEFAULT_PROPAGATION_DEPTH
output_format

"structured" (JSON), "xml", or "natural". Default "structured".

'structured'
token_counter

Optional callable(record) -> int. Default: len(str(r)) // 4.

None
Source code in src/popoto/recipes/context_assembler.py
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
class ContextAssembler:
    """Orchestrates memory retrieval into a single assemble() call.

    Combines pull-path (query-driven via CompositeScoreQuery) and push-path
    (proactive via CyclicDecayField) retrieval, applies token budgets, and
    formats output for LLM context injection.

    Args:
        model_class: Popoto Model class to query.
        score_weights: Dict mapping field names to weights for
            CompositeScoreQuery (e.g., {"relevance": 0.6, "confidence": 0.3}).
        max_items: Maximum records to return. Default 10.
        max_tokens: Optional soft token budget. Records are dropped to fit.
        surfacing_threshold: Minimum score for push-path records. Default 0.5.
        propagation_depth: BFS depth for CoOccurrence. Default 2.
        output_format: "structured" (JSON), "xml", or "natural". Default "structured".
        token_counter: Optional callable(record) -> int. Default: len(str(r)) // 4.
    """

    def __init__(
        self,
        model_class,
        score_weights,
        max_items=DEFAULT_MAX_ITEMS,
        max_tokens=None,
        surfacing_threshold=DEFAULT_SURFACING_THRESHOLD,
        propagation_depth=DEFAULT_PROPAGATION_DEPTH,
        output_format="structured",
        token_counter=None,
    ):
        self.model_class = model_class
        self.score_weights = score_weights
        self.max_items = max_items
        self.max_tokens = max_tokens
        self.surfacing_threshold = surfacing_threshold
        self.propagation_depth = propagation_depth
        self.output_format = output_format
        self._token_counter = token_counter or (lambda r: len(str(r)) // 4)

        # Detect field capabilities on model
        self._existence_filter = None
        self._co_occurrence_field = None
        self._co_occurrence_field_name = None
        self._cyclic_decay_field_name = None
        self._confidence_field_name = None
        self._decaying_sorted_field_name = None

        for name, f in model_class._meta.fields.items():
            if isinstance(f, ExistenceFilter) and self._existence_filter is None:
                self._existence_filter = f
            if isinstance(f, CoOccurrenceField) and self._co_occurrence_field is None:
                self._co_occurrence_field = f
                self._co_occurrence_field_name = name
            if (
                isinstance(f, CyclicDecayField)
                and self._cyclic_decay_field_name is None
            ):
                self._cyclic_decay_field_name = name
            if isinstance(f, ConfidenceField) and self._confidence_field_name is None:
                self._confidence_field_name = name
            if (
                isinstance(f, DecayingSortedField)
                and self._decaying_sorted_field_name is None
            ):
                self._decaying_sorted_field_name = name

    def assemble(
        self,
        query_cues=None,
        agent_id=None,
        partition_filters=None,
        assess_quality=False,
    ):
        """Execute the full retrieval pipeline.

        Args:
            query_cues: Optional dict of query cues (e.g., {"topic": "deploy"}).
                If None, pull path is skipped.
            agent_id: Optional agent ID for partition filtering. Added to
                partition_filters as {"agent_id": agent_id}.
            partition_filters: Optional dict of partition key-value pairs
                for filtering queries.
            assess_quality: When True, compute a ``RetrievalQuality`` over
                the selected records and attach it to
                ``AssemblyResult.metadata["quality"]``. Default False;
                when False the result shape is bit-for-bit identical to
                the pre-metacognitive-layer behavior. Turning this on adds
                bounded overhead (one ``might_exist`` per cue plus one
                ``get_confidence`` per selected record).

        Returns:
            AssemblyResult with records, proactive, formatted, and metadata.
        """
        t0 = time.time()

        # Build partition filters
        filters = dict(partition_filters or {})
        if agent_id is not None:
            filters["agent_id"] = agent_id

        pull_records = []
        push_records = []
        all_pull_candidates = []  # For competitive suppression

        # --- Pull path ---
        if query_cues:
            pull_records, all_pull_candidates = self._pull_path(query_cues, filters)

        # --- Push path ---
        if self._cyclic_decay_field_name is not None:
            push_records = self._push_path(filters)

        # --- Merge + deduplicate ---
        seen_keys = set()
        merged = []
        pull_keys = set()
        push_keys = set()

        for record in pull_records:
            rk = _get_key(record)
            if rk not in seen_keys:
                seen_keys.add(rk)
                merged.append(record)
                pull_keys.add(rk)

        for record in push_records:
            rk = _get_key(record)
            if rk not in seen_keys:
                seen_keys.add(rk)
                merged.append(record)
                push_keys.add(rk)

        # --- Budget selection ---
        # max_items cap
        selected = merged[: self.max_items]

        # max_tokens cap
        total_tokens = 0
        if self.max_tokens is not None:
            budget_selected = []
            for record in selected:
                try:
                    tokens = self._token_counter(record)
                except Exception:
                    tokens = len(str(record)) // 4
                    logger.warning("Token counter failed, falling back to heuristic")
                if total_tokens + tokens > self.max_tokens and budget_selected:
                    break
                total_tokens += tokens
                budget_selected.append(record)
            selected = budget_selected
        else:
            for record in selected:
                try:
                    total_tokens += self._token_counter(record)
                except Exception:
                    total_tokens += len(str(record)) // 4

        # Identify proactive records in final selection
        proactive = [r for r in selected if _get_key(r) in push_keys]

        # --- Post-retrieval effects ---
        self._post_effects(
            selected, pull_keys, push_keys, all_pull_candidates, agent_id
        )

        # --- Format ---
        formatter = {
            "structured": format_structured,
            "xml": format_xml,
            "natural": format_natural,
        }.get(self.output_format, format_structured)

        formatted = formatter(selected)

        timing_ms = round((time.time() - t0) * 1000, 2)

        metadata = {
            "pull_count": len([r for r in selected if _get_key(r) in pull_keys]),
            "push_count": len(proactive),
            "token_count": total_tokens,
            "timing_ms": timing_ms,
            "total_candidates": len(merged),
        }

        # [METACOGNITIVE] Quality assessment — opt-in, off-by-default so existing
        # callers see bit-for-bit identical metadata.
        if assess_quality:
            try:
                metadata["quality"] = self._compute_quality(
                    selected=selected,
                    all_pull_candidates=all_pull_candidates,
                    query_cues=query_cues or {},
                )
            except Exception as e:
                logger.warning("_compute_quality failed: %s", e)
                metadata["quality"] = RetrievalQuality()

        return AssemblyResult(
            records=selected,
            proactive=proactive,
            formatted=formatted,
            metadata=metadata,
        )

    def _pull_path(self, query_cues, filters):
        """Execute pull-path retrieval.

        Returns:
            Tuple of (selected_records, all_candidates) where all_candidates
            includes records that may not make the final cut.
        """
        # ExistenceFilter pre-check
        if self._existence_filter is not None:
            all_missing = True
            for cue_value in query_cues.values():
                if not self._existence_filter.definitely_missing(
                    self.model_class, str(cue_value)
                ):
                    all_missing = False
                    break
            if all_missing:
                logger.debug(
                    "ExistenceFilter: all cues definitely missing, skipping pull"
                )
                return [], []

        # CoOccurrence boost (first pass without boost, then propagate)
        co_occurrence_boost = None

        try:
            # Initial composite score query
            query = self.model_class.query
            if filters:
                query = query.filter(**filters)

            candidates = query.composite_score(
                indexes=self.score_weights,
                limit=self.max_items * 2,
                co_occurrence_boost=co_occurrence_boost,
            )
        except Exception as e:
            logger.warning("CompositeScoreQuery failed: %s", e)
            return [], []

        if not candidates:
            return [], []

        # CoOccurrence propagation to discover associated records
        if self._co_occurrence_field is not None and candidates:
            seed_pks = [_get_key(c) for c in candidates[: self.max_items]]
            try:
                propagated = self._co_occurrence_field.propagate(
                    self.model_class,
                    seed_pks,
                    depth=self.propagation_depth,
                    decay_per_hop=0.5,
                    threshold=0.01,
                )
                if propagated:
                    # Re-run composite score with co-occurrence boost
                    query = self.model_class.query
                    if filters:
                        query = query.filter(**filters)
                    candidates = query.composite_score(
                        indexes=self.score_weights,
                        limit=self.max_items * 2,
                        co_occurrence_boost=propagated,
                    )
            except Exception as e:
                logger.warning("CoOccurrence propagation failed: %s", e)

        all_candidates = list(candidates)
        return candidates, all_candidates

    def _push_path(self, filters):
        """Execute push-path retrieval via CyclicDecayField.

        Uses composite_score with min_score for threshold filtering instead
        of top_by_decay, since composite_score supports server-side score
        thresholds via ZREVRANGEBYSCORE.
        """
        try:
            query = self.model_class.query
            if filters:
                query = query.filter(**filters)

            # Build weights using only the CyclicDecayField for push-path scoring
            push_weights = {self._cyclic_decay_field_name: 1.0}

            results = query.composite_score(
                indexes=push_weights,
                limit=self.max_items,
                min_score=(
                    self.surfacing_threshold if self.surfacing_threshold > 0 else None
                ),
            )
        except Exception as e:
            logger.warning("Push path failed: %s", e)
            return []

        if not results:
            logger.debug("Push path: 0 records above surfacing threshold")

        return results

    def _post_effects(
        self, selected, pull_keys, push_keys, all_pull_candidates, agent_id
    ):
        """Apply post-retrieval effects using Redis pipeline."""
        if not selected and not all_pull_candidates:
            return

        pipeline = POPOTO_REDIS_DB.pipeline()

        # on_read for pull-path selected records
        for record in selected:
            if _get_key(record) in pull_keys:
                ObservationProtocol.on_read(record, pipeline=pipeline)

        # on_surfaced for push-path selected records
        proactive_records = [r for r in selected if _get_key(r) in push_keys]
        if proactive_records:
            ObservationProtocol.on_surfaced(
                proactive_records,
                reason="proactive",
                partition=agent_id,
                pipeline=pipeline,
            )

        # Competitive suppression for non-selected pull candidates
        if self._confidence_field_name is not None:
            selected_keys = {_get_key(r) for r in selected}
            for candidate in all_pull_candidates:
                if _get_key(candidate) not in selected_keys:
                    try:
                        ConfidenceField.update_confidence(
                            candidate,
                            self._confidence_field_name,
                            signal=COMPETITIVE_SUPPRESSION_SIGNAL,
                        )
                    except (TypeError, ValueError):
                        pass  # Model may not have confidence on this instance

        try:
            pipeline.execute()
        except Exception as e:
            logger.warning("Post-effects pipeline failed: %s", e)

    # ------------------------------------------------------------------
    # Metacognitive layer: RetrievalQuality helpers + public assess()
    # ------------------------------------------------------------------

    def _cue_familiarity(self, cue_value) -> float:
        """Thin wrapper around the module-level :func:`_cue_familiarity`.

        Introspects ``self`` exactly once and forwards explicit kwargs so
        the pure helper is the single source of truth (issue #370 C2).
        """
        return _cue_familiarity(
            cue_value,
            existence_filter=self._existence_filter,
            model_class=self.model_class,
        )

    def _compute_fok(self, query_cues, pull_candidates):
        """Thin wrapper around the module-level :func:`_compute_fok`."""
        return _compute_fok(
            query_cues,
            pull_candidates,
            model_class=self.model_class,
            score_weights=self.score_weights,
            max_items=self.max_items,
            surfacing_threshold=self.surfacing_threshold,
            existence_filter=self._existence_filter,
        )

    def _score_proxy_for_records(self, records):
        """Thin wrapper around the module-level
        :func:`_score_proxy_for_records`.
        """
        return _score_proxy_for_records(
            records,
            model_class=self.model_class,
            score_weights=self.score_weights,
        )

    def _compute_score_spread(self, records):
        """Thin wrapper around the module-level
        :func:`_compute_score_spread`.
        """
        return _compute_score_spread(
            records,
            model_class=self.model_class,
            score_weights=self.score_weights,
        )

    def _avg_confidence(self, records):
        """Thin wrapper around the module-level :func:`_avg_confidence`."""
        return _avg_confidence(
            records,
            confidence_field_name=self._confidence_field_name,
            model_class=self.model_class,
        )

    def _staleness_ratio(self, records):
        """Thin wrapper around the module-level :func:`_staleness_ratio`."""
        return _staleness_ratio(
            records,
            model_class=self.model_class,
            score_weights=self.score_weights,
            surfacing_threshold=self.surfacing_threshold,
            decaying_sorted_field_name=self._decaying_sorted_field_name,
        )

    def _compute_quality(self, selected, all_pull_candidates, query_cues):
        """Assemble a RetrievalQuality over the selected records.

        Side-effect-free; reads from ConfidenceField, ExistenceFilter, and
        sorted-set indexes via their existing public APIs. Intended for
        calls at the end of ``assemble()`` (post selection) and from the
        standalone ``assess()`` probe.
        """
        avg_conf = self._avg_confidence(selected)
        score_spread, distribution = self._compute_score_spread(selected)
        fok_score, per_cue = self._compute_fok(query_cues, all_pull_candidates)
        staleness = self._staleness_ratio(selected)
        return RetrievalQuality(
            avg_confidence=avg_conf,
            score_spread=score_spread,
            fok_score=fok_score,
            staleness_ratio=staleness,
            score_distribution=distribution,
            per_cue_fok=per_cue,
        )

    def assess(self, query_cues=None, partition_filters=None, probe_limit=None):
        """Probe retrieval quality without running the full pipeline.

        Runs a cheap pre-retrieval check: ExistenceFilter lookups for
        cue_familiarity + a single low-limit composite_score probe to
        gather pull candidates for FOK computation. Does NOT run
        CoOccurrence propagation, does NOT run the push path, does NOT
        apply post-effects.

        Intended use: call ``assess()`` before ``assemble()`` to decide
        whether the full retrieval is worth the round-trip cost. When
        ``assess().fok_score < some_threshold``, the agent can skip the
        retrieval entirely and widen the cue or caveat its answer.

        Args:
            query_cues: Optional dict of query cues. When empty, all
                metrics default to 0.0 with a logged warning.
            partition_filters: Optional dict of partition filters. Same
                semantics as ``assemble()``.
            probe_limit: Optional cap on the number of candidates fetched
                for the probe. Defaults to ``self.max_items``.

        Returns:
            RetrievalQuality. ``selected`` is treated as empty; the quality
            reflects what's *available* for retrieval, not what was
            actually retrieved.
        """
        filters = dict(partition_filters or {})

        if not query_cues:
            logger.warning("assess() called with no query_cues")
            return RetrievalQuality()

        limit = probe_limit if probe_limit is not None else self.max_items
        probe_candidates = []

        # ExistenceFilter pre-check — same short-circuit as _pull_path.
        if self._existence_filter is not None:
            all_missing = True
            for cue_value in query_cues.values():
                if not self._existence_filter.definitely_missing(
                    self.model_class, str(cue_value)
                ):
                    all_missing = False
                    break
            if all_missing:
                # No probe — everything is definitely absent.
                fok_score, per_cue = self._compute_fok(query_cues, [])
                return RetrievalQuality(
                    avg_confidence=1.0 if not self._confidence_field_name else 0.5,
                    score_spread=0.0,
                    fok_score=fok_score,
                    staleness_ratio=0.0,
                    score_distribution=[],
                    per_cue_fok=per_cue,
                )

        try:
            query = self.model_class.query
            if filters:
                query = query.filter(**filters)
            probe_candidates = query.composite_score(
                indexes=self.score_weights,
                limit=limit,
            )
        except Exception as e:
            logger.warning("assess() composite_score probe failed: %s", e)
            probe_candidates = []

        avg_conf = self._avg_confidence(probe_candidates)
        score_spread, distribution = self._compute_score_spread(probe_candidates)
        fok_score, per_cue = self._compute_fok(query_cues, probe_candidates)
        staleness = self._staleness_ratio(probe_candidates)

        return RetrievalQuality(
            avg_confidence=avg_conf,
            score_spread=score_spread,
            fok_score=fok_score,
            staleness_ratio=staleness,
            score_distribution=distribution,
            per_cue_fok=per_cue,
        )

assemble(query_cues=None, agent_id=None, partition_filters=None, assess_quality=False)

Execute the full retrieval pipeline.

Parameters:

Name Type Description Default
query_cues

Optional dict of query cues (e.g., {"topic": "deploy"}). If None, pull path is skipped.

None
agent_id

Optional agent ID for partition filtering. Added to partition_filters as {"agent_id": agent_id}.

None
partition_filters

Optional dict of partition key-value pairs for filtering queries.

None
assess_quality

When True, compute a RetrievalQuality over the selected records and attach it to AssemblyResult.metadata["quality"]. Default False; when False the result shape is bit-for-bit identical to the pre-metacognitive-layer behavior. Turning this on adds bounded overhead (one might_exist per cue plus one get_confidence per selected record).

False

Returns:

Type Description

AssemblyResult with records, proactive, formatted, and metadata.

Source code in src/popoto/recipes/context_assembler.py
def assemble(
    self,
    query_cues=None,
    agent_id=None,
    partition_filters=None,
    assess_quality=False,
):
    """Execute the full retrieval pipeline.

    Args:
        query_cues: Optional dict of query cues (e.g., {"topic": "deploy"}).
            If None, pull path is skipped.
        agent_id: Optional agent ID for partition filtering. Added to
            partition_filters as {"agent_id": agent_id}.
        partition_filters: Optional dict of partition key-value pairs
            for filtering queries.
        assess_quality: When True, compute a ``RetrievalQuality`` over
            the selected records and attach it to
            ``AssemblyResult.metadata["quality"]``. Default False;
            when False the result shape is bit-for-bit identical to
            the pre-metacognitive-layer behavior. Turning this on adds
            bounded overhead (one ``might_exist`` per cue plus one
            ``get_confidence`` per selected record).

    Returns:
        AssemblyResult with records, proactive, formatted, and metadata.
    """
    t0 = time.time()

    # Build partition filters
    filters = dict(partition_filters or {})
    if agent_id is not None:
        filters["agent_id"] = agent_id

    pull_records = []
    push_records = []
    all_pull_candidates = []  # For competitive suppression

    # --- Pull path ---
    if query_cues:
        pull_records, all_pull_candidates = self._pull_path(query_cues, filters)

    # --- Push path ---
    if self._cyclic_decay_field_name is not None:
        push_records = self._push_path(filters)

    # --- Merge + deduplicate ---
    seen_keys = set()
    merged = []
    pull_keys = set()
    push_keys = set()

    for record in pull_records:
        rk = _get_key(record)
        if rk not in seen_keys:
            seen_keys.add(rk)
            merged.append(record)
            pull_keys.add(rk)

    for record in push_records:
        rk = _get_key(record)
        if rk not in seen_keys:
            seen_keys.add(rk)
            merged.append(record)
            push_keys.add(rk)

    # --- Budget selection ---
    # max_items cap
    selected = merged[: self.max_items]

    # max_tokens cap
    total_tokens = 0
    if self.max_tokens is not None:
        budget_selected = []
        for record in selected:
            try:
                tokens = self._token_counter(record)
            except Exception:
                tokens = len(str(record)) // 4
                logger.warning("Token counter failed, falling back to heuristic")
            if total_tokens + tokens > self.max_tokens and budget_selected:
                break
            total_tokens += tokens
            budget_selected.append(record)
        selected = budget_selected
    else:
        for record in selected:
            try:
                total_tokens += self._token_counter(record)
            except Exception:
                total_tokens += len(str(record)) // 4

    # Identify proactive records in final selection
    proactive = [r for r in selected if _get_key(r) in push_keys]

    # --- Post-retrieval effects ---
    self._post_effects(
        selected, pull_keys, push_keys, all_pull_candidates, agent_id
    )

    # --- Format ---
    formatter = {
        "structured": format_structured,
        "xml": format_xml,
        "natural": format_natural,
    }.get(self.output_format, format_structured)

    formatted = formatter(selected)

    timing_ms = round((time.time() - t0) * 1000, 2)

    metadata = {
        "pull_count": len([r for r in selected if _get_key(r) in pull_keys]),
        "push_count": len(proactive),
        "token_count": total_tokens,
        "timing_ms": timing_ms,
        "total_candidates": len(merged),
    }

    # [METACOGNITIVE] Quality assessment — opt-in, off-by-default so existing
    # callers see bit-for-bit identical metadata.
    if assess_quality:
        try:
            metadata["quality"] = self._compute_quality(
                selected=selected,
                all_pull_candidates=all_pull_candidates,
                query_cues=query_cues or {},
            )
        except Exception as e:
            logger.warning("_compute_quality failed: %s", e)
            metadata["quality"] = RetrievalQuality()

    return AssemblyResult(
        records=selected,
        proactive=proactive,
        formatted=formatted,
        metadata=metadata,
    )

assess(query_cues=None, partition_filters=None, probe_limit=None)

Probe retrieval quality without running the full pipeline.

Runs a cheap pre-retrieval check: ExistenceFilter lookups for cue_familiarity + a single low-limit composite_score probe to gather pull candidates for FOK computation. Does NOT run CoOccurrence propagation, does NOT run the push path, does NOT apply post-effects.

Intended use: call assess() before assemble() to decide whether the full retrieval is worth the round-trip cost. When assess().fok_score < some_threshold, the agent can skip the retrieval entirely and widen the cue or caveat its answer.

Parameters:

Name Type Description Default
query_cues

Optional dict of query cues. When empty, all metrics default to 0.0 with a logged warning.

None
partition_filters

Optional dict of partition filters. Same semantics as assemble().

None
probe_limit

Optional cap on the number of candidates fetched for the probe. Defaults to self.max_items.

None

Returns:

Type Description

RetrievalQuality. selected is treated as empty; the quality

reflects what's available for retrieval, not what was

actually retrieved.

Source code in src/popoto/recipes/context_assembler.py
def assess(self, query_cues=None, partition_filters=None, probe_limit=None):
    """Probe retrieval quality without running the full pipeline.

    Runs a cheap pre-retrieval check: ExistenceFilter lookups for
    cue_familiarity + a single low-limit composite_score probe to
    gather pull candidates for FOK computation. Does NOT run
    CoOccurrence propagation, does NOT run the push path, does NOT
    apply post-effects.

    Intended use: call ``assess()`` before ``assemble()`` to decide
    whether the full retrieval is worth the round-trip cost. When
    ``assess().fok_score < some_threshold``, the agent can skip the
    retrieval entirely and widen the cue or caveat its answer.

    Args:
        query_cues: Optional dict of query cues. When empty, all
            metrics default to 0.0 with a logged warning.
        partition_filters: Optional dict of partition filters. Same
            semantics as ``assemble()``.
        probe_limit: Optional cap on the number of candidates fetched
            for the probe. Defaults to ``self.max_items``.

    Returns:
        RetrievalQuality. ``selected`` is treated as empty; the quality
        reflects what's *available* for retrieval, not what was
        actually retrieved.
    """
    filters = dict(partition_filters or {})

    if not query_cues:
        logger.warning("assess() called with no query_cues")
        return RetrievalQuality()

    limit = probe_limit if probe_limit is not None else self.max_items
    probe_candidates = []

    # ExistenceFilter pre-check — same short-circuit as _pull_path.
    if self._existence_filter is not None:
        all_missing = True
        for cue_value in query_cues.values():
            if not self._existence_filter.definitely_missing(
                self.model_class, str(cue_value)
            ):
                all_missing = False
                break
        if all_missing:
            # No probe — everything is definitely absent.
            fok_score, per_cue = self._compute_fok(query_cues, [])
            return RetrievalQuality(
                avg_confidence=1.0 if not self._confidence_field_name else 0.5,
                score_spread=0.0,
                fok_score=fok_score,
                staleness_ratio=0.0,
                score_distribution=[],
                per_cue_fok=per_cue,
            )

    try:
        query = self.model_class.query
        if filters:
            query = query.filter(**filters)
        probe_candidates = query.composite_score(
            indexes=self.score_weights,
            limit=limit,
        )
    except Exception as e:
        logger.warning("assess() composite_score probe failed: %s", e)
        probe_candidates = []

    avg_conf = self._avg_confidence(probe_candidates)
    score_spread, distribution = self._compute_score_spread(probe_candidates)
    fok_score, per_cue = self._compute_fok(query_cues, probe_candidates)
    staleness = self._staleness_ratio(probe_candidates)

    return RetrievalQuality(
        avg_confidence=avg_conf,
        score_spread=score_spread,
        fok_score=fok_score,
        staleness_ratio=staleness,
        score_distribution=distribution,
        per_cue_fok=per_cue,
    )

format_structured(records)

Format records as JSON array.

Source code in src/popoto/recipes/context_assembler.py
def format_structured(records) -> str:
    """Format records as JSON array."""
    dicts = [_record_to_dict(r) for r in records]
    return json.dumps(dicts, default=str, indent=2)

format_xml(records)

Format records as XML tags.

Source code in src/popoto/recipes/context_assembler.py
def format_xml(records) -> str:
    """Format records as XML tags."""
    lines = ["<records>"]
    for record in records:
        d = _record_to_dict(record)
        lines.append("  <record>")
        for key, val in d.items():
            escaped = (
                str(val).replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
            )
            lines.append(f"    <{key}>{escaped}</{key}>")
        lines.append("  </record>")
    lines.append("</records>")
    return "\n".join(lines)

format_natural(records)

Format records as natural language summary.

Source code in src/popoto/recipes/context_assembler.py
def format_natural(records) -> str:
    """Format records as natural language summary."""
    if not records:
        return ""
    parts = []
    for i, record in enumerate(records, 1):
        d = _record_to_dict(record)
        fields_str = ", ".join(f"{k}: {v}" for k, v in d.items() if v is not None)
        parts.append(f"{i}. {fields_str}")
    return "\n".join(parts)