Skip to content

popoto.models.query

popoto.models.query

Query Layer for Popoto Redis ORM.

This module provides the Query class, which serves as the primary interface for retrieving Model instances from Redis. It follows a Django-inspired query API, allowing developers familiar with Django's ORM to quickly adapt to Popoto.

Design Philosophy:

Popoto treats Redis not as a simple key-value cache, but as a first-class database with rich querying capabilities. The Query class abstracts away the complexity of Redis data structures (sets, sorted sets, hash maps) and presents a unified, intuitive interface.

Key architectural decisions: 1. Set Intersection Strategy: Filter operations return sets of Redis keys, which are then intersected to combine multiple filters. This leverages Redis's O(N*M) SINTER performance for efficient multi-criteria queries.

  1. Field-Delegated Filtering: Each field type (KeyField, SortedField, etc.) implements its own filter_query() method. The Query class orchestrates these but delegates the actual Redis commands to the field implementations.

  2. Sorted Fields First: When processing filters, sorted fields are evaluated before key fields. Sorted fields often produce smaller result sets (range queries) and may satisfy multiple filter parameters at once due to partitioning.

  3. Pipeline Optimization: Bulk retrieval uses Redis pipelines to batch HGETALL commands, dramatically reducing round-trip latency when fetching multiple objects.

Usage Examples:

# Get a single object by key fields
user = User.query.get(username="alice")

# Filter with multiple criteria
products = Product.query.filter(category="electronics", price__lte=100.0)

# Count without loading objects
count = Order.query.count(status="pending")

# Retrieve specific fields only (projection)
names = User.query.filter(active=True, values=("name", "email"))

See Also:

  • popoto.models.base.Model - The base class that exposes Model.query
  • popoto.fields.key_field_mixin.KeyFieldMixin - Key field filtering logic
  • popoto.fields.sorted_field_mixin.SortedFieldMixin - Range query logic

QueryException

Bases: Exception

Raised when a query is malformed or produces an unexpected result.

Common causes include: - Using unknown filter parameters not supported by any field - Using get() when multiple objects match the criteria - Specifying order_by without including that field in values - Missing required partition fields for sorted field queries

Source code in src/popoto/models/query.py
class QueryException(Exception):
    """Raised when a query is malformed or produces an unexpected result.

    Common causes include:
    - Using unknown filter parameters not supported by any field
    - Using `get()` when multiple objects match the criteria
    - Specifying `order_by` without including that field in `values`
    - Missing required partition fields for sorted field queries
    """

    pass

QueryBuilder

Chainable query builder that accumulates query state.

This class provides a fluent interface for building queries incrementally. Each method returns self (or a new QueryBuilder) to enable method chaining.

The QueryBuilder is returned by Query.filter() and accumulates filter parameters, ordering, and limits until execution methods like all() are called.

Example

Chainable query construction

results = Model.query.filter(status="active").order_by("name").limit(10).all()

Multiple filter chaining

results = Model.query.filter(status="active").filter(type="premium").all()

Note

QueryBuilder also acts as a list-like object for backward compatibility. Iterating over a QueryBuilder or accessing len() will execute the query.

Source code in src/popoto/models/query.py
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
class QueryBuilder:
    """Chainable query builder that accumulates query state.

    This class provides a fluent interface for building queries incrementally.
    Each method returns self (or a new QueryBuilder) to enable method chaining.

    The QueryBuilder is returned by Query.filter() and accumulates filter
    parameters, ordering, and limits until execution methods like all() are called.

    Example:
        # Chainable query construction
        results = Model.query.filter(status="active").order_by("name").limit(10).all()

        # Multiple filter chaining
        results = Model.query.filter(status="active").filter(type="premium").all()

    Note:
        QueryBuilder also acts as a list-like object for backward compatibility.
        Iterating over a QueryBuilder or accessing len() will execute the query.
    """

    def __init__(self, query: "Query", filters: dict = None, q_objects: list = None):
        """Initialize a QueryBuilder with a reference to the parent Query.

        Args:
            query: The Query instance this builder operates on
            filters: Initial filter parameters (optional)
            q_objects: List of Q objects for complex query logic (optional)
        """
        self._query = query
        self._filters = filters.copy() if filters else {}
        self._q_objects = list(q_objects) if q_objects else []
        self._limit_value = None
        self._order_by_value = None
        self._values_tuple = None
        self._computed_sort_fn = None
        self._computed_sort_reverse = False
        self._no_track = False

    def filter(self, *args, **kwargs) -> "QueryBuilder":
        """Add filter criteria and return a new QueryBuilder.

        Creates a new QueryBuilder with merged filter parameters, allowing
        multiple filter() calls to be chained. Supports Q objects and Expression
        objects for complex query logic with OR/AND/NOT operators.

        Args:
            *args: Q objects or Expression objects for complex query expressions
            **kwargs: Filter parameters to add to the query

        Returns:
            A new QueryBuilder with the combined filters

        Example:
            query = Model.query.filter(status="active").filter(type="premium")
            query = Model.query.filter(Q(status="active") | Q(type="premium"))
            query = Model.query.filter(Model.rating > 4.0)
        """
        from .q import Q
        from .expressions import Expression, CombinedExpression

        # Create a new QueryBuilder with merged filters and Q objects
        new_builder = QueryBuilder(self._query, self._filters, self._q_objects)
        new_builder._filters.update(kwargs)

        # Process args - can be Q objects or Expression objects
        for arg in args:
            if isinstance(arg, Q):
                new_builder._q_objects.append(arg)
            elif isinstance(arg, (Expression, CombinedExpression)):
                # Convert Expression to Q object
                new_builder._q_objects.append(arg.to_q())

        new_builder._limit_value = self._limit_value
        new_builder._order_by_value = self._order_by_value
        new_builder._values_tuple = self._values_tuple
        new_builder._computed_sort_fn = self._computed_sort_fn
        new_builder._computed_sort_reverse = self._computed_sort_reverse
        new_builder._no_track = self._no_track
        return new_builder

    def limit(self, n: int) -> "QueryBuilder":
        """Set the maximum number of results to return.

        Args:
            n: Maximum number of results

        Returns:
            Self for method chaining

        Example:
            results = Model.query.filter(status="active").limit(10).all()
        """
        self._limit_value = n
        return self

    def order_by(self, field: str) -> "QueryBuilder":
        """Set the field to order results by.

        Args:
            field: Field name to sort by. Prefix with "-" for descending order.

        Returns:
            Self for method chaining

        Example:
            results = Model.query.filter(status="active").order_by("-created_at").all()
        """
        self._order_by_value = field
        return self

    def values(self, *fields) -> "QueryBuilder":
        """Specify fields to return as dicts instead of model instances.

        Args:
            *fields: Field names to include in the result dicts

        Returns:
            Self for method chaining

        Example:
            results = Model.query.filter(status="active").values("name", "email").all()
        """
        self._values_tuple = fields
        return self

    def computed_sort(self, fn, reverse: bool = False) -> "QueryBuilder":
        """Sort results using a caller-provided key function.

        Applies a Python-side sort after fetching results from Redis, before
        applying limit(). This enables sorting by computed/derived values that
        are not stored as indexed fields.

        When both computed_sort() and order_by() are set, computed_sort() takes
        precedence and order_by() is ignored.

        Performance note: This is O(N log N) on the full result set before
        limiting. For large result sets (>10K records), consider using
        SortedField indexes instead.

        Args:
            fn: A callable that takes a model instance (or dict if values()
                is used) and returns a sort key. Must not be None.
            reverse: If True, sort in descending order. Default is False.

        Returns:
            Self for method chaining.

        Raises:
            TypeError: If fn is None.

        Example:
            # Sort by a computed activation score
            results = (
                Model.query.filter(status="active")
                .computed_sort(lambda x: x.priority * 0.5 + x.score * 0.5,
                               reverse=True)
                .limit(10)
                .all()
            )
        """
        if fn is None:
            raise TypeError("computed_sort() requires a callable, got None")
        self._computed_sort_fn = fn
        self._computed_sort_reverse = reverse
        return self

    def no_track(self) -> "QueryBuilder":
        """Suppress on_read() tracking for this query.

        Use for internal operations (reindex, migration) that shouldn't
        count as reads for AccessTrackerMixin models.

        Returns:
            Self for method chaining

        Example:
            results = Model.query.filter(status="active").no_track().all()
        """
        self._no_track = True
        return self

    def top_by_decay(
        self, field_name=None, n=10, decay_rate=None, base_score_field=None
    ):
        """Return top-N instances ranked by time-decayed score.

        Executes a Lua script server-side that computes:
            decayed_score = base_score * elapsed_days ^ (-decay_rate)

        Args:
            field_name: Name of a DecayingSortedField on the model. Optional
                when the model has exactly one DecayingSortedField (or subclass).
            n: Maximum number of results to return. Default 10.
            decay_rate: Override the field's decay_rate for this query.
            base_score_field: Override the field's base_score_field for this query.

        Returns:
            List of model instances in decayed-score order.

        Raises:
            QueryException: If field is not a DecayingSortedField or
                required partition_by filter is missing.
        """
        from ..fields.decaying_sorted_field import DecayingSortedField, DECAY_SCORE_LUA
        from .encoding import decode_popoto_model_hashmap

        model_class = self._query.model_class

        if field_name is None:
            dsf_names = [
                name
                for name, f in model_class._meta.fields.items()
                if isinstance(f, DecayingSortedField)
            ]
            if len(dsf_names) == 1:
                field_name = dsf_names[0]
            elif len(dsf_names) == 0:
                raise QueryException(
                    f"'{model_class.__name__}' has no DecayingSortedField"
                )
            else:
                raise QueryException(
                    f"Multiple DecayingSortedFields on '{model_class.__name__}': "
                    f"{dsf_names}. Specify field_name explicitly."
                )
        elif field_name not in model_class._meta.fields:
            raise QueryException(
                f"'{model_class.__name__}' has no field '{field_name}'"
            )

        field = model_class._meta.fields[field_name]
        if not isinstance(field, DecayingSortedField):
            raise QueryException(
                f"top_by_decay() requires a DecayingSortedField. "
                f"'{field_name}' is {type(field).__name__}"
            )

        # Use field defaults unless overridden
        effective_decay_rate = (
            decay_rate if decay_rate is not None else field.decay_rate
        )
        effective_base_score_field = (
            base_score_field
            if base_score_field is not None
            else (field.base_score_field or "")
        )

        if n <= 0:
            return []

        # Build the sorted set key respecting partition_by
        try:
            partition_values = [str(self._filters[pf]) for pf in field.partition_by]
        except KeyError:
            missing = [pf for pf in field.partition_by if pf not in self._filters]
            raise QueryException(
                f"top_by_decay() on '{field_name}' requires partition filter(s): "
                f"{', '.join(missing)}"
            )

        # Use actual field class for key generation (CyclicDecayField has
        # its own field_class_key prefix, distinct from DecayingSortedField)
        sortedset_db_key = field.__class__.get_sortedset_db_key(
            model_class, field_name, *partition_values
        )

        import time

        now = time.time()

        # Use extended Lua script for CyclicDecayField, plain script otherwise
        from ..fields.cyclic_decay_field import CyclicDecayField, CYCLIC_DECAY_LUA

        if isinstance(field, CyclicDecayField):
            # Build companion hash keys from partition values
            cycles_hash_key = CyclicDecayField.get_cycles_hash_key_from_parts(
                model_class, field_name, *partition_values
            )
            pressure_hash_key = CyclicDecayField.get_pressure_hash_key_from_parts(
                model_class, field_name, *partition_values
            )

            result = POPOTO_REDIS_DB.eval(
                CYCLIC_DECAY_LUA,
                3,  # number of KEYS
                sortedset_db_key.redis_key,
                cycles_hash_key,
                pressure_hash_key,
                str(now),
                str(effective_decay_rate),
                str(n),
                effective_base_score_field,
            )
        else:
            result = POPOTO_REDIS_DB.eval(
                DECAY_SCORE_LUA,
                1,  # number of KEYS
                sortedset_db_key.redis_key,
                str(now),
                str(effective_decay_rate),
                str(n),
                effective_base_score_field,
            )

        if not result:
            return []

        # Parse result: [key1, score1, key2, score2, ...]
        redis_keys = []
        for i in range(0, len(result), 2):
            key = result[i]
            if isinstance(key, bytes):
                key = key.decode()
            redis_keys.append(key)

        if not redis_keys:
            return []

        # Fetch model instances via pipeline
        pipe = POPOTO_REDIS_DB.pipeline()
        for key in redis_keys:
            pipe.hgetall(key)
        raw_results = pipe.execute()

        instances = []
        for key, data in zip(redis_keys, raw_results):
            if data:
                instance = decode_popoto_model_hashmap(model_class, data)
                instances.append(instance)

        if not self._no_track:
            _fire_on_read(model_class, instances)

        return instances

    def composite_score(
        self,
        indexes: dict,
        limit: int = 10,
        aggregate: str = "SUM",
        min_score: float = None,
        post_filter: Optional[Callable[[str, float], bool]] = None,
        co_occurrence_boost: dict = None,
        similarity_boost: dict = None,
        temperature: float = 1.0,
    ) -> list:
        """Return top-K instances ranked by a weighted composite of multiple indexes.

        Combines N sorted set indexes with configurable weights via Redis
        ZUNIONSTORE and returns model instances ranked by composite score.

        Each index name maps to a field on the model. Supported field types:
            - DecayingSortedField / CyclicDecayField: Materializes decay-computed
              scores into a temp ZSET via the existing Lua decay script.
            - SortedFieldMixin fields: Uses the sorted set directly.
            - WriteFilter priority: Resolves ``$WF:{Class}:priority`` directly.
            - ConfidenceField: Materializes confidence values from companion hash.
            - AccessTracker: Materializes access_count from meta hashes.

        Args:
            indexes: Mapping of field names to weights, e.g.
                ``{"relevance": 0.4, "confidence": 0.3}``. Weights are arbitrary
                positive floats; relative ratios matter, not absolute values.
            limit: Maximum results to return. Default 10.
            aggregate: Aggregation mode for ZUNIONSTORE: "SUM", "MIN", or "MAX".
                Default "SUM".
            min_score: Optional minimum composite score threshold. Results below
                this score are excluded. Note: ``min_score`` is applied to raw
                composite scores **before** temperature scaling, while
                ``post_filter`` receives temperature-scaled scores.
            post_filter: Optional callable ``(redis_key, score) -> bool``. Applied
                after ZREVRANGE but before hydration. Return True to keep.
            co_occurrence_boost: Optional dict ``{redis_key: weight}`` from
                ``CoOccurrenceField.propagate()``. Injected as an additional
                index in the composite.
            similarity_boost: Optional dict ``{redis_key: score}`` from
                ``semantic_search()``. Injected as an additional index
                in the composite, identical mechanism to co_occurrence_boost.
            temperature: Scales composite scores by dividing each score by this
                value. Low temperature (0.02-0.1) sharpens discrimination so top
                scores dominate. Default 1.0 preserves current behavior. High
                temperature (2.0+) flattens scores toward uniform. Must be > 0.

        Returns:
            List of model instances ranked by composite score (descending).

        Raises:
            QueryException: If indexes is empty, contains invalid field names,
                references fields without sorted set indexes, or temperature <= 0.

        Example:
            results = Memory.query.filter(agent_id="agent-1").composite_score(
                indexes={"relevance": 0.4, "confidence": 0.3, "access_score": 0.2},
                limit=10,
                temperature=0.1,  # sharp retrieval -- top result dominates
            )
        """
        import uuid

        from .encoding import decode_popoto_model_hashmap

        model_class = self._query.model_class

        # --- Validate inputs ---
        if not indexes:
            raise QueryException("composite_score() requires a non-empty indexes dict")

        if limit <= 0:
            return []

        aggregate = aggregate.upper()
        if aggregate not in ("SUM", "MIN", "MAX"):
            raise QueryException(
                f"aggregate must be 'SUM', 'MIN', or 'MAX' (got '{aggregate}')"
            )

        if temperature <= 0:
            raise QueryException(f"temperature must be > 0 (got {temperature})")

        # --- Resolve each index to a Redis sorted set key ---
        resolved_keys = {}  # {redis_zset_key: weight}
        temp_keys = []  # keys to clean up after
        uid = uuid.uuid4().hex[:8]
        model_name = model_class.__name__

        for field_name, weight in indexes.items():
            resolved_key = self._resolve_index(
                model_class, field_name, weight, uid, temp_keys
            )
            if resolved_key:
                resolved_keys[resolved_key] = weight

        # --- Handle co_occurrence_boost ---
        if co_occurrence_boost:
            co_key = f"$CSQ:{model_name}:co_occurrence:{uid}"
            POPOTO_REDIS_DB.zadd(
                co_key,
                {str(k): float(v) for k, v in co_occurrence_boost.items()},
            )
            POPOTO_REDIS_DB.expire(co_key, 5)
            temp_keys.append(co_key)
            resolved_keys[co_key] = 1.0  # weight already in the scores

        # --- Handle similarity_boost ---
        if similarity_boost:
            sim_key = f"$CSQ:{model_name}:similarity:{uid}"
            POPOTO_REDIS_DB.zadd(
                sim_key,
                {str(k): float(v) for k, v in similarity_boost.items()},
            )
            POPOTO_REDIS_DB.expire(sim_key, 5)
            temp_keys.append(sim_key)
            resolved_keys[sim_key] = 1.0  # weight already in the scores

        if not resolved_keys:
            self._cleanup_temp_keys(temp_keys)
            return []

        # --- ZUNIONSTORE ---
        composite_key = f"$CSQ:{model_name}:{uid}"
        temp_keys.append(composite_key)

        try:
            POPOTO_REDIS_DB.zunionstore(
                composite_key,
                resolved_keys,
                aggregate=aggregate,
            )
            POPOTO_REDIS_DB.expire(composite_key, 5)

            # --- ZREVRANGE top-K ---
            if min_score is not None:
                raw_results = POPOTO_REDIS_DB.zrevrangebyscore(
                    composite_key,
                    "+inf",
                    str(min_score),
                    start=0,
                    num=limit,
                    withscores=True,
                )
            else:
                raw_results = POPOTO_REDIS_DB.zrevrange(
                    composite_key, 0, limit - 1, withscores=True
                )

            if not raw_results:
                return []

            # --- Temperature scaling ---
            if temperature != 1.0:
                raw_results = [
                    (member, score / temperature) for member, score in raw_results
                ]

            # --- Post-filter ---
            pks = []
            for member, score in raw_results:
                if isinstance(member, bytes):
                    member = member.decode()
                if post_filter is not None and not post_filter(member, score):
                    continue
                pks.append(member)

            if not pks:
                return []

            # --- Hydrate models ---
            pipe = POPOTO_REDIS_DB.pipeline()
            for key in pks:
                pipe.hgetall(key)
            hashes = pipe.execute()

            instances = []
            for key, data in zip(pks, hashes):
                if data:
                    instance = decode_popoto_model_hashmap(model_class, data)
                    instances.append(instance)

            if not self._no_track:
                _fire_on_read(model_class, instances)

            return instances

        finally:
            self._cleanup_temp_keys(temp_keys)

    def semantic_search(
        self,
        query_text: str,
        indexes: dict = None,
        limit: int = 10,
        aggregate: str = "SUM",
        min_score: float = None,
        post_filter: Optional[Callable[[str, float], bool]] = None,
        co_occurrence_boost: dict = None,
        temperature: float = 1.0,
    ) -> list:
        """Return top-K instances ranked by semantic similarity combined with memory signals.

        Embeds the query text via the configured provider, computes cosine
        similarity against all stored embeddings, and injects similarity
        scores into composite_score() via the similarity_boost parameter.

        Args:
            query_text: The text to search for semantically.
            indexes: Optional mapping of field names to weights for
                composite scoring alongside similarity. If None, returns
                results ranked by similarity alone.
            limit: Maximum results to return. Default 10.
            aggregate: Aggregation mode for ZUNIONSTORE. Default "SUM".
            min_score: Optional minimum composite score threshold.
            post_filter: Optional callable (redis_key, score) -> bool.
            co_occurrence_boost: Optional {redis_key: weight} dict.
            temperature: Score scaling factor. Default 1.0.

        Returns:
            List of model instances ranked by combined score (descending).
            Returns empty list if query_text is empty, no provider is
            configured, or no embeddings exist.

        Example:
            results = Memory.query.semantic_search(
                "revenue trends",
                indexes={"relevance": 0.4, "confidence": 0.3},
                limit=10,
            )
        """
        if not query_text or not query_text.strip():
            return []

        model_class = self._query.model_class

        # Find the EmbeddingField on the model
        from ..fields.embedding_field import EmbeddingField

        embedding_field = None
        for fname, field in model_class._meta.fields.items():
            if isinstance(field, EmbeddingField):
                embedding_field = field
                break

        if embedding_field is None:
            raise QueryException(
                f"{model_class.__name__} has no EmbeddingField for semantic_search()"
            )

        provider = embedding_field.provider
        if provider is None:
            return []

        # Embed the query text
        try:
            query_vectors = provider.embed([query_text], input_type="query")
            if not query_vectors or not query_vectors[0]:
                return []
        except Exception as e:
            logger.error(f"semantic_search embedding failed: {e}")
            return []

        # Load cached embeddings for this model class
        try:
            import numpy as np
        except ImportError:
            raise QueryException(
                "numpy is required for semantic_search(). "
                "Install with: pip install popoto[embeddings]"
            )

        matrix, keys = EmbeddingField.load_embeddings(model_class)
        if matrix is None or len(keys) == 0:
            return []

        # Compute cosine similarity (matrix is pre-normalized)
        query_vec = np.array(query_vectors[0], dtype=np.float32)
        query_norm = np.linalg.norm(query_vec)
        if query_norm == 0:
            return []
        query_vec = query_vec / query_norm

        similarities = matrix @ query_vec  # dot product on unit vectors

        # Build similarity_boost dict: {redis_key: similarity_score}
        similarity_boost = {}
        for i, score in enumerate(similarities):
            if score > 0:  # Only include positive similarities
                similarity_boost[keys[i]] = float(score)

        if not similarity_boost:
            return []

        # If no additional indexes provided, use similarity-only mode
        if not indexes:
            # Direct similarity ranking without composite_score overhead
            return self._similarity_only_search(
                model_class, similarity_boost, limit, temperature, post_filter
            )

        # Delegate to composite_score with similarity_boost
        return self.composite_score(
            indexes=indexes,
            limit=limit,
            aggregate=aggregate,
            min_score=min_score,
            post_filter=post_filter,
            co_occurrence_boost=co_occurrence_boost,
            similarity_boost=similarity_boost,
            temperature=temperature,
        )

    def _similarity_only_search(
        self, model_class, similarity_boost, limit, temperature, post_filter
    ):
        """Rank and return instances using only similarity scores.

        Used when semantic_search() is called without additional indexes.
        Creates a temp ZSET from similarity scores and hydrates top-K.
        """
        import uuid
        from .encoding import decode_popoto_model_hashmap

        uid = uuid.uuid4().hex[:8]
        model_name = model_class.__name__
        sim_key = f"$CSQ:{model_name}:sim_only:{uid}"

        try:
            POPOTO_REDIS_DB.zadd(
                sim_key,
                {str(k): float(v) for k, v in similarity_boost.items()},
            )
            POPOTO_REDIS_DB.expire(sim_key, 5)

            raw_results = POPOTO_REDIS_DB.zrevrange(
                sim_key, 0, limit - 1, withscores=True
            )

            if not raw_results:
                return []

            # Temperature scaling
            if temperature != 1.0:
                raw_results = [
                    (member, score / temperature) for member, score in raw_results
                ]

            # Post-filter
            pks = []
            for member, score in raw_results:
                if isinstance(member, bytes):
                    member = member.decode()
                if post_filter is not None and not post_filter(member, score):
                    continue
                pks.append(member)

            if not pks:
                return []

            # Hydrate
            pipe = POPOTO_REDIS_DB.pipeline()
            for key in pks:
                pipe.hgetall(key)
            hashes = pipe.execute()

            instances = []
            for key, data in zip(pks, hashes):
                if data:
                    instance = decode_popoto_model_hashmap(model_class, data)
                    instances.append(instance)

            return instances
        finally:
            POPOTO_REDIS_DB.delete(sim_key)

    def keyword_search(
        self,
        query_text: str,
        field: str = None,
        limit: int = 10,
    ) -> list:
        """Return instances ranked by BM25 keyword relevance.

        Delegates to BM25Field.search() and hydrates model instances.
        The BM25 score is attached to each instance as ``_bm25_score``.

        Args:
            query_text: The search query string.
            field: Name of the BM25Field to search. Optional when the model
                has exactly one BM25Field.
            limit: Maximum results to return. Default 10.

        Returns:
            List of model instances ranked by BM25 score (descending).

        Raises:
            QueryException: If the model has no BM25Field or field name
                is invalid.
        """
        from .encoding import decode_popoto_model_hashmap
        from ..fields.bm25_field import BM25Field

        model_class = self._query.model_class

        # Auto-detect BM25Field if not specified
        if field is None:
            bm25_fields = [
                name
                for name, f in model_class._meta.fields.items()
                if isinstance(f, BM25Field)
            ]
            if len(bm25_fields) == 1:
                field = bm25_fields[0]
            elif len(bm25_fields) == 0:
                raise QueryException(
                    f"'{model_class.__name__}' has no BM25Field for keyword_search()"
                )
            else:
                raise QueryException(
                    f"Multiple BM25Fields on '{model_class.__name__}': "
                    f"{bm25_fields}. Specify field= explicitly."
                )

        # Get raw scored results
        scored = BM25Field.search(model_class, field, query_text, limit=limit)
        if not scored:
            return []

        # Hydrate model instances
        pipe = POPOTO_REDIS_DB.pipeline()
        for key, _score in scored:
            pipe.hgetall(key)
        hashes = pipe.execute()

        instances = []
        for (key, score), data in zip(scored, hashes):
            if data:
                instance = decode_popoto_model_hashmap(model_class, data)
                instance._bm25_score = score
                instances.append(instance)

        if not self._no_track:
            _fire_on_read(model_class, instances)

        return instances

    def fuse(
        self,
        k: int = 60,
        limit: int = 10,
        post_filter: Optional[Callable] = None,
        **ranked_lists,
    ) -> list:
        """Reciprocal Rank Fusion across heterogeneous ranked lists.

        Combines multiple ranked lists from different retrieval signals
        (keyword search, semantic search, graph propagation, etc.) using
        the RRF formula: ``score(d) = sum(1 / (k + rank_i(d)))``

        Each ranked list is a sequence of ``(redis_key, score)`` tuples.
        The scores are used only for ordering within each list -- RRF uses
        ranks, not raw scores.

        Args:
            k: RRF constant (default 60). Higher values reduce the influence
                of high-ranking items. Standard value from Cormack et al.
            limit: Maximum results to return. Default 10.
            post_filter: Optional ``(redis_key, rrf_score) -> bool`` callback.
                Return True to keep the result.
            **ranked_lists: Named ranked lists. Each value is a list of
                ``(redis_key, score)`` tuples sorted by score descending.

        Returns:
            List of model instances ranked by RRF score (descending).

        Raises:
            QueryException: If no ranked lists are provided.

        Example:
            results = Memory.query.fuse(
                keyword=BM25Field.search(Memory, "content", "redis", limit=50),
                semantic=[(key, sim) for key, sim in similarity_results],
                k=60,
                limit=10,
            )
        """
        from .encoding import decode_popoto_model_hashmap

        model_class = self._query.model_class

        if not ranked_lists:
            raise QueryException(
                "fuse() requires at least one ranked list as a keyword argument"
            )

        # Compute RRF scores
        rrf_scores = {}  # doc_key -> accumulated RRF score

        for list_name, ranked_list in ranked_lists.items():
            if not ranked_list:
                continue
            for rank_idx, (doc_key, _score) in enumerate(ranked_list):
                doc_key = str(doc_key)
                rrf_scores[doc_key] = rrf_scores.get(doc_key, 0.0) + (
                    1.0 / (k + rank_idx + 1)
                )

        if not rrf_scores:
            return []

        # Sort by RRF score descending
        sorted_results = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)

        # Apply post_filter
        if post_filter is not None:
            sorted_results = [
                (key, score) for key, score in sorted_results if post_filter(key, score)
            ]

        # Take top-K
        sorted_results = sorted_results[:limit]

        if not sorted_results:
            return []

        # Hydrate model instances
        pipe = POPOTO_REDIS_DB.pipeline()
        for key, _score in sorted_results:
            pipe.hgetall(key)
        hashes = pipe.execute()

        instances = []
        for (key, score), data in zip(sorted_results, hashes):
            if data:
                instance = decode_popoto_model_hashmap(model_class, data)
                instance._rrf_score = score
                instances.append(instance)

        if not self._no_track:
            _fire_on_read(model_class, instances)

        return instances

    def _resolve_index(self, model_class, field_name, weight, uid, temp_keys):
        """Resolve a field name to a Redis sorted set key for composite scoring.

        Handles different field types by either returning existing ZSET keys
        directly or materializing data into temporary ZSETs.

        Args:
            model_class: The Model class.
            field_name: Name of the field or special index.
            weight: The weight for this index (used for naming temp keys).
            uid: Unique identifier for temp key namespacing.
            temp_keys: List to append any created temp keys to.

        Returns:
            str: Redis key of the sorted set to use, or None if unresolvable.

        Raises:
            QueryException: If field_name is invalid or unsupported.
        """
        from ..fields.access_tracker import AccessTrackerMixin
        from ..fields.confidence_field import ConfidenceField
        from ..fields.decaying_sorted_field import DecayingSortedField
        from ..fields.sorted_field_mixin import SortedFieldMixin
        from ..fields.write_filter import WriteFilterMixin

        model_name = model_class.__name__

        # --- Special case: "priority" for WriteFilter ---
        if field_name == "priority":
            if not issubclass(model_class, WriteFilterMixin):
                raise QueryException(
                    f"'{model_name}' does not use WriteFilterMixin; "
                    f"cannot resolve 'priority' index"
                )
            return f"$WF:{model_name}:priority"

        # --- Special case: "access_count" / "access_score" for AccessTracker ---
        if field_name in ("access_count", "access_score"):
            if not issubclass(model_class, AccessTrackerMixin):
                raise QueryException(
                    f"'{model_name}' does not use AccessTrackerMixin; "
                    f"cannot resolve '{field_name}' index"
                )
            return self._materialize_access_tracker(model_class, uid, temp_keys)

        # --- Look up the field on the model ---
        if field_name not in model_class._meta.fields:
            raise QueryException(
                f"'{model_name}' has no field '{field_name}'. "
                f"Valid fields: {list(model_class._meta.fields.keys())}"
            )

        field = model_class._meta.fields[field_name]

        # --- DecayingSortedField: materialize decay scores ---
        if isinstance(field, DecayingSortedField):
            return self._materialize_decay_field(
                model_class, field, field_name, uid, temp_keys
            )

        # --- ConfidenceField: materialize from companion hash ---
        if isinstance(field, ConfidenceField):
            return self._materialize_confidence_field(
                model_class, field, field_name, uid, temp_keys
            )

        # --- SortedFieldMixin: use existing sorted set directly ---
        if isinstance(field, SortedFieldMixin):
            try:
                partition_values = [str(self._filters[pf]) for pf in field.partition_by]
            except KeyError:
                missing = [pf for pf in field.partition_by if pf not in self._filters]
                raise QueryException(
                    f"composite_score() on '{field_name}' requires "
                    f"partition filter(s): {', '.join(missing)}"
                )
            sortedset_db_key = field.__class__.get_sortedset_db_key(
                model_class, field_name, *partition_values
            )
            return sortedset_db_key.redis_key

        # --- Unsupported field type ---
        raise QueryException(
            f"Field '{field_name}' ({type(field).__name__}) does not have "
            f"a sorted set index and cannot be used in composite_score()"
        )

    def _materialize_decay_field(self, model_class, field, field_name, uid, temp_keys):
        """Materialize a DecayingSortedField's decay-computed scores into a temp ZSET.

        Uses the existing Lua decay script to compute scores, then writes
        them to a temporary sorted set.

        Args:
            model_class: The Model class.
            field: The DecayingSortedField instance.
            field_name: Name of the field.
            uid: Unique identifier for temp key.
            temp_keys: List to append temp key to.

        Returns:
            str: Redis key of the temp sorted set.
        """
        import time

        from ..fields.decaying_sorted_field import DECAY_SCORE_LUA
        from ..fields.cyclic_decay_field import CyclicDecayField, CYCLIC_DECAY_LUA

        model_name = model_class.__name__

        try:
            partition_values = [str(self._filters[pf]) for pf in field.partition_by]
        except KeyError:
            missing = [pf for pf in field.partition_by if pf not in self._filters]
            raise QueryException(
                f"composite_score() on '{field_name}' requires "
                f"partition filter(s): {', '.join(missing)}"
            )

        sortedset_db_key = field.__class__.get_sortedset_db_key(
            model_class, field_name, *partition_values
        )

        now = time.time()
        base_score_field = field.base_score_field or ""

        # Get all decay scores via Lua
        if isinstance(field, CyclicDecayField):
            cycles_hash_key = CyclicDecayField.get_cycles_hash_key_from_parts(
                model_class, field_name, *partition_values
            )
            pressure_hash_key = CyclicDecayField.get_pressure_hash_key_from_parts(
                model_class, field_name, *partition_values
            )
            result = POPOTO_REDIS_DB.eval(
                CYCLIC_DECAY_LUA,
                3,
                sortedset_db_key.redis_key,
                cycles_hash_key,
                pressure_hash_key,
                str(now),
                str(field.decay_rate),
                str(999999),  # get all members
                base_score_field,
            )
        else:
            result = POPOTO_REDIS_DB.eval(
                DECAY_SCORE_LUA,
                1,
                sortedset_db_key.redis_key,
                str(now),
                str(field.decay_rate),
                str(999999),  # get all members
                base_score_field,
            )

        temp_key = f"$CSQ:{model_name}:decay:{field_name}:{uid}"
        temp_keys.append(temp_key)

        if result:
            # Parse [key1, score1, key2, score2, ...] and write to temp ZSET
            zadd_mapping = {}
            for i in range(0, len(result), 2):
                member = result[i]
                if isinstance(member, bytes):
                    member = member.decode()
                score = float(result[i + 1])
                zadd_mapping[member] = score

            if zadd_mapping:
                POPOTO_REDIS_DB.zadd(temp_key, zadd_mapping)
                POPOTO_REDIS_DB.expire(temp_key, 5)

        return temp_key

    def _materialize_confidence_field(
        self, model_class, field, field_name, uid, temp_keys
    ):
        """Materialize a ConfidenceField's confidence values into a temp ZSET.

        Reads the companion hash and extracts confidence values for each member.
        When the field has partition_by and the query includes partition field
        values, reads only the partition-scoped hash. When partition fields are
        missing from a partitioned query, raises QueryException.

        Args:
            model_class: The Model class.
            field: The ConfidenceField instance.
            field_name: Name of the field.
            uid: Unique identifier for temp key.
            temp_keys: List to append temp key to.

        Returns:
            str: Redis key of the temp sorted set.

        Raises:
            QueryException: If a partitioned field is queried without providing
                values for all partition fields.
        """
        import msgpack

        model_name = model_class.__name__

        # Build companion hash key — partition-aware
        if field.partition_by:
            # Extract partition values from query filters
            partition_values = {}
            for pf in field.partition_by:
                if pf not in self._filters:
                    missing = [p for p in field.partition_by if p not in self._filters]
                    raise QueryException(
                        f"ConfidenceField '{field_name}' is partitioned by "
                        f"{', '.join(field.partition_by)}. "
                        f"Query must include filter(s) for: {', '.join(missing)}"
                    )
                partition_values[pf] = self._filters[pf]
            data_hash_key = field.get_data_hash_key_from_values(
                model_class, field_name, **partition_values
            )
        else:
            # Unpartitioned: single global hash
            data_hash_key = field.get_data_hash_key_from_values(
                model_class, field_name
            )

        # Read all entries from companion hash
        all_data = POPOTO_REDIS_DB.hgetall(data_hash_key)

        temp_key = f"$CSQ:{model_name}:confidence:{field_name}:{uid}"
        temp_keys.append(temp_key)

        if all_data:
            zadd_mapping = {}
            for member_key, raw_value in all_data.items():
                if isinstance(member_key, bytes):
                    member_key = member_key.decode()
                try:
                    data = msgpack.unpackb(raw_value, raw=False)
                    confidence = data.get("confidence", field.initial_confidence)
                except Exception:
                    logger.warning(
                        "Failed to unpack confidence data for %s", member_key
                    )
                    confidence = field.initial_confidence
                zadd_mapping[member_key] = float(confidence)

            if zadd_mapping:
                POPOTO_REDIS_DB.zadd(temp_key, zadd_mapping)
                POPOTO_REDIS_DB.expire(temp_key, 5)

        return temp_key

    def _materialize_access_tracker(self, model_class, uid, temp_keys):
        """Materialize AccessTracker access_count values into a temp ZSET.

        Iterates over all model instances and reads access_count from each
        instance's meta hash.

        .. note::
            This method uses ``SMEMBERS`` on the class set to discover all
            instances.  For models with very large instance counts (100K+),
            this scan can be expensive.  Consider using ``post_filter`` or
            partitioned queries to narrow the result set when working at
            that scale.

        Args:
            model_class: The Model class.
            uid: Unique identifier for temp key.
            temp_keys: List to append temp key to.

        Returns:
            str: Redis key of the temp sorted set.
        """
        model_name = model_class.__name__
        temp_key = f"$CSQ:{model_name}:access:{uid}"
        temp_keys.append(temp_key)

        # Get all instance keys
        all_keys = POPOTO_REDIS_DB.smembers(
            model_class._meta.db_class_set_key.redis_key
        )

        if all_keys:
            zadd_mapping = {}
            pipe = POPOTO_REDIS_DB.pipeline()
            decoded_keys = []
            for key in all_keys:
                if isinstance(key, bytes):
                    key = key.decode()
                decoded_keys.append(key)
                meta_key = f"$AT:{model_name}:meta:{key}"
                pipe.hget(meta_key, "access_count")

            results = pipe.execute()

            for key, count_raw in zip(decoded_keys, results):
                count = int(count_raw) if count_raw else 0
                if count > 0:
                    zadd_mapping[key] = float(count)

            if zadd_mapping:
                POPOTO_REDIS_DB.zadd(temp_key, zadd_mapping)
                POPOTO_REDIS_DB.expire(temp_key, 5)

        return temp_key

    @staticmethod
    def _cleanup_temp_keys(temp_keys):
        """Delete temporary Redis keys created during composite scoring.

        Args:
            temp_keys: List of Redis key strings to delete.
        """
        if temp_keys:
            POPOTO_REDIS_DB.delete(*temp_keys)

    def all(self) -> list:
        """Execute the query and return all matching results.

        Combines all accumulated filters, ordering, and limits into a single
        query execution. When computed_sort() is set, it takes precedence over
        order_by() and applies after fetch but before limit.

        Returns:
            List of Model instances, or list of dicts if values() was called.
        """
        kwargs = self._filters.copy()

        if self._computed_sort_fn is not None:
            # When computed_sort is active:
            # 1. Remove order_by (computed_sort takes precedence)
            # 2. Remove limit (we need all results to sort, then slice)
            if self._order_by_value is not None:
                logger.warning(
                    "Both computed_sort() and order_by() are set; "
                    "computed_sort() takes precedence, order_by() is ignored."
                )
            if self._values_tuple is not None:
                kwargs["values"] = self._values_tuple
            results = self._query._execute_filter(
                q_objects=self._q_objects, _no_track=self._no_track, **kwargs
            )
            # Apply computed sort (O(N log N) on full result set)
            results = sorted(
                results,
                key=self._computed_sort_fn,
                reverse=self._computed_sort_reverse,
            )
            # Apply limit after sorting
            if self._limit_value is not None:
                results = results[: self._limit_value]
            return results

        # Standard path: no computed_sort
        if self._limit_value is not None:
            kwargs["limit"] = self._limit_value
        if self._order_by_value is not None:
            kwargs["order_by"] = self._order_by_value
        if self._values_tuple is not None:
            kwargs["values"] = self._values_tuple
        return self._query._execute_filter(
            q_objects=self._q_objects, _no_track=self._no_track, **kwargs
        )

    def count(self) -> int:
        """Count matching results without loading objects.

        Returns:
            Integer count of matching instances
        """
        # For Q objects, we need to execute the full query and count
        if self._q_objects:
            return len(self.all())
        return self._query.count(**self._filters)

    def first(self) -> "Model":
        """Return the first matching result, or None if no matches.

        Returns:
            First Model instance or None
        """
        results = self.limit(1).all()
        return results[0] if results else None

    def last(self) -> "Model":
        """Return the last matching result, or None if no matches.

        Note: For efficient access to the last item in sorted order, use
        order_by("-field").first() instead. This method fetches all results.

        Returns:
            Last Model instance or None
        """
        results = self.all()
        return results[-1] if results else None

    # List-like behavior for backward compatibility
    def __iter__(self):
        """Iterate over query results (executes query)."""
        return iter(self.all())

    def __len__(self):
        """Return the number of results (executes query)."""
        return len(self.all())

    def __getitem__(self, index):
        """Access results by index (executes query)."""
        return self.all()[index]

    def __bool__(self):
        """Check if any results exist (executes query)."""
        return len(self.all()) > 0

    def __eq__(self, other):
        """Compare with another object (executes query for comparison).

        Supports comparison with lists and other QueryBuilders for backward
        compatibility with code like: `assert Model.query.filter(...) == []`
        """
        if isinstance(other, list):
            return self.all() == other
        if isinstance(other, QueryBuilder):
            return self.all() == other.all()
        return NotImplemented

    def __contains__(self, item):
        """Check if item is in query results (executes query)."""
        return item in self.all()

    def __repr__(self):
        return f"<QueryBuilder for {self._query.model_class.__name__} filters={self._filters}>"

