Skip to content

MemTable

In-memory sorted tables — the write-side data path.

ActiveMemTable

ActiveMemTable()

Bases: MemTable

Public interface to the live mutable memtable.

Wraps a :class:SkipList and adds: - table_id for the audit chain - freeze() to produce an immutable snapshot - Metadata tracking for observability

Create a new empty active memtable with a unique table ID.

Initializes the underlying skip list and sets up metadata tracking for entry count and sequence number range.

Source code in app/memtable/active.py
def __init__(self) -> None:
    """Create a new empty active memtable with a unique table ID.

    Initializes the underlying skip list and sets up metadata tracking
    for entry count and sequence number range.
    """
    self.table_id: TableID = uuid.uuid4().hex
    self._skiplist = SkipList()
    self._created_at = time.time_ns()
    self._entry_count = 0
    self._seq_first: SeqNum = 0
    self._seq_last: SeqNum = 0

    logger.info("ActiveMemTable created", table_id=self.table_id)

size_bytes property

Estimated data size in bytes.

metadata property

Return a frozen metadata snapshot — safe to pass across threads.

put(key, seq, timestamp_ms, value)

Insert or update key in the underlying SkipList.

Source code in app/memtable/active.py
def put(self, key: Key, seq: SeqNum, timestamp_ms: int, value: Value) -> None:
    """Insert or update *key* in the underlying SkipList."""
    try:
        self._skiplist.put(key, seq, timestamp_ms, value)
    except Exception as exc:
        logger.error(
            "ActiveMemTable put failed",
            table_id=self.table_id,
            key=key,
            seq=seq,
            error=str(exc),
        )
        raise
    self._entry_count += 1
    if self._seq_first == 0:
        self._seq_first = seq
    self._seq_last = seq

get(key)

Look up key. Returns (seq, value) or None.

Source code in app/memtable/active.py
def get(self, key: Key) -> tuple[SeqNum, Value] | None:
    """Look up *key*. Returns ``(seq, value)`` or ``None``."""
    return self._skiplist.get(key)

items()

Yield (key, seq, timestamp_ms, value) in sorted key order.

Source code in app/memtable/active.py
def items(self) -> Iterator[tuple[Key, SeqNum, int, Value]]:
    """Yield ``(key, seq, timestamp_ms, value)`` in sorted key order."""
    return iter(self._skiplist)

delete(key, seq, timestamp_ms)

Logically delete key. Returns whether the key existed.

Source code in app/memtable/active.py
def delete(self, key: Key, seq: SeqNum, timestamp_ms: int) -> bool:
    """Logically delete *key*. Returns whether the key existed."""
    try:
        result = self._skiplist.delete(key, seq, timestamp_ms)
    except Exception as exc:
        logger.error(
            "ActiveMemTable delete failed",
            table_id=self.table_id,
            key=key,
            seq=seq,
            error=str(exc),
        )
        raise
    self._entry_count += 1  # tombstone is an entry
    if self._seq_first == 0:
        self._seq_first = seq
    self._seq_last = seq
    return result

freeze()

Return a sorted snapshot of all visible entries.

The snapshot_id is assigned by the caller (MemTableManager), not by this method.

Raises :class:SnapshotEmptyError if the table has no entries.

Source code in app/memtable/active.py
def freeze(self) -> list[tuple[Key, SeqNum, int, Value]]:
    """Return a sorted snapshot of all visible entries.

    The snapshot_id is assigned by the caller (MemTableManager),
    not by this method.

    Raises :class:`SnapshotEmptyError` if the table has no entries.
    """
    logger.info(
        "ActiveMemTable freeze start",
        table_id=self.table_id,
        size_bytes=self.size_bytes,
    )
    data = self._skiplist.snapshot()
    if not data:
        raise SnapshotEmptyError(
            f"Cannot freeze empty ActiveMemTable {self.table_id}"
        )
    logger.info(
        "ActiveMemTable freeze done",
        table_id=self.table_id,
        entry_count=len(data),
    )
    return data

ActiveMemTableMeta

