Skip to content

popoto.fields.access_tracker

popoto.fields.access_tracker

AccessTrackerMixin — read pattern tracking with staged vs confirmed reads.

This module provides a mixin class that adds read access tracking to any Model. It uses a staging pattern: reads are first recorded to a staging list, then atomically promoted to the confirmed access log via a Lua script.

Design
  • on_read() appends a timestamp to a staging list (cheap, fire-and-forget)
  • confirm_access() atomically promotes staged timestamps to the confirmed log
  • discard_staged_access() discards staged reads without affecting confirmed data
  • Access log is capped at max_access_log entries (default 100)
Redis Key Patterns
  • $AT:{ClassName}:staged:{redis_key} — staging list (RPUSH timestamps)
  • $AT:{ClassName}:access_log:{redis_key} — confirmed access timestamps
  • $AT:{ClassName}:meta:{redis_key} — hash with access_count and last_accessed
Example

class Memory(AccessTrackerMixin, Model): key = UniqueKeyField() content = StringField()

memory = Memory.query.get(key="important") # auto-stages on_read memory.confirm_access() # promote staged reads to confirmed log print(memory.access_count) # total confirmed read count print(memory.last_accessed) # timestamp of most recent confirmed read

AccessTrackerMixin

Mixin that adds read access tracking to any Model.

Add this as a base class alongside Model to enable read tracking:

class MyModel(AccessTrackerMixin, Model):
    ...
Class Attributes

_max_access_log: Maximum number of timestamps to keep in the access log. Older entries are trimmed on confirm. Default 100. _track_reads: Whether to automatically track reads from queries. Default True.

Note: Attributes are prefixed with underscore to avoid conflict with Popoto's ModelBase metaclass, which requires public attributes to be Fields.

Source code in src/popoto/fields/access_tracker.py
class AccessTrackerMixin:
    """Mixin that adds read access tracking to any Model.

    Add this as a base class alongside Model to enable read tracking:

        class MyModel(AccessTrackerMixin, Model):
            ...

    Class Attributes:
        _max_access_log: Maximum number of timestamps to keep in the access log.
            Older entries are trimmed on confirm. Default 100.
        _track_reads: Whether to automatically track reads from queries.
            Default True.

    Note: Attributes are prefixed with underscore to avoid conflict with
    Popoto's ModelBase metaclass, which requires public attributes to be Fields.
    """

    _max_access_log = 100
    _track_reads = True

    def _at_key(self, kind):
        """Build an access tracker Redis key.

        Args:
            kind: One of 'staged', 'access_log', 'meta'

        Returns:
            str: Redis key like '$AT:ClassName:kind:redis_key'
        """
        class_name = type(self).__name__
        redis_key = self._redis_key or self.db_key.redis_key
        return f"$AT:{class_name}:{kind}:{redis_key}"

    def on_read(self, pipeline=None):
        """Record a read access by staging a timestamp.

        Appends the current timestamp to the staging list. This is a cheap
        operation suitable for fire-and-forget use from query hooks.

        Args:
            pipeline: Optional Redis pipeline for batch operations.
        """
        ts = str(time.time())
        staged_key = self._at_key("staged")
        if pipeline:
            pipeline.rpush(staged_key, ts)
        else:
            POPOTO_REDIS_DB.rpush(staged_key, ts)

    def confirm_access(self, pipeline=None):
        """Atomically promote staged reads to the confirmed access log.

        Uses a Lua script to:
        1. Move all staged timestamps to the access log
        2. Trim the log to max_access_log entries
        3. Update access_count and last_accessed in the meta hash
        4. Delete the staging list

        Args:
            pipeline: Optional Redis pipeline (not used for Lua eval,
                reserved for future use).

        Returns:
            int: Number of staged reads that were promoted.

        Raises:
            TypeError: If the model instance has not been saved to Redis.
        """
        if not hasattr(self, "_redis_key") and not hasattr(self, "db_key"):
            raise TypeError("confirm_access() requires a saved model instance")
        try:
            redis_key = self._redis_key or self.db_key.redis_key
        except Exception:
            raise TypeError("confirm_access() requires a saved model instance")

        # Check if the model has been saved (has a valid redis key in the DB)
        if not POPOTO_REDIS_DB.exists(redis_key):
            raise TypeError("confirm_access() requires a saved model instance")

        staged_key = self._at_key("staged")
        log_key = self._at_key("access_log")
        meta_key = self._at_key("meta")

        count = POPOTO_REDIS_DB.eval(
            CONFIRM_ACCESS_LUA,
            3,  # number of KEYS
            staged_key,
            log_key,
            meta_key,
            str(self._max_access_log),
        )
        return int(count)

    def discard_staged_access(self, pipeline=None):
        """Discard all staged reads without affecting confirmed data.

        Args:
            pipeline: Optional Redis pipeline for batch operations.
        """
        staged_key = self._at_key("staged")
        if pipeline:
            pipeline.delete(staged_key)
        else:
            POPOTO_REDIS_DB.delete(staged_key)

    @property
    def access_count(self):
        """Total number of confirmed read accesses.

        Returns:
            int: The cumulative access count, or 0 if never confirmed.
        """
        meta_key = self._at_key("meta")
        raw = POPOTO_REDIS_DB.hget(meta_key, "access_count")
        if raw is None:
            return 0
        return int(raw)

    @property
    def last_accessed(self):
        """Timestamp of the most recent confirmed read access.

        Returns:
            float or None: Unix timestamp, or None if never confirmed.
        """
        meta_key = self._at_key("meta")
        raw = POPOTO_REDIS_DB.hget(meta_key, "last_accessed")
        if raw is None:
            return None
        return float(raw)

    def _delete_access_tracker_keys(self, pipeline=None):
        """Remove all access tracker Redis keys for this instance.

        Called during model deletion to clean up tracking data.

        Args:
            pipeline: Optional Redis pipeline for batch operations.
        """
        keys = [
            self._at_key("staged"),
            self._at_key("access_log"),
            self._at_key("meta"),
        ]
        if pipeline:
            for key in keys:
                pipeline.delete(key)
        else:
            POPOTO_REDIS_DB.delete(*keys)