filter(*args, **kwargs)

Add filter criteria and return a new QueryBuilder.

Creates a new QueryBuilder with merged filter parameters, allowing multiple filter() calls to be chained. Supports Q objects and Expression objects for complex query logic with OR/AND/NOT operators.

Parameters:

Name Type Description Default
*args

Q objects or Expression objects for complex query expressions

()
**kwargs

Filter parameters to add to the query

{}

Returns:

Type Description
QueryBuilder

A new QueryBuilder with the combined filters

Example

query = Model.query.filter(status="active").filter(type="premium") query = Model.query.filter(Q(status="active") | Q(type="premium")) query = Model.query.filter(Model.rating > 4.0)

Source code in src/popoto/models/query.py
def filter(self, *args, **kwargs) -> "QueryBuilder":
    """Add filter criteria and return a new QueryBuilder.

    Creates a new QueryBuilder with merged filter parameters, allowing
    multiple filter() calls to be chained. Supports Q objects and Expression
    objects for complex query logic with OR/AND/NOT operators.

    Args:
        *args: Q objects or Expression objects for complex query expressions
        **kwargs: Filter parameters to add to the query

    Returns:
        A new QueryBuilder with the combined filters

    Example:
        query = Model.query.filter(status="active").filter(type="premium")
        query = Model.query.filter(Q(status="active") | Q(type="premium"))
        query = Model.query.filter(Model.rating > 4.0)
    """
    from .q import Q
    from .expressions import Expression, CombinedExpression

    # Create a new QueryBuilder with merged filters and Q objects
    new_builder = QueryBuilder(self._query, self._filters, self._q_objects)
    new_builder._filters.update(kwargs)

    # Process args - can be Q objects or Expression objects
    for arg in args:
        if isinstance(arg, Q):
            new_builder._q_objects.append(arg)
        elif isinstance(arg, (Expression, CombinedExpression)):
            # Convert Expression to Q object
            new_builder._q_objects.append(arg.to_q())

    new_builder._limit_value = self._limit_value
    new_builder._order_by_value = self._order_by_value
    new_builder._values_tuple = self._values_tuple
    new_builder._computed_sort_fn = self._computed_sort_fn
    new_builder._computed_sort_reverse = self._computed_sort_reverse
    new_builder._no_track = self._no_track
    return new_builder

limit(n)

Set the maximum number of results to return.

Parameters:

Name Type Description Default
n int

Maximum number of results

required

Returns:

Type Description
QueryBuilder

Self for method chaining

Example

results = Model.query.filter(status="active").limit(10).all()

Source code in src/popoto/models/query.py
def limit(self, n: int) -> "QueryBuilder":
    """Set the maximum number of results to return.

    Args:
        n: Maximum number of results

    Returns:
        Self for method chaining

    Example:
        results = Model.query.filter(status="active").limit(10).all()
    """
    self._limit_value = n
    return self

order_by(field)

Set the field to order results by.

Parameters:

Name Type Description Default
field str

Field name to sort by. Prefix with "-" for descending order.

required

Returns:

Type Description
QueryBuilder

Self for method chaining

Example

results = Model.query.filter(status="active").order_by("-created_at").all()

Source code in src/popoto/models/query.py
def order_by(self, field: str) -> "QueryBuilder":
    """Set the field to order results by.

    Args:
        field: Field name to sort by. Prefix with "-" for descending order.

    Returns:
        Self for method chaining

    Example:
        results = Model.query.filter(status="active").order_by("-created_at").all()
    """
    self._order_by_value = field
    return self

values(*fields)

Specify fields to return as dicts instead of model instances.

Parameters:

Name Type Description Default
*fields

Field names to include in the result dicts

()

Returns:

Type Description
QueryBuilder

Self for method chaining

Example

results = Model.query.filter(status="active").values("name", "email").all()

Source code in src/popoto/models/query.py
def values(self, *fields) -> "QueryBuilder":
    """Specify fields to return as dicts instead of model instances.

    Args:
        *fields: Field names to include in the result dicts

    Returns:
        Self for method chaining

    Example:
        results = Model.query.filter(status="active").values("name", "email").all()
    """
    self._values_tuple = fields
    return self

computed_sort(fn, reverse=False)

Sort results using a caller-provided key function.

Applies a Python-side sort after fetching results from Redis, before applying limit(). This enables sorting by computed/derived values that are not stored as indexed fields.

When both computed_sort() and order_by() are set, computed_sort() takes precedence and order_by() is ignored.

Performance note: This is O(N log N) on the full result set before limiting. For large result sets (>10K records), consider using SortedField indexes instead.

