Skip to content

popoto.recipes.context_assembler

popoto.recipes.context_assembler

ContextAssembler — Retrieval-to-injection bridge with token budgets.

A capstone recipe composing all shipped Popoto memory primitives into a single assemble() call. Orchestrates pull-path (query-driven) and push-path (proactive surfacing) retrieval, applies token budgets, and formats output for LLM context injection.

Metacognitive extensions (opt-in, off-by-default):

  • RetrievalQuality dataclass surfaces avg confidence, score spread, feeling-of-knowing (FOK), and staleness for the retrieval.
  • ContextAssembler.assess(query_cues) — pre-retrieval FOK probe.
  • ContextAssembler.assemble(..., assess_quality=True) — attaches a RetrievalQuality to AssemblyResult.metadata["quality"] without changing the default behavior.
Pipeline

Pull path: ExistenceFilter pre-check → CompositeScoreQuery → CoOccurrence propagation Push path: CyclicDecayField temporal scan above surfacing threshold Merge: Deduplicate, re-rank, budget-select, post-effects, format

Synergy with Popoto Primitives

┌────────────────────────┬───────────────────────────────────────┐ │ Primitive │ Role in ContextAssembler │ ├────────────────────────┼───────────────────────────────────────┤ │ DecayingSortedField │ Score index for CompositeScoreQuery │ │ CyclicDecayField │ Push-path proactive surfacing │ │ ConfidenceField │ Score index + competitive suppression │ │ CoOccurrenceField │ Pull-path candidate expansion │ │ ExistenceFilter │ Pull-path pre-check (skip if absent) │ │ AccessTrackerMixin │ on_read post-effect tracking │ │ ObservationProtocol │ on_read / on_surfaced dispatch │ │ RecallProposal │ Created for push-path records │ │ WriteFilterMixin │ Priority score in composite │ │ EventStreamMixin │ Mutation logging (via model save) │ │ PredictionLedgerMixin │ Outcome tracking (via model save) │ │ CompositeScoreQuery │ Multi-factor ranked retrieval │ └────────────────────────┴───────────────────────────────────────┘

Dependencies

All 12 shipped Popoto primitives (Steps 1-12 of the memory roadmap). No external dependencies beyond Popoto itself.

Example

from popoto.recipes.context_assembler import ContextAssembler

assembler = ContextAssembler( model_class=Memory, score_weights={"relevance": 0.6, "confidence": 0.3}, max_items=10, max_tokens=4000, ) result = assembler.assemble( query_cues={"topic": "deployment"}, agent_id="agent-1", )

result.records — selected instances

result.proactive — push-path subset

result.formatted — LLM-ready string

result.metadata — scores, timing, token counts

COMPETITIVE_SUPPRESSION_SIGNAL = Defaults.COMPETITIVE_SUPPRESSION_SIGNAL module-attribute

Signal strength for competitive suppression of non-selected pull-path candidates. Applied via ConfidenceField.update_confidence(). Values < 0.5 act as contradiction signals, mildly reducing future ranking. Optimal range: [0.1, 0.7]. Insensitive to retrieval quality.

DEFAULT_SURFACING_THRESHOLD = Defaults.DEFAULT_SURFACING_THRESHOLD module-attribute

Minimum score for push-path records to be surfaced. Records from CyclicDecayField scan below this threshold are filtered out. Optimal range: [0.1, 0.9]. Insensitive to retrieval quality.

DEFAULT_MAX_ITEMS = 10 module-attribute

Default maximum number of records returned by assemble().

DEFAULT_PROPAGATION_DEPTH = 2 module-attribute

Default BFS depth for CoOccurrence propagation.

RRF_K = 60 module-attribute

RRF rank-fusion constant (Cormack et al. 2009). Do not expose as user config; tuning experiments belong in a separate follow-up.

HYBRID_CANDIDATE_MULTIPLIER = 5 module-attribute

candidate_limit = max_items * HYBRID_CANDIDATE_MULTIPLIER for per-signal retrieval in the hybrid pull path before RRF fusion.

AssemblyResult dataclass

Return type for ContextAssembler.assemble().

Attributes:

Name Type Description
records list

All selected instances (pull + push, deduplicated).

proactive list

Push-path subset of records (proactively surfaced).

formatted str

LLM-ready formatted string (JSON, XML, or natural).

metadata dict

Dict with scores, token_count, timing_ms, pull_count, push_count.

Source code in src/popoto/recipes/context_assembler.py
@dataclass
class AssemblyResult:
    """Return type for ContextAssembler.assemble().

    Attributes:
        records: All selected instances (pull + push, deduplicated).
        proactive: Push-path subset of records (proactively surfaced).
        formatted: LLM-ready formatted string (JSON, XML, or natural).
        metadata: Dict with scores, token_count, timing_ms, pull_count,
            push_count.
    """

    records: list = field(default_factory=list)
    proactive: list = field(default_factory=list)
    formatted: str = ""
    metadata: dict = field(default_factory=dict)

RetrievalQuality dataclass

Metacognitive signal describing retrieval trustworthiness.

Surfaces four machine-readable metrics about a retrieval so an agent can decide whether to trust its context, retry with different cues, widen scope, or caveat its downstream answer. This is a purely mechanical signal — no LLM self-reporting — following the research finding that GPT-4's self-reported confidence reflects output structure rather than internal uncertainty.

Attributes:

Name Type Description
avg_confidence float

Mean of ConfidenceField.get_confidence() across selected records. 1.0 when the model has no ConfidenceField (no evidence against the retrieval).

score_spread float

Coefficient of variation (stddev / mean) of the per-record composite scores. High spread means one or two records dominate; low spread means results are roughly equivalent. Falls back to 0.0 when abs(mean) < 1e-9 — stddev/mean is undefined when mean is zero.

fok_score float

Feeling-of-knowing — 0.4 * cue_familiarity + 0.4 * partial_retrieval_count + 0.2 * subthreshold_activation, averaged across query cues. 0.0 when no cues were provided.

staleness_ratio float

Fraction of selected records with DecayingSortedField score below the field's decay threshold. 0.0 when the model has no DecayingSortedField.

score_distribution list

Optional full list of per-record composite scores for histogram analysis; empty when unavailable.

per_cue_fok dict

Optional dict mapping cue value -> dict with the three FOK components for that cue (for debugging).

Example

quality = assembler.assess({"topic": "deploy"}) if quality.fok_score < 0.3: # Skip the expensive retrieval; we don't know this domain return result = assembler.assemble({"topic": "deploy"}, assess_quality=True) if result.metadata["quality"].avg_confidence < 0.4: # Caveat the downstream response ...

