Skip to content

Flush Pipeline

Parallel flush with ordered commits.

FlushPipeline

FlushPipeline(mem, sst, wal, max_workers=2, compaction=None)

Daemon that drains the immutable queue to SSTables.

Writes run in parallel (up to max_workers), but commits are serialized: each slot waits for the previous slot's commit before committing its own result.

Wire up the flush pipeline to its dependent managers.

Parameters:

Name Type Description Default
mem MemTableManager

Memtable manager supplying immutable snapshots to flush.

required
sst SSTableManager

SSTable manager for writing and committing SSTables.

required
wal WALManager

WAL manager for truncating entries after successful flush.

required
max_workers int

Maximum number of concurrent SSTable writes.

2
compaction CompactionManager | None

Optional compaction manager to trigger after each flush commit.

None
Source code in app/engine/flush_pipeline.py
def __init__(
    self,
    mem: MemTableManager,
    sst: SSTableManager,
    wal: WALManager,
    max_workers: int = 2,
    compaction: CompactionManager | None = None,
) -> None:
    """Wire up the flush pipeline to its dependent managers.

    Args:
        mem: Memtable manager supplying immutable snapshots to flush.
        sst: SSTable manager for writing and committing SSTables.
        wal: WAL manager for truncating entries after successful flush.
        max_workers: Maximum number of concurrent SSTable writes.
        compaction: Optional compaction manager to trigger after each
            flush commit.
    """
    self._mem = mem
    self._sst = sst
    self._wal = wal
    self._max_workers = max_workers
    self._compaction = compaction
    self._semaphore = asyncio.Semaphore(max_workers)
    self._stop_event = asyncio.Event()
    self._running = False
    # BUG-14: async event bridged from threading.Event via
    # loop.call_soon_threadsafe in the notify callback
    self._flush_async = asyncio.Event()
    self._loop: asyncio.AbstractEventLoop | None = None

running property

Whether the pipeline loop is active.

run() async

Main daemon loop — runs until :meth:stop is called.

Source code in app/engine/flush_pipeline.py
async def run(self) -> None:
    """Main daemon loop — runs until :meth:`stop` is called."""
    self._running = True
    self._loop = asyncio.get_running_loop()

    # BUG-14: register callback so threading.Event bridges to async
    self._mem.set_flush_notify(self._on_flush_notify)

    logger.info("FlushPipeline started", max_workers=self._max_workers)

    while not self._stop_event.is_set():
        dispatched = await self._dispatch_all()
        if not dispatched:
            # No work — wait for async flush signal or stop
            with contextlib.suppress(TimeoutError):
                await asyncio.wait_for(
                    self._flush_async.wait(),
                    timeout=0.5,
                )
            self._flush_async.clear()

    # Final drain on shutdown
    await self._dispatch_all()
    self._mem.set_flush_notify(None)
    self._running = False
    logger.info("FlushPipeline stopped")

stop()

Signal the pipeline to stop after draining.

Source code in app/engine/flush_pipeline.py
def stop(self) -> None:
    """Signal the pipeline to stop after draining."""
    self._stop_event.set()
    logger.info("FlushPipeline stop requested")

FlushSlot

FlushSlot(snapshot, file_id, prev_committed, my_committed=asyncio.Event(), batch_abort=asyncio.Event(), position=0) dataclass

Tracks one in-flight flush operation.

Attributes:

Name Type Description
snapshot ImmutableMemTable

The immutable memtable being flushed.

file_id FileID

Target SSTable file ID for this flush.

prev_committed Event

Event set when the previous slot's commit completes.

my_committed Event

Event set when this slot's commit completes.

batch_abort Event

Shared event — set if any slot in the batch fails.

position int

Zero-based index of this slot within the batch.

MemTableManager

MemTableManager(config)

Single point of coordination for all in-memory write-side state.

All thresholds are read from config at the moment they're needed, so runtime config changes take effect on the next operation.

Initialize the memtable manager with an empty active table.

Parameters:

Name Type Description Default
config LSMConfig

Live engine configuration. Thresholds (memtable size, queue length, backpressure timeout) are read from config at the point of use so runtime changes take effect immediately.

required
Source code in app/engine/memtable_manager.py
def __init__(self, config: LSMConfig) -> None:
    """Initialize the memtable manager with an empty active table.

    Args:
        config: Live engine configuration. Thresholds (memtable size,
            queue length, backpressure timeout) are read from *config*
            at the point of use so runtime changes take effect
            immediately.
    """
    self._config = config
    self._active = ActiveMemTable()
    self._immutable_q: deque[ImmutableMemTable] = deque()
    self._write_lock = threading.RLock()
    self._queue_not_full = threading.Condition(self._write_lock)
    self._flush_event = threading.Event()
    # BUG-14: optional callback to bridge flush signal to asyncio
    self._flush_notify: Callable[[], None] | None = None