Parameters:

Name Type Description Default
fn

A callable that takes a model instance (or dict if values() is used) and returns a sort key. Must not be None.

required
reverse bool

If True, sort in descending order. Default is False.

False

Returns:

Type Description
QueryBuilder

Self for method chaining.

Raises:

Type Description
TypeError

If fn is None.

Example
Sort by a computed activation score

results = ( Model.query.filter(status="active") .computed_sort(lambda x: x.priority * 0.5 + x.score * 0.5, reverse=True) .limit(10) .all() )

Source code in src/popoto/models/query.py
def computed_sort(self, fn, reverse: bool = False) -> "QueryBuilder":
    """Sort results using a caller-provided key function.

    Applies a Python-side sort after fetching results from Redis, before
    applying limit(). This enables sorting by computed/derived values that
    are not stored as indexed fields.

    When both computed_sort() and order_by() are set, computed_sort() takes
    precedence and order_by() is ignored.

    Performance note: This is O(N log N) on the full result set before
    limiting. For large result sets (>10K records), consider using
    SortedField indexes instead.

    Args:
        fn: A callable that takes a model instance (or dict if values()
            is used) and returns a sort key. Must not be None.
        reverse: If True, sort in descending order. Default is False.

    Returns:
        Self for method chaining.

    Raises:
        TypeError: If fn is None.

    Example:
        # Sort by a computed activation score
        results = (
            Model.query.filter(status="active")
            .computed_sort(lambda x: x.priority * 0.5 + x.score * 0.5,
                           reverse=True)
            .limit(10)
            .all()
        )
    """
    if fn is None:
        raise TypeError("computed_sort() requires a callable, got None")
    self._computed_sort_fn = fn
    self._computed_sort_reverse = reverse
    return self

no_track()

Suppress on_read() tracking for this query.

Use for internal operations (reindex, migration) that shouldn't count as reads for AccessTrackerMixin models.

Returns:

Type Description
QueryBuilder

Self for method chaining

Example

results = Model.query.filter(status="active").no_track().all()

Source code in src/popoto/models/query.py
def no_track(self) -> "QueryBuilder":
    """Suppress on_read() tracking for this query.

    Use for internal operations (reindex, migration) that shouldn't
    count as reads for AccessTrackerMixin models.

    Returns:
        Self for method chaining

    Example:
        results = Model.query.filter(status="active").no_track().all()
    """
    self._no_track = True
    return self

top_by_decay(field_name=None, n=10, decay_rate=None, base_score_field=None)

Return top-N instances ranked by time-decayed score.

Executes a Lua script server-side that computes

decayed_score = base_score * elapsed_days ^ (-decay_rate)

Parameters:

Name Type Description Default
field_name

Name of a DecayingSortedField on the model. Optional when the model has exactly one DecayingSortedField (or subclass).

None
n

Maximum number of results to return. Default 10.

10
decay_rate

Override the field's decay_rate for this query.

None
base_score_field

Override the field's base_score_field for this query.

None

Returns:

Type Description

List of model instances in decayed-score order.

Raises:

Type Description
QueryException

If field is not a DecayingSortedField or required partition_by filter is missing.

Source code in src/popoto/models/query.py
def top_by_decay(
    self, field_name=None, n=10, decay_rate=None, base_score_field=None
):
    """Return top-N instances ranked by time-decayed score.

    Executes a Lua script server-side that computes:
        decayed_score = base_score * elapsed_days ^ (-decay_rate)

    Args:
        field_name: Name of a DecayingSortedField on the model. Optional
            when the model has exactly one DecayingSortedField (or subclass).
        n: Maximum number of results to return. Default 10.
        decay_rate: Override the field's decay_rate for this query.
        base_score_field: Override the field's base_score_field for this query.

    Returns:
        List of model instances in decayed-score order.

    Raises:
        QueryException: If field is not a DecayingSortedField or
            required partition_by filter is missing.
    """
    from ..fields.decaying_sorted_field import DecayingSortedField, DECAY_SCORE_LUA
    from .encoding import decode_popoto_model_hashmap

    model_class = self._query.model_class

    if field_name is None:
        dsf_names = [
            name
            for name, f in model_class._meta.fields.items()
            if isinstance(f, DecayingSortedField)
        ]
        if len(dsf_names) == 1:
            field_name = dsf_names[0]
        elif len(dsf_names) == 0:
            raise QueryException(
                f"'{model_class.__name__}' has no DecayingSortedField"
            )
        else:
            raise QueryException(
                f"Multiple DecayingSortedFields on '{model_class.__name__}': "
                f"{dsf_names}. Specify field_name explicitly."
            )
    elif field_name not in model_class._meta.fields:
        raise QueryException(
            f"'{model_class.__name__}' has no field '{field_name}'"
        )

    field = model_class._meta.fields[field_name]
    if not isinstance(field, DecayingSortedField):
        raise QueryException(
            f"top_by_decay() requires a DecayingSortedField. "
            f"'{field_name}' is {type(field).__name__}"
        )

    # Use field defaults unless overridden
    effective_decay_rate = (
        decay_rate if decay_rate is not None else field.decay_rate
    )
    effective_base_score_field = (
        base_score_field
        if base_score_field is not None
        else (field.base_score_field or "")
    )

    if n <= 0:
        return []

    # Build the sorted set key respecting partition_by
    try:
        partition_values = [str(self._filters[pf]) for pf in field.partition_by]
    except KeyError:
        missing = [pf for pf in field.partition_by if pf not in self._filters]
        raise QueryException(
            f"top_by_decay() on '{field_name}' requires partition filter(s): "
            f"{', '.join(missing)}"
        )

    # Use actual field class for key generation (CyclicDecayField has
    # its own field_class_key prefix, distinct from DecayingSortedField)
    sortedset_db_key = field.__class__.get_sortedset_db_key(
        model_class, field_name, *partition_values
    )

    import time

    now = time.time()

    # Use extended Lua script for CyclicDecayField, plain script otherwise
    from ..fields.cyclic_decay_field import CyclicDecayField, CYCLIC_DECAY_LUA

    if isinstance(field, CyclicDecayField):
        # Build companion hash keys from partition values
        cycles_hash_key = CyclicDecayField.get_cycles_hash_key_from_parts(
            model_class, field_name, *partition_values
        )
        pressure_hash_key = CyclicDecayField.get_pressure_hash_key_from_parts(
            model_class, field_name, *partition_values
        )

        result = POPOTO_REDIS_DB.eval(
            CYCLIC_DECAY_LUA,
            3,  # number of KEYS
            sortedset_db_key.redis_key,
            cycles_hash_key,
            pressure_hash_key,
            str(now),
            str(effective_decay_rate),
            str(n),
            effective_base_score_field,
        )
    else:
        result = POPOTO_REDIS_DB.eval(
            DECAY_SCORE_LUA,
            1,  # number of KEYS
            sortedset_db_key.redis_key,
            str(now),
            str(effective_decay_rate),
            str(n),
            effective_base_score_field,
        )

    if not result:
        return []

    # Parse result: [key1, score1, key2, score2, ...]
    redis_keys = []
    for i in range(0, len(result), 2):
        key = result[i]
        if isinstance(key, bytes):
            key = key.decode()
        redis_keys.append(key)

    if not redis_keys:
        return []

    # Fetch model instances via pipeline
    pipe = POPOTO_REDIS_DB.pipeline()
    for key in redis_keys:
        pipe.hgetall(key)
    raw_results = pipe.execute()

    instances = []
    for key, data in zip(redis_keys, raw_results):
        if data:
            instance = decode_popoto_model_hashmap(model_class, data)
            instances.append(instance)

    if not self._no_track:
        _fire_on_read(model_class, instances)

    return instances

composite_score(indexes, limit=10, aggregate='SUM', min_score=None, post_filter=None, co_occurrence_boost=None, similarity_boost=None, temperature=1.0)

Return top-K instances ranked by a weighted composite of multiple indexes.

Combines N sorted set indexes with configurable weights via Redis ZUNIONSTORE and returns model instances ranked by composite score.

Each index name maps to a field on the model. Supported field types: - DecayingSortedField / CyclicDecayField: Materializes decay-computed scores into a temp ZSET via the existing Lua decay script. - SortedFieldMixin fields: Uses the sorted set directly. - WriteFilter priority: Resolves $WF:{Class}:priority directly. - ConfidenceField: Materializes confidence values from companion hash. - AccessTracker: Materializes access_count from meta hashes.

Parameters:

Name Type Description Default
indexes dict

Mapping of field names to weights, e.g. {"relevance": 0.4, "confidence": 0.3}. Weights are arbitrary positive floats; relative ratios matter, not absolute values.

required
limit int

Maximum results to return. Default 10.

10
aggregate str

Aggregation mode for ZUNIONSTORE: "SUM", "MIN", or "MAX". Default "SUM".

'SUM'
min_score float

Optional minimum composite score threshold. Results below this score are excluded. Note: min_score is applied to raw composite scores before temperature scaling, while post_filter receives temperature-scaled scores.

None
post_filter Optional[Callable[[str, float], bool]]

Optional callable (redis_key, score) -> bool. Applied after ZREVRANGE but before hydration. Return True to keep.

None
co_occurrence_boost dict

Optional dict {redis_key: weight} from CoOccurrenceField.propagate(). Injected as an additional index in the composite.

None
similarity_boost dict

Optional dict {redis_key: score} from semantic_search(). Injected as an additional index in the composite, identical mechanism to co_occurrence_boost.

None
temperature float

Scales composite scores by dividing each score by this value. Low temperature (0.02-0.1) sharpens discrimination so top scores dominate. Default 1.0 preserves current behavior. High temperature (2.0+) flattens scores toward uniform. Must be > 0.

1.0

Returns:

Type Description
list

List of model instances ranked by composite score (descending).

Raises:

Type Description
QueryException

If indexes is empty, contains invalid field names, references fields without sorted set indexes, or temperature <= 0.

Example

results = Memory.query.filter(agent_id="agent-1").composite_score( indexes={"relevance": 0.4, "confidence": 0.3, "access_score": 0.2}, limit=10, temperature=0.1, # sharp retrieval -- top result dominates )

Source code in src/popoto/models/query.py
def composite_score(
    self,
    indexes: dict,
    limit: int = 10,
    aggregate: str = "SUM",
    min_score: float = None,
    post_filter: Optional[Callable[[str, float], bool]] = None,
    co_occurrence_boost: dict = None,
    similarity_boost: dict = None,
    temperature: float = 1.0,
) -> list:
    """Return top-K instances ranked by a weighted composite of multiple indexes.

    Combines N sorted set indexes with configurable weights via Redis
    ZUNIONSTORE and returns model instances ranked by composite score.

    Each index name maps to a field on the model. Supported field types:
        - DecayingSortedField / CyclicDecayField: Materializes decay-computed
          scores into a temp ZSET via the existing Lua decay script.
        - SortedFieldMixin fields: Uses the sorted set directly.
        - WriteFilter priority: Resolves ``$WF:{Class}:priority`` directly.
        - ConfidenceField: Materializes confidence values from companion hash.
        - AccessTracker: Materializes access_count from meta hashes.

    Args:
        indexes: Mapping of field names to weights, e.g.
            ``{"relevance": 0.4, "confidence": 0.3}``. Weights are arbitrary
            positive floats; relative ratios matter, not absolute values.
        limit: Maximum results to return. Default 10.
        aggregate: Aggregation mode for ZUNIONSTORE: "SUM", "MIN", or "MAX".
            Default "SUM".
        min_score: Optional minimum composite score threshold. Results below
            this score are excluded. Note: ``min_score`` is applied to raw
            composite scores **before** temperature scaling, while
            ``post_filter`` receives temperature-scaled scores.
        post_filter: Optional callable ``(redis_key, score) -> bool``. Applied
            after ZREVRANGE but before hydration. Return True to keep.
        co_occurrence_boost: Optional dict ``{redis_key: weight}`` from
            ``CoOccurrenceField.propagate()``. Injected as an additional
            index in the composite.
        similarity_boost: Optional dict ``{redis_key: score}`` from
            ``semantic_search()``. Injected as an additional index
            in the composite, identical mechanism to co_occurrence_boost.
        temperature: Scales composite scores by dividing each score by this
            value. Low temperature (0.02-0.1) sharpens discrimination so top
            scores dominate. Default 1.0 preserves current behavior. High
            temperature (2.0+) flattens scores toward uniform. Must be > 0.

    Returns:
        List of model instances ranked by composite score (descending).

    Raises:
        QueryException: If indexes is empty, contains invalid field names,
            references fields without sorted set indexes, or temperature <= 0.

    Example:
        results = Memory.query.filter(agent_id="agent-1").composite_score(
            indexes={"relevance": 0.4, "confidence": 0.3, "access_score": 0.2},
            limit=10,
            temperature=0.1,  # sharp retrieval -- top result dominates
        )
    """
    import uuid

    from .encoding import decode_popoto_model_hashmap

    model_class = self._query.model_class

    # --- Validate inputs ---
    if not indexes:
        raise QueryException("composite_score() requires a non-empty indexes dict")

    if limit <= 0:
        return []

    aggregate = aggregate.upper()
    if aggregate not in ("SUM", "MIN", "MAX"):
        raise QueryException(
            f"aggregate must be 'SUM', 'MIN', or 'MAX' (got '{aggregate}')"
        )

    if temperature <= 0:
        raise QueryException(f"temperature must be > 0 (got {temperature})")

    # --- Resolve each index to a Redis sorted set key ---
    resolved_keys = {}  # {redis_zset_key: weight}
    temp_keys = []  # keys to clean up after
    uid = uuid.uuid4().hex[:8]
    model_name = model_class.__name__

    for field_name, weight in indexes.items():
        resolved_key = self._resolve_index(
            model_class, field_name, weight, uid, temp_keys
        )
        if resolved_key:
            resolved_keys[resolved_key] = weight

    # --- Handle co_occurrence_boost ---
    if co_occurrence_boost:
        co_key = f"$CSQ:{model_name}:co_occurrence:{uid}"
        POPOTO_REDIS_DB.zadd(
            co_key,
            {str(k): float(v) for k, v in co_occurrence_boost.items()},
        )
        POPOTO_REDIS_DB.expire(co_key, 5)
        temp_keys.append(co_key)
        resolved_keys[co_key] = 1.0  # weight already in the scores

    # --- Handle similarity_boost ---
    if similarity_boost:
        sim_key = f"$CSQ:{model_name}:similarity:{uid}"
        POPOTO_REDIS_DB.zadd(
            sim_key,
            {str(k): float(v) for k, v in similarity_boost.items()},
        )
        POPOTO_REDIS_DB.expire(sim_key, 5)
        temp_keys.append(sim_key)
        resolved_keys[sim_key] = 1.0  # weight already in the scores

    if not resolved_keys:
        self._cleanup_temp_keys(temp_keys)
        return []

    # --- ZUNIONSTORE ---
    composite_key = f"$CSQ:{model_name}:{uid}"
    temp_keys.append(composite_key)

    try:
        POPOTO_REDIS_DB.zunionstore(
            composite_key,
            resolved_keys,
            aggregate=aggregate,
        )
        POPOTO_REDIS_DB.expire(composite_key, 5)

        # --- ZREVRANGE top-K ---
        if min_score is not None:
            raw_results = POPOTO_REDIS_DB.zrevrangebyscore(
                composite_key,
                "+inf",
                str(min_score),
                start=0,
                num=limit,
                withscores=True,
            )
        else:
            raw_results = POPOTO_REDIS_DB.zrevrange(
                composite_key, 0, limit - 1, withscores=True
            )

        if not raw_results:
            return []

        # --- Temperature scaling ---
        if temperature != 1.0:
            raw_results = [
                (member, score / temperature) for member, score in raw_results
            ]

        # --- Post-filter ---
        pks = []
        for member, score in raw_results:
            if isinstance(member, bytes):
                member = member.decode()
            if post_filter is not None and not post_filter(member, score):
                continue
            pks.append(member)

        if not pks:
            return []

        # --- Hydrate models ---
        pipe = POPOTO_REDIS_DB.pipeline()
        for key in pks:
            pipe.hgetall(key)
        hashes = pipe.execute()

        instances = []
        for key, data in zip(pks, hashes):
            if data:
                instance = decode_popoto_model_hashmap(model_class, data)
                instances.append(instance)

        if not self._no_track:
            _fire_on_read(model_class, instances)

        return instances

    finally:
        self._cleanup_temp_keys(temp_keys)

Return top-K instances ranked by semantic similarity combined with memory signals.

Embeds the query text via the configured provider, computes cosine similarity against all stored embeddings, and injects similarity scores into composite_score() via the similarity_boost parameter.

Parameters:

Name Type Description Default
query_text str

The text to search for semantically.

required
indexes dict

Optional mapping of field names to weights for composite scoring alongside similarity. If None, returns results ranked by similarity alone.

None
limit int

Maximum results to return. Default 10.

10
aggregate str

Aggregation mode for ZUNIONSTORE. Default "SUM".

'SUM'
min_score float

Optional minimum composite score threshold.

None
post_filter Optional[Callable[[str, float], bool]]

Optional callable (redis_key, score) -> bool.

None
co_occurrence_boost dict

Optional {redis_key: weight} dict.

None
temperature float

Score scaling factor. Default 1.0.

1.0

Returns:

Type Description
list

List of model instances ranked by combined score (descending).

list

Returns empty list if query_text is empty, no provider is

list

configured, or no embeddings exist.

Example

results = Memory.query.semantic_search( "revenue trends", indexes={"relevance": 0.4, "confidence": 0.3}, limit=10, )

Source code in src/popoto/models/query.py
def semantic_search(
    self,
    query_text: str,
    indexes: dict = None,
    limit: int = 10,
    aggregate: str = "SUM",
    min_score: float = None,
    post_filter: Optional[Callable[[str, float], bool]] = None,
    co_occurrence_boost: dict = None,
    temperature: float = 1.0,
) -> list:
    """Return top-K instances ranked by semantic similarity combined with memory signals.

    Embeds the query text via the configured provider, computes cosine
    similarity against all stored embeddings, and injects similarity
    scores into composite_score() via the similarity_boost parameter.

    Args:
        query_text: The text to search for semantically.
        indexes: Optional mapping of field names to weights for
            composite scoring alongside similarity. If None, returns
            results ranked by similarity alone.
        limit: Maximum results to return. Default 10.
        aggregate: Aggregation mode for ZUNIONSTORE. Default "SUM".
        min_score: Optional minimum composite score threshold.
        post_filter: Optional callable (redis_key, score) -> bool.
        co_occurrence_boost: Optional {redis_key: weight} dict.
        temperature: Score scaling factor. Default 1.0.

    Returns:
        List of model instances ranked by combined score (descending).
        Returns empty list if query_text is empty, no provider is
        configured, or no embeddings exist.

    Example:
        results = Memory.query.semantic_search(
            "revenue trends",
            indexes={"relevance": 0.4, "confidence": 0.3},
            limit=10,
        )
    """
    if not query_text or not query_text.strip():
        return []

    model_class = self._query.model_class

    # Find the EmbeddingField on the model
    from ..fields.embedding_field import EmbeddingField

    embedding_field = None
    for fname, field in model_class._meta.fields.items():
        if isinstance(field, EmbeddingField):
            embedding_field = field
            break

    if embedding_field is None:
        raise QueryException(
            f"{model_class.__name__} has no EmbeddingField for semantic_search()"
        )

    provider = embedding_field.provider
    if provider is None:
        return []

    # Embed the query text
    try:
        query_vectors = provider.embed([query_text], input_type="query")
        if not query_vectors or not query_vectors[0]:
            return []
    except Exception as e:
        logger.error(f"semantic_search embedding failed: {e}")
        return []

    # Load cached embeddings for this model class
    try:
        import numpy as np
    except ImportError:
        raise QueryException(
            "numpy is required for semantic_search(). "
            "Install with: pip install popoto[embeddings]"
        )

    matrix, keys = EmbeddingField.load_embeddings(model_class)
    if matrix is None or len(keys) == 0:
        return []

    # Compute cosine similarity (matrix is pre-normalized)
    query_vec = np.array(query_vectors[0], dtype=np.float32)
    query_norm = np.linalg.norm(query_vec)
    if query_norm == 0:
        return []
    query_vec = query_vec / query_norm

    similarities = matrix @ query_vec  # dot product on unit vectors

    # Build similarity_boost dict: {redis_key: similarity_score}
    similarity_boost = {}
    for i, score in enumerate(similarities):
        if score > 0:  # Only include positive similarities
            similarity_boost[keys[i]] = float(score)

    if not similarity_boost:
        return []

    # If no additional indexes provided, use similarity-only mode
    if not indexes:
        # Direct similarity ranking without composite_score overhead
        return self._similarity_only_search(
            model_class, similarity_boost, limit, temperature, post_filter
        )

    # Delegate to composite_score with similarity_boost
    return self.composite_score(
        indexes=indexes,
        limit=limit,
        aggregate=aggregate,
        min_score=min_score,
        post_filter=post_filter,
        co_occurrence_boost=co_occurrence_boost,
        similarity_boost=similarity_boost,
        temperature=temperature,
    )

Return instances ranked by BM25 keyword relevance.

Delegates to BM25Field.search() and hydrates model instances. The BM25 score is attached to each instance as _bm25_score.

Parameters:

Name Type Description Default
query_text str

The search query string.

required
field str

Name of the BM25Field to search. Optional when the model has exactly one BM25Field.

None
limit int

Maximum results to return. Default 10.

10

Returns:

Type Description
list

List of model instances ranked by BM25 score (descending).

Raises:

Type Description
QueryException

If the model has no BM25Field or field name is invalid.

Source code in src/popoto/models/query.py
def keyword_search(
    self,
    query_text: str,
    field: str = None,
    limit: int = 10,
) -> list:
    """Return instances ranked by BM25 keyword relevance.

    Delegates to BM25Field.search() and hydrates model instances.
    The BM25 score is attached to each instance as ``_bm25_score``.

    Args:
        query_text: The search query string.
        field: Name of the BM25Field to search. Optional when the model
            has exactly one BM25Field.
        limit: Maximum results to return. Default 10.

    Returns:
        List of model instances ranked by BM25 score (descending).

    Raises:
        QueryException: If the model has no BM25Field or field name
            is invalid.
    """
    from .encoding import decode_popoto_model_hashmap
    from ..fields.bm25_field import BM25Field

    model_class = self._query.model_class

    # Auto-detect BM25Field if not specified
    if field is None:
        bm25_fields = [
            name
            for name, f in model_class._meta.fields.items()
            if isinstance(f, BM25Field)
        ]
        if len(bm25_fields) == 1:
            field = bm25_fields[0]
        elif len(bm25_fields) == 0:
            raise QueryException(
                f"'{model_class.__name__}' has no BM25Field for keyword_search()"
            )
        else:
            raise QueryException(
                f"Multiple BM25Fields on '{model_class.__name__}': "
                f"{bm25_fields}. Specify field= explicitly."
            )

    # Get raw scored results
    scored = BM25Field.search(model_class, field, query_text, limit=limit)
    if not scored:
        return []

    # Hydrate model instances
    pipe = POPOTO_REDIS_DB.pipeline()
    for key, _score in scored:
        pipe.hgetall(key)
    hashes = pipe.execute()

    instances = []
    for (key, score), data in zip(scored, hashes):
        if data:
            instance = decode_popoto_model_hashmap(model_class, data)
            instance._bm25_score = score
            instances.append(instance)

    if not self._no_track:
        _fire_on_read(model_class, instances)

    return instances

fuse(k=60, limit=10, post_filter=None, **ranked_lists)

Reciprocal Rank Fusion across heterogeneous ranked lists.

Combines multiple ranked lists from different retrieval signals (keyword search, semantic search, graph propagation, etc.) using the RRF formula: score(d) = sum(1 / (k + rank_i(d)))

Each ranked list is a sequence of (redis_key, score) tuples. The scores are used only for ordering within each list -- RRF uses ranks, not raw scores.

Parameters:

Name Type Description Default
k int

RRF constant (default 60). Higher values reduce the influence of high-ranking items. Standard value from Cormack et al.

60
limit int

Maximum results to return. Default 10.

10
post_filter Optional[Callable]

Optional (redis_key, rrf_score) -> bool callback. Return True to keep the result.

None
**ranked_lists

Named ranked lists. Each value is a list of (redis_key, score) tuples sorted by score descending.

{}

Returns:

Type Description
list

List of model instances ranked by RRF score (descending).

Raises:

Type Description
QueryException

If no ranked lists are provided.

Example

results = Memory.query.fuse( keyword=BM25Field.search(Memory, "content", "redis", limit=50), semantic=[(key, sim) for key, sim in similarity_results], k=60, limit=10, )

Source code in src/popoto/models/query.py
def fuse(
    self,
    k: int = 60,
    limit: int = 10,
    post_filter: Optional[Callable] = None,
    **ranked_lists,
) -> list:
    """Reciprocal Rank Fusion across heterogeneous ranked lists.

    Combines multiple ranked lists from different retrieval signals
    (keyword search, semantic search, graph propagation, etc.) using
    the RRF formula: ``score(d) = sum(1 / (k + rank_i(d)))``

    Each ranked list is a sequence of ``(redis_key, score)`` tuples.
    The scores are used only for ordering within each list -- RRF uses
    ranks, not raw scores.

    Args:
        k: RRF constant (default 60). Higher values reduce the influence
            of high-ranking items. Standard value from Cormack et al.
        limit: Maximum results to return. Default 10.
        post_filter: Optional ``(redis_key, rrf_score) -> bool`` callback.
            Return True to keep the result.
        **ranked_lists: Named ranked lists. Each value is a list of
            ``(redis_key, score)`` tuples sorted by score descending.

    Returns:
        List of model instances ranked by RRF score (descending).

    Raises:
        QueryException: If no ranked lists are provided.

    Example:
        results = Memory.query.fuse(
            keyword=BM25Field.search(Memory, "content", "redis", limit=50),
            semantic=[(key, sim) for key, sim in similarity_results],
            k=60,
            limit=10,
        )
    """
    from .encoding import decode_popoto_model_hashmap

    model_class = self._query.model_class

    if not ranked_lists:
        raise QueryException(
            "fuse() requires at least one ranked list as a keyword argument"
        )

    # Compute RRF scores
    rrf_scores = {}  # doc_key -> accumulated RRF score

    for list_name, ranked_list in ranked_lists.items():
        if not ranked_list:
            continue
        for rank_idx, (doc_key, _score) in enumerate(ranked_list):
            doc_key = str(doc_key)
            rrf_scores[doc_key] = rrf_scores.get(doc_key, 0.0) + (
                1.0 / (k + rank_idx + 1)
            )

    if not rrf_scores:
        return []

    # Sort by RRF score descending
    sorted_results = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)

    # Apply post_filter
    if post_filter is not None:
        sorted_results = [
            (key, score) for key, score in sorted_results if post_filter(key, score)
        ]

    # Take top-K
    sorted_results = sorted_results[:limit]

    if not sorted_results:
        return []

    # Hydrate model instances
    pipe = POPOTO_REDIS_DB.pipeline()
    for key, _score in sorted_results:
        pipe.hgetall(key)
    hashes = pipe.execute()

    instances = []
    for (key, score), data in zip(sorted_results, hashes):
        if data:
            instance = decode_popoto_model_hashmap(model_class, data)
            instance._rrf_score = score
            instances.append(instance)

    if not self._no_track:
        _fire_on_read(model_class, instances)

    return instances