ActiveMemTableMeta(table_id, size_bytes, entry_count, created_at, seq_first, seq_last) dataclass

Point-in-time metadata snapshot for an active memtable.

Attributes:

Name Type Description
table_id TableID

UUID hex identifying this memtable instance.

size_bytes int

Estimated data size in bytes (sum of key + value lengths).

entry_count int

Total number of put/delete operations applied.

created_at int

Creation timestamp in nanoseconds since epoch.

seq_first SeqNum

Sequence number of the first write to this table.

seq_last SeqNum

Sequence number of the most recent write.

ImmutableMemTable

ImmutableMemTable(snapshot_id, data)

Read-only, sorted snapshot of an :class:ActiveMemTable.

Supports O(1) point lookups via an internal _index dict and sorted iteration via items() for the flush path.

Create a frozen, read-only memtable snapshot.

After construction, the instance is sealed and any attribute assignment will raise ImmutableTableAccessError.

Parameters:

Name Type Description Default
snapshot_id SnapshotID

Unique identifier for this snapshot, typically the table_id of the ActiveMemTable that was frozen.

required
data list[tuple[Key, SeqNum, int, Value]]

Sorted list of (key, seq, timestamp_ms, value) tuples. Must be in ascending key order.

required
Source code in app/memtable/immutable.py
def __init__(
    self,
    snapshot_id: SnapshotID,
    data: list[tuple[Key, SeqNum, int, Value]],
) -> None:
    """Create a frozen, read-only memtable snapshot.

    After construction, the instance is sealed and any attribute
    assignment will raise ``ImmutableTableAccessError``.

    Args:
        snapshot_id: Unique identifier for this snapshot, typically the
            ``table_id`` of the ``ActiveMemTable`` that was frozen.
        data: Sorted list of ``(key, seq, timestamp_ms, value)`` tuples.
            Must be in ascending key order.
    """
    self.snapshot_id = snapshot_id
    self._data = data  # must be sorted by key ascending
    self._index: dict[Key, int] = {entry[0]: i for i, entry in enumerate(data)}
    self._frozen_at = time.time_ns()
    self._tombstone_count = sum(1 for _, _, _, v in data if v == TOMBSTONE)
    self._sealed = True  # must be last

    logger.info(
        "ImmutableMemTable created",
        snapshot_id=snapshot_id,
        entry_count=len(data),
        tombstones=self._tombstone_count,
    )

size_bytes property

Total key + value bytes in this snapshot.

seq_min property

Smallest sequence number in this snapshot.

seq_max property

Largest sequence number in this snapshot.

metadata property

Return a frozen metadata snapshot.

get(key)

O(1) point lookup. Returns (seq, value) or None.

Does not return timestamp_ms — only seq is needed for MVCC ordering.

Source code in app/memtable/immutable.py
def get(self, key: Key) -> tuple[SeqNum, Value] | None:
    """O(1) point lookup. Returns ``(seq, value)`` or ``None``.

    Does **not** return ``timestamp_ms`` — only ``seq`` is needed
    for MVCC ordering.
    """
    idx = self._index.get(key)
    if idx is None:
        return None
    _, seq, _, value = self._data[idx]
    return (seq, value)

items()

Yield (key, seq, timestamp_ms, value) in sorted key order.

Used by the flush path to iterate entries for SSTable writing.

Source code in app/memtable/immutable.py
def items(self) -> Iterator[tuple[Key, SeqNum, int, Value]]:
    """Yield ``(key, seq, timestamp_ms, value)`` in sorted key order.

    Used by the flush path to iterate entries for SSTable writing.
    """
    return iter(self._data)

__len__()

Return the number of entries in this snapshot.

Returns:

Type Description
int

Total entry count including tombstones.

Source code in app/memtable/immutable.py
def __len__(self) -> int:
    """Return the number of entries in this snapshot.

    Returns:
        Total entry count including tombstones.
    """
    return len(self._data)

__setattr__(name, value)

Enforce immutability after construction.

Parameters:

Name Type Description Default
name str

Attribute name being set.

required
value object

Value to assign.