flush_event property

The event signalled when a new snapshot is added to the queue.

write_lock property

The write lock for the atomic write path.

size_bytes property

Size of the active memtable in bytes.

active_metadata property

Metadata for the current active memtable.

immutable_metadata property

Metadata for all snapshots (newest first).

put(key, seq, timestamp_ms, value)

Write key into the active memtable.

Called by the engine while holding _write_lock.

Source code in app/engine/memtable_manager.py
def put(self, key: Key, seq: SeqNum, timestamp_ms: int, value: Value) -> None:
    """Write *key* into the active memtable.

    Called by the engine while holding ``_write_lock``.
    """
    logger.debug("MemTableManager put", key=key, seq=seq)
    try:
        self._active.put(key, seq, timestamp_ms, value)
    except Exception as exc:
        logger.error(
            "MemTableManager put failed",
            key=key, seq=seq, error=str(exc),
        )
        raise

get(key)

Look up key in active memtable then immutable queue.

Scans newest → oldest. Returns raw value including TOMBSTONE — the engine resolves it. Takes a snapshot copy of the immutable queue to avoid concurrent-modification issues (BUG-02).

Source code in app/engine/memtable_manager.py
def get(self, key: Key) -> tuple[SeqNum, Value] | None:
    """Look up *key* in active memtable then immutable queue.

    Scans newest → oldest.  Returns raw value including TOMBSTONE —
    the engine resolves it.  Takes a snapshot copy of the immutable
    queue to avoid concurrent-modification issues (BUG-02).
    """
    result = self._active.get(key)
    if result is not None:
        logger.debug("MemTableManager get", key=key, source="active")
        return result

    # BUG-02: snapshot the queue to avoid mutation during iteration.
    # deque copy is O(n) but n ≤ immutable_queue_max_len (default 4).
    snapshot = list(self._immutable_q)

    for i, table in enumerate(snapshot):
        result = table.get(key)
        if result is not None:
            logger.debug(
                "MemTableManager get",
                key=key,
                source=f"immutable_{i}",
            )
            return result

    logger.debug("MemTableManager get", key=key, source="miss")
    return None

maybe_freeze()

Freeze the active memtable if it exceeds size or entry threshold.

Called under _write_lock by the engine. Applies backpressure via _queue_not_full.wait() if the immutable queue is full.

Source code in app/engine/memtable_manager.py
def maybe_freeze(self) -> ImmutableMemTable | None:
    """Freeze the active memtable if it exceeds size or entry threshold.

    Called under ``_write_lock`` by the engine.  Applies backpressure
    via ``_queue_not_full.wait()`` if the immutable queue is full.
    """
    if not self._should_freeze():
        return None

    meta = self._active.metadata
    logger.info(
        "Freeze triggered",
        size_bytes=meta.size_bytes,
        size_limit_bytes=self._config.max_memtable_bytes,
        entry_count=meta.entry_count,
        entry_limit=int(self._config.max_memtable_entries),
    )

    # Backpressure: wait if queue is full (with timeout)
    queue_max = int(self._config.immutable_queue_max_len)
    bp_timeout = float(self._config.backpressure_timeout)

    while len(self._immutable_q) >= queue_max:
        logger.warning(
            "Backpressure active",
            queue_len=len(self._immutable_q),
            max=queue_max,
        )
        signalled = self._queue_not_full.wait(timeout=bp_timeout)
        if not signalled and len(self._immutable_q) >= queue_max:
            logger.error(
                "Backpressure timeout",
                queue_len=len(self._immutable_q),
                timeout=bp_timeout,
            )
            raise FreezeBackpressureTimeout(
                f"Waited {bp_timeout}s for queue space "
                f"(queue_len={len(self._immutable_q)})"
            )
        logger.info(
            "Backpressure released",
            queue_len=len(self._immutable_q),
        )

    # Freeze the active table
    snapshot_id = self._active.table_id
    data = self._active.freeze()
    snapshot = ImmutableMemTable(snapshot_id, data)

    self._immutable_q.appendleft(snapshot)
    self._active = ActiveMemTable()
    self._flush_event.set()
    if self._flush_notify is not None:
        self._flush_notify()

    logger.info(
        "Freeze complete",
        snapshot_id=snapshot_id,
        queue_len=len(self._immutable_q),
    )
    return snapshot