all()

Execute the query and return all matching results.

Combines all accumulated filters, ordering, and limits into a single query execution. When computed_sort() is set, it takes precedence over order_by() and applies after fetch but before limit.

Returns:

Type Description
list

List of Model instances, or list of dicts if values() was called.

Source code in src/popoto/models/query.py
def all(self) -> list:
    """Execute the query and return all matching results.

    Combines all accumulated filters, ordering, and limits into a single
    query execution. When computed_sort() is set, it takes precedence over
    order_by() and applies after fetch but before limit.

    Returns:
        List of Model instances, or list of dicts if values() was called.
    """
    kwargs = self._filters.copy()

    if self._computed_sort_fn is not None:
        # When computed_sort is active:
        # 1. Remove order_by (computed_sort takes precedence)
        # 2. Remove limit (we need all results to sort, then slice)
        if self._order_by_value is not None:
            logger.warning(
                "Both computed_sort() and order_by() are set; "
                "computed_sort() takes precedence, order_by() is ignored."
            )
        if self._values_tuple is not None:
            kwargs["values"] = self._values_tuple
        results = self._query._execute_filter(
            q_objects=self._q_objects, _no_track=self._no_track, **kwargs
        )
        # Apply computed sort (O(N log N) on full result set)
        results = sorted(
            results,
            key=self._computed_sort_fn,
            reverse=self._computed_sort_reverse,
        )
        # Apply limit after sorting
        if self._limit_value is not None:
            results = results[: self._limit_value]
        return results

    # Standard path: no computed_sort
    if self._limit_value is not None:
        kwargs["limit"] = self._limit_value
    if self._order_by_value is not None:
        kwargs["order_by"] = self._order_by_value
    if self._values_tuple is not None:
        kwargs["values"] = self._values_tuple
    return self._query._execute_filter(
        q_objects=self._q_objects, _no_track=self._no_track, **kwargs
    )

count()

Count matching results without loading objects.

Returns:

Type Description
int

Integer count of matching instances

Source code in src/popoto/models/query.py
def count(self) -> int:
    """Count matching results without loading objects.

    Returns:
        Integer count of matching instances
    """
    # For Q objects, we need to execute the full query and count
    if self._q_objects:
        return len(self.all())
    return self._query.count(**self._filters)

first()

Return the first matching result, or None if no matches.

Returns:

Type Description
Model

First Model instance or None

Source code in src/popoto/models/query.py
def first(self) -> "Model":
    """Return the first matching result, or None if no matches.

    Returns:
        First Model instance or None
    """
    results = self.limit(1).all()
    return results[0] if results else None

last()

Return the last matching result, or None if no matches.

Note: For efficient access to the last item in sorted order, use order_by("-field").first() instead. This method fetches all results.

Returns:

Type Description
Model

Last Model instance or None

Source code in src/popoto/models/query.py
def last(self) -> "Model":
    """Return the last matching result, or None if no matches.

    Note: For efficient access to the last item in sorted order, use
    order_by("-field").first() instead. This method fetches all results.

    Returns:
        Last Model instance or None
    """
    results = self.all()
    return results[-1] if results else None

Query

Query interface for a Popoto Model.

Accessed via Model.query. Provides get, filter, all, count, and keys methods, plus async variants of each.

Every Model class automatically receives a Query instance as both Model.query and Model.objects (for Django compatibility). This class coordinates filtering, retrieval, and result preparation across different field types.

Architecture:

Query acts as an orchestrator rather than implementing filter logic directly. Each field type knows how to filter itself via its filter_query() class method. Query's job is to:

  1. Route filter parameters to the appropriate fields
  2. Combine results from multiple field filters via set intersection
  3. Batch-load matching objects using Redis pipelines
  4. Apply post-query sorting and limiting

This delegation pattern allows new field types to add query capabilities without modifying the Query class.

Chainable Query API:

In addition to the original kwargs-based API, Query now supports a fluent chainable interface via QueryBuilder:

# Original API (still fully supported)
results = Model.query.filter(status="active", limit=10, order_by="name")

# Chainable API
results = Model.query.filter(status="active").order_by("name").limit(10).all()

# Chain multiple filters
results = Model.query.filter(status="active").filter(type="premium").all()

The filter() method returns a QueryBuilder when no limit/order_by/values kwargs are provided, enabling chaining. The QueryBuilder is also iterable for backward compatibility with code that iterates over filter() results directly.

Attributes:

Name Type Description
model_class Model

The Model subclass this Query operates on

options ModelOptions

The ModelOptions metadata for the model (fields, key names, etc.)

Example

class Product(Model): sku = KeyField(type=str) price = SortedField(type=float) category = KeyField(type=str)

Query is automatically available

cheap_electronics = Product.query.filter( category="electronics", price__lte=50.0, order_by="price", limit=10 )