Source code in src/popoto/recipes/context_assembler.py
@dataclass
class RetrievalQuality:
    """Metacognitive signal describing retrieval trustworthiness.

    Surfaces four machine-readable metrics about a retrieval so an agent
    can decide whether to trust its context, retry with different cues,
    widen scope, or caveat its downstream answer. This is a purely
    *mechanical* signal — no LLM self-reporting — following the research
    finding that GPT-4's self-reported confidence reflects output structure
    rather than internal uncertainty.

    Attributes:
        avg_confidence: Mean of ``ConfidenceField.get_confidence()`` across
            selected records. ``1.0`` when the model has no ConfidenceField
            (no evidence against the retrieval).
        score_spread: Coefficient of variation (stddev / mean) of the
            per-record composite scores. High spread means one or two records
            dominate; low spread means results are roughly equivalent.
            Falls back to ``0.0`` when ``abs(mean) < 1e-9`` — stddev/mean is
            undefined when mean is zero.
        fok_score: Feeling-of-knowing — 0.4 * cue_familiarity + 0.4 *
            partial_retrieval_count + 0.2 * subthreshold_activation,
            averaged across query cues. ``0.0`` when no cues were provided.
        staleness_ratio: Fraction of selected records with
            DecayingSortedField score below the field's decay threshold.
            ``0.0`` when the model has no DecayingSortedField.
        score_distribution: Optional full list of per-record composite
            scores for histogram analysis; empty when unavailable.
        per_cue_fok: Optional dict mapping cue value -> dict with the
            three FOK components for that cue (for debugging).

    Example:
        quality = assembler.assess({"topic": "deploy"})
        if quality.fok_score < 0.3:
            # Skip the expensive retrieval; we don't know this domain
            return
        result = assembler.assemble({"topic": "deploy"}, assess_quality=True)
        if result.metadata["quality"].avg_confidence < 0.4:
            # Caveat the downstream response
            ...
    """

    avg_confidence: float = 0.0
    score_spread: float = 0.0
    fok_score: float = 0.0
    staleness_ratio: float = 0.0
    score_distribution: list = field(default_factory=list)
    per_cue_fok: dict = field(default_factory=dict)

    @classmethod
    def from_records(
        cls,
        records,
        query_cues=None,
        score_weights=None,
        max_items=DEFAULT_MAX_ITEMS,
        surfacing_threshold=DEFAULT_SURFACING_THRESHOLD,
    ) -> "RetrievalQuality":
        """Build a RetrievalQuality over an already-retrieved list of records.

        Intended for custom retrieval pipelines (BM25, RRF, hybrid) that
        want the metacognitive layer without adopting
        :class:`ContextAssembler`. All model capabilities (ConfidenceField,
        ExistenceFilter, DecayingSortedField) are introspected from
        ``records[0]._meta.fields``. Heterogeneous record lists are
        rejected with ``TypeError`` — score weights and capability field
        names are per-model-class, so a mixed list would silently produce
        incorrect FOK / score_spread / staleness_ratio values.

        Args:
            records: Non-empty list of Popoto Model instances of a single
                concrete class. When empty, returns a zero-valued
                :class:`RetrievalQuality`.
            query_cues: Optional dict of query cues — same shape as
                ``ContextAssembler.assess(query_cues=...)``. When falsy,
                ``fok_score`` is 0.0 and ``per_cue_fok`` is empty.
            score_weights: Optional dict mapping sorted-field names to
                weights. Used for ``score_spread`` and ``staleness_ratio``.
                When None, both default to 0.0 and ``score_distribution``
                is empty.
            max_items: Denominator for ``partial_retrieval_count`` in the
                FOK formula. Default matches ``ContextAssembler``.
            surfacing_threshold: Threshold for subthreshold_activation and
                staleness_ratio. Default matches ``ContextAssembler``.

        Returns:
            A :class:`RetrievalQuality` dataclass. Field semantics match
            the assembler path exactly — see class docstring.

        Raises:
            TypeError: If ``records`` contains instances of more than one
                concrete model class.

        Example:
            >>> from popoto import RetrievalQuality
            >>> records = my_bm25_pipeline(query)  # custom retrieval
            >>> quality = RetrievalQuality.from_records(
            ...     records,
            ...     query_cues={"topic": query},
            ...     score_weights={"relevance": 1.0},
            ... )
            >>> if quality.fok_score < 0.3:
            ...     return "low confidence retrieval"
        """
        # Empty list -> zero-valued quality, no warning.
        if not records:
            return cls()

        # Mixed-model guard (C4): fail loudly, not silently.
        distinct_types = {type(r) for r in records}
        if len(distinct_types) > 1:
            class_names = sorted(t.__name__ for t in distinct_types)
            raise TypeError(
                "RetrievalQuality.from_records requires a homogeneous list "
                f"of records; got {len(distinct_types)} distinct model "
                f"classes: {class_names}"
            )

        # All records are the same class — introspect once at the entry point.
        model_class = type(records[0])
        existence_filter = None
        confidence_field_name = None
        decaying_sorted_field_name = None
        for name, f in model_class._meta.fields.items():
            if isinstance(f, ExistenceFilter) and existence_filter is None:
                existence_filter = f
            if isinstance(f, ConfidenceField) and confidence_field_name is None:
                confidence_field_name = name
            if (
                isinstance(f, DecayingSortedField)
                and decaying_sorted_field_name is None
            ):
                decaying_sorted_field_name = name

        # Delegate numeric work to the pure module-level helpers (C2).
        avg_conf = _avg_confidence(
            records,
            confidence_field_name=confidence_field_name,
            model_class=model_class,
        )

        if score_weights is None:
            # Skip score_spread / staleness_ratio entirely.
            score_spread = 0.0
            distribution: list = []
            staleness = 0.0
        else:
            score_spread, distribution = _compute_score_spread(
                records,
                model_class=model_class,
                score_weights=score_weights,
            )
            staleness = _staleness_ratio(
                records,
                model_class=model_class,
                score_weights=score_weights,
                surfacing_threshold=surfacing_threshold,
                decaying_sorted_field_name=decaying_sorted_field_name,
            )

        if not query_cues:
            fok_score = 0.0
            per_cue: dict = {}
        else:
            # FOK needs pull-candidates; we have already-retrieved records,
            # so those are the candidates by construction.
            fok_score, per_cue = _compute_fok(
                query_cues,
                records,
                model_class=model_class,
                score_weights=score_weights or {},
                max_items=max_items,
                surfacing_threshold=surfacing_threshold,
                existence_filter=existence_filter,
            )

        return cls(
            avg_confidence=avg_conf,
            score_spread=score_spread,
            fok_score=fok_score,
            staleness_ratio=staleness,
            score_distribution=distribution,
            per_cue_fok=per_cue,
        )

from_records(records, query_cues=None, score_weights=None, max_items=DEFAULT_MAX_ITEMS, surfacing_threshold=DEFAULT_SURFACING_THRESHOLD) classmethod

Build a RetrievalQuality over an already-retrieved list of records.

Intended for custom retrieval pipelines (BM25, RRF, hybrid) that want the metacognitive layer without adopting :class:ContextAssembler. All model capabilities (ConfidenceField, ExistenceFilter, DecayingSortedField) are introspected from records[0]._meta.fields. Heterogeneous record lists are rejected with TypeError — score weights and capability field names are per-model-class, so a mixed list would silently produce incorrect FOK / score_spread / staleness_ratio values.

Parameters:

Name Type Description Default
records

Non-empty list of Popoto Model instances of a single concrete class. When empty, returns a zero-valued :class:RetrievalQuality.

required
query_cues

Optional dict of query cues — same shape as ContextAssembler.assess(query_cues=...). When falsy, fok_score is 0.0 and per_cue_fok is empty.

None
score_weights

Optional dict mapping sorted-field names to weights. Used for score_spread and staleness_ratio. When None, both default to 0.0 and score_distribution is empty.

None
max_items