access_count property

Total number of confirmed read accesses.

Returns:

Name Type Description
int

The cumulative access count, or 0 if never confirmed.

last_accessed property

Timestamp of the most recent confirmed read access.

Returns:

Type Description

float or None: Unix timestamp, or None if never confirmed.

on_read(pipeline=None)

Record a read access by staging a timestamp.

Appends the current timestamp to the staging list. This is a cheap operation suitable for fire-and-forget use from query hooks.

Parameters:

Name Type Description Default
pipeline

Optional Redis pipeline for batch operations.

None
Source code in src/popoto/fields/access_tracker.py
def on_read(self, pipeline=None):
    """Record a read access by staging a timestamp.

    Appends the current timestamp to the staging list. This is a cheap
    operation suitable for fire-and-forget use from query hooks.

    Args:
        pipeline: Optional Redis pipeline for batch operations.
    """
    ts = str(time.time())
    staged_key = self._at_key("staged")
    if pipeline:
        pipeline.rpush(staged_key, ts)
    else:
        POPOTO_REDIS_DB.rpush(staged_key, ts)

confirm_access(pipeline=None)

Atomically promote staged reads to the confirmed access log.

Uses a Lua script to: 1. Move all staged timestamps to the access log 2. Trim the log to max_access_log entries 3. Update access_count and last_accessed in the meta hash 4. Delete the staging list

Parameters:

Name Type Description Default
pipeline

Optional Redis pipeline (not used for Lua eval, reserved for future use).

None

Returns:

Name Type Description
int

Number of staged reads that were promoted.

Raises:

Type Description
TypeError

If the model instance has not been saved to Redis.

Source code in src/popoto/fields/access_tracker.py
def confirm_access(self, pipeline=None):
    """Atomically promote staged reads to the confirmed access log.

    Uses a Lua script to:
    1. Move all staged timestamps to the access log
    2. Trim the log to max_access_log entries
    3. Update access_count and last_accessed in the meta hash
    4. Delete the staging list

    Args:
        pipeline: Optional Redis pipeline (not used for Lua eval,
            reserved for future use).

    Returns:
        int: Number of staged reads that were promoted.

    Raises:
        TypeError: If the model instance has not been saved to Redis.
    """
    if not hasattr(self, "_redis_key") and not hasattr(self, "db_key"):
        raise TypeError("confirm_access() requires a saved model instance")
    try:
        redis_key = self._redis_key or self.db_key.redis_key
    except Exception:
        raise TypeError("confirm_access() requires a saved model instance")

    # Check if the model has been saved (has a valid redis key in the DB)
    if not POPOTO_REDIS_DB.exists(redis_key):
        raise TypeError("confirm_access() requires a saved model instance")

    staged_key = self._at_key("staged")
    log_key = self._at_key("access_log")
    meta_key = self._at_key("meta")

    count = POPOTO_REDIS_DB.eval(
        CONFIRM_ACCESS_LUA,
        3,  # number of KEYS
        staged_key,
        log_key,
        meta_key,
        str(self._max_access_log),
    )
    return int(count)

discard_staged_access(pipeline=None)

Discard all staged reads without affecting confirmed data.

Parameters:

Name Type Description Default
pipeline

Optional Redis pipeline for batch operations.

None
Source code in src/popoto/fields/access_tracker.py
def discard_staged_access(self, pipeline=None):
    """Discard all staged reads without affecting confirmed data.

    Args:
        pipeline: Optional Redis pipeline for batch operations.
    """
    staged_key = self._at_key("staged")
    if pipeline:
        pipeline.delete(staged_key)
    else:
        POPOTO_REDIS_DB.delete(staged_key)