Source code in src/popoto/models/query.py
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
class Query:
    """Query interface for a Popoto Model.

    Accessed via ``Model.query``. Provides ``get``, ``filter``, ``all``,
    ``count``, and ``keys`` methods, plus async variants of each.

    Every Model class automatically receives a Query instance as both `Model.query`
    and `Model.objects` (for Django compatibility). This class coordinates filtering,
    retrieval, and result preparation across different field types.

    Architecture:
    ------------
    Query acts as an orchestrator rather than implementing filter logic directly.
    Each field type knows how to filter itself via its `filter_query()` class method.
    Query's job is to:

    1. Route filter parameters to the appropriate fields
    2. Combine results from multiple field filters via set intersection
    3. Batch-load matching objects using Redis pipelines
    4. Apply post-query sorting and limiting

    This delegation pattern allows new field types to add query capabilities
    without modifying the Query class.

    Chainable Query API:
    -------------------
    In addition to the original kwargs-based API, Query now supports a fluent
    chainable interface via QueryBuilder:

        # Original API (still fully supported)
        results = Model.query.filter(status="active", limit=10, order_by="name")

        # Chainable API
        results = Model.query.filter(status="active").order_by("name").limit(10).all()

        # Chain multiple filters
        results = Model.query.filter(status="active").filter(type="premium").all()

    The filter() method returns a QueryBuilder when no limit/order_by/values kwargs
    are provided, enabling chaining. The QueryBuilder is also iterable for backward
    compatibility with code that iterates over filter() results directly.

    Attributes:
        model_class: The Model subclass this Query operates on
        options: The ModelOptions metadata for the model (fields, key names, etc.)

    Example:
        class Product(Model):
            sku = KeyField(type=str)
            price = SortedField(type=float)
            category = KeyField(type=str)

        # Query is automatically available
        cheap_electronics = Product.query.filter(
            category="electronics",
            price__lte=50.0,
            order_by="price",
            limit=10
        )
    """

    model_class: "Model"
    options: "ModelOptions"

    def __init__(self, model_class: "Model"):
        """
        Initialize a Query instance bound to a specific Model class.

        This is called automatically by the ModelBase metaclass when a Model
        subclass is defined. Users should not need to instantiate Query directly.

        Args:
            model_class: The Model subclass this Query will operate on
        """
        self.model_class = model_class
        self.options = model_class._meta
        self._geo_distances = {}  # {redis_key: distance}
        self._geo_distance_unit = None  # unit for distance values

    def get(
        self, db_key: DB_key = None, redis_key: str = None, **kwargs
    ) -> Optional["Model"]:
        """Retrieve a single model instance.

        Look up by *db_key*, *redis_key*, or keyword field values. Raises
        :class:`QueryException` if more than one match is found. Returns
        ``None`` when no match exists.

        This method provides multiple retrieval strategies, optimized for different
        use cases:

        1. **Direct key lookup** (fastest): If all KeyField values are provided,
           the Redis key can be computed directly without any search.

        2. **Raw Redis key**: If you already have the Redis key string (e.g., from
           a previous query or external source), pass it directly.

        3. **Filter fallback**: If non-key fields are provided, falls back to
           `filter()` but raises an exception if multiple objects match.

        Args:
            db_key: A DB_key instance pointing to the object
            redis_key: The raw Redis key string (e.g., "User:alice:123")
            **kwargs: Field values to identify the object. If all KeyFields are
                      provided, enables direct lookup.

        Returns:
            The matching Model instance, or None if not found.

        Raises:
            QueryException: If the filter matches more than one object. This
                           indicates get() was used incorrectly; use filter() instead.

        Example:
            # Direct lookup when all keys are known (single Redis command)
            user = User.query.get(username="alice", tenant_id="acme")

            # Positional redis_key string (e.g. from a previous query or external source)
            user = User.query.get("User:alice:acme")

            # Fallback to filter when using non-key fields
            user = User.query.get(email="alice@example.com")  # May be slower
        """
        if isinstance(db_key, str) and not redis_key:
            redis_key = db_key
            db_key = None

        if (
            not db_key
            and not redis_key
            and all([key in kwargs for key in self.options.key_field_names])
        ):
            db_key = self.model_class(**kwargs).db_key

        if db_key and not redis_key:
            redis_key = db_key.redis_key

        if redis_key:
            from ..models.encoding import decode_popoto_model_hashmap

            hashmap = POPOTO_REDIS_DB.hgetall(redis_key)
            if not hashmap:
                return None
            instance = decode_popoto_model_hashmap(self.model_class, hashmap)
            _fire_on_read(self.model_class, [instance])

        else:
            instances = self.filter(**kwargs)
            if len(instances) > 1:
                raise QueryException(
                    f"{self.model_class.__name__} found more than one unique instance. Use `query.filter()`"
                )
            instance = instances[0] if len(instances) == 1 else None

        # or not hasattr(instance, 'db_key')
        return instance or None

    def get_many(self, redis_keys: list, skip_none: bool = False) -> list:
        """Retrieve multiple model instances by their Redis keys in a single pipeline.

        Uses a Redis pipeline to batch HGETALL calls, reducing N sequential
        round-trips to a single pipelined round-trip. Input order is preserved:
        each position in the returned list corresponds to the same position in
        ``redis_keys``.

        Unlike the internal ``get_many_objects()`` static method (which takes a
        set of bytes keys and silently drops missing entries), this public method
        takes a list of string keys, preserves order, and returns ``None`` for
        missing keys.

        Args:
            redis_keys: List of Redis key strings to look up (e.g.
                ``["User:alice:acme", "User:bob:acme"]``).
            skip_none: If True, filter out ``None`` entries from the result so
                that only successfully hydrated instances are returned. When
                False (default), the returned list has the same length as
                ``redis_keys`` with ``None`` at positions where the key was
                missing.

        Returns:
            List of Model instances (and/or ``None`` when *skip_none* is False).

        Example:
            # Bulk hydration after a set-based query
            keys = ["Product:widget:001", "Product:widget:002", "Product:widget:003"]
            products = Product.query.get_many(redis_keys=keys)
            # [<Product>, None, <Product>]  -- second key was missing

            # Skip missing entries
            products = Product.query.get_many(redis_keys=keys, skip_none=True)
            # [<Product>, <Product>]
        """
        if not redis_keys:
            return []

        from ..models.encoding import decode_popoto_model_hashmap

        pipeline = POPOTO_REDIS_DB.pipeline()
        for key in redis_keys:
            pipeline.hgetall(key)
        hashes_list = pipeline.execute()

        results = []
        live_instances = []
        for hashmap in hashes_list:
            if hashmap:
                instance = decode_popoto_model_hashmap(self.model_class, hashmap)
                results.append(instance)
                live_instances.append(instance)
            else:
                results.append(None)

        if live_instances:
            _fire_on_read(self.model_class, live_instances)

        if skip_none:
            return [r for r in results if r is not None]
        return results

    def keys(self, catchall=False, clean=False, **kwargs) -> list:
        """Return a list of Redis key bytes for all instances of this model.

        By default, returns keys from the Model's class set (a Redis SET that
        tracks all instances). This is O(N) where N is the number of instances.

        Args:
            catchall: Debug flag. If True, uses Redis KEYS command with wildcard
                     pattern matching. This scans ALL keys in Redis and should
                     NEVER be used in production. Useful for finding orphaned keys
                     that aren't in the class set.
            clean: Debug flag. If True, removes dangling references from index sets.
                  This repairs inconsistencies where the class set or field indexes
                  reference objects that no longer exist. Run this if you see
                  "missing objects" errors in query results.
            **kwargs: Reserved for future filtering capabilities.

        Returns:
            List of Redis key strings (bytes) for all Model instances.

        Warning:
            Both `catchall` and `clean` use Redis KEYS command, which blocks the
            server and scans all keys. Never use these in production environments.

        Example:
            # Normal usage - get all keys from class set
            all_keys = Product.query.keys()

            # Debug - find any keys matching the model name
            orphaned = Product.query.keys(catchall=True)

            # Repair - clean up dangling references
            Product.query.keys(clean=True)
        """
        if clean:
            logger.warning(
                "Query.keys(clean=True) is deprecated. Use Model.clean_indexes() for production-safe orphan cleanup."
            )
            pipeline = POPOTO_REDIS_DB.pipeline()
            from ..fields.key_field_mixin import KeyFieldMixin
            from ..fields.relationship import Relationship

            for db_key in list(
                POPOTO_REDIS_DB.smembers(
                    self.model_class._meta.db_class_set_key.redis_key
                )
            ):
                hash = POPOTO_REDIS_DB.hgetall(db_key)
                if not len(hash):
                    pipeline = pipeline.srem(
                        self.model_class._meta.db_class_set_key.redis_key, db_key
                    )

            # find
            for field_name, field in self.model_class._meta.fields.items():  # 3
                if not isinstance(field, (KeyFieldMixin, Relationship)):
                    continue
                field_key_prefix = field.get_special_use_field_db_key(
                    self.model_class, field_name
                )
                for field_key in POPOTO_REDIS_DB.keys(f"{field_key_prefix}:*"):
                    for object_key in POPOTO_REDIS_DB.smembers(field_key):
                        hash = POPOTO_REDIS_DB.hgetall(object_key)
                        if not len(hash):
                            pipeline = pipeline.srem(field_key, object_key)

            pipeline.execute()

        if catchall:
            logger.warning(
                "{catchall} is for debugging purposes only. Not for use in production environment"
            )
            return list(POPOTO_REDIS_DB.keys(f"*{self.model_class.__name__}*"))
        else:
            return list(
                POPOTO_REDIS_DB.smembers(
                    self.model_class._meta.db_class_set_key.redis_key
                )
            )

    def all(self, **kwargs) -> list:
        """Return all instances, with optional ``order_by``, ``limit``, and ``values``.

        Fetches every object tracked in the Model's class set. For large datasets,
        consider using `filter()` with appropriate constraints or `count()` to
        first assess the result size.

        Args:
            **kwargs: Supports the same result modifiers as `filter()`:
                - values: tuple of field names to return as dicts instead of objects
                - order_by: field name to sort by (prefix with "-" for descending)
                - limit: maximum number of results to return

        Returns:
            List of Model instances, or list of dicts if `values` is specified.

        Example:
            # Get all users
            users = User.query.all()

            # Get all users, sorted by creation date, newest first
            users = User.query.all(order_by="-created_at", limit=100)

            # Get only names and emails (more efficient for large objects)
            user_data = User.query.all(values=("name", "email"))
        """
        redis_db_keys_list = self.keys()

        # Apply default order_by from Meta if not explicitly provided
        if "order_by" not in kwargs and self.model_class._meta.order_by:
            kwargs["order_by"] = self.model_class._meta.order_by

        return self.prepare_results(
            Query.get_many_objects(
                self.model_class,
                set(redis_db_keys_list),
                order_by_attr_name=kwargs.get("order_by", None),
                values=kwargs.get("values", None),
            ),
            **kwargs,
        )

    def filter_for_keys_set(self, **kwargs) -> set:
        """
        Execute filter logic and return matching Redis keys (without loading objects).

        This is the core filtering engine. It routes filter parameters to the
        appropriate field types and combines their results via set intersection.
        Separated from `filter()` to support `count()` without the overhead of
        object instantiation.

        Processing Order:
        ----------------
        1. **Sorted fields first**: SortedFields use Redis sorted sets with range
           queries (ZRANGEBYSCORE), which are often more selective than key lookups.
           Additionally, sorted fields may have partition dependencies (`partition_by`)
           that consume other filter parameters.

        2. **Remaining fields**: KeyFields and other field types are processed
           after sorted fields, using whatever parameters remain unconsumed.

        3. **Set intersection**: Each field's filter returns a set of matching keys.
           The final result is the intersection of all sets, implementing AND logic.

        Args:
            **kwargs: Filter parameters. Each parameter is matched to a field that
                     supports it. Reserved params (limit, order_by, values) are
                     excluded from field matching.

        Returns:
            Set of Redis key strings (bytes) matching ALL filter criteria.
            Returns empty set if no filters provided or no matches found.

        Raises:
            QueryException: If any kwargs don't match a known filter parameter.
                          This prevents silent failures from typos.

        Note:
            This method does not apply ordering or limits - it only identifies
            matching keys. Use `filter()` for the complete query pipeline.
        """
        db_keys_sets = []
        self._sorted_field_order = None
        self._sorted_field_name = None
        self._pending_client_filters = {}
        yet_employed_kwargs_set = set(kwargs.keys()).difference(
            {"limit", "order_by", "values"}
        )
        if not len(yet_employed_kwargs_set):
            # No filter criteria - return all keys (same as all())
            return set(self.keys())

        # todo: use redis.SINTER for keyfield exact match filters

        # do sorted_fields first - because they can obviate some keyfield filters
        for field_name in self.options.sorted_field_names:
            field = self.options.fields[field_name]
            if not len(
                yet_employed_kwargs_set
                & self.options.filter_query_params_by_field[field_name]
            ):
                continue  # this field cannot use any of the available filter params
            logger.debug(
                f"query on {field_name} with {self.options.filter_query_params_by_field[field_name]}"
            )
            logger.debug(
                {
                    k: kwargs[k]
                    for k in self.options.filter_query_params_by_field[field_name]
                    if k in kwargs
                }
            )
            result = field.__class__.filter_query(
                self.model_class, field_name, **kwargs
            )
            # Handle tuple return from GeoField with distances
            if isinstance(result, tuple) and len(result) == 3:
                keys_set, distances, unit = result
                self._geo_distances.update(distances)
                self._geo_distance_unit = unit
                db_keys_sets.append(keys_set)
            else:
                # result is now a list (preserving ZRANGEBYSCORE order)
                if self._sorted_field_order is None:
                    self._sorted_field_order = result
                    self._sorted_field_name = field_name
                db_keys_sets.append(set(result))  # convert to set for intersection
            yet_employed_kwargs_set = yet_employed_kwargs_set.difference(
                self.options.filter_query_params_by_field[field_name]
            ).difference(
                set(field.partition_by)
            )  # also remove the required partition_by field names

        for field_name in self.options.filter_query_params_by_field:
            if field_name in self.options.sorted_field_names:
                continue  # already handled
            params_for_field = yet_employed_kwargs_set & set(
                self.options.filter_query_params_by_field[field_name]
            )
            if not params_for_field:
                continue  # this field cannot use any of the available filter params

            field = self.options.fields[field_name]
            logger.debug(f"query on {field_name} with {params_for_field}")
            logger.debug({k: kwargs[k] for k in params_for_field})
            result = field.__class__.filter_query(
                self.model_class, field_name, **{k: kwargs[k] for k in params_for_field}
            )
            # Handle tuple return from GeoField with distances
            if isinstance(result, tuple) and len(result) == 3:
                keys_set, distances, unit = result
                self._geo_distances.update(distances)
                self._geo_distance_unit = unit
                db_keys_sets.append(keys_set)
            else:
                db_keys_sets.append(result)
            yet_employed_kwargs_set = yet_employed_kwargs_set.difference(
                params_for_field
            )

        # Separate plain field params (client-side filter) from truly unknown params
        if yet_employed_kwargs_set:
            plain_field_filters = {}
            unknown_params = set()
            for param in yet_employed_kwargs_set:
                if param in self.options.fields:
                    plain_field_filters[param] = kwargs[param]
                    logger.debug(
                        f"Client-side filter on unindexed field '{param}' "
                        f"— consider using SortedField for better performance"
                    )
                else:
                    unknown_params.add(param)
            if unknown_params:
                raise QueryException(
                    f"Invalid filter parameters: {','.join(unknown_params)}"
                )
            self._pending_client_filters = plain_field_filters

        logger.debug(db_keys_sets)
        if not len(db_keys_sets):
            if self._pending_client_filters:
                # Only plain field filters — load all keys for client-side filtering
                return set(self.keys())
            return set()
        # return intersection of all the db keys sets, effectively &&-ing all filters
        intersection = set.intersection(*db_keys_sets)
        if self._sorted_field_order is not None:
            matched_keys = intersection
            self._sorted_field_order = [
                k for k in self._sorted_field_order if k in matched_keys
            ]
        return intersection

    def _evaluate_filter_args(self, q_objects: list, kwargs: dict) -> set:
        """Evaluate filter arguments including Q objects and return matching keys.

        This method handles both traditional kwargs filtering and Q object
        expressions. When Q objects are present, they are combined with any
        kwargs using AND logic.

        Args:
            q_objects: List of Q objects for complex query expressions
            kwargs: Dict of filter parameters and result modifiers

        Returns:
            Set of Redis keys matching all filter criteria.

        Processing Logic:
        ----------------
        1. If no Q objects, delegate to filter_for_keys_set()
        2. If Q objects present:
           a. Evaluate each Q object to get its result set
           b. If kwargs filters exist, evaluate them too
           c. Intersect all result sets (AND logic between args)
        """
        from .q import evaluate_q

        # Extract result modifiers from kwargs
        filter_kwargs = {
            k: v for k, v in kwargs.items() if k not in {"limit", "order_by", "values"}
        }

        if not q_objects:
            # No Q objects - use traditional filtering
            return self.filter_for_keys_set(**kwargs)

        # Evaluate Q objects
        result_sets = []
        all_keys = None  # Lazy-loaded for negation operations

        for q_obj in q_objects:
            result_sets.append(evaluate_q(self, q_obj, all_keys))

        # If there are also kwargs filters, include them
        if filter_kwargs:
            kwargs_result = self.filter_for_keys_set(**kwargs)
            result_sets.append(kwargs_result)

        # Intersect all result sets (AND logic between multiple Q args and kwargs)
        if not result_sets:
            return set()
        return set.intersection(*result_sets) if result_sets else set()

    def filter(self, *args, **kwargs) -> "QueryBuilder":
        """
        Query for Model instances matching the specified criteria.

        This is the primary query method for Popoto, providing Django-like filtering
        syntax with Redis-optimized execution. All filter parameters are AND-ed
        together by default. Use Q objects for OR logic and complex combinations.

        Returns a QueryBuilder that supports method chaining. The QueryBuilder also
        behaves like a list for backward compatibility - you can iterate over it or
        pass it to len() and it will execute the query automatically.

        Filter Parameters:
        -----------------
        Available filters depend on the field types in your Model:

        **KeyField filters:**
        - `field=value` - Exact match
        - `field__in=[v1, v2]` - Match any value in list
        - `field__contains="x"` - Substring match (uses Redis KEYS, slow)
        - `field__startswith="x"` - Prefix match
        - `field__endswith="x"` - Suffix match
        - `field__isnull=True/False` - Null check

        **SortedField filters:**
        - `field=value` - Exact match
        - `field__gt=value` - Greater than
        - `field__gte=value` - Greater than or equal
        - `field__lt=value` - Less than
        - `field__lte=value` - Less than or equal

        **Q Objects (for complex logic):**
        - `Q(field=value)` - Basic Q object (equivalent to kwargs)
        - `Q(...) | Q(...)` - OR logic (union of results)
        - `Q(...) & Q(...)` - AND logic (intersection of results)
        - `~Q(...)` - NOT logic (exclusion)

        Result Modifiers (kwargs API):
        -----------------------------
        - `order_by="field"` - Sort ascending by field
        - `order_by="-field"` - Sort descending by field
        - `limit=N` - Return at most N results
        - `values=("field1", "field2")` - Return dicts with only specified fields
          instead of full Model instances (projection query, more efficient)

        Chainable Methods:
        -----------------
        - `.filter(**kwargs)` - Add more filter criteria
        - `.order_by("field")` - Sort results (prefix with "-" for descending)
        - `.limit(n)` - Limit number of results
        - `.values("field1", "field2")` - Return dicts instead of objects
        - `.all()` - Execute and return results
        - `.first()` - Execute and return first result or None
        - `.count()` - Count matching results without loading objects

        Args:
            *args: Q objects for complex query expressions
            **kwargs: Filter parameters and result modifiers as described above.

        Returns:
            QueryBuilder that can be chained or iterated directly.

        Raises:
            QueryException: If unknown filter parameters are provided.

        Example:
            # Original kwargs API (still fully supported)
            users = User.query.filter(
                status="active",
                tier="premium",
                created_at__gte=datetime(2024, 1, 1),
                order_by="-created_at",
                limit=50
            )

            # Chainable API
            users = User.query.filter(status="active").order_by("-created_at").limit(50).all()

            # Chain multiple filters
            users = User.query.filter(status="active").filter(tier="premium").all()

            # Efficient projection - only load specific fields
            emails = User.query.filter(status="active").values("email", "name").all()

            # OR logic with Q objects
            users = User.query.filter(Q(status="active") | Q(type="premium"))

            # Complex combinations
            users = User.query.filter(
                (Q(status="active") | Q(type="premium")) & Q(rating__gt=3.0)
            )
        """
        from .q import Q
        from .expressions import Expression, CombinedExpression

        # Process args - can be Q objects or Expression objects
        q_objects = []
        for arg in args:
            if isinstance(arg, Q):
                q_objects.append(arg)
            elif isinstance(arg, (Expression, CombinedExpression)):
                # Convert Expression to Q object
                q_objects.append(arg.to_q())

        # Extract result modifiers from kwargs for the QueryBuilder
        filters = {
            k: v for k, v in kwargs.items() if k not in {"limit", "order_by", "values"}
        }
        builder = QueryBuilder(self, filters, q_objects)

        # Apply result modifiers if provided in kwargs (for backward compatibility)
        if "limit" in kwargs:
            builder._limit_value = kwargs["limit"]
        if "order_by" in kwargs:
            builder._order_by_value = kwargs["order_by"]
        if "values" in kwargs:
            builder._values_tuple = kwargs["values"]

        return builder

    def top_by_decay(
        self, field_name=None, n=10, decay_rate=None, base_score_field=None
    ):
        """Return top-N instances ranked by time-decayed score.

        Convenience method that creates a QueryBuilder and delegates.
        For partitioned fields, use query.filter(partition=value).top_by_decay().

        Args:
            field_name: Name of a DecayingSortedField on the model. Optional
                when the model has exactly one DecayingSortedField (or subclass).
            n: Maximum number of results to return. Default 10.
            decay_rate: Override the field's decay_rate for this query.
            base_score_field: Override the field's base_score_field for this query.

        Returns:
            List of model instances in decayed-score order.
        """
        builder = QueryBuilder(self)
        return builder.top_by_decay(
            field_name, n=n, decay_rate=decay_rate, base_score_field=base_score_field
        )

    def composite_score(
        self,
        indexes: dict,
        limit: int = 10,
        aggregate: str = "SUM",
        min_score: float = None,
        post_filter: Optional[Callable[[str, float], bool]] = None,
        co_occurrence_boost: dict = None,
        similarity_boost: dict = None,
        temperature: float = 1.0,
    ) -> list:
        """Return top-K instances ranked by weighted composite score.

        Convenience method that creates a QueryBuilder and delegates.
        For partitioned fields, use
        ``query.filter(partition=value).composite_score(...)``.

        Args:
            indexes: Mapping of field names to weights.
            limit: Maximum results to return. Default 10.
            aggregate: Aggregation mode: "SUM", "MIN", or "MAX".
            min_score: Optional minimum composite score threshold.
            post_filter: Optional callable (redis_key, score) -> bool.
            co_occurrence_boost: Optional {redis_key: weight} dict.
            similarity_boost: Optional {redis_key: score} dict from
                semantic_search().
            temperature: Score scaling factor. Default 1.0 (no scaling).
                Must be > 0.

        Returns:
            List of model instances ranked by composite score.
        """
        builder = QueryBuilder(self)
        return builder.composite_score(
            indexes=indexes,
            limit=limit,
            aggregate=aggregate,
            min_score=min_score,
            post_filter=post_filter,
            co_occurrence_boost=co_occurrence_boost,
            similarity_boost=similarity_boost,
            temperature=temperature,
        )

    def semantic_search(
        self,
        query_text: str,
        indexes: dict = None,
        limit: int = 10,
        aggregate: str = "SUM",
        min_score: float = None,
        post_filter: Optional[Callable[[str, float], bool]] = None,
        co_occurrence_boost: dict = None,
        temperature: float = 1.0,
    ) -> list:
        """Return top-K instances ranked by semantic similarity.

        Convenience method that creates a QueryBuilder and delegates.

        Args:
            query_text: The text to search for semantically.
            indexes: Optional field-weight mapping for composite scoring.
            limit: Maximum results to return. Default 10.
            aggregate: Aggregation mode. Default "SUM".
            min_score: Optional minimum score threshold.
            post_filter: Optional callable (redis_key, score) -> bool.
            co_occurrence_boost: Optional {redis_key: weight} dict.
            temperature: Score scaling factor. Default 1.0.

        Returns:
            List of model instances ranked by semantic similarity.
        """
        builder = QueryBuilder(self)
        return builder.semantic_search(
            query_text=query_text,
            indexes=indexes,
            limit=limit,
            aggregate=aggregate,
            min_score=min_score,
            post_filter=post_filter,
            co_occurrence_boost=co_occurrence_boost,
            temperature=temperature,
        )

    def keyword_search(
        self,
        query_text: str,
        field: str = None,
        limit: int = 10,
    ) -> list:
        """Return instances ranked by BM25 keyword relevance.

        Convenience method that creates a QueryBuilder and delegates.

        Args:
            query_text: The search query string.
            field: Name of the BM25Field to search. Optional when the model
                has exactly one BM25Field.
            limit: Maximum results to return. Default 10.

        Returns:
            List of model instances ranked by BM25 score (descending).
        """
        builder = QueryBuilder(self)
        return builder.keyword_search(
            query_text=query_text,
            field=field,
            limit=limit,
        )

    def fuse(
        self,
        k: int = 60,
        limit: int = 10,
        post_filter: Optional[Callable] = None,
        **ranked_lists,
    ) -> list:
        """Reciprocal Rank Fusion across heterogeneous ranked lists.

        Convenience method that creates a QueryBuilder and delegates.

        Args:
            k: RRF constant (default 60).
            limit: Maximum results to return. Default 10.
            post_filter: Optional (redis_key, rrf_score) -> bool callback.
            **ranked_lists: Named ranked lists of (redis_key, score) tuples.

        Returns:
            List of model instances ranked by RRF score (descending).
        """
        builder = QueryBuilder(self)
        return builder.fuse(
            k=k,
            limit=limit,
            post_filter=post_filter,
            **ranked_lists,
        )

    def _execute_filter(
        self, q_objects: list = None, _no_track: bool = False, **kwargs
    ) -> list:
        """Internal method to execute filter logic and return results.

        This is the actual filter execution, called by QueryBuilder.all() and
        the backward-compatible list operations on QueryBuilder.

        Args:
            q_objects: List of Q objects for complex query expressions
            _no_track: If True, suppress on_read() for AccessTrackerMixin models
            **kwargs: Filter parameters and result modifiers

        Returns:
            List of Model instances or dicts
        """
        # Reset geo distances for this query
        self._geo_distances = {}
        self._geo_distance_unit = None

        # Use _evaluate_filter_args if Q objects present, otherwise filter_for_keys_set
        if q_objects:
            db_keys_set = self._evaluate_filter_args(q_objects, kwargs)
            # Q objects combine results from multiple filter_for_keys_set calls,
            # so _sorted_field_order is unreliable — clear it
            self._sorted_field_order = None
            self._sorted_field_name = None
        else:
            db_keys_set = self.filter_for_keys_set(**kwargs)
        if not len(db_keys_set):
            return []

        # Apply default order_by from Meta if not explicitly provided
        # but not when sorted field ordering is active (it's a smarter default)
        if (
            "order_by" not in kwargs
            and self.model_class._meta.order_by
            and not getattr(self, "_sorted_field_order", None)
        ):
            kwargs["order_by"] = self.model_class._meta.order_by

        # Use sorted field order if available and no explicit order_by
        sorted_field_order = getattr(self, "_sorted_field_order", None)
        explicit_order_by = kwargs.get("order_by", None)
        # Meta.order_by is a default - sorted field order takes precedence over it
        if sorted_field_order and not explicit_order_by:
            db_keys_set = sorted_field_order  # Use ordered list instead of set

        objects = Query.get_many_objects(
            self.model_class,
            db_keys_set,
            order_by_attr_name=kwargs.get("order_by", None),
            limit=kwargs.get("limit", None),
            values=kwargs.get("values", None),
        )

        # Apply client-side filters for plain (unindexed) fields
        client_filters = getattr(self, "_pending_client_filters", {})
        if client_filters:
            filtered = []
            for obj in objects:
                match = True
                for field_name, expected_value in client_filters.items():
                    if isinstance(obj, dict):
                        actual = obj.get(field_name)
                    else:
                        actual = getattr(obj, field_name, None)
                    if actual != expected_value:
                        match = False
                        break
                if match:
                    filtered.append(obj)
            objects = filtered

        # Attach geo distances to objects if available
        if self._geo_distances:
            # Normalize distance dict keys to strings for consistent lookup
            normalized_distances = {}
            for key, dist in self._geo_distances.items():
                if isinstance(key, bytes):
                    normalized_distances[key.decode()] = dist
                else:
                    normalized_distances[key] = dist

            for obj in objects:
                if isinstance(obj, dict):
                    # When values= is used, obj is a dict - skip distance attachment
                    continue
                redis_key = obj.db_key.redis_key
                if isinstance(redis_key, bytes):
                    redis_key = redis_key.decode()
                distance = normalized_distances.get(redis_key)
                if distance is not None:
                    obj._geo_distance = distance
                    obj._geo_distance_unit = self._geo_distance_unit

            # Sort by distance (ascending) to preserve geo-sorted order
            # Only sort model objects, not dicts
            model_objects = [o for o in objects if not isinstance(o, dict)]
            dict_objects = [o for o in objects if isinstance(o, dict)]
            model_objects.sort(key=lambda o: getattr(o, "_geo_distance", float("inf")))
            objects = model_objects + dict_objects

        results = self.prepare_results(objects, **kwargs)

        # Fire on_read for AccessTrackerMixin models (skip for value projections)
        if not _no_track and not kwargs.get("values"):
            model_results = [r for r in results if not isinstance(r, dict)]
            if model_results:
                _fire_on_read(self.model_class, model_results)

        return results

    def prepare_results(
        self,
        objects,
        order_by: str = "",
        values: tuple = (),
        limit: int = None,
        **kwargs,
    ):
        """Apply sorting and limiting to query results.

        This is a post-processing step that operates on already-loaded objects.
        For large result sets, sorting happens in Python rather than Redis,
        which may have performance implications.

        Design Note:
        -----------
        Sorting is applied after fetching because Redis doesn't support cross-key
        sorting natively. SortedFields maintain their own sorted sets for range
        queries, but general-purpose sorting across arbitrary fields requires
        loading objects first.

        For KeyFields, `get_many_objects()` can optimize sorting when the sort
        field is part of the key (extracting sort values from key strings without
        loading full objects). This method handles the remaining cases.

        Args:
            objects: List of Model instances or dicts (if values projection was used)
            order_by: Field name to sort by. Prefix with "-" for descending order.
            values: Tuple of field names if projection was used (affects sort behavior)
            limit: Maximum number of results to return after sorting
            **kwargs: Unused, accepts extra params for forward compatibility

        Returns:
            Sorted and limited list of objects or dicts.

        Raises:
            QueryException: If order_by field doesn't exist or isn't included in
                           values tuple when using projection.

        Note:
            Null values in the sort field are handled by substituting the field's
            default type value (e.g., "" for str, 0 for int), ensuring consistent
            ordering.
        """
        # Apply default order_by from Meta if not explicitly provided
        if not order_by and self.model_class._meta.order_by:
            order_by = self.model_class._meta.order_by

        reverse_order = False
        if order_by and order_by.startswith("-"):
            reverse_order = True
            order_by = order_by[1:]
        if order_by:
            order_by_attr_name = order_by
            if (
                not isinstance(order_by_attr_name, str)
            ) or order_by_attr_name not in self.model_class._meta.fields:
                raise QueryException(
                    f"order_by={order_by_attr_name} must be a field name (str)"
                )
            attr_type = self.model_class._meta.fields[order_by_attr_name].type
            if values and order_by_attr_name not in values:
                raise QueryException(
                    "field must be included in values=(fieldnames) in order to use order_by"
                )
            elif values:
                objects.sort(key=lambda item: item.get(order_by_attr_name))
            else:
                objects.sort(
                    key=lambda item: getattr(item, order_by_attr_name) or attr_type()
                )
            objects = (
                list(reversed(objects))[:limit] if reverse_order else objects[:limit]
            )

        if limit and len(objects) > limit:
            objects = objects[:limit]

        return objects

    def count(self, **kwargs) -> int:
        """Count instances matching the given filters (or all if no filters).

        More efficient than `len(filter(...))` because it avoids object
        instantiation. For unfiltered counts, uses Redis SCARD which is O(1).

        Args:
            **kwargs: Same filter parameters as `filter()`. If empty, counts
                     all instances of the Model.

        Returns:
            Integer count of matching instances.

        Performance:
        -----------
        - No filters: O(1) using Redis SCARD on the class set
        - With filters: O(N) where N is the result of filter intersection,
          but still avoids the overhead of HGETALL and object instantiation

        Example:
            # O(1) - count all products
            total = Product.query.count()

            # Filtered count - still more efficient than len(filter(...))
            active = Product.query.count(status="active", category="electronics")

        Note:
            Future optimization: Could use Redis SINTERCARD (Redis 7.0+) for
            filtered counts to avoid materializing the full key set.
        """
        if not len(kwargs):
            return int(
                POPOTO_REDIS_DB.scard(self.model_class._meta.db_class_set_key.redis_key)
                or 0
            )
        db_keys = self.filter_for_keys_set(**kwargs)
        client_filters = getattr(self, "_pending_client_filters", {})
        if client_filters:
            # Must load objects to apply client-side filters
            objects = Query.get_many_objects(self.model_class, db_keys)
            return sum(
                1
                for obj in objects
                if all(
                    getattr(obj, fname, None) == fval
                    for fname, fval in client_filters.items()
                )
            )
        return len(db_keys)

    @classmethod
    def get_many_objects(
        cls,
        model: "Model",
        db_keys: set,
        order_by_attr_name: str = None,
        limit: int = None,
        values: tuple = None,
        lazy: bool = True,
    ) -> list:
        """
        Batch-load multiple Model instances from Redis using pipelined commands.

        This is the core bulk retrieval method, optimized for performance through
        several strategies:

        1. **Redis Pipelines**: All HGETALL/HMGET commands are batched into a single
           pipeline, reducing network round trips from O(N) to O(1).

        2. **KeyField Sorting Optimization**: If sorting by a KeyField, sort values
           can be extracted directly from key strings without loading objects,
           and limit can be applied BEFORE loading (reducing Redis commands).

        3. **Projection Optimization**: With `values` tuple:
           - If ALL requested fields are KeyFields, data is extracted from key
             strings without ANY Redis commands.
           - Otherwise, uses HMGET instead of HGETALL for partial field retrieval.

        Args:
            model: The Model class to instantiate
            db_keys: Set of Redis keys to load
            order_by_attr_name: Field to sort by (prefix with "-" for descending).
                               If this is a KeyField, sorting is optimized.
            limit: Maximum objects to load. Applied early if sorting by KeyField.
            values: Tuple of field names for projection. If specified, returns
                   dicts instead of Model instances.

        Returns:
            List of Model instances, or list of dicts if values is specified.
            Objects for missing/deleted keys are silently excluded (with a
            warning logged).

        Raises:
            QueryException: If values is not a tuple.

        Performance Notes:
        -----------------
        - N objects without projection: 1 pipeline with N HGETALL commands
        - N objects with projection: 1 pipeline with N HMGET commands
        - Projection with only KeyFields: 0 Redis commands (parsed from keys)
        - Sorting by KeyField with limit: Only `limit` objects loaded

        Example:
            # Internal usage - typically called by filter() or all()
            objects = Query.get_many_objects(
                User,
                {b"User:alice", b"User:bob"},
                order_by_attr_name="username",
                limit=10
            )
        """
        from .encoding import decode_popoto_model_hashmap

        pipeline = POPOTO_REDIS_DB.pipeline()
        reverse_order = False
        # order the hashes list or objects before applying limit
        if order_by_attr_name and order_by_attr_name.startswith("-"):
            order_by_attr_name = order_by_attr_name[1:]
            reverse_order = True

        if order_by_attr_name and order_by_attr_name in model._meta.key_field_names:
            field_position = model._meta.get_db_key_index_position(order_by_attr_name)
            db_keys = list(db_keys)
            db_keys.sort(key=lambda key: key.split(b":")[field_position])
            db_keys = (
                list(reversed(db_keys))[:limit] if reverse_order else db_keys[:limit]
            )

        if values:
            if not isinstance(values, tuple):
                raise QueryException(
                    "values takes a tuple. eg. query.filter(values=('name',))"
                )
            elif set(values).issubset(model._meta.key_field_names):
                db_keys = [DB_key.from_redis_key(db_key) for db_key in db_keys]
                return [
                    {
                        field_name: (
                            model._meta.fields[field_name].type(
                                db_key[
                                    model._meta.get_db_key_index_position(field_name)
                                ]
                            )
                            if db_key[model._meta.get_db_key_index_position(field_name)]
                            else None
                        )
                        for field_name in values
                    }
                    for db_key in db_keys
                ]
            else:
                [pipeline.hmget(db_key, values) for db_key in db_keys]
                value_lists = pipeline.execute()
                hashes_list = [
                    {field_name: result[i] for i, field_name in enumerate(values)}
                    for result in value_lists
                ]

        else:
            [pipeline.hgetall(db_key) for db_key in db_keys]
            hashes_list = pipeline.execute()

        if {} in hashes_list:
            logger.error(
                "one or more redis keys points to missing objects. Debug with Model.query.keys(clean=True)"
            )

        return [
            decode_popoto_model_hashmap(
                model, redis_hash, fields_only=bool(values), lazy=lazy and not values
            )
            for redis_hash in hashes_list
            if redis_hash
        ]

    # Async methods using native redis.asyncio

    async def async_get(
        self, db_key: DB_key = None, redis_key: str = None, **kwargs
    ) -> "Model":
        """Async version of get() using native async Redis.

        Retrieves a single model instance from Redis using non-blocking I/O.
        Uses redis.asyncio for true async operations without thread pool overhead.

        Args:
            db_key: Optional DB_key object
            redis_key: Optional Redis key string
            **kwargs: Field values to construct query

        Returns:
            Model instance or None if not found

        Raises:
            QueryException: If the filter matches more than one object.
        """
        from ..models.encoding import decode_popoto_model_hashmap

        if (
            not db_key
            and not redis_key
            and all([key in kwargs for key in self.options.key_field_names])
        ):
            db_key = self.model_class(**kwargs).db_key

        if db_key and not redis_key:
            redis_key = db_key.redis_key

        if redis_key:
            async_redis = await get_async_redis_db()
            hashmap = await async_redis.hgetall(redis_key)
            if not hashmap:
                return None
            instance = decode_popoto_model_hashmap(self.model_class, hashmap)
            await to_thread(_fire_on_read, self.model_class, [instance])
        else:
            instances = await self.async_filter(**kwargs)
            if len(instances) > 1:
                raise QueryException(
                    f"{self.model_class.__name__} found more than one unique instance. Use `query.filter()`"
                )
            instance = instances[0] if len(instances) == 1 else None

        return instance or None

    async def async_get_many(self, redis_keys: list, skip_none: bool = False) -> list:
        """Async version of get_many() using native async Redis.

        Retrieves multiple model instances by their Redis keys in a single
        async pipeline. See :meth:`Query.get_many` for full documentation.

        Args:
            redis_keys: List of Redis key strings to look up.
            skip_none: If True, filter out ``None`` entries from the result.

        Returns:
            List of Model instances (and/or ``None`` when *skip_none* is False).

        Example:
            keys = ["Product:widget:001", "Product:widget:002"]
            products = await Product.query.async_get_many(redis_keys=keys)
        """
        if not redis_keys:
            return []

        from ..models.encoding import decode_popoto_model_hashmap

        async_redis = await get_async_redis_db()
        pipeline = async_redis.pipeline()
        for key in redis_keys:
            pipeline.hgetall(key)
        hashes_list = await pipeline.execute()

        results = []
        live_instances = []
        for hashmap in hashes_list:
            if hashmap:
                instance = decode_popoto_model_hashmap(self.model_class, hashmap)
                results.append(instance)
                live_instances.append(instance)
            else:
                results.append(None)

        if live_instances:
            await to_thread(_fire_on_read, self.model_class, live_instances)

        if skip_none:
            return [r for r in results if r is not None]
        return results

    async def async_filter(self, **kwargs) -> list:
        """Async version of filter() using native async Redis.

        Filters model instances based on field values using non-blocking I/O.
        Currently uses to_thread() for complex filter operations that involve
        field-specific query logic, but uses native async for result fetching.

        Preserves sorted field ordering from ZRANGEBYSCORE when no explicit
        order_by is provided, matching the sync _execute_filter behavior.

        Precedence: explicit order_by > sorted field order > Meta.order_by

        Args:
            **kwargs: Filter parameters (field values, limit, order_by, values)

        Returns:
            List of model instances or dicts (if values= specified)

        Note:
            The filter_for_keys_set() operation currently uses sync Redis due to
            the complexity of field-specific filter_query() implementations.
            Object loading uses native async Redis for better performance on
            bulk data retrieval.
        """
        # Reset geo distances for this query
        self._geo_distances = {}
        self._geo_distance_unit = None

        # Get keys using sync method in thread pool (field query implementations are sync)
        db_keys_set = await to_thread(self.filter_for_keys_set, **kwargs)
        if not len(db_keys_set):
            return []

        # Apply default order_by from Meta if not explicitly provided,
        # but not when sorted field ordering is active (it's a smarter default)
        if (
            "order_by" not in kwargs
            and self.model_class._meta.order_by
            and not getattr(self, "_sorted_field_order", None)
        ):
            kwargs["order_by"] = self.model_class._meta.order_by

        # Use sorted field order if available and no explicit order_by
        sorted_field_order = getattr(self, "_sorted_field_order", None)
        explicit_order_by = kwargs.get("order_by", None)
        # Meta.order_by is a default - sorted field order takes precedence over it
        if sorted_field_order and not explicit_order_by:
            db_keys_set = sorted_field_order  # Use ordered list instead of set

        # Use native async for bulk object loading
        objects = await self._async_get_many_objects(
            self.model_class,
            db_keys_set,
            order_by_attr_name=kwargs.get("order_by", None),
            limit=kwargs.get("limit", None),
            values=kwargs.get("values", None),
        )

        # Apply client-side filters for plain (unindexed) fields
        client_filters = getattr(self, "_pending_client_filters", {})
        if client_filters:
            filtered = []
            for obj in objects:
                match = True
                for field_name, expected_value in client_filters.items():
                    if isinstance(obj, dict):
                        actual = obj.get(field_name)
                    else:
                        actual = getattr(obj, field_name, None)
                    if actual != expected_value:
                        match = False
                        break
                if match:
                    filtered.append(obj)
            objects = filtered

        # Attach geo distances to objects if available
        if self._geo_distances:
            normalized_distances = {}
            for key, dist in self._geo_distances.items():
                if isinstance(key, bytes):
                    normalized_distances[key.decode()] = dist
                else:
                    normalized_distances[key] = dist

            for obj in objects:
                if isinstance(obj, dict):
                    continue
                redis_key = obj.db_key.redis_key
                if isinstance(redis_key, bytes):
                    redis_key = redis_key.decode()
                distance = normalized_distances.get(redis_key)
                if distance is not None:
                    obj._geo_distance = distance
                    obj._geo_distance_unit = self._geo_distance_unit

            model_objects = [o for o in objects if not isinstance(o, dict)]
            dict_objects = [o for o in objects if isinstance(o, dict)]
            model_objects.sort(key=lambda o: getattr(o, "_geo_distance", float("inf")))
            objects = model_objects + dict_objects

        return self.prepare_results(objects, **kwargs)

    async def async_all(self, **kwargs) -> list:
        """Async version of all() using native async Redis.

        Retrieves all model instances using non-blocking I/O.

        Args:
            **kwargs: Optional order_by and values parameters

        Returns:
            List of all model instances or dicts (if values= specified)
        """
        async_redis = await get_async_redis_db()
        redis_db_keys_list = list(
            await async_redis.smembers(
                self.model_class._meta.db_class_set_key.redis_key
            )
        )

        # Apply default order_by from Meta if not explicitly provided
        if "order_by" not in kwargs and self.model_class._meta.order_by:
            kwargs["order_by"] = self.model_class._meta.order_by

        objects = await self._async_get_many_objects(
            self.model_class,
            set(redis_db_keys_list),
            order_by_attr_name=kwargs.get("order_by", None),
            values=kwargs.get("values", None),
        )

        return self.prepare_results(objects, **kwargs)

    async def async_count(self, **kwargs) -> int:
        """Async version of count() using native async Redis.

        Counts model instances matching filter criteria using non-blocking I/O.

        Args:
            **kwargs: Optional filter parameters

        Returns:
            Count of matching instances
        """
        async_redis = await get_async_redis_db()

        if not len(kwargs):
            count = await async_redis.scard(
                self.model_class._meta.db_class_set_key.redis_key
            )
            return int(count or 0)

        # Use sync filter_for_keys_set in thread pool for complex filter logic
        db_keys = await to_thread(self.filter_for_keys_set, **kwargs)
        client_filters = getattr(self, "_pending_client_filters", {})
        if client_filters:
            # Must load objects to apply client-side filters
            objects = await self._async_get_many_objects(self.model_class, db_keys)
            return sum(
                1
                for obj in objects
                if all(
                    getattr(obj, fname, None) == fval
                    for fname, fval in client_filters.items()
                )
            )
        return len(db_keys)

    async def async_keys(self, catchall=False, clean=False, **kwargs) -> list:
        """Async version of keys() using native async Redis.

        Retrieves Redis keys for model instances using non-blocking I/O.

        Args:
            catchall: If True, use KEYS pattern (debug only, not for production)
            clean: If True, clean up orphaned keys (debug only, not for production)
            **kwargs: Additional parameters

        Returns:
            List of Redis keys

        Note:
            The clean operation uses to_thread() as it involves complex pipeline
            operations. Regular key retrieval uses native async.
        """
        if clean:
            # Clean operation is complex with pipelines, use thread pool
            return await to_thread(self.keys, catchall=catchall, clean=clean, **kwargs)

        async_redis = await get_async_redis_db()

        if catchall:
            logger.warning(
                "{catchall} is for debugging purposes only. Not for use in production environment"
            )
            return list(await async_redis.keys(f"*{self.model_class.__name__}*"))
        else:
            return list(
                await async_redis.smembers(
                    self.model_class._meta.db_class_set_key.redis_key
                )
            )

    @classmethod
    async def _async_get_many_objects(
        cls,
        model: "Model",
        db_keys: set,
        order_by_attr_name: str = None,
        limit: int = None,
        values: tuple = None,
        lazy: bool = True,
    ) -> list:
        """Async version of get_many_objects using native async Redis.

        Batch-loads multiple Model instances from Redis using async pipelined
        commands for true non-blocking I/O.

        Args:
            model: The Model class to instantiate
            db_keys: Set of Redis keys to load
            order_by_attr_name: Field to sort by (prefix with "-" for descending)
            limit: Maximum objects to load
            values: Tuple of field names for projection

        Returns:
            List of Model instances, or list of dicts if values is specified.
        """
        from .encoding import decode_popoto_model_hashmap
        from .db_key import DB_key

        async_redis = await get_async_redis_db()
        pipeline = async_redis.pipeline()

        reverse_order = False
        if order_by_attr_name and order_by_attr_name.startswith("-"):
            order_by_attr_name = order_by_attr_name[1:]
            reverse_order = True

        if order_by_attr_name and order_by_attr_name in model._meta.key_field_names:
            field_position = model._meta.get_db_key_index_position(order_by_attr_name)
            db_keys = list(db_keys)
            db_keys.sort(key=lambda key: key.split(b":")[field_position])
            db_keys = (
                list(reversed(db_keys))[:limit] if reverse_order else db_keys[:limit]
            )

        if values:
            if not isinstance(values, tuple):
                raise QueryException(
                    "values takes a tuple. eg. query.filter(values=('name',))"
                )
            elif set(values).issubset(model._meta.key_field_names):
                db_keys = [DB_key.from_redis_key(db_key) for db_key in db_keys]
                return [
                    {
                        field_name: (
                            model._meta.fields[field_name].type(
                                db_key[
                                    model._meta.get_db_key_index_position(field_name)
                                ]
                            )
                            if db_key[model._meta.get_db_key_index_position(field_name)]
                            else None
                        )
                        for field_name in values
                    }
                    for db_key in db_keys
                ]
            else:
                for db_key in db_keys:
                    pipeline.hmget(db_key, values)
                value_lists = await pipeline.execute()
                hashes_list = [
                    {field_name: result[i] for i, field_name in enumerate(values)}
                    for result in value_lists
                ]
        else:
            for db_key in db_keys:
                pipeline.hgetall(db_key)
            hashes_list = await pipeline.execute()

        if {} in hashes_list:
            logger.error(
                "one or more redis keys points to missing objects. Debug with Model.query.keys(clean=True)"
            )

        objects = [
            decode_popoto_model_hashmap(
                model, redis_hash, fields_only=bool(values), lazy=lazy and not values
            )
            for redis_hash in hashes_list
            if redis_hash
        ]
        if not values:
            await to_thread(_fire_on_read, model, objects)
        return objects