Denominator for partial_retrieval_count in the FOK formula. Default matches ContextAssembler.

DEFAULT_MAX_ITEMS
surfacing_threshold

Threshold for subthreshold_activation and staleness_ratio. Default matches ContextAssembler.

DEFAULT_SURFACING_THRESHOLD

Returns:

Name Type Description
A RetrievalQuality

class:RetrievalQuality dataclass. Field semantics match

RetrievalQuality

the assembler path exactly — see class docstring.

Raises:

Type Description
TypeError

If records contains instances of more than one concrete model class.

Example

from popoto import RetrievalQuality records = my_bm25_pipeline(query) # custom retrieval quality = RetrievalQuality.from_records( ... records, ... query_cues={"topic": query}, ... score_weights={"relevance": 1.0}, ... ) if quality.fok_score < 0.3: ... return "low confidence retrieval"

Source code in src/popoto/recipes/context_assembler.py
@classmethod
def from_records(
    cls,
    records,
    query_cues=None,
    score_weights=None,
    max_items=DEFAULT_MAX_ITEMS,
    surfacing_threshold=DEFAULT_SURFACING_THRESHOLD,
) -> "RetrievalQuality":
    """Build a RetrievalQuality over an already-retrieved list of records.

    Intended for custom retrieval pipelines (BM25, RRF, hybrid) that
    want the metacognitive layer without adopting
    :class:`ContextAssembler`. All model capabilities (ConfidenceField,
    ExistenceFilter, DecayingSortedField) are introspected from
    ``records[0]._meta.fields``. Heterogeneous record lists are
    rejected with ``TypeError`` — score weights and capability field
    names are per-model-class, so a mixed list would silently produce
    incorrect FOK / score_spread / staleness_ratio values.

    Args:
        records: Non-empty list of Popoto Model instances of a single
            concrete class. When empty, returns a zero-valued
            :class:`RetrievalQuality`.
        query_cues: Optional dict of query cues — same shape as
            ``ContextAssembler.assess(query_cues=...)``. When falsy,
            ``fok_score`` is 0.0 and ``per_cue_fok`` is empty.
        score_weights: Optional dict mapping sorted-field names to
            weights. Used for ``score_spread`` and ``staleness_ratio``.
            When None, both default to 0.0 and ``score_distribution``
            is empty.
        max_items: Denominator for ``partial_retrieval_count`` in the
            FOK formula. Default matches ``ContextAssembler``.
        surfacing_threshold: Threshold for subthreshold_activation and
            staleness_ratio. Default matches ``ContextAssembler``.

    Returns:
        A :class:`RetrievalQuality` dataclass. Field semantics match
        the assembler path exactly — see class docstring.

    Raises:
        TypeError: If ``records`` contains instances of more than one
            concrete model class.

    Example:
        >>> from popoto import RetrievalQuality
        >>> records = my_bm25_pipeline(query)  # custom retrieval
        >>> quality = RetrievalQuality.from_records(
        ...     records,
        ...     query_cues={"topic": query},
        ...     score_weights={"relevance": 1.0},
        ... )
        >>> if quality.fok_score < 0.3:
        ...     return "low confidence retrieval"
    """
    # Empty list -> zero-valued quality, no warning.
    if not records:
        return cls()

    # Mixed-model guard (C4): fail loudly, not silently.
    distinct_types = {type(r) for r in records}
    if len(distinct_types) > 1:
        class_names = sorted(t.__name__ for t in distinct_types)
        raise TypeError(
            "RetrievalQuality.from_records requires a homogeneous list "
            f"of records; got {len(distinct_types)} distinct model "
            f"classes: {class_names}"
        )

    # All records are the same class — introspect once at the entry point.
    model_class = type(records[0])
    existence_filter = None
    confidence_field_name = None
    decaying_sorted_field_name = None
    for name, f in model_class._meta.fields.items():
        if isinstance(f, ExistenceFilter) and existence_filter is None:
            existence_filter = f
        if isinstance(f, ConfidenceField) and confidence_field_name is None:
            confidence_field_name = name
        if (
            isinstance(f, DecayingSortedField)
            and decaying_sorted_field_name is None
        ):
            decaying_sorted_field_name = name

    # Delegate numeric work to the pure module-level helpers (C2).
    avg_conf = _avg_confidence(
        records,
        confidence_field_name=confidence_field_name,
        model_class=model_class,
    )

    if score_weights is None:
        # Skip score_spread / staleness_ratio entirely.
        score_spread = 0.0
        distribution: list = []
        staleness = 0.0
    else:
        score_spread, distribution = _compute_score_spread(
            records,
            model_class=model_class,
            score_weights=score_weights,
        )
        staleness = _staleness_ratio(
            records,
            model_class=model_class,
            score_weights=score_weights,
            surfacing_threshold=surfacing_threshold,
            decaying_sorted_field_name=decaying_sorted_field_name,
        )

    if not query_cues:
        fok_score = 0.0
        per_cue: dict = {}
    else:
        # FOK needs pull-candidates; we have already-retrieved records,
        # so those are the candidates by construction.
        fok_score, per_cue = _compute_fok(
            query_cues,
            records,
            model_class=model_class,
            score_weights=score_weights or {},
            max_items=max_items,
            surfacing_threshold=surfacing_threshold,
            existence_filter=existence_filter,
        )

    return cls(
        avg_confidence=avg_conf,
        score_spread=score_spread,
        fok_score=fok_score,
        staleness_ratio=staleness,
        score_distribution=distribution,
        per_cue_fok=per_cue,
    )

ContextAssembler

Orchestrates memory retrieval into a single assemble() call.

Combines pull-path (query-driven) and push-path (proactive via CyclicDecayField) retrieval, applies token budgets, and formats output for LLM context injection.

The pull path supports two modes selected by retrieval_mode:

  • "hybrid" — BM25 (lexical) + vector (semantic) signals fused via Reciprocal Rank Fusion (RRF, k=60) followed by optional CoOccurrence graph propagation. Requires BM25Field and EmbeddingField on the model.
  • "composite" — original CompositeScoreQuery weighted-sum (unchanged from pre-v1.7 behaviour). Requires score_weights.
  • "auto" (default) — selects "hybrid" when both BM25Field and EmbeddingField are detected on the model, otherwise falls back to "composite".

Parameters:

Name Type Description Default
model_class

Popoto Model class to query.

required
score_weights

Dict mapping field names to weights for the composite pull path (e.g., {"relevance": 0.6, "confidence": 0.3}). Ignored when the effective retrieval mode is "hybrid".

required
max_items

Maximum records to return. Default 10.

DEFAULT_MAX_ITEMS
max_tokens

Optional enforced token budget over the serialized per-record output. Packing is greedy first-fit in rank order: a record that does not fit is skipped (not a packing terminator) and later smaller records may still be admitted. The first record is always admitted, so assemble() never returns zero records when candidates exist — a single oversized record can therefore overshoot the budget, and that overshoot is visible in metadata["token_count"]. Wrapper framing (JSON array brackets, <records> envelope, enumeration prefixes) is excluded from counting; it is a fixed handful of tokens per assembly.

None
surfacing_threshold

Minimum score for push-path records. Default 0.5.

DEFAULT_SURFACING_THRESHOLD
propagation_depth

BFS depth for CoOccurrence. Default 2.