required

Raises:

Type Description
ImmutableTableAccessError

If the instance has been sealed (construction complete) and the attribute is not the internal _sealed flag.

Source code in app/memtable/immutable.py
def __setattr__(self, name: str, value: object) -> None:
    """Enforce immutability after construction.

    Args:
        name: Attribute name being set.
        value: Value to assign.

    Raises:
        ImmutableTableAccessError: If the instance has been sealed
            (construction complete) and the attribute is not the
            internal ``_sealed`` flag.
    """
    if self._sealed and name != "_sealed":
        raise ImmutableTableAccessError(
            f"Cannot set '{name}' on ImmutableMemTable {self.snapshot_id}"
        )
    super().__setattr__(name, value)

ImmutableMemTableMeta

ImmutableMemTableMeta(snapshot_id, source_table_id, size_bytes, entry_count, tombstone_count, frozen_at, seq_min, seq_max) dataclass

Metadata for a frozen memtable snapshot.

Attributes:

Name Type Description
snapshot_id SnapshotID

UUID hex identifying this snapshot.

source_table_id TableID

Table ID of the active memtable that was frozen.

size_bytes int

Total key + value bytes in this snapshot.

entry_count int

Number of entries (including tombstones).

tombstone_count int

Number of deletion tombstones.

frozen_at int

Freeze timestamp in nanoseconds since epoch.

seq_min SeqNum

Smallest sequence number in this snapshot.

seq_max SeqNum

Largest sequence number in this snapshot.

SkipList

SkipList()

Thread-safe sorted map from :type:Key to (SeqNum, ts, Value).

Concurrent writers lock only the predecessor nodes at the insertion boundary. Readers acquire no locks — boolean flag reads are atomic under CPython's GIL.

Initialize an empty skip list with a sentinel head node.

The head node spans all levels and is never visible to iterators.

Source code in app/memtable/skiplist.py
def __init__(self) -> None:
    """Initialize an empty skip list with a sentinel head node.

    The head node spans all levels and is never visible to iterators.
    """
    self._head = _Node(key=b"", seq=0, timestamp_ms=0, value=b"", level=MAX_LEVEL)
    self._head.fully_linked = True
    self._level = 0
    self._size = 0
    self._size_lock = threading.Lock()
    self._count = 0
    self._count_lock = threading.Lock()

size_bytes property

Estimated size in bytes (sum of key + value lengths).

count property

Number of visible (non-deleted) entries.

put(key, seq, timestamp_ms, value)

Insert or update key in the skip list.

If the key already exists (fully_linked=True, marked=False), updates seq, timestamp_ms, and value in place. Otherwise inserts a new node.

Raises :class:SkipListKeyError if key is empty. Raises :class:SkipListInsertError if retries are exhausted.

Source code in app/memtable/skiplist.py
def put(self, key: Key, seq: SeqNum, timestamp_ms: int, value: Value) -> None:
    """Insert or update *key* in the skip list.

    If the key already exists (``fully_linked=True``, ``marked=False``),
    updates seq, timestamp_ms, and value in place.  Otherwise inserts
    a new node.

    Raises :class:`SkipListKeyError` if *key* is empty.
    Raises :class:`SkipListInsertError` if retries are exhausted.
    """
    if not key:
        raise SkipListKeyError("Key must not be empty")

    top_level = self._random_level()

    for _attempt in range(_MAX_RETRIES):
        preds, succs = self._find(key)

        # ── update path: key already exists ───────────────────────
        for lv in range(MAX_LEVEL + 1):
            node = succs[lv]
            if (
                node is not None
                and node.key == key
                and node.fully_linked
                and not node.marked
            ):
                with node.lock:
                    if node.marked:
                        break  # concurrently deleted — retry
                    old_value_len = len(node.value)
                    node.seq = seq
                    node.timestamp_ms = timestamp_ms
                    node.value = value
                    with self._size_lock:
                        self._size += len(value) - old_value_len
                return
            if node is None or node.key != key:
                break

        # ── insert path ───────────────────────────────────────────
        pred_nodes = [preds[lv] for lv in range(top_level + 1)]

        with self._lock_nodes(pred_nodes):
            # Validate — predecessors still point to expected successors
            valid = True
            for lv in range(top_level + 1):
                if preds[lv].marked or preds[lv].forward[lv] is not succs[lv]:
                    valid = False
                    break
            if not valid:
                continue  # retry from scratch

            new_node = _Node(key, seq, timestamp_ms, value, top_level)

            for lv in range(top_level + 1):
                new_node.forward[lv] = succs[lv]
                preds[lv].forward[lv] = new_node

            # Last step — now visible to readers
            new_node.fully_linked = True

            if top_level > self._level:
                self._level = top_level

            with self._size_lock:
                self._size += len(key) + len(value)
            with self._count_lock:
                self._count += 1

        return

    raise SkipListInsertError(
        f"put() failed after {_MAX_RETRIES} retries for key={key!r}"
    )