get(db_key=None, redis_key=None, **kwargs)

Retrieve a single model instance.

Look up by db_key, redis_key, or keyword field values. Raises :class:QueryException if more than one match is found. Returns None when no match exists.

This method provides multiple retrieval strategies, optimized for different use cases:

  1. Direct key lookup (fastest): If all KeyField values are provided, the Redis key can be computed directly without any search.

  2. Raw Redis key: If you already have the Redis key string (e.g., from a previous query or external source), pass it directly.

  3. Filter fallback: If non-key fields are provided, falls back to filter() but raises an exception if multiple objects match.

Parameters:

Name Type Description Default
db_key DB_key

A DB_key instance pointing to the object

None
redis_key str

The raw Redis key string (e.g., "User:alice:123")

None
**kwargs

Field values to identify the object. If all KeyFields are provided, enables direct lookup.

{}

Returns:

Type Description
Optional[Model]

The matching Model instance, or None if not found.

Raises:

Type Description
QueryException

If the filter matches more than one object. This indicates get() was used incorrectly; use filter() instead.

Example
Direct lookup when all keys are known (single Redis command)

user = User.query.get(username="alice", tenant_id="acme")

Positional redis_key string (e.g. from a previous query or external source)

user = User.query.get("User:alice:acme")

Fallback to filter when using non-key fields

user = User.query.get(email="alice@example.com") # May be slower

Source code in src/popoto/models/query.py
def get(
    self, db_key: DB_key = None, redis_key: str = None, **kwargs
) -> Optional["Model"]:
    """Retrieve a single model instance.

    Look up by *db_key*, *redis_key*, or keyword field values. Raises
    :class:`QueryException` if more than one match is found. Returns
    ``None`` when no match exists.

    This method provides multiple retrieval strategies, optimized for different
    use cases:

    1. **Direct key lookup** (fastest): If all KeyField values are provided,
       the Redis key can be computed directly without any search.

    2. **Raw Redis key**: If you already have the Redis key string (e.g., from
       a previous query or external source), pass it directly.

    3. **Filter fallback**: If non-key fields are provided, falls back to
       `filter()` but raises an exception if multiple objects match.

    Args:
        db_key: A DB_key instance pointing to the object
        redis_key: The raw Redis key string (e.g., "User:alice:123")
        **kwargs: Field values to identify the object. If all KeyFields are
                  provided, enables direct lookup.

    Returns:
        The matching Model instance, or None if not found.

    Raises:
        QueryException: If the filter matches more than one object. This
                       indicates get() was used incorrectly; use filter() instead.

    Example:
        # Direct lookup when all keys are known (single Redis command)
        user = User.query.get(username="alice", tenant_id="acme")

        # Positional redis_key string (e.g. from a previous query or external source)
        user = User.query.get("User:alice:acme")

        # Fallback to filter when using non-key fields
        user = User.query.get(email="alice@example.com")  # May be slower
    """
    if isinstance(db_key, str) and not redis_key:
        redis_key = db_key
        db_key = None

    if (
        not db_key
        and not redis_key
        and all([key in kwargs for key in self.options.key_field_names])
    ):
        db_key = self.model_class(**kwargs).db_key

    if db_key and not redis_key:
        redis_key = db_key.redis_key

    if redis_key:
        from ..models.encoding import decode_popoto_model_hashmap

        hashmap = POPOTO_REDIS_DB.hgetall(redis_key)
        if not hashmap:
            return None
        instance = decode_popoto_model_hashmap(self.model_class, hashmap)
        _fire_on_read(self.model_class, [instance])

    else:
        instances = self.filter(**kwargs)
        if len(instances) > 1:
            raise QueryException(
                f"{self.model_class.__name__} found more than one unique instance. Use `query.filter()`"
            )
        instance = instances[0] if len(instances) == 1 else None

    # or not hasattr(instance, 'db_key')
    return instance or None

get_many(redis_keys, skip_none=False)

Retrieve multiple model instances by their Redis keys in a single pipeline.

Uses a Redis pipeline to batch HGETALL calls, reducing N sequential round-trips to a single pipelined round-trip. Input order is preserved: each position in the returned list corresponds to the same position in redis_keys.

Unlike the internal get_many_objects() static method (which takes a set of bytes keys and silently drops missing entries), this public method takes a list of string keys, preserves order, and returns None for missing keys.

Parameters:

Name Type Description Default
redis_keys list

List of Redis key strings to look up (e.g. ["User:alice:acme", "User:bob:acme"]).

required
skip_none bool

If True, filter out None entries from the result so that only successfully hydrated instances are returned. When False (default), the returned list has the same length as redis_keys with None at positions where the key was missing.

False

Returns:

Type Description
list

List of Model instances (and/or None when skip_none is False).

Example
Bulk hydration after a set-based query

keys = ["Product:widget:001", "Product:widget:002", "Product:widget:003"] products = Product.query.get_many(redis_keys=keys)

[, None, ] -- second key was missing
Skip missing entries

products = Product.query.get_many(redis_keys=keys, skip_none=True)

[, ]
Source code in src/popoto/models/query.py
def get_many(self, redis_keys: list, skip_none: bool = False) -> list:
    """Retrieve multiple model instances by their Redis keys in a single pipeline.

    Uses a Redis pipeline to batch HGETALL calls, reducing N sequential
    round-trips to a single pipelined round-trip. Input order is preserved:
    each position in the returned list corresponds to the same position in
    ``redis_keys``.

    Unlike the internal ``get_many_objects()`` static method (which takes a
    set of bytes keys and silently drops missing entries), this public method
    takes a list of string keys, preserves order, and returns ``None`` for
    missing keys.

    Args:
        redis_keys: List of Redis key strings to look up (e.g.
            ``["User:alice:acme", "User:bob:acme"]``).
        skip_none: If True, filter out ``None`` entries from the result so
            that only successfully hydrated instances are returned. When
            False (default), the returned list has the same length as
            ``redis_keys`` with ``None`` at positions where the key was
            missing.

    Returns:
        List of Model instances (and/or ``None`` when *skip_none* is False).

    Example:
        # Bulk hydration after a set-based query
        keys = ["Product:widget:001", "Product:widget:002", "Product:widget:003"]
        products = Product.query.get_many(redis_keys=keys)
        # [<Product>, None, <Product>]  -- second key was missing

        # Skip missing entries
        products = Product.query.get_many(redis_keys=keys, skip_none=True)
        # [<Product>, <Product>]
    """
    if not redis_keys:
        return []

    from ..models.encoding import decode_popoto_model_hashmap

    pipeline = POPOTO_REDIS_DB.pipeline()
    for key in redis_keys:
        pipeline.hgetall(key)
    hashes_list = pipeline.execute()

    results = []
    live_instances = []
    for hashmap in hashes_list:
        if hashmap:
            instance = decode_popoto_model_hashmap(self.model_class, hashmap)
            results.append(instance)
            live_instances.append(instance)
        else:
            results.append(None)

    if live_instances:
        _fire_on_read(self.model_class, live_instances)

    if skip_none:
        return [r for r in results if r is not None]
    return results

keys(catchall=False, clean=False, **kwargs)

Return a list of Redis key bytes for all instances of this model.

By default, returns keys from the Model's class set (a Redis SET that tracks all instances). This is O(N) where N is the number of instances.

Parameters:

Name Type Description Default
catchall

Debug flag. If True, uses Redis KEYS command with wildcard pattern matching. This scans ALL keys in Redis and should NEVER be used in production. Useful for finding orphaned keys that aren't in the class set.

False
clean

Debug flag. If True, removes dangling references from index sets. This repairs inconsistencies where the class set or field indexes reference objects that no longer exist. Run this if you see "missing objects" errors in query results.

False
**kwargs

Reserved for future filtering capabilities.

{}

Returns:

Type Description
list

List of Redis key strings (bytes) for all Model instances.

Warning

Both catchall and clean use Redis KEYS command, which blocks the server and scans all keys. Never use these in production environments.

Example
Normal usage - get all keys from class set

all_keys = Product.query.keys()

Debug - find any keys matching the model name

orphaned = Product.query.keys(catchall=True)

Repair - clean up dangling references

Product.query.keys(clean=True)

Source code in src/popoto/models/query.py
def keys(self, catchall=False, clean=False, **kwargs) -> list:
    """Return a list of Redis key bytes for all instances of this model.

    By default, returns keys from the Model's class set (a Redis SET that
    tracks all instances). This is O(N) where N is the number of instances.

    Args:
        catchall: Debug flag. If True, uses Redis KEYS command with wildcard
                 pattern matching. This scans ALL keys in Redis and should
                 NEVER be used in production. Useful for finding orphaned keys
                 that aren't in the class set.
        clean: Debug flag. If True, removes dangling references from index sets.
              This repairs inconsistencies where the class set or field indexes
              reference objects that no longer exist. Run this if you see
              "missing objects" errors in query results.
        **kwargs: Reserved for future filtering capabilities.

    Returns:
        List of Redis key strings (bytes) for all Model instances.

    Warning:
        Both `catchall` and `clean` use Redis KEYS command, which blocks the
        server and scans all keys. Never use these in production environments.

    Example:
        # Normal usage - get all keys from class set
        all_keys = Product.query.keys()

        # Debug - find any keys matching the model name
        orphaned = Product.query.keys(catchall=True)

        # Repair - clean up dangling references
        Product.query.keys(clean=True)
    """
    if clean:
        logger.warning(
            "Query.keys(clean=True) is deprecated. Use Model.clean_indexes() for production-safe orphan cleanup."
        )
        pipeline = POPOTO_REDIS_DB.pipeline()
        from ..fields.key_field_mixin import KeyFieldMixin
        from ..fields.relationship import Relationship

        for db_key in list(
            POPOTO_REDIS_DB.smembers(
                self.model_class._meta.db_class_set_key.redis_key
            )
        ):
            hash = POPOTO_REDIS_DB.hgetall(db_key)
            if not len(hash):
                pipeline = pipeline.srem(
                    self.model_class._meta.db_class_set_key.redis_key, db_key
                )

        # find
        for field_name, field in self.model_class._meta.fields.items():  # 3
            if not isinstance(field, (KeyFieldMixin, Relationship)):
                continue
            field_key_prefix = field.get_special_use_field_db_key(
                self.model_class, field_name
            )
            for field_key in POPOTO_REDIS_DB.keys(f"{field_key_prefix}:*"):
                for object_key in POPOTO_REDIS_DB.smembers(field_key):
                    hash = POPOTO_REDIS_DB.hgetall(object_key)
                    if not len(hash):
                        pipeline = pipeline.srem(field_key, object_key)

        pipeline.execute()

    if catchall:
        logger.warning(
            "{catchall} is for debugging purposes only. Not for use in production environment"
        )
        return list(POPOTO_REDIS_DB.keys(f"*{self.model_class.__name__}*"))
    else:
        return list(
            POPOTO_REDIS_DB.smembers(
                self.model_class._meta.db_class_set_key.redis_key
            )
        )

all(**kwargs)

Return all instances, with optional order_by, limit, and values.

Fetches every object tracked in the Model's class set. For large datasets, consider using filter() with appropriate constraints or count() to first assess the result size.

Parameters:

Name Type Description Default
**kwargs

Supports the same result modifiers as filter(): - values: tuple of field names to return as dicts instead of objects - order_by: field name to sort by (prefix with "-" for descending) - limit: maximum number of results to return

{}

Returns:

Type Description
list

List of Model instances, or list of dicts if values is specified.

Example
Get all users