DEFAULT_PROPAGATION_DEPTH
output_format

"structured" (JSON), "xml", or "natural". Default "structured".

'structured'
token_counter

Optional callable(serialized_text: str) -> int. Receives the exact serialized per-record string the formatter emits for the active output_format (never the record object) and must return a non-negative int. Counters that raise, or return anything else, fall back to the stdlib estimator for that record (with a diagnostic warning). Old-contract callable(record) counters trigger a DeprecationWarning at construction. Default: a stdlib character-class heuristic over the serialized text (_estimate_tokens).

None
retrieval_mode str

"auto" (default), "hybrid", or "composite". See class docstring for semantics.

'auto'

Raises:

Type Description
QueryException

If retrieval_mode="hybrid" is requested but the model lacks BM25Field or EmbeddingField.

Source code in src/popoto/recipes/context_assembler.py
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
class ContextAssembler:
    """Orchestrates memory retrieval into a single assemble() call.

    Combines pull-path (query-driven) and push-path (proactive via
    CyclicDecayField) retrieval, applies token budgets, and formats output
    for LLM context injection.

    The pull path supports two modes selected by ``retrieval_mode``:

    * ``"hybrid"`` — BM25 (lexical) + vector (semantic) signals fused via
      Reciprocal Rank Fusion (RRF, k=60) followed by optional CoOccurrence
      graph propagation. Requires ``BM25Field`` and ``EmbeddingField`` on the
      model.
    * ``"composite"`` — original CompositeScoreQuery weighted-sum (unchanged
      from pre-v1.7 behaviour). Requires ``score_weights``.
    * ``"auto"`` *(default)* — selects ``"hybrid"`` when both ``BM25Field``
      and ``EmbeddingField`` are detected on the model, otherwise falls back
      to ``"composite"``.

    Args:
        model_class: Popoto Model class to query.
        score_weights: Dict mapping field names to weights for the composite
            pull path (e.g., ``{"relevance": 0.6, "confidence": 0.3}``).
            Ignored when the effective retrieval mode is ``"hybrid"``.
        max_items: Maximum records to return. Default 10.
        max_tokens: Optional enforced token budget over the serialized
            per-record output. Packing is greedy first-fit in rank order:
            a record that does not fit is skipped (not a packing
            terminator) and later smaller records may still be admitted.
            The first record is always admitted, so ``assemble()`` never
            returns zero records when candidates exist — a single
            oversized record can therefore overshoot the budget, and that
            overshoot is visible in ``metadata["token_count"]``. Wrapper
            framing (JSON array brackets, ``<records>`` envelope,
            enumeration prefixes) is excluded from counting; it is a fixed
            handful of tokens per assembly.
        surfacing_threshold: Minimum score for push-path records. Default 0.5.
        propagation_depth: BFS depth for CoOccurrence. Default 2.
        output_format: ``"structured"`` (JSON), ``"xml"``, or ``"natural"``.
            Default ``"structured"``.
        token_counter: Optional ``callable(serialized_text: str) -> int``.
            Receives the exact serialized per-record string the formatter
            emits for the active ``output_format`` (never the record
            object) and must return a non-negative ``int``. Counters that
            raise, or return anything else, fall back to the stdlib
            estimator for that record (with a diagnostic warning).
            Old-contract ``callable(record)`` counters trigger a
            ``DeprecationWarning`` at construction. Default: a stdlib
            character-class heuristic over the serialized text
            (``_estimate_tokens``).
        retrieval_mode: ``"auto"`` (default), ``"hybrid"``, or
            ``"composite"``. See class docstring for semantics.

    Raises:
        QueryException: If ``retrieval_mode="hybrid"`` is requested but the
            model lacks ``BM25Field`` or ``EmbeddingField``.
    """

    def __init__(
        self,
        model_class,
        score_weights,
        max_items=DEFAULT_MAX_ITEMS,
        max_tokens=None,
        surfacing_threshold=DEFAULT_SURFACING_THRESHOLD,
        propagation_depth=DEFAULT_PROPAGATION_DEPTH,
        output_format="structured",
        token_counter=None,
        *,
        retrieval_mode: str = "auto",
    ):
        self.model_class = model_class
        self.score_weights = score_weights
        self.max_items = max_items
        self.max_tokens = max_tokens
        self.surfacing_threshold = surfacing_threshold
        self.propagation_depth = propagation_depth
        self.output_format = output_format
        if token_counter is None:
            self._token_counter = _estimate_tokens
        else:
            # Construction-time contract probe: token_counter receives the
            # serialized record STRING, not the record object. Old-contract
            # callable(record) counters typically raise TypeError or
            # AttributeError when handed a str — surface that loudly here
            # instead of silently degrading per-call in production.
            try:
                token_counter("popoto token_counter contract probe")
            except (TypeError, AttributeError):
                warnings.warn(
                    "token_counter now receives the serialized record string "
                    "(str), not the record object — update your counter",
                    DeprecationWarning,
                    stacklevel=2,
                )
            except Exception:
                # Other probe failures are not contract signals; per-call
                # validation in _count_record_tokens handles them.
                pass
            self._token_counter = token_counter

        # Detect field capabilities on model
        self._existence_filter = None
        self._co_occurrence_field = None
        self._co_occurrence_field_name = None
        self._cyclic_decay_field_name = None
        self._confidence_field_name = None
        self._decaying_sorted_field_name = None
        self._bm25_field = None
        self._bm25_field_name = None
        self._embedding_field = None
        self._embedding_field_name = None

        for name, f in model_class._meta.fields.items():
            if isinstance(f, ExistenceFilter) and self._existence_filter is None:
                self._existence_filter = f
            if isinstance(f, CoOccurrenceField) and self._co_occurrence_field is None:
                self._co_occurrence_field = f
                self._co_occurrence_field_name = name
            if (
                isinstance(f, CyclicDecayField)
                and self._cyclic_decay_field_name is None
            ):
                self._cyclic_decay_field_name = name
            if isinstance(f, ConfidenceField) and self._confidence_field_name is None:
                self._confidence_field_name = name
            if (
                isinstance(f, DecayingSortedField)
                and self._decaying_sorted_field_name is None
            ):
                self._decaying_sorted_field_name = name
            if isinstance(f, BM25Field) and self._bm25_field is None:
                self._bm25_field = f
                self._bm25_field_name = name
            if isinstance(f, EmbeddingField) and self._embedding_field is None:
                self._embedding_field = f
                self._embedding_field_name = name

        # Resolve effective retrieval mode
        if retrieval_mode == "auto":
            self._effective_mode = (
                "hybrid"
                if (self._bm25_field is not None and self._embedding_field is not None)
                else "composite"
            )
        elif retrieval_mode == "hybrid":
            if self._bm25_field is None or self._embedding_field is None:
                from ..exceptions import QueryException

                missing = []
                if self._bm25_field is None:
                    missing.append("BM25Field")
                if self._embedding_field is None:
                    missing.append("EmbeddingField")
                raise QueryException(
                    f"retrieval_mode='hybrid' requires {' and '.join(missing)} "
                    f"on {model_class.__name__}"
                )
            self._effective_mode = "hybrid"
        else:
            self._effective_mode = "composite"

        if self._effective_mode == "hybrid" and score_weights:
            logger.debug(
                "ContextAssembler: retrieval_mode resolved to 'hybrid'; "
                "score_weights are ignored for the pull path"
            )

    def assemble(
        self,
        query_cues=None,
        agent_id=None,
        partition_filters=None,
        assess_quality=False,
    ):
        """Execute the full retrieval pipeline.

        Args:
            query_cues: Optional dict of query cues (e.g., {"topic": "deploy"}).
                If None, pull path is skipped.
            agent_id: Optional agent ID for partition filtering. Added to
                partition_filters as {"agent_id": agent_id}.
            partition_filters: Optional dict of partition key-value pairs
                for filtering queries.
            assess_quality: When True, compute a ``RetrievalQuality`` over
                the selected records and attach it to
                ``AssemblyResult.metadata["quality"]``. Default False;
                when False the result shape is bit-for-bit identical to
                the pre-metacognitive-layer behavior. Turning this on adds
                bounded overhead (one ``might_exist`` per cue plus one
                ``get_confidence`` per selected record).

        Returns:
            AssemblyResult with records, proactive, formatted, and metadata.
        """
        t0 = time.time()

        # Build partition filters
        filters = dict(partition_filters or {})
        if agent_id is not None:
            filters["agent_id"] = agent_id

        pull_records = []
        push_records = []
        all_pull_candidates = []  # For competitive suppression

        # --- Pull path ---
        if query_cues:
            pull_records, all_pull_candidates = self._pull_path(query_cues, filters)

        # --- Push path ---
        if self._cyclic_decay_field_name is not None:
            push_records = self._push_path(filters)

        # --- Merge + deduplicate ---
        seen_keys = set()
        merged = []
        pull_keys = set()
        push_keys = set()

        for record in pull_records:
            rk = _get_key(record)
            if rk not in seen_keys:
                seen_keys.add(rk)
                merged.append(record)
                pull_keys.add(rk)

        for record in push_records:
            rk = _get_key(record)
            if rk not in seen_keys:
                seen_keys.add(rk)
                merged.append(record)
                push_keys.add(rk)

        # --- Budget selection ---
        # max_items cap
        selected = merged[: self.max_items]

        # max_tokens cap — greedy first-fit in rank order (skip, not break):
        # a record that does not fit is skipped and the loop continues, so
        # later smaller records may still be admitted. The first record is
        # always admitted (never-zero-records guarantee); a single oversized
        # record can overshoot the budget, visibly in metadata["token_count"].
        # The serialized strings captured here at count time are the exact
        # strings the formatter composes below — counting can never diverge
        # from emission, even if _post_effects mutates record state.
        total_tokens = 0
        selected_serialized = []
        if self.max_tokens is not None:
            budget_selected = []
            for record in selected:
                tokens, serialized = self._count_record_tokens(record)
                if budget_selected and total_tokens + tokens > self.max_tokens:
                    continue
                total_tokens += tokens
                budget_selected.append(record)
                selected_serialized.append(serialized)
            selected = budget_selected
        else:
            for record in selected:
                tokens, serialized = self._count_record_tokens(record)
                total_tokens += tokens
                selected_serialized.append(serialized)

        # Identify proactive records in final selection
        proactive = [r for r in selected if _get_key(r) in push_keys]

        # --- Post-retrieval effects ---
        self._post_effects(
            selected, pull_keys, push_keys, all_pull_candidates, agent_id
        )

        # --- Format ---
        # Compose from the count-time serialized strings (NOT a re-serialize
        # after _post_effects): what was counted is byte-for-byte what is
        # emitted, plus fixed wrapper framing.
        compose = {
            "structured": _compose_structured,
            "xml": _compose_xml,
            "natural": _compose_natural,
        }.get(self.output_format, _compose_structured)

        formatted = compose(selected_serialized)

        timing_ms = round((time.time() - t0) * 1000, 2)

        metadata = {
            "pull_count": len([r for r in selected if _get_key(r) in pull_keys]),
            "push_count": len(proactive),
            "token_count": total_tokens,
            "timing_ms": timing_ms,
            "total_candidates": len(merged),
        }

        # [METACOGNITIVE] Quality assessment — opt-in, off-by-default so existing
        # callers see bit-for-bit identical metadata.
        if assess_quality:
            try:
                metadata["quality"] = self._compute_quality(
                    selected=selected,
                    all_pull_candidates=all_pull_candidates,
                    query_cues=query_cues or {},
                )
            except Exception as e:
                logger.warning("_compute_quality failed: %s", e)
                metadata["quality"] = RetrievalQuality()

        return AssemblyResult(
            records=selected,
            proactive=proactive,
            formatted=formatted,
            metadata=metadata,
        )

    def _count_record_tokens(self, record):
        """Serialize ``record`` for the active output format and count tokens.

        Single counting path used by both the budgeted and unbudgeted
        branches of budget selection. The configured ``token_counter``
        receives the serialized string; its return value is validated (a
        non-negative ``int``, not a ``bool``). Any exception or invalid
        return falls back to :func:`_estimate_tokens` over the same string,
        with a diagnostic warning.

        Returns:
            Tuple ``(tokens, serialized)`` — the token count and the exact
            per-record string the formatter will emit for this record.
        """
        serialized = _serialize_record(record, self.output_format)
        try:
            tokens = self._token_counter(serialized)
            if not isinstance(tokens, int) or isinstance(tokens, bool) or tokens < 0:
                raise TypeError(
                    f"token_counter returned {tokens!r}; " "expected a non-negative int"
                )
        except Exception as e:
            logger.warning(
                "token_counter raised %s on serialized text (first 80 chars: "
                "%r); falling back to _estimate_tokens. Contract: "
                "callable(str) -> int.",
                type(e).__name__,
                serialized[:80],
            )
            tokens = _estimate_tokens(serialized)
        return tokens, serialized

    def _pull_path(self, query_cues, filters):
        """Dispatch pull-path retrieval based on ``self._effective_mode``.

        Returns:
            Tuple of (selected_records, all_candidates).
        """
        if self._effective_mode == "hybrid":
            return self._pull_path_hybrid(query_cues, filters)
        return self._pull_path_composite(query_cues, filters)

    def _pull_path_composite(self, query_cues, filters):
        """Execute pull-path retrieval via CompositeScoreQuery (original path).

        Returns:
            Tuple of (selected_records, all_candidates) where all_candidates
            includes records that may not make the final cut.
        """
        # ExistenceFilter pre-check
        if self._existence_filter is not None:
            all_missing = True
            for cue_value in query_cues.values():
                if not self._existence_filter.definitely_missing(
                    self.model_class, str(cue_value)
                ):
                    all_missing = False
                    break
            if all_missing:
                logger.debug(
                    "ExistenceFilter: all cues definitely missing, skipping pull"
                )
                return [], []

        # CoOccurrence boost (first pass without boost, then propagate)
        co_occurrence_boost = None

        try:
            # Initial composite score query
            query = self.model_class.query
            if filters:
                query = query.filter(**filters)

            candidates = query.composite_score(
                indexes=self.score_weights,
                limit=self.max_items * 2,
                co_occurrence_boost=co_occurrence_boost,
            )
        except Exception as e:
            logger.warning("CompositeScoreQuery failed: %s", e)
            return [], []

        if not candidates:
            return [], []

        # CoOccurrence propagation to discover associated records
        if self._co_occurrence_field is not None and candidates:
            seed_pks = [_get_key(c) for c in candidates[: self.max_items]]
            try:
                propagated = self._co_occurrence_field.propagate(
                    self.model_class,
                    seed_pks,
                    depth=self.propagation_depth,
                    decay_per_hop=0.5,
                    threshold=0.01,
                )
                if propagated:
                    # Re-run composite score with co-occurrence boost
                    query = self.model_class.query
                    if filters:
                        query = query.filter(**filters)
                    candidates = query.composite_score(
                        indexes=self.score_weights,
                        limit=self.max_items * 2,
                        co_occurrence_boost=propagated,
                    )
            except Exception as e:
                logger.warning("CoOccurrence propagation failed: %s", e)

        all_candidates = list(candidates)
        return candidates, all_candidates

    def _pull_path_hybrid(self, query_cues, filters):
        """Hybrid pull path: BM25 (lexical) + vector (semantic) + graph via RRF.

        Collects up to three ranked signals then fuses them with
        ``QueryBuilder.fuse()`` (RRF, k=RRF_K). Falls back to the composite
        path when both lexical and vector signals are empty.

        Returns:
            Tuple of (selected_records, all_candidates).
        """
        query_text = " ".join(str(v) for v in query_cues.values())

        # ExistenceFilter pre-check (same short-circuit as composite path)
        if self._existence_filter is not None:
            all_missing = all(
                self._existence_filter.definitely_missing(self.model_class, str(v))
                for v in query_cues.values()
            )
            if all_missing:
                logger.debug(
                    "ExistenceFilter: all cues definitely missing, skipping hybrid pull"
                )
                return [], []

        candidate_limit = self.max_items * HYBRID_CANDIDATE_MULTIPLIER

        keyword_results: list = []
        vector_results: list = []
        graph_results: list = []

        # --- BM25 lexical retrieval ---
        try:
            keyword_results = BM25Field.search(
                self.model_class,
                self._bm25_field_name,
                query_text,
                limit=candidate_limit,
            )
        except Exception as e:
            logger.warning("BM25 search failed in hybrid path: %s", e)

        # --- Vector semantic retrieval (raw scored tuples, not hydrated) ---
        try:
            from ..models.query import QueryBuilder as _QueryBuilder

            _q = self.model_class.query
            if filters:
                _qb = _q.filter(**filters)
            else:
                _qb = _QueryBuilder(_q)
            vector_results = _qb._get_vector_scores(query_text, limit=candidate_limit)
        except Exception as e:
            logger.warning("Vector search failed in hybrid path: %s", e)

        if not keyword_results and not vector_results:
            logger.debug(
                "Hybrid path: no BM25 or vector signal collected, "
                "falling back to composite"
            )
            return self._pull_path_composite(query_cues, filters)

        # --- Graph propagation (seeds from BM25 top results) ---
        if self._co_occurrence_field is not None and keyword_results:
            seed_pks = [k for k, _ in keyword_results[:5]]
            try:
                propagated = self._co_occurrence_field.propagate(
                    self.model_class,
                    seed_pks,
                    depth=self.propagation_depth,
                    decay_per_hop=0.5,
                    threshold=0.01,
                )
                graph_results = list(propagated.items())
            except Exception as e:
                logger.warning("Graph propagation failed in hybrid path: %s", e)

        # --- RRF fusion ---
        fuse_kwargs: dict = {}
        if keyword_results:
            fuse_kwargs["keyword"] = keyword_results
        if vector_results:
            fuse_kwargs["vector"] = vector_results
        if graph_results:
            fuse_kwargs["graph"] = graph_results

        try:
            query = self.model_class.query
            if filters:
                query = query.filter(**filters)
            candidates = query.fuse(
                k=RRF_K,
                limit=self.max_items * 2,
                **fuse_kwargs,
            )
        except Exception as e:
            logger.warning("RRF fusion failed, falling back to composite: %s", e)
            return self._pull_path_composite(query_cues, filters)

        all_candidates = list(candidates)
        return candidates, all_candidates

    def _push_path(self, filters):
        """Execute push-path retrieval via CyclicDecayField.

        Uses composite_score with min_score for threshold filtering instead
        of top_by_decay, since composite_score supports server-side score
        thresholds via ZREVRANGEBYSCORE.
        """
        try:
            query = self.model_class.query
            if filters:
                query = query.filter(**filters)

            # Build weights using only the CyclicDecayField for push-path scoring
            push_weights = {self._cyclic_decay_field_name: 1.0}

            results = query.composite_score(
                indexes=push_weights,
                limit=self.max_items,
                min_score=(
                    self.surfacing_threshold if self.surfacing_threshold > 0 else None
                ),
            )
        except Exception as e:
            logger.warning("Push path failed: %s", e)
            return []

        if not results:
            logger.debug("Push path: 0 records above surfacing threshold")

        return results

    def _post_effects(
        self, selected, pull_keys, push_keys, all_pull_candidates, agent_id
    ):
        """Apply post-retrieval effects using Redis pipeline."""
        if not selected and not all_pull_candidates:
            return

        pipeline = POPOTO_REDIS_DB.pipeline()

        # on_read for pull-path selected records
        for record in selected:
            if _get_key(record) in pull_keys:
                ObservationProtocol.on_read(record, pipeline=pipeline)

        # on_surfaced for push-path selected records
        proactive_records = [r for r in selected if _get_key(r) in push_keys]
        if proactive_records:
            ObservationProtocol.on_surfaced(
                proactive_records,
                reason="proactive",
                partition=agent_id,
                pipeline=pipeline,
            )

        # Competitive suppression for non-selected pull candidates
        if self._confidence_field_name is not None:
            selected_keys = {_get_key(r) for r in selected}
            for candidate in all_pull_candidates:
                if _get_key(candidate) not in selected_keys:
                    try:
                        ConfidenceField.update_confidence(
                            candidate,
                            self._confidence_field_name,
                            signal=COMPETITIVE_SUPPRESSION_SIGNAL,
                        )
                    except (TypeError, ValueError):
                        pass  # Model may not have confidence on this instance

        try:
            pipeline.execute()
        except Exception as e:
            logger.warning("Post-effects pipeline failed: %s", e)

    # ------------------------------------------------------------------
    # Metacognitive layer: RetrievalQuality helpers + public assess()
    # ------------------------------------------------------------------

    def _cue_familiarity(self, cue_value) -> float:
        """Thin wrapper around the module-level :func:`_cue_familiarity`.

        Introspects ``self`` exactly once and forwards explicit kwargs so
        the pure helper is the single source of truth (issue #370 C2).
        """
        return _cue_familiarity(
            cue_value,
            existence_filter=self._existence_filter,
            model_class=self.model_class,
        )

    def _compute_fok(self, query_cues, pull_candidates):
        """Thin wrapper around the module-level :func:`_compute_fok`."""
        return _compute_fok(
            query_cues,
            pull_candidates,
            model_class=self.model_class,
            score_weights=self.score_weights,
            max_items=self.max_items,
            surfacing_threshold=self.surfacing_threshold,
            existence_filter=self._existence_filter,
        )

    def _score_proxy_for_records(self, records):
        """Thin wrapper around the module-level
        :func:`_score_proxy_for_records`.
        """
        return _score_proxy_for_records(
            records,
            model_class=self.model_class,
            score_weights=self.score_weights,
        )

    def _compute_score_spread(self, records):
        """Thin wrapper around the module-level
        :func:`_compute_score_spread`.
        """
        return _compute_score_spread(
            records,
            model_class=self.model_class,
            score_weights=self.score_weights,
        )

    def _avg_confidence(self, records):
        """Thin wrapper around the module-level :func:`_avg_confidence`."""
        return _avg_confidence(
            records,
            confidence_field_name=self._confidence_field_name,
            model_class=self.model_class,
        )

    def _staleness_ratio(self, records):
        """Thin wrapper around the module-level :func:`_staleness_ratio`."""
        return _staleness_ratio(
            records,
            model_class=self.model_class,
            score_weights=self.score_weights,
            surfacing_threshold=self.surfacing_threshold,
            decaying_sorted_field_name=self._decaying_sorted_field_name,
        )

    def _compute_quality(self, selected, all_pull_candidates, query_cues):
        """Assemble a RetrievalQuality over the selected records.

        Side-effect-free; reads from ConfidenceField, ExistenceFilter, and
        sorted-set indexes via their existing public APIs. Intended for
        calls at the end of ``assemble()`` (post selection) and from the
        standalone ``assess()`` probe.
        """
        avg_conf = self._avg_confidence(selected)
        score_spread, distribution = self._compute_score_spread(selected)
        fok_score, per_cue = self._compute_fok(query_cues, all_pull_candidates)
        staleness = self._staleness_ratio(selected)
        return RetrievalQuality(
            avg_confidence=avg_conf,
            score_spread=score_spread,
            fok_score=fok_score,
            staleness_ratio=staleness,
            score_distribution=distribution,
            per_cue_fok=per_cue,
        )

    def assess(self, query_cues=None, partition_filters=None, probe_limit=None):
        """Probe retrieval quality without running the full pipeline.

        Runs a cheap pre-retrieval check: ExistenceFilter lookups for
        cue_familiarity + a single low-limit composite_score probe to
        gather pull candidates for FOK computation. Does NOT run
        CoOccurrence propagation, does NOT run the push path, does NOT
        apply post-effects.

        Intended use: call ``assess()`` before ``assemble()`` to decide
        whether the full retrieval is worth the round-trip cost. When
        ``assess().fok_score < some_threshold``, the agent can skip the
        retrieval entirely and widen the cue or caveat its answer.

        Args:
            query_cues: Optional dict of query cues. When empty, all
                metrics default to 0.0 with a logged warning.
            partition_filters: Optional dict of partition filters. Same
                semantics as ``assemble()``.
            probe_limit: Optional cap on the number of candidates fetched
                for the probe. Defaults to ``self.max_items``.

        Returns:
            RetrievalQuality. ``selected`` is treated as empty; the quality
            reflects what's *available* for retrieval, not what was
            actually retrieved.
        """
        filters = dict(partition_filters or {})

        if not query_cues:
            logger.warning("assess() called with no query_cues")
            return RetrievalQuality()

        limit = probe_limit if probe_limit is not None else self.max_items
        probe_candidates = []

        # ExistenceFilter pre-check — same short-circuit as _pull_path.
        if self._existence_filter is not None:
            all_missing = True
            for cue_value in query_cues.values():
                if not self._existence_filter.definitely_missing(
                    self.model_class, str(cue_value)
                ):
                    all_missing = False
                    break
            if all_missing:
                # No probe — everything is definitely absent.
                fok_score, per_cue = self._compute_fok(query_cues, [])
                return RetrievalQuality(
                    avg_confidence=1.0 if not self._confidence_field_name else 0.5,
                    score_spread=0.0,
                    fok_score=fok_score,
                    staleness_ratio=0.0,
                    score_distribution=[],
                    per_cue_fok=per_cue,
                )

        try:
            query = self.model_class.query
            if filters:
                query = query.filter(**filters)
            probe_candidates = query.composite_score(
                indexes=self.score_weights,
                limit=limit,
            )
        except Exception as e:
            logger.warning("assess() composite_score probe failed: %s", e)
            probe_candidates = []

        avg_conf = self._avg_confidence(probe_candidates)
        score_spread, distribution = self._compute_score_spread(probe_candidates)
        fok_score, per_cue = self._compute_fok(query_cues, probe_candidates)
        staleness = self._staleness_ratio(probe_candidates)

        return RetrievalQuality(
            avg_confidence=avg_conf,
            score_spread=score_spread,
            fok_score=fok_score,
            staleness_ratio=staleness,
            score_distribution=distribution,
            per_cue_fok=per_cue,
        )