get(key)

Look up key without acquiring any lock.

Returns (seq, value) or None. Does not return timestamp_ms — only seq is needed for MVCC ordering.

Source code in app/memtable/skiplist.py
def get(self, key: Key) -> tuple[SeqNum, Value] | None:
    """Look up *key* without acquiring any lock.

    Returns ``(seq, value)`` or ``None``.  Does **not** return
    ``timestamp_ms`` — only ``seq`` is needed for MVCC ordering.
    """
    _, succs = self._find(key)
    node = succs[0]
    if (
        node is not None
        and node.key == key
        and node.fully_linked
        and not node.marked
    ):
        return (node.seq, node.value)
    return None

delete(key, seq, timestamp_ms)

Logically delete key by writing a TOMBSTONE.

Returns True if the key existed and was marked, False if the key was not found (a tombstone is still inserted so the delete propagates to SSTables).

Source code in app/memtable/skiplist.py
def delete(self, key: Key, seq: SeqNum, timestamp_ms: int) -> bool:
    """Logically delete *key* by writing a ``TOMBSTONE``.

    Returns ``True`` if the key existed and was marked, ``False`` if
    the key was not found (a tombstone is still inserted so the
    delete propagates to SSTables).
    """
    if not key:
        raise SkipListKeyError("Key must not be empty")

    _, succs = self._find(key)
    node = succs[0]

    if node is None or node.key != key or not node.fully_linked:
        # Key not found — insert a tombstone so delete propagates to SSTables
        self.put(key, seq, timestamp_ms, TOMBSTONE)
        return False

    with node.lock:
        if node.marked:
            return False
        node.marked = True
        node.seq = seq
        node.timestamp_ms = timestamp_ms
        node.value = TOMBSTONE

    with self._count_lock:
        self._count -= 1

    return True

__iter__()

Yield (key, seq, timestamp_ms, value) in sorted key order.

Lock-free. Skips nodes that are not fully_linked or are marked. Safe to call concurrently with put() and delete() — boolean flag reads are atomic under the GIL.

Source code in app/memtable/skiplist.py
def __iter__(self) -> Iterator[tuple[Key, SeqNum, int, Value]]:
    """Yield ``(key, seq, timestamp_ms, value)`` in sorted key order.

    Lock-free.  Skips nodes that are not ``fully_linked`` or are
    ``marked``.  Safe to call concurrently with ``put()`` and
    ``delete()`` — boolean flag reads are atomic under the GIL.
    """
    curr = self._head.forward[0]
    while curr is not None:
        if curr.fully_linked and not curr.marked:
            yield curr.key, curr.seq, curr.timestamp_ms, curr.value
        curr = curr.forward[0]

snapshot()

Materialise the iterator as a sorted list for freezing.

Used by :meth:ActiveMemTable.freeze to produce a stable point-in-time copy.

Source code in app/memtable/skiplist.py
def snapshot(self) -> list[tuple[Key, SeqNum, int, Value]]:
    """Materialise the iterator as a sorted list for freezing.

    Used by :meth:`ActiveMemTable.freeze` to produce a stable
    point-in-time copy.
    """
    data = list(self)
    return data