force_freeze()

Freeze the active memtable unconditionally.

Bypasses threshold checks. Returns None only if the active memtable is empty (nothing to freeze). Called under _write_lock by the engine's flush() command.

Source code in app/engine/memtable_manager.py
def force_freeze(self) -> ImmutableMemTable | None:
    """Freeze the active memtable unconditionally.

    Bypasses threshold checks.  Returns ``None`` only if the active
    memtable is empty (nothing to freeze).  Called under
    ``_write_lock`` by the engine's ``flush()`` command.
    """
    if self._active.metadata.entry_count == 0:
        logger.info("Force freeze skipped (empty memtable)")
        return None

    meta = self._active.metadata
    logger.info(
        "Force freeze triggered",
        size_bytes=meta.size_bytes,
        entry_count=meta.entry_count,
    )

    # Backpressure: wait if queue is full (with timeout)
    queue_max = int(self._config.immutable_queue_max_len)
    bp_timeout = float(self._config.backpressure_timeout)

    while len(self._immutable_q) >= queue_max:
        logger.warning(
            "Backpressure active (force freeze)",
            queue_len=len(self._immutable_q),
            max=queue_max,
        )
        signalled = self._queue_not_full.wait(timeout=bp_timeout)
        if not signalled and len(self._immutable_q) >= queue_max:
            raise FreezeBackpressureTimeout(
                f"Waited {bp_timeout}s for queue space "
                f"(queue_len={len(self._immutable_q)})"
            )

    # Freeze the active table
    snapshot_id = self._active.table_id
    data = self._active.freeze()
    snapshot = ImmutableMemTable(snapshot_id, data)

    self._immutable_q.appendleft(snapshot)
    self._active = ActiveMemTable()
    self._flush_event.set()
    if self._flush_notify is not None:
        self._flush_notify()

    logger.info(
        "Force freeze complete",
        snapshot_id=snapshot_id,
        queue_len=len(self._immutable_q),
    )
    return snapshot

peek_oldest()

Return the oldest snapshot without removing it.

Source code in app/engine/memtable_manager.py
def peek_oldest(self) -> ImmutableMemTable | None:
    """Return the oldest snapshot without removing it."""
    if not self._immutable_q:
        return None
    return self._immutable_q[-1]

peek_at_depth(depth)

Return the snapshot at depth (0 = oldest, 1 = second oldest, etc.).

Protected by _write_lock to prevent TOCTOU race (BUG-03).

Source code in app/engine/memtable_manager.py
def peek_at_depth(self, depth: int) -> ImmutableMemTable | None:
    """Return the snapshot at *depth* (0 = oldest, 1 = second oldest, etc.).

    Protected by ``_write_lock`` to prevent TOCTOU race (BUG-03).
    """
    with self._write_lock:
        if depth >= len(self._immutable_q):
            return None
        return self._immutable_q[-(depth + 1)]

snapshot_queue()

Return a snapshot of the immutable queue (oldest first).

Used by the flush pipeline to safely iterate all pending snapshots without holding a lock during the flush.

Source code in app/engine/memtable_manager.py
def snapshot_queue(self) -> list[ImmutableMemTable]:
    """Return a snapshot of the immutable queue (oldest first).

    Used by the flush pipeline to safely iterate all pending
    snapshots without holding a lock during the flush.
    """
    return list(reversed(self._immutable_q))

pop_oldest()

Remove the oldest snapshot and unblock any stalled freeze.

Source code in app/engine/memtable_manager.py
def pop_oldest(self) -> None:
    """Remove the oldest snapshot and unblock any stalled freeze."""
    if self._immutable_q:
        removed = self._immutable_q.pop()
        logger.info(
            "Snapshot popped",
            snapshot_id=removed.snapshot_id,
            queue_remaining=len(self._immutable_q),
        )
    with self._queue_not_full:
        self._queue_not_full.notify_all()

queue_len()

Return the number of snapshots in the immutable queue.

Source code in app/engine/memtable_manager.py
def queue_len(self) -> int:
    """Return the number of snapshots in the immutable queue."""
    return len(self._immutable_q)

restore(entries)

Replay WAL entries into the active memtable.

Called by engine._recover() only — single-threaded, no lock needed.