users = User.query.all()

Get all users, sorted by creation date, newest first

users = User.query.all(order_by="-created_at", limit=100)

Get only names and emails (more efficient for large objects)

user_data = User.query.all(values=("name", "email"))

Source code in src/popoto/models/query.py
def all(self, **kwargs) -> list:
    """Return all instances, with optional ``order_by``, ``limit``, and ``values``.

    Fetches every object tracked in the Model's class set. For large datasets,
    consider using `filter()` with appropriate constraints or `count()` to
    first assess the result size.

    Args:
        **kwargs: Supports the same result modifiers as `filter()`:
            - values: tuple of field names to return as dicts instead of objects
            - order_by: field name to sort by (prefix with "-" for descending)
            - limit: maximum number of results to return

    Returns:
        List of Model instances, or list of dicts if `values` is specified.

    Example:
        # Get all users
        users = User.query.all()

        # Get all users, sorted by creation date, newest first
        users = User.query.all(order_by="-created_at", limit=100)

        # Get only names and emails (more efficient for large objects)
        user_data = User.query.all(values=("name", "email"))
    """
    redis_db_keys_list = self.keys()

    # Apply default order_by from Meta if not explicitly provided
    if "order_by" not in kwargs and self.model_class._meta.order_by:
        kwargs["order_by"] = self.model_class._meta.order_by

    return self.prepare_results(
        Query.get_many_objects(
            self.model_class,
            set(redis_db_keys_list),
            order_by_attr_name=kwargs.get("order_by", None),
            values=kwargs.get("values", None),
        ),
        **kwargs,
    )

filter_for_keys_set(**kwargs)

Execute filter logic and return matching Redis keys (without loading objects).

This is the core filtering engine. It routes filter parameters to the appropriate field types and combines their results via set intersection. Separated from filter() to support count() without the overhead of object instantiation.

Processing Order:
  1. Sorted fields first: SortedFields use Redis sorted sets with range queries (ZRANGEBYSCORE), which are often more selective than key lookups. Additionally, sorted fields may have partition dependencies (partition_by) that consume other filter parameters.

  2. Remaining fields: KeyFields and other field types are processed after sorted fields, using whatever parameters remain unconsumed.

  3. Set intersection: Each field's filter returns a set of matching keys. The final result is the intersection of all sets, implementing AND logic.

Parameters:

Name Type Description Default
**kwargs

Filter parameters. Each parameter is matched to a field that supports it. Reserved params (limit, order_by, values) are excluded from field matching.

{}

Returns:

Type Description
set

Set of Redis key strings (bytes) matching ALL filter criteria.

set

Returns empty set if no filters provided or no matches found.

Raises:

Type Description
QueryException

If any kwargs don't match a known filter parameter. This prevents silent failures from typos.

Note

This method does not apply ordering or limits - it only identifies matching keys. Use filter() for the complete query pipeline.

Source code in src/popoto/models/query.py
def filter_for_keys_set(self, **kwargs) -> set:
    """
    Execute filter logic and return matching Redis keys (without loading objects).

    This is the core filtering engine. It routes filter parameters to the
    appropriate field types and combines their results via set intersection.
    Separated from `filter()` to support `count()` without the overhead of
    object instantiation.

    Processing Order:
    ----------------
    1. **Sorted fields first**: SortedFields use Redis sorted sets with range
       queries (ZRANGEBYSCORE), which are often more selective than key lookups.
       Additionally, sorted fields may have partition dependencies (`partition_by`)
       that consume other filter parameters.

    2. **Remaining fields**: KeyFields and other field types are processed
       after sorted fields, using whatever parameters remain unconsumed.

    3. **Set intersection**: Each field's filter returns a set of matching keys.
       The final result is the intersection of all sets, implementing AND logic.

    Args:
        **kwargs: Filter parameters. Each parameter is matched to a field that
                 supports it. Reserved params (limit, order_by, values) are
                 excluded from field matching.

    Returns:
        Set of Redis key strings (bytes) matching ALL filter criteria.
        Returns empty set if no filters provided or no matches found.

    Raises:
        QueryException: If any kwargs don't match a known filter parameter.
                      This prevents silent failures from typos.

    Note:
        This method does not apply ordering or limits - it only identifies
        matching keys. Use `filter()` for the complete query pipeline.
    """
    db_keys_sets = []
    self._sorted_field_order = None
    self._sorted_field_name = None
    self._pending_client_filters = {}
    yet_employed_kwargs_set = set(kwargs.keys()).difference(
        {"limit", "order_by", "values"}
    )
    if not len(yet_employed_kwargs_set):
        # No filter criteria - return all keys (same as all())
        return set(self.keys())

    # todo: use redis.SINTER for keyfield exact match filters

    # do sorted_fields first - because they can obviate some keyfield filters
    for field_name in self.options.sorted_field_names:
        field = self.options.fields[field_name]
        if not len(
            yet_employed_kwargs_set
            & self.options.filter_query_params_by_field[field_name]
        ):
            continue  # this field cannot use any of the available filter params
        logger.debug(
            f"query on {field_name} with {self.options.filter_query_params_by_field[field_name]}"
        )
        logger.debug(
            {
                k: kwargs[k]
                for k in self.options.filter_query_params_by_field[field_name]
                if k in kwargs
            }
        )
        result = field.__class__.filter_query(
            self.model_class, field_name, **kwargs
        )
        # Handle tuple return from GeoField with distances
        if isinstance(result, tuple) and len(result) == 3:
            keys_set, distances, unit = result
            self._geo_distances.update(distances)
            self._geo_distance_unit = unit
            db_keys_sets.append(keys_set)
        else:
            # result is now a list (preserving ZRANGEBYSCORE order)
            if self._sorted_field_order is None:
                self._sorted_field_order = result
                self._sorted_field_name = field_name
            db_keys_sets.append(set(result))  # convert to set for intersection
        yet_employed_kwargs_set = yet_employed_kwargs_set.difference(
            self.options.filter_query_params_by_field[field_name]
        ).difference(
            set(field.partition_by)
        )  # also remove the required partition_by field names

    for field_name in self.options.filter_query_params_by_field:
        if field_name in self.options.sorted_field_names:
            continue  # already handled
        params_for_field = yet_employed_kwargs_set & set(
            self.options.filter_query_params_by_field[field_name]
        )
        if not params_for_field:
            continue  # this field cannot use any of the available filter params

        field = self.options.fields[field_name]
        logger.debug(f"query on {field_name} with {params_for_field}")
        logger.debug({k: kwargs[k] for k in params_for_field})
        result = field.__class__.filter_query(
            self.model_class, field_name, **{k: kwargs[k] for k in params_for_field}
        )
        # Handle tuple return from GeoField with distances
        if isinstance(result, tuple) and len(result) == 3:
            keys_set, distances, unit = result
            self._geo_distances.update(distances)
            self._geo_distance_unit = unit
            db_keys_sets.append(keys_set)
        else:
            db_keys_sets.append(result)
        yet_employed_kwargs_set = yet_employed_kwargs_set.difference(
            params_for_field
        )

    # Separate plain field params (client-side filter) from truly unknown params
    if yet_employed_kwargs_set:
        plain_field_filters = {}
        unknown_params = set()
        for param in yet_employed_kwargs_set:
            if param in self.options.fields:
                plain_field_filters[param] = kwargs[param]
                logger.debug(
                    f"Client-side filter on unindexed field '{param}' "
                    f"— consider using SortedField for better performance"
                )
            else:
                unknown_params.add(param)
        if unknown_params:
            raise QueryException(
                f"Invalid filter parameters: {','.join(unknown_params)}"
            )
        self._pending_client_filters = plain_field_filters

    logger.debug(db_keys_sets)
    if not len(db_keys_sets):
        if self._pending_client_filters:
            # Only plain field filters — load all keys for client-side filtering
            return set(self.keys())
        return set()
    # return intersection of all the db keys sets, effectively &&-ing all filters
    intersection = set.intersection(*db_keys_sets)
    if self._sorted_field_order is not None:
        matched_keys = intersection
        self._sorted_field_order = [
            k for k in self._sorted_field_order if k in matched_keys
        ]
    return intersection

filter(*args, **kwargs)

Query for Model instances matching the specified criteria.

This is the primary query method for Popoto, providing Django-like filtering syntax with Redis-optimized execution. All filter parameters are AND-ed together by default. Use Q objects for OR logic and complex combinations.

Returns a QueryBuilder that supports method chaining. The QueryBuilder also behaves like a list for backward compatibility - you can iterate over it or pass it to len() and it will execute the query automatically.

Filter Parameters:

Available filters depend on the field types in your Model:

KeyField filters: - field=value - Exact match - field__in=[v1, v2] - Match any value in list - field__contains="x" - Substring match (uses Redis KEYS, slow) - field__startswith="x" - Prefix match - field__endswith="x" - Suffix match - field__isnull=True/False - Null check

SortedField filters: - field=value - Exact match - field__gt=value - Greater than - field__gte=value - Greater than or equal - field__lt=value - Less than - field__lte=value - Less than or equal

Q Objects (for complex logic): - Q(field=value) - Basic Q object (equivalent to kwargs) - Q(...) | Q(...) - OR logic (union of results) - Q(...) & Q(...) - AND logic (intersection of results) - ~Q(...) - NOT logic (exclusion)

Result Modifiers (kwargs API):
  • order_by="field" - Sort ascending by field
  • order_by="-field" - Sort descending by field
  • limit=N - Return at most N results
  • values=("field1", "field2") - Return dicts with only specified fields instead of full Model instances (projection query, more efficient)
Chainable Methods:
  • .filter(**kwargs) - Add more filter criteria
  • .order_by("field") - Sort results (prefix with "-" for descending)
  • .limit(n) - Limit number of results
  • .values("field1", "field2") - Return dicts instead of objects
  • .all() - Execute and return results
  • .first() - Execute and return first result or None
  • .count() - Count matching results without loading objects

Parameters:

Name Type Description Default
*args

Q objects for complex query expressions

()
**kwargs

Filter parameters and result modifiers as described above.

{}

Returns:

Type Description
QueryBuilder

QueryBuilder that can be chained or iterated directly.

Raises:

Type Description
QueryException

If unknown filter parameters are provided.

Example
Original kwargs API (still fully supported)

users = User.query.filter( status="active", tier="premium", created_at__gte=datetime(2024, 1, 1), order_by="-created_at", limit=50 )

Chainable API

users = User.query.filter(status="active").order_by("-created_at").limit(50).all()

Chain multiple filters

users = User.query.filter(status="active").filter(tier="premium").all()

Efficient projection - only load specific fields

emails = User.query.filter(status="active").values("email", "name").all()

OR logic with Q objects

users = User.query.filter(Q(status="active") | Q(type="premium"))

Complex combinations

users = User.query.filter( (Q(status="active") | Q(type="premium")) & Q(rating__gt=3.0) )

Source code in src/popoto/models/query.py
def filter(self, *args, **kwargs) -> "QueryBuilder":
    """
    Query for Model instances matching the specified criteria.

    This is the primary query method for Popoto, providing Django-like filtering
    syntax with Redis-optimized execution. All filter parameters are AND-ed
    together by default. Use Q objects for OR logic and complex combinations.

    Returns a QueryBuilder that supports method chaining. The QueryBuilder also
    behaves like a list for backward compatibility - you can iterate over it or
    pass it to len() and it will execute the query automatically.

    Filter Parameters:
    -----------------
    Available filters depend on the field types in your Model:

    **KeyField filters:**
    - `field=value` - Exact match
    - `field__in=[v1, v2]` - Match any value in list
    - `field__contains="x"` - Substring match (uses Redis KEYS, slow)
    - `field__startswith="x"` - Prefix match
    - `field__endswith="x"` - Suffix match
    - `field__isnull=True/False` - Null check

    **SortedField filters:**
    - `field=value` - Exact match
    - `field__gt=value` - Greater than
    - `field__gte=value` - Greater than or equal
    - `field__lt=value` - Less than
    - `field__lte=value` - Less than or equal

    **Q Objects (for complex logic):**
    - `Q(field=value)` - Basic Q object (equivalent to kwargs)
    - `Q(...) | Q(...)` - OR logic (union of results)
    - `Q(...) & Q(...)` - AND logic (intersection of results)
    - `~Q(...)` - NOT logic (exclusion)

    Result Modifiers (kwargs API):
    -----------------------------
    - `order_by="field"` - Sort ascending by field
    - `order_by="-field"` - Sort descending by field
    - `limit=N` - Return at most N results
    - `values=("field1", "field2")` - Return dicts with only specified fields
      instead of full Model instances (projection query, more efficient)

    Chainable Methods:
    -----------------
    - `.filter(**kwargs)` - Add more filter criteria
    - `.order_by("field")` - Sort results (prefix with "-" for descending)
    - `.limit(n)` - Limit number of results
    - `.values("field1", "field2")` - Return dicts instead of objects
    - `.all()` - Execute and return results
    - `.first()` - Execute and return first result or None
    - `.count()` - Count matching results without loading objects

    Args:
        *args: Q objects for complex query expressions
        **kwargs: Filter parameters and result modifiers as described above.

    Returns:
        QueryBuilder that can be chained or iterated directly.

    Raises:
        QueryException: If unknown filter parameters are provided.

    Example:
        # Original kwargs API (still fully supported)
        users = User.query.filter(
            status="active",
            tier="premium",
            created_at__gte=datetime(2024, 1, 1),
            order_by="-created_at",
            limit=50
        )

        # Chainable API
        users = User.query.filter(status="active").order_by("-created_at").limit(50).all()

        # Chain multiple filters
        users = User.query.filter(status="active").filter(tier="premium").all()

        # Efficient projection - only load specific fields
        emails = User.query.filter(status="active").values("email", "name").all()

        # OR logic with Q objects
        users = User.query.filter(Q(status="active") | Q(type="premium"))

        # Complex combinations
        users = User.query.filter(
            (Q(status="active") | Q(type="premium")) & Q(rating__gt=3.0)
        )
    """
    from .q import Q
    from .expressions import Expression, CombinedExpression

    # Process args - can be Q objects or Expression objects
    q_objects = []
    for arg in args:
        if isinstance(arg, Q):
            q_objects.append(arg)
        elif isinstance(arg, (Expression, CombinedExpression)):
            # Convert Expression to Q object
            q_objects.append(arg.to_q())

    # Extract result modifiers from kwargs for the QueryBuilder
    filters = {
        k: v for k, v in kwargs.items() if k not in {"limit", "order_by", "values"}
    }
    builder = QueryBuilder(self, filters, q_objects)

    # Apply result modifiers if provided in kwargs (for backward compatibility)
    if "limit" in kwargs:
        builder._limit_value = kwargs["limit"]
    if "order_by" in kwargs:
        builder._order_by_value = kwargs["order_by"]
    if "values" in kwargs:
        builder._values_tuple = kwargs["values"]

    return builder

top_by_decay(field_name=None, n=10, decay_rate=None, base_score_field=None)

Return top-N instances ranked by time-decayed score.

Convenience method that creates a QueryBuilder and delegates. For partitioned fields, use query.filter(partition=value).top_by_decay().

Parameters:

Name Type Description Default
field_name

Name of a DecayingSortedField on the model. Optional when the model has exactly one DecayingSortedField (or subclass).

None
n

Maximum number of results to return. Default 10.

10
decay_rate

Override the field's decay_rate for this query.

None
base_score_field

Override the field's base_score_field for this query.

None

Returns:

Type Description

List of model instances in decayed-score order.

Source code in src/popoto/models/query.py
def top_by_decay(
    self, field_name=None, n=10, decay_rate=None, base_score_field=None
):
    """Return top-N instances ranked by time-decayed score.

    Convenience method that creates a QueryBuilder and delegates.
    For partitioned fields, use query.filter(partition=value).top_by_decay().

    Args:
        field_name: Name of a DecayingSortedField on the model. Optional
            when the model has exactly one DecayingSortedField (or subclass).
        n: Maximum number of results to return. Default 10.
        decay_rate: Override the field's decay_rate for this query.
        base_score_field: Override the field's base_score_field for this query.

    Returns:
        List of model instances in decayed-score order.
    """
    builder = QueryBuilder(self)
    return builder.top_by_decay(
        field_name, n=n, decay_rate=decay_rate, base_score_field=base_score_field
    )

composite_score(indexes, limit=10, aggregate='SUM', min_score=None, post_filter=None, co_occurrence_boost=None, similarity_boost=None, temperature=1.0)

Return top-K instances ranked by weighted composite score.

Convenience method that creates a QueryBuilder and delegates. For partitioned fields, use query.filter(partition=value).composite_score(...).

Parameters:

Name Type Description Default
indexes dict

Mapping of field names to weights.

required
limit int

Maximum results to return. Default 10.

10
aggregate str

Aggregation mode: "SUM", "MIN", or "MAX".

'SUM'
min_score float

Optional minimum composite score threshold.

None
post_filter Optional[Callable[[str, float], bool]]

Optional callable (redis_key, score) -> bool.

None
co_occurrence_boost dict

Optional {redis_key: weight} dict.

None
similarity_boost dict

Optional {redis_key: score} dict from semantic_search().

None
temperature float

Score scaling factor. Default 1.0 (no scaling). Must be > 0.

1.0

Returns:

Type Description
list

List of model instances ranked by composite score.

Source code in src/popoto/models/query.py
def composite_score(
    self,
    indexes: dict,
    limit: int = 10,
    aggregate: str = "SUM",
    min_score: float = None,
    post_filter: Optional[Callable[[str, float], bool]] = None,
    co_occurrence_boost: dict = None,
    similarity_boost: dict = None,
    temperature: float = 1.0,
) -> list:
    """Return top-K instances ranked by weighted composite score.

    Convenience method that creates a QueryBuilder and delegates.
    For partitioned fields, use
    ``query.filter(partition=value).composite_score(...)``.

    Args:
        indexes: Mapping of field names to weights.
        limit: Maximum results to return. Default 10.
        aggregate: Aggregation mode: "SUM", "MIN", or "MAX".
        min_score: Optional minimum composite score threshold.
        post_filter: Optional callable (redis_key, score) -> bool.
        co_occurrence_boost: Optional {redis_key: weight} dict.
        similarity_boost: Optional {redis_key: score} dict from
            semantic_search().
        temperature: Score scaling factor. Default 1.0 (no scaling).
            Must be > 0.

    Returns:
        List of model instances ranked by composite score.
    """
    builder = QueryBuilder(self)
    return builder.composite_score(
        indexes=indexes,
        limit=limit,
        aggregate=aggregate,
        min_score=min_score,
        post_filter=post_filter,
        co_occurrence_boost=co_occurrence_boost,
        similarity_boost=similarity_boost,
        temperature=temperature,
    )

Return top-K instances ranked by semantic similarity.

Convenience method that creates a QueryBuilder and delegates.

Parameters:

Name Type Description Default
query_text str

The text to search for semantically.

required
indexes dict

Optional field-weight mapping for composite scoring.

None
limit int

Maximum results to return. Default 10.

10
aggregate str

Aggregation mode. Default "SUM".

'SUM'
min_score float

Optional minimum score threshold.

None
post_filter Optional[Callable[[str, float], bool]]

Optional callable (redis_key, score) -> bool.

None
co_occurrence_boost dict

Optional {redis_key: weight} dict.

None
temperature float

Score scaling factor. Default 1.0.

1.0

Returns:

Type Description
list

List of model instances ranked by semantic similarity.

Source code in src/popoto/models/query.py
def semantic_search(
    self,
    query_text: str,
    indexes: dict = None,
    limit: int = 10,
    aggregate: str = "SUM",
    min_score: float = None,
    post_filter: Optional[Callable[[str, float], bool]] = None,
    co_occurrence_boost: dict = None,
    temperature: float = 1.0,
) -> list:
    """Return top-K instances ranked by semantic similarity.

    Convenience method that creates a QueryBuilder and delegates.

    Args:
        query_text: The text to search for semantically.
        indexes: Optional field-weight mapping for composite scoring.
        limit: Maximum results to return. Default 10.
        aggregate: Aggregation mode. Default "SUM".
        min_score: Optional minimum score threshold.
        post_filter: Optional callable (redis_key, score) -> bool.
        co_occurrence_boost: Optional {redis_key: weight} dict.
        temperature: Score scaling factor. Default 1.0.

    Returns:
        List of model instances ranked by semantic similarity.
    """
    builder = QueryBuilder(self)
    return builder.semantic_search(
        query_text=query_text,
        indexes=indexes,
        limit=limit,
        aggregate=aggregate,
        min_score=min_score,
        post_filter=post_filter,
        co_occurrence_boost=co_occurrence_boost,
        temperature=temperature,
    )

Return instances ranked by BM25 keyword relevance.

Convenience method that creates a QueryBuilder and delegates.

Parameters:

Name Type Description Default
query_text str

The search query string.

required
field str

Name of the BM25Field to search. Optional when the model has exactly one BM25Field.

None
limit int

Maximum results to return. Default 10.

10

Returns:

Type Description
list

List of model instances ranked by BM25 score (descending).

Source code in src/popoto/models/query.py
def keyword_search(
    self,
    query_text: str,
    field: str = None,
    limit: int = 10,
) -> list:
    """Return instances ranked by BM25 keyword relevance.

    Convenience method that creates a QueryBuilder and delegates.

    Args:
        query_text: The search query string.
        field: Name of the BM25Field to search. Optional when the model
            has exactly one BM25Field.
        limit: Maximum results to return. Default 10.

    Returns:
        List of model instances ranked by BM25 score (descending).
    """
    builder = QueryBuilder(self)
    return builder.keyword_search(
        query_text=query_text,
        field=field,
        limit=limit,
    )

fuse(k=60, limit=10, post_filter=None, **ranked_lists)

Reciprocal Rank Fusion across heterogeneous ranked lists.

Convenience method that creates a QueryBuilder and delegates.

Parameters:

Name Type Description Default
k int

RRF constant (default 60).

60
limit int

Maximum results to return. Default 10.

10
post_filter Optional[Callable]

Optional (redis_key, rrf_score) -> bool callback.

None
**ranked_lists

Named ranked lists of (redis_key, score) tuples.

{}

Returns:

Type Description
list

List of model instances ranked by RRF score (descending).

Source code in src/popoto/models/query.py
def fuse(
    self,
    k: int = 60,
    limit: int = 10,
    post_filter: Optional[Callable] = None,
    **ranked_lists,
) -> list:
    """Reciprocal Rank Fusion across heterogeneous ranked lists.

    Convenience method that creates a QueryBuilder and delegates.

    Args:
        k: RRF constant (default 60).
        limit: Maximum results to return. Default 10.
        post_filter: Optional (redis_key, rrf_score) -> bool callback.
        **ranked_lists: Named ranked lists of (redis_key, score) tuples.

    Returns:
        List of model instances ranked by RRF score (descending).
    """
    builder = QueryBuilder(self)
    return builder.fuse(
        k=k,
        limit=limit,
        post_filter=post_filter,
        **ranked_lists,
    )

prepare_results(objects, order_by='', values=(), limit=None, **kwargs)

Apply sorting and limiting to query results.

This is a post-processing step that operates on already-loaded objects. For large result sets, sorting happens in Python rather than Redis, which may have performance implications.

Design Note:

Sorting is applied after fetching because Redis doesn't support cross-key sorting natively. SortedFields maintain their own sorted sets for range queries, but general-purpose sorting across arbitrary fields requires loading objects first.