assemble(query_cues=None, agent_id=None, partition_filters=None, assess_quality=False)

Execute the full retrieval pipeline.

Parameters:

Name Type Description Default
query_cues

Optional dict of query cues (e.g., {"topic": "deploy"}). If None, pull path is skipped.

None
agent_id

Optional agent ID for partition filtering. Added to partition_filters as {"agent_id": agent_id}.

None
partition_filters

Optional dict of partition key-value pairs for filtering queries.

None
assess_quality

When True, compute a RetrievalQuality over the selected records and attach it to AssemblyResult.metadata["quality"]. Default False; when False the result shape is bit-for-bit identical to the pre-metacognitive-layer behavior. Turning this on adds bounded overhead (one might_exist per cue plus one get_confidence per selected record).

False

Returns:

Type Description

AssemblyResult with records, proactive, formatted, and metadata.

Source code in src/popoto/recipes/context_assembler.py
def assemble(
    self,
    query_cues=None,
    agent_id=None,
    partition_filters=None,
    assess_quality=False,
):
    """Execute the full retrieval pipeline.

    Args:
        query_cues: Optional dict of query cues (e.g., {"topic": "deploy"}).
            If None, pull path is skipped.
        agent_id: Optional agent ID for partition filtering. Added to
            partition_filters as {"agent_id": agent_id}.
        partition_filters: Optional dict of partition key-value pairs
            for filtering queries.
        assess_quality: When True, compute a ``RetrievalQuality`` over
            the selected records and attach it to
            ``AssemblyResult.metadata["quality"]``. Default False;
            when False the result shape is bit-for-bit identical to
            the pre-metacognitive-layer behavior. Turning this on adds
            bounded overhead (one ``might_exist`` per cue plus one
            ``get_confidence`` per selected record).

    Returns:
        AssemblyResult with records, proactive, formatted, and metadata.
    """
    t0 = time.time()

    # Build partition filters
    filters = dict(partition_filters or {})
    if agent_id is not None:
        filters["agent_id"] = agent_id

    pull_records = []
    push_records = []
    all_pull_candidates = []  # For competitive suppression

    # --- Pull path ---
    if query_cues:
        pull_records, all_pull_candidates = self._pull_path(query_cues, filters)

    # --- Push path ---
    if self._cyclic_decay_field_name is not None:
        push_records = self._push_path(filters)

    # --- Merge + deduplicate ---
    seen_keys = set()
    merged = []
    pull_keys = set()
    push_keys = set()

    for record in pull_records:
        rk = _get_key(record)
        if rk not in seen_keys:
            seen_keys.add(rk)
            merged.append(record)
            pull_keys.add(rk)

    for record in push_records:
        rk = _get_key(record)
        if rk not in seen_keys:
            seen_keys.add(rk)
            merged.append(record)
            push_keys.add(rk)

    # --- Budget selection ---
    # max_items cap
    selected = merged[: self.max_items]

    # max_tokens cap — greedy first-fit in rank order (skip, not break):
    # a record that does not fit is skipped and the loop continues, so
    # later smaller records may still be admitted. The first record is
    # always admitted (never-zero-records guarantee); a single oversized
    # record can overshoot the budget, visibly in metadata["token_count"].
    # The serialized strings captured here at count time are the exact
    # strings the formatter composes below — counting can never diverge
    # from emission, even if _post_effects mutates record state.
    total_tokens = 0
    selected_serialized = []
    if self.max_tokens is not None:
        budget_selected = []
        for record in selected:
            tokens, serialized = self._count_record_tokens(record)
            if budget_selected and total_tokens + tokens > self.max_tokens:
                continue
            total_tokens += tokens
            budget_selected.append(record)
            selected_serialized.append(serialized)
        selected = budget_selected
    else:
        for record in selected:
            tokens, serialized = self._count_record_tokens(record)
            total_tokens += tokens
            selected_serialized.append(serialized)

    # Identify proactive records in final selection
    proactive = [r for r in selected if _get_key(r) in push_keys]

    # --- Post-retrieval effects ---
    self._post_effects(
        selected, pull_keys, push_keys, all_pull_candidates, agent_id
    )

    # --- Format ---
    # Compose from the count-time serialized strings (NOT a re-serialize
    # after _post_effects): what was counted is byte-for-byte what is
    # emitted, plus fixed wrapper framing.
    compose = {
        "structured": _compose_structured,
        "xml": _compose_xml,
        "natural": _compose_natural,
    }.get(self.output_format, _compose_structured)

    formatted = compose(selected_serialized)

    timing_ms = round((time.time() - t0) * 1000, 2)

    metadata = {
        "pull_count": len([r for r in selected if _get_key(r) in pull_keys]),
        "push_count": len(proactive),
        "token_count": total_tokens,
        "timing_ms": timing_ms,
        "total_candidates": len(merged),
    }

    # [METACOGNITIVE] Quality assessment — opt-in, off-by-default so existing
    # callers see bit-for-bit identical metadata.
    if assess_quality:
        try:
            metadata["quality"] = self._compute_quality(
                selected=selected,
                all_pull_candidates=all_pull_candidates,
                query_cues=query_cues or {},
            )
        except Exception as e:
            logger.warning("_compute_quality failed: %s", e)
            metadata["quality"] = RetrievalQuality()

    return AssemblyResult(
        records=selected,
        proactive=proactive,
        formatted=formatted,
        metadata=metadata,
    )