Source code in app/engine/memtable_manager.py
def restore(self, entries: list[WALEntry]) -> None:
    """Replay WAL entries into the active memtable.

    Called by ``engine._recover()`` only — single-threaded, no lock
    needed.
    """
    logger.info("Restore start", entry_count=len(entries))
    try:
        for entry in entries:
            self._active.put(
                entry.key, entry.seq, entry.timestamp_ms, entry.value,
            )
    except Exception as exc:
        last_seq = entries[-1].seq if entries else "?"
        raise MemTableRestoreError(
            f"Restore failed at seq={last_seq}: {exc}"
        ) from exc

    logger.info(
        "Restore done",
        live_keys=self._active.metadata.entry_count,
    )

set_flush_notify(callback)

Register (or clear) a callback invoked after each freeze.

Used by FlushPipeline to bridge the sync flush_event to an asyncio.Event via loop.call_soon_threadsafe (BUG-14).

Source code in app/engine/memtable_manager.py
def set_flush_notify(self, callback: Callable[[], None] | None) -> None:
    """Register (or clear) a callback invoked after each freeze.

    Used by FlushPipeline to bridge the sync flush_event to an
    asyncio.Event via ``loop.call_soon_threadsafe`` (BUG-14).
    """
    self._flush_notify = callback

show_mem(table_id=None)

Inspect memtable contents.

Parameters

table_id: Optional. The table ID (active) or snapshot ID (immutable) to inspect. When provided, returns the full entry list for that table. When None (the default), returns a summary listing all active and immutable memtables without their entries.

Returns

A dict with the structure::

# When table_id is None (listing mode):
{
    "active": { "table_id": ..., "entry_count": ..., ... },
    "immutable": [ { "snapshot_id": ..., ... }, ... ],
}

# When table_id matches a table (detail mode):
{
    "type": "active" | "immutable",
    "table_id": ...,
    "entry_count": ...,
    "entries": [ {"key": ..., "seq": ..., "ts": ..., "value": ...}, ... ],
}
Source code in app/engine/memtable_manager.py
def show_mem(
    self, table_id: str | None = None,
) -> dict[str, object]:
    """Inspect memtable contents.

    Parameters
    ----------
    table_id:
        **Optional.** The table ID (active) or snapshot ID (immutable)
        to inspect.  When provided, returns the full entry list for
        that table.  When ``None`` (the default), returns a summary
        listing all active and immutable memtables without their
        entries.

    Returns
    -------
    A dict with the structure::

        # When table_id is None (listing mode):
        {
            "active": { "table_id": ..., "entry_count": ..., ... },
            "immutable": [ { "snapshot_id": ..., ... }, ... ],
        }

        # When table_id matches a table (detail mode):
        {
            "type": "active" | "immutable",
            "table_id": ...,
            "entry_count": ...,
            "entries": [ {"key": ..., "seq": ..., "ts": ..., "value": ...}, ... ],
        }
    """
    # ── Detail mode: return entries for a specific table ───────────
    if table_id is not None:
        # Check active memtable
        if self._active.table_id == table_id:
            entries = [
                {
                    "key": k,
                    "seq": seq,
                    "timestamp_ms": ts,
                    "value": v,
                }
                for k, seq, ts, v in self._active.items()
            ]
            meta = self._active.metadata
            return {
                "type": "active",
                "table_id": meta.table_id,
                "entry_count": meta.entry_count,
                "size_bytes": meta.size_bytes,
                "entries": entries,
            }

        # Check immutable queue
        for table in self._immutable_q:
            if table.snapshot_id == table_id:
                entries = [
                    {
                        "key": k,
                        "seq": seq,
                        "timestamp_ms": ts,
                        "value": v,
                    }
                    for k, seq, ts, v in table.items()
                ]
                imm_meta = table.metadata
                return {
                    "type": "immutable",
                    "table_id": imm_meta.snapshot_id,
                    "entry_count": imm_meta.entry_count,
                    "size_bytes": imm_meta.size_bytes,
                    "seq_min": imm_meta.seq_min,
                    "seq_max": imm_meta.seq_max,
                    "entries": entries,
                }

        return {"error": f"No table found with id {table_id!r}"}

    # ── Listing mode: summarise all tables ─────────────────────────
    active_meta = self._active.metadata
    immutable_metas = [t.metadata for t in self._immutable_q]

    return {
        "active": {
            "table_id": active_meta.table_id,
            "entry_count": active_meta.entry_count,
            "size_bytes": active_meta.size_bytes,
            "seq_first": active_meta.seq_first,
            "seq_last": active_meta.seq_last,
        },
        "immutable": [
            {
                "snapshot_id": m.snapshot_id,
                "entry_count": m.entry_count,
                "size_bytes": m.size_bytes,
                "seq_min": m.seq_min,
                "seq_max": m.seq_max,
                "tombstone_count": m.tombstone_count,
            }
            for m in immutable_metas
        ],
    }