For KeyFields, get_many_objects() can optimize sorting when the sort field is part of the key (extracting sort values from key strings without loading full objects). This method handles the remaining cases.

Parameters:

Name Type Description Default
objects

List of Model instances or dicts (if values projection was used)

required
order_by str

Field name to sort by. Prefix with "-" for descending order.

''
values tuple

Tuple of field names if projection was used (affects sort behavior)

()
limit int

Maximum number of results to return after sorting

None
**kwargs

Unused, accepts extra params for forward compatibility

{}

Returns:

Type Description

Sorted and limited list of objects or dicts.

Raises:

Type Description
QueryException

If order_by field doesn't exist or isn't included in values tuple when using projection.

Note

Null values in the sort field are handled by substituting the field's default type value (e.g., "" for str, 0 for int), ensuring consistent ordering.

Source code in src/popoto/models/query.py
def prepare_results(
    self,
    objects,
    order_by: str = "",
    values: tuple = (),
    limit: int = None,
    **kwargs,
):
    """Apply sorting and limiting to query results.

    This is a post-processing step that operates on already-loaded objects.
    For large result sets, sorting happens in Python rather than Redis,
    which may have performance implications.

    Design Note:
    -----------
    Sorting is applied after fetching because Redis doesn't support cross-key
    sorting natively. SortedFields maintain their own sorted sets for range
    queries, but general-purpose sorting across arbitrary fields requires
    loading objects first.

    For KeyFields, `get_many_objects()` can optimize sorting when the sort
    field is part of the key (extracting sort values from key strings without
    loading full objects). This method handles the remaining cases.

    Args:
        objects: List of Model instances or dicts (if values projection was used)
        order_by: Field name to sort by. Prefix with "-" for descending order.
        values: Tuple of field names if projection was used (affects sort behavior)
        limit: Maximum number of results to return after sorting
        **kwargs: Unused, accepts extra params for forward compatibility

    Returns:
        Sorted and limited list of objects or dicts.

    Raises:
        QueryException: If order_by field doesn't exist or isn't included in
                       values tuple when using projection.

    Note:
        Null values in the sort field are handled by substituting the field's
        default type value (e.g., "" for str, 0 for int), ensuring consistent
        ordering.
    """
    # Apply default order_by from Meta if not explicitly provided
    if not order_by and self.model_class._meta.order_by:
        order_by = self.model_class._meta.order_by

    reverse_order = False
    if order_by and order_by.startswith("-"):
        reverse_order = True
        order_by = order_by[1:]
    if order_by:
        order_by_attr_name = order_by
        if (
            not isinstance(order_by_attr_name, str)
        ) or order_by_attr_name not in self.model_class._meta.fields:
            raise QueryException(
                f"order_by={order_by_attr_name} must be a field name (str)"
            )
        attr_type = self.model_class._meta.fields[order_by_attr_name].type
        if values and order_by_attr_name not in values:
            raise QueryException(
                "field must be included in values=(fieldnames) in order to use order_by"
            )
        elif values:
            objects.sort(key=lambda item: item.get(order_by_attr_name))
        else:
            objects.sort(
                key=lambda item: getattr(item, order_by_attr_name) or attr_type()
            )
        objects = (
            list(reversed(objects))[:limit] if reverse_order else objects[:limit]
        )

    if limit and len(objects) > limit:
        objects = objects[:limit]

    return objects

count(**kwargs)

Count instances matching the given filters (or all if no filters).

More efficient than len(filter(...)) because it avoids object instantiation. For unfiltered counts, uses Redis SCARD which is O(1).

Parameters:

Name Type Description Default
**kwargs

Same filter parameters as filter(). If empty, counts all instances of the Model.

{}

Returns:

Type Description
int

Integer count of matching instances.

Performance:
  • No filters: O(1) using Redis SCARD on the class set
  • With filters: O(N) where N is the result of filter intersection, but still avoids the overhead of HGETALL and object instantiation
Example
O(1) - count all products

total = Product.query.count()

Filtered count - still more efficient than len(filter(...))

active = Product.query.count(status="active", category="electronics")

Note

Future optimization: Could use Redis SINTERCARD (Redis 7.0+) for filtered counts to avoid materializing the full key set.

Source code in src/popoto/models/query.py
def count(self, **kwargs) -> int:
    """Count instances matching the given filters (or all if no filters).

    More efficient than `len(filter(...))` because it avoids object
    instantiation. For unfiltered counts, uses Redis SCARD which is O(1).

    Args:
        **kwargs: Same filter parameters as `filter()`. If empty, counts
                 all instances of the Model.

    Returns:
        Integer count of matching instances.

    Performance:
    -----------
    - No filters: O(1) using Redis SCARD on the class set
    - With filters: O(N) where N is the result of filter intersection,
      but still avoids the overhead of HGETALL and object instantiation

    Example:
        # O(1) - count all products
        total = Product.query.count()

        # Filtered count - still more efficient than len(filter(...))
        active = Product.query.count(status="active", category="electronics")

    Note:
        Future optimization: Could use Redis SINTERCARD (Redis 7.0+) for
        filtered counts to avoid materializing the full key set.
    """
    if not len(kwargs):
        return int(
            POPOTO_REDIS_DB.scard(self.model_class._meta.db_class_set_key.redis_key)
            or 0
        )
    db_keys = self.filter_for_keys_set(**kwargs)
    client_filters = getattr(self, "_pending_client_filters", {})
    if client_filters:
        # Must load objects to apply client-side filters
        objects = Query.get_many_objects(self.model_class, db_keys)
        return sum(
            1
            for obj in objects
            if all(
                getattr(obj, fname, None) == fval
                for fname, fval in client_filters.items()
            )
        )
    return len(db_keys)

get_many_objects(model, db_keys, order_by_attr_name=None, limit=None, values=None, lazy=True) classmethod

Batch-load multiple Model instances from Redis using pipelined commands.

This is the core bulk retrieval method, optimized for performance through several strategies:

  1. Redis Pipelines: All HGETALL/HMGET commands are batched into a single pipeline, reducing network round trips from O(N) to O(1).

  2. KeyField Sorting Optimization: If sorting by a KeyField, sort values can be extracted directly from key strings without loading objects, and limit can be applied BEFORE loading (reducing Redis commands).

  3. Projection Optimization: With values tuple:

  4. If ALL requested fields are KeyFields, data is extracted from key strings without ANY Redis commands.
  5. Otherwise, uses HMGET instead of HGETALL for partial field retrieval.

Parameters:

Name Type Description Default
model Model

The Model class to instantiate

required
db_keys set

Set of Redis keys to load

required
order_by_attr_name str

Field to sort by (prefix with "-" for descending). If this is a KeyField, sorting is optimized.

None
limit int

Maximum objects to load. Applied early if sorting by KeyField.

None
values tuple

Tuple of field names for projection. If specified, returns dicts instead of Model instances.

None

Returns:

Type Description
list

List of Model instances, or list of dicts if values is specified.

list

Objects for missing/deleted keys are silently excluded (with a

list

warning logged).

Raises:

Type Description
QueryException

If values is not a tuple.

Performance Notes:
  • N objects without projection: 1 pipeline with N HGETALL commands
  • N objects with projection: 1 pipeline with N HMGET commands
  • Projection with only KeyFields: 0 Redis commands (parsed from keys)
  • Sorting by KeyField with limit: Only limit objects loaded
Example
Internal usage - typically called by filter() or all()

objects = Query.get_many_objects( User, {b"User:alice", b"User:bob"}, order_by_attr_name="username", limit=10 )

Source code in src/popoto/models/query.py
@classmethod
def get_many_objects(
    cls,
    model: "Model",
    db_keys: set,
    order_by_attr_name: str = None,
    limit: int = None,
    values: tuple = None,
    lazy: bool = True,
) -> list:
    """
    Batch-load multiple Model instances from Redis using pipelined commands.

    This is the core bulk retrieval method, optimized for performance through
    several strategies:

    1. **Redis Pipelines**: All HGETALL/HMGET commands are batched into a single
       pipeline, reducing network round trips from O(N) to O(1).

    2. **KeyField Sorting Optimization**: If sorting by a KeyField, sort values
       can be extracted directly from key strings without loading objects,
       and limit can be applied BEFORE loading (reducing Redis commands).

    3. **Projection Optimization**: With `values` tuple:
       - If ALL requested fields are KeyFields, data is extracted from key
         strings without ANY Redis commands.
       - Otherwise, uses HMGET instead of HGETALL for partial field retrieval.

    Args:
        model: The Model class to instantiate
        db_keys: Set of Redis keys to load
        order_by_attr_name: Field to sort by (prefix with "-" for descending).
                           If this is a KeyField, sorting is optimized.
        limit: Maximum objects to load. Applied early if sorting by KeyField.
        values: Tuple of field names for projection. If specified, returns
               dicts instead of Model instances.

    Returns:
        List of Model instances, or list of dicts if values is specified.
        Objects for missing/deleted keys are silently excluded (with a
        warning logged).

    Raises:
        QueryException: If values is not a tuple.

    Performance Notes:
    -----------------
    - N objects without projection: 1 pipeline with N HGETALL commands
    - N objects with projection: 1 pipeline with N HMGET commands
    - Projection with only KeyFields: 0 Redis commands (parsed from keys)
    - Sorting by KeyField with limit: Only `limit` objects loaded

    Example:
        # Internal usage - typically called by filter() or all()
        objects = Query.get_many_objects(
            User,
            {b"User:alice", b"User:bob"},
            order_by_attr_name="username",
            limit=10
        )
    """
    from .encoding import decode_popoto_model_hashmap

    pipeline = POPOTO_REDIS_DB.pipeline()
    reverse_order = False
    # order the hashes list or objects before applying limit
    if order_by_attr_name and order_by_attr_name.startswith("-"):
        order_by_attr_name = order_by_attr_name[1:]
        reverse_order = True

    if order_by_attr_name and order_by_attr_name in model._meta.key_field_names:
        field_position = model._meta.get_db_key_index_position(order_by_attr_name)
        db_keys = list(db_keys)
        db_keys.sort(key=lambda key: key.split(b":")[field_position])
        db_keys = (
            list(reversed(db_keys))[:limit] if reverse_order else db_keys[:limit]
        )

    if values:
        if not isinstance(values, tuple):
            raise QueryException(
                "values takes a tuple. eg. query.filter(values=('name',))"
            )
        elif set(values).issubset(model._meta.key_field_names):
            db_keys = [DB_key.from_redis_key(db_key) for db_key in db_keys]
            return [
                {
                    field_name: (
                        model._meta.fields[field_name].type(
                            db_key[
                                model._meta.get_db_key_index_position(field_name)
                            ]
                        )
                        if db_key[model._meta.get_db_key_index_position(field_name)]
                        else None
                    )
                    for field_name in values
                }
                for db_key in db_keys
            ]
        else:
            [pipeline.hmget(db_key, values) for db_key in db_keys]
            value_lists = pipeline.execute()
            hashes_list = [
                {field_name: result[i] for i, field_name in enumerate(values)}
                for result in value_lists
            ]

    else:
        [pipeline.hgetall(db_key) for db_key in db_keys]
        hashes_list = pipeline.execute()

    if {} in hashes_list:
        logger.error(
            "one or more redis keys points to missing objects. Debug with Model.query.keys(clean=True)"
        )

    return [
        decode_popoto_model_hashmap(
            model, redis_hash, fields_only=bool(values), lazy=lazy and not values
        )
        for redis_hash in hashes_list
        if redis_hash
    ]

async_get(db_key=None, redis_key=None, **kwargs) async

Async version of get() using native async Redis.

Retrieves a single model instance from Redis using non-blocking I/O. Uses redis.asyncio for true async operations without thread pool overhead.

Parameters:

Name Type Description Default
db_key DB_key

Optional DB_key object

None
redis_key str

Optional Redis key string

None
**kwargs

Field values to construct query

{}

Returns:

Type Description
Model

Model instance or None if not found

Raises:

Type Description
QueryException

If the filter matches more than one object.

Source code in src/popoto/models/query.py
async def async_get(
    self, db_key: DB_key = None, redis_key: str = None, **kwargs
) -> "Model":
    """Async version of get() using native async Redis.

    Retrieves a single model instance from Redis using non-blocking I/O.
    Uses redis.asyncio for true async operations without thread pool overhead.

    Args:
        db_key: Optional DB_key object
        redis_key: Optional Redis key string
        **kwargs: Field values to construct query

    Returns:
        Model instance or None if not found

    Raises:
        QueryException: If the filter matches more than one object.
    """
    from ..models.encoding import decode_popoto_model_hashmap

    if (
        not db_key
        and not redis_key
        and all([key in kwargs for key in self.options.key_field_names])
    ):
        db_key = self.model_class(**kwargs).db_key

    if db_key and not redis_key:
        redis_key = db_key.redis_key

    if redis_key:
        async_redis = await get_async_redis_db()
        hashmap = await async_redis.hgetall(redis_key)
        if not hashmap:
            return None
        instance = decode_popoto_model_hashmap(self.model_class, hashmap)
        await to_thread(_fire_on_read, self.model_class, [instance])
    else:
        instances = await self.async_filter(**kwargs)
        if len(instances) > 1:
            raise QueryException(
                f"{self.model_class.__name__} found more than one unique instance. Use `query.filter()`"
            )
        instance = instances[0] if len(instances) == 1 else None

    return instance or None

async_get_many(redis_keys, skip_none=False) async

Async version of get_many() using native async Redis.

Retrieves multiple model instances by their Redis keys in a single async pipeline. See :meth:Query.get_many for full documentation.

Parameters:

Name Type Description Default
redis_keys list

List of Redis key strings to look up.

required
skip_none bool

If True, filter out None entries from the result.

False

Returns:

Type Description
list

List of Model instances (and/or None when skip_none is False).

Example

keys = ["Product:widget:001", "Product:widget:002"] products = await Product.query.async_get_many(redis_keys=keys)

Source code in src/popoto/models/query.py
async def async_get_many(self, redis_keys: list, skip_none: bool = False) -> list:
    """Async version of get_many() using native async Redis.

    Retrieves multiple model instances by their Redis keys in a single
    async pipeline. See :meth:`Query.get_many` for full documentation.

    Args:
        redis_keys: List of Redis key strings to look up.
        skip_none: If True, filter out ``None`` entries from the result.

    Returns:
        List of Model instances (and/or ``None`` when *skip_none* is False).

    Example:
        keys = ["Product:widget:001", "Product:widget:002"]
        products = await Product.query.async_get_many(redis_keys=keys)
    """
    if not redis_keys:
        return []

    from ..models.encoding import decode_popoto_model_hashmap

    async_redis = await get_async_redis_db()
    pipeline = async_redis.pipeline()
    for key in redis_keys:
        pipeline.hgetall(key)
    hashes_list = await pipeline.execute()

    results = []
    live_instances = []
    for hashmap in hashes_list:
        if hashmap:
            instance = decode_popoto_model_hashmap(self.model_class, hashmap)
            results.append(instance)
            live_instances.append(instance)
        else:
            results.append(None)

    if live_instances:
        await to_thread(_fire_on_read, self.model_class, live_instances)

    if skip_none:
        return [r for r in results if r is not None]
    return results

async_filter(**kwargs) async

Async version of filter() using native async Redis.

Filters model instances based on field values using non-blocking I/O. Currently uses to_thread() for complex filter operations that involve field-specific query logic, but uses native async for result fetching.

Preserves sorted field ordering from ZRANGEBYSCORE when no explicit order_by is provided, matching the sync _execute_filter behavior.

Precedence: explicit order_by > sorted field order > Meta.order_by

Parameters:

Name Type Description Default
**kwargs

Filter parameters (field values, limit, order_by, values)

{}

Returns:

Type Description
list

List of model instances or dicts (if values= specified)

Note

The filter_for_keys_set() operation currently uses sync Redis due to the complexity of field-specific filter_query() implementations. Object loading uses native async Redis for better performance on bulk data retrieval.

Source code in src/popoto/models/query.py
async def async_filter(self, **kwargs) -> list:
    """Async version of filter() using native async Redis.

    Filters model instances based on field values using non-blocking I/O.
    Currently uses to_thread() for complex filter operations that involve
    field-specific query logic, but uses native async for result fetching.

    Preserves sorted field ordering from ZRANGEBYSCORE when no explicit
    order_by is provided, matching the sync _execute_filter behavior.

    Precedence: explicit order_by > sorted field order > Meta.order_by

    Args:
        **kwargs: Filter parameters (field values, limit, order_by, values)

    Returns:
        List of model instances or dicts (if values= specified)

    Note:
        The filter_for_keys_set() operation currently uses sync Redis due to
        the complexity of field-specific filter_query() implementations.
        Object loading uses native async Redis for better performance on
        bulk data retrieval.
    """
    # Reset geo distances for this query
    self._geo_distances = {}
    self._geo_distance_unit = None

    # Get keys using sync method in thread pool (field query implementations are sync)
    db_keys_set = await to_thread(self.filter_for_keys_set, **kwargs)
    if not len(db_keys_set):
        return []

    # Apply default order_by from Meta if not explicitly provided,
    # but not when sorted field ordering is active (it's a smarter default)
    if (
        "order_by" not in kwargs
        and self.model_class._meta.order_by
        and not getattr(self, "_sorted_field_order", None)
    ):
        kwargs["order_by"] = self.model_class._meta.order_by

    # Use sorted field order if available and no explicit order_by
    sorted_field_order = getattr(self, "_sorted_field_order", None)
    explicit_order_by = kwargs.get("order_by", None)
    # Meta.order_by is a default - sorted field order takes precedence over it
    if sorted_field_order and not explicit_order_by:
        db_keys_set = sorted_field_order  # Use ordered list instead of set

    # Use native async for bulk object loading
    objects = await self._async_get_many_objects(
        self.model_class,
        db_keys_set,
        order_by_attr_name=kwargs.get("order_by", None),
        limit=kwargs.get("limit", None),
        values=kwargs.get("values", None),
    )

    # Apply client-side filters for plain (unindexed) fields
    client_filters = getattr(self, "_pending_client_filters", {})
    if client_filters:
        filtered = []
        for obj in objects:
            match = True
            for field_name, expected_value in client_filters.items():
                if isinstance(obj, dict):
                    actual = obj.get(field_name)
                else:
                    actual = getattr(obj, field_name, None)
                if actual != expected_value:
                    match = False
                    break
            if match:
                filtered.append(obj)
        objects = filtered

    # Attach geo distances to objects if available
    if self._geo_distances:
        normalized_distances = {}
        for key, dist in self._geo_distances.items():
            if isinstance(key, bytes):
                normalized_distances[key.decode()] = dist
            else:
                normalized_distances[key] = dist

        for obj in objects:
            if isinstance(obj, dict):
                continue
            redis_key = obj.db_key.redis_key
            if isinstance(redis_key, bytes):
                redis_key = redis_key.decode()
            distance = normalized_distances.get(redis_key)
            if distance is not None:
                obj._geo_distance = distance
                obj._geo_distance_unit = self._geo_distance_unit

        model_objects = [o for o in objects if not isinstance(o, dict)]
        dict_objects = [o for o in objects if isinstance(o, dict)]
        model_objects.sort(key=lambda o: getattr(o, "_geo_distance", float("inf")))
        objects = model_objects + dict_objects

    return self.prepare_results(objects, **kwargs)

async_all(**kwargs) async

Async version of all() using native async Redis.

Retrieves all model instances using non-blocking I/O.

Parameters:

Name Type Description Default
**kwargs

Optional order_by and values parameters

{}

Returns:

Type Description
list

List of all model instances or dicts (if values= specified)

Source code in src/popoto/models/query.py
async def async_all(self, **kwargs) -> list:
    """Async version of all() using native async Redis.

    Retrieves all model instances using non-blocking I/O.

    Args:
        **kwargs: Optional order_by and values parameters

    Returns:
        List of all model instances or dicts (if values= specified)
    """
    async_redis = await get_async_redis_db()
    redis_db_keys_list = list(
        await async_redis.smembers(
            self.model_class._meta.db_class_set_key.redis_key
        )
    )

    # Apply default order_by from Meta if not explicitly provided
    if "order_by" not in kwargs and self.model_class._meta.order_by:
        kwargs["order_by"] = self.model_class._meta.order_by

    objects = await self._async_get_many_objects(
        self.model_class,
        set(redis_db_keys_list),
        order_by_attr_name=kwargs.get("order_by", None),
        values=kwargs.get("values", None),
    )

    return self.prepare_results(objects, **kwargs)

async_count(**kwargs) async

Async version of count() using native async Redis.

Counts model instances matching filter criteria using non-blocking I/O.

Parameters:

Name Type Description Default
**kwargs

Optional filter parameters

{}

Returns:

Type Description
int

Count of matching instances

Source code in src/popoto/models/query.py
async def async_count(self, **kwargs) -> int:
    """Async version of count() using native async Redis.

    Counts model instances matching filter criteria using non-blocking I/O.

    Args:
        **kwargs: Optional filter parameters

    Returns:
        Count of matching instances
    """
    async_redis = await get_async_redis_db()

    if not len(kwargs):
        count = await async_redis.scard(
            self.model_class._meta.db_class_set_key.redis_key
        )
        return int(count or 0)

    # Use sync filter_for_keys_set in thread pool for complex filter logic
    db_keys = await to_thread(self.filter_for_keys_set, **kwargs)
    client_filters = getattr(self, "_pending_client_filters", {})
    if client_filters:
        # Must load objects to apply client-side filters
        objects = await self._async_get_many_objects(self.model_class, db_keys)
        return sum(
            1
            for obj in objects
            if all(
                getattr(obj, fname, None) == fval
                for fname, fval in client_filters.items()
            )
        )
    return len(db_keys)

async_keys(catchall=False, clean=False, **kwargs) async

Async version of keys() using native async Redis.

Retrieves Redis keys for model instances using non-blocking I/O.

Parameters:

Name Type Description Default
catchall

If True, use KEYS pattern (debug only, not for production)

False
clean

If True, clean up orphaned keys (debug only, not for production)

False
**kwargs

Additional parameters

{}

Returns:

Type Description
list

List of Redis keys

Note

The clean operation uses to_thread() as it involves complex pipeline operations. Regular key retrieval uses native async.

Source code in src/popoto/models/query.py
async def async_keys(self, catchall=False, clean=False, **kwargs) -> list:
    """Async version of keys() using native async Redis.

    Retrieves Redis keys for model instances using non-blocking I/O.

    Args:
        catchall: If True, use KEYS pattern (debug only, not for production)
        clean: If True, clean up orphaned keys (debug only, not for production)
        **kwargs: Additional parameters

    Returns:
        List of Redis keys

    Note:
        The clean operation uses to_thread() as it involves complex pipeline
        operations. Regular key retrieval uses native async.
    """
    if clean:
        # Clean operation is complex with pipelines, use thread pool
        return await to_thread(self.keys, catchall=catchall, clean=clean, **kwargs)

    async_redis = await get_async_redis_db()

    if catchall:
        logger.warning(
            "{catchall} is for debugging purposes only. Not for use in production environment"
        )
        return list(await async_redis.keys(f"*{self.model_class.__name__}*"))
    else:
        return list(
            await async_redis.smembers(
                self.model_class._meta.db_class_set_key.redis_key
            )
        )