assess(query_cues=None, partition_filters=None, probe_limit=None)

Probe retrieval quality without running the full pipeline.

Runs a cheap pre-retrieval check: ExistenceFilter lookups for cue_familiarity + a single low-limit composite_score probe to gather pull candidates for FOK computation. Does NOT run CoOccurrence propagation, does NOT run the push path, does NOT apply post-effects.

Intended use: call assess() before assemble() to decide whether the full retrieval is worth the round-trip cost. When assess().fok_score < some_threshold, the agent can skip the retrieval entirely and widen the cue or caveat its answer.

Parameters:

Name Type Description Default
query_cues

Optional dict of query cues. When empty, all metrics default to 0.0 with a logged warning.

None
partition_filters

Optional dict of partition filters. Same semantics as assemble().

None
probe_limit

Optional cap on the number of candidates fetched for the probe. Defaults to self.max_items.

None

Returns:

Type Description

RetrievalQuality. selected is treated as empty; the quality

reflects what's available for retrieval, not what was

actually retrieved.

Source code in src/popoto/recipes/context_assembler.py
def assess(self, query_cues=None, partition_filters=None, probe_limit=None):
    """Probe retrieval quality without running the full pipeline.

    Runs a cheap pre-retrieval check: ExistenceFilter lookups for
    cue_familiarity + a single low-limit composite_score probe to
    gather pull candidates for FOK computation. Does NOT run
    CoOccurrence propagation, does NOT run the push path, does NOT
    apply post-effects.

    Intended use: call ``assess()`` before ``assemble()`` to decide
    whether the full retrieval is worth the round-trip cost. When
    ``assess().fok_score < some_threshold``, the agent can skip the
    retrieval entirely and widen the cue or caveat its answer.

    Args:
        query_cues: Optional dict of query cues. When empty, all
            metrics default to 0.0 with a logged warning.
        partition_filters: Optional dict of partition filters. Same
            semantics as ``assemble()``.
        probe_limit: Optional cap on the number of candidates fetched
            for the probe. Defaults to ``self.max_items``.

    Returns:
        RetrievalQuality. ``selected`` is treated as empty; the quality
        reflects what's *available* for retrieval, not what was
        actually retrieved.
    """
    filters = dict(partition_filters or {})

    if not query_cues:
        logger.warning("assess() called with no query_cues")
        return RetrievalQuality()

    limit = probe_limit if probe_limit is not None else self.max_items
    probe_candidates = []

    # ExistenceFilter pre-check — same short-circuit as _pull_path.
    if self._existence_filter is not None:
        all_missing = True
        for cue_value in query_cues.values():
            if not self._existence_filter.definitely_missing(
                self.model_class, str(cue_value)
            ):
                all_missing = False
                break
        if all_missing:
            # No probe — everything is definitely absent.
            fok_score, per_cue = self._compute_fok(query_cues, [])
            return RetrievalQuality(
                avg_confidence=1.0 if not self._confidence_field_name else 0.5,
                score_spread=0.0,
                fok_score=fok_score,
                staleness_ratio=0.0,
                score_distribution=[],
                per_cue_fok=per_cue,
            )

    try:
        query = self.model_class.query
        if filters:
            query = query.filter(**filters)
        probe_candidates = query.composite_score(
            indexes=self.score_weights,
            limit=limit,
        )
    except Exception as e:
        logger.warning("assess() composite_score probe failed: %s", e)
        probe_candidates = []

    avg_conf = self._avg_confidence(probe_candidates)
    score_spread, distribution = self._compute_score_spread(probe_candidates)
    fok_score, per_cue = self._compute_fok(query_cues, probe_candidates)
    staleness = self._staleness_ratio(probe_candidates)

    return RetrievalQuality(
        avg_confidence=avg_conf,
        score_spread=score_spread,
        fok_score=fok_score,
        staleness_ratio=staleness,
        score_distribution=distribution,
        per_cue_fok=per_cue,
    )

format_structured(records)

Format records as JSON array.

Source code in src/popoto/recipes/context_assembler.py
def format_structured(records) -> str:
    """Format records as JSON array."""
    return _compose_structured([_serialize_record(r, "structured") for r in records])

format_xml(records)

Format records as XML tags.

Source code in src/popoto/recipes/context_assembler.py
def format_xml(records) -> str:
    """Format records as XML tags."""
    return _compose_xml([_serialize_record(r, "xml") for r in records])

format_natural(records)

Format records as natural language summary.

Source code in src/popoto/recipes/context_assembler.py
def format_natural(records) -> str:
    """Format records as natural language summary."""
    return _compose_natural([_serialize_record(r, "natural") for r in records])