Skip to content

SSTable

Sorted String Tables — on-disk persistent storage.

SSTableWriter

SSTableWriter(directory, file_id, snapshot_id, level, block_size=BLOCK_SIZE_DEFAULT, block_entries=0, bloom_n=1000000, bloom_fpr=0.01)

Write-once SSTable builder.

Records must be added in ascending key order via :meth:put. Call :meth:finish (async) or :meth:finish_sync when done.

Initialize an SSTable writer and open the data file for writing.

The output directory is created if it does not exist.

Parameters:

Name Type Description Default
directory Path

Target directory for the SSTable files.

required
file_id FileID

Unique identifier for this SSTable.

required
snapshot_id SnapshotID

ID of the memtable snapshot being flushed.

required
level Level

Compaction level (0 for flush, 1+ for compaction output).

required
block_size int

Target data block size in bytes. Used when block_entries is 0.

BLOCK_SIZE_DEFAULT
block_entries int

If non-zero, flush a block after this many records instead of using byte-size threshold.

0
bloom_n int

Expected element count for the bloom filter. The flush path passes len(snapshot) (exact count) and the compaction path passes the total input record count.

1000000
bloom_fpr float

Target false positive rate for the bloom filter. Read from config.bloom_fpr which selects between bloom_fpr_dev (0.05) and bloom_fpr_prod (0.01) based on the current environment.

0.01
Source code in app/sstable/writer.py
def __init__(
    self,
    directory: Path,
    file_id: FileID,
    snapshot_id: SnapshotID,
    level: Level,
    block_size: int = BLOCK_SIZE_DEFAULT,
    block_entries: int = 0,
    bloom_n: int = 1_000_000,
    bloom_fpr: float = 0.01,
) -> None:
    """Initialize an SSTable writer and open the data file for writing.

    The output directory is created if it does not exist.

    Args:
        directory: Target directory for the SSTable files.
        file_id: Unique identifier for this SSTable.
        snapshot_id: ID of the memtable snapshot being flushed.
        level: Compaction level (0 for flush, 1+ for compaction output).
        block_size: Target data block size in bytes. Used when
            ``block_entries`` is 0.
        block_entries: If non-zero, flush a block after this many
            records instead of using byte-size threshold.
        bloom_n: Expected element count for the bloom filter. The
            flush path passes ``len(snapshot)`` (exact count) and the
            compaction path passes the total input record count.
        bloom_fpr: Target false positive rate for the bloom filter.
            Read from ``config.bloom_fpr`` which selects between
            ``bloom_fpr_dev`` (0.05) and ``bloom_fpr_prod`` (0.01)
            based on the current environment.
    """
    self._dir = directory
    self._file_id = file_id
    self._snapshot_id = snapshot_id
    self._level = level
    self._block_size = block_size
    self._block_entries = block_entries
    self._state = _State.OPEN

    # File paths
    self._data_path = directory / "data.bin"
    self._index_path = directory / "index.bin"
    self._filter_path = directory / "filter.bin"
    self._meta_path = directory / "meta.json"

    # Bloom + index builders
    self._bloom = BloomFilter(n=bloom_n, fpr=bloom_fpr)
    self._bloom_fpr = bloom_fpr
    self._index = SparseIndex()

    # Block buffering
    self._block_buf: list[bytes] = []
    self._block_buf_size: int = 0
    self._block_buf_count: int = 0
    self._block_first_key: Key | None = None

    # Output file handle
    directory.mkdir(parents=True, exist_ok=True)
    self._data_fd = open(self._data_path, "wb")  # noqa: SIM115
    self._data_offset: int = 0

    # Stats
    self._record_count: int = 0
    self._block_count: int = 0
    self._min_key: Key | None = None
    self._max_key: Key | None = None
    self._seq_min: SeqNum | None = None
    self._seq_max: SeqNum | None = None
    self._last_key: Key | None = None

put(key, seq, timestamp_ms, value)

Add a record. Keys must be in ascending order.

Source code in app/sstable/writer.py
def put(
    self,
    key: Key,
    seq: SeqNum,
    timestamp_ms: int,
    value: Value,
) -> None:
    """Add a record. Keys must be in ascending order."""
    if self._state is not _State.OPEN:
        raise SSTableWriteError("Writer is not in OPEN state")

    # Validate ascending key order
    if self._last_key is not None and key <= self._last_key:
        raise SSTableWriteError(
            f"Keys must be ascending: {key!r} <= {self._last_key!r}"
        )

    encoded = encode_record(key, seq, timestamp_ms, value)

    # Track first key of current block
    if self._block_first_key is None:
        self._block_first_key = key
        self._index.add(key, self._data_offset)

    self._block_buf.append(encoded)
    self._block_buf_size += len(encoded)
    self._block_buf_count += 1

    # Add to bloom
    self._bloom.add(key)

    # Stats
    self._record_count += 1
    if self._min_key is None:
        self._min_key = key
    self._max_key = key
    self._last_key = key
    if self._seq_min is None or seq < self._seq_min:
        self._seq_min = seq
    if self._seq_max is None or seq > self._seq_max:
        self._seq_max = seq

    # Flush block if full (entry-count mode or byte-size mode)
    if self._block_entries > 0:
        if self._block_buf_count >= self._block_entries:
            self._flush_block()
    elif self._block_buf_size >= self._block_size:
        self._flush_block()

finish() async

Finalize the SSTable (async). Bloom + index written concurrently.

Source code in app/sstable/writer.py
async def finish(self) -> SSTableMeta:
    """Finalize the SSTable (async). Bloom + index written concurrently."""
    if self._state is not _State.OPEN:
        raise SSTableWriteError("Writer is not in OPEN state")
    self._state = _State.FINISHING
    self._flush_block_and_close()

    bloom_bytes = self._bloom.to_bytes()
    index_bytes = self._index.to_bytes()

    # Write bloom and index concurrently
    await asyncio.gather(
        asyncio.to_thread(self._write_file, self._filter_path, bloom_bytes),
        asyncio.to_thread(self._write_file, self._index_path, index_bytes),
    )

    return self._finalize()

finish_sync()

Finalize the SSTable (sync). For L1+ subprocess use.

Source code in app/sstable/writer.py
def finish_sync(self) -> SSTableMeta:
    """Finalize the SSTable (sync). For L1+ subprocess use."""
    if self._state is not _State.OPEN:
        raise SSTableWriteError("Writer is not in OPEN state")
    self._state = _State.FINISHING
    self._flush_block_and_close()

    self._write_file(self._filter_path, self._bloom.to_bytes())
    self._write_file(self._index_path, self._index.to_bytes())

    return self._finalize()

SSTableReader

SSTableReader(directory, file_id, meta, index, bloom, cache, mm, fd)

Read-only access to one SSTable.

Bloom filter and sparse index are loaded lazily on the first call to :meth:get. Once loaded, they are cached in the shared :class:BlockCache so a future reader for the same file (e.g. after engine restart) can skip the disk read.

Construct an SSTableReader (prefer the :meth:open factory).

Parameters:

Name Type Description Default
directory Path

Path to the SSTable directory on disk.

required
file_id FileID

Unique identifier for this SSTable.

required
meta SSTableMeta

Parsed metadata from meta.json.

required
index SparseIndex | None

Pre-loaded sparse index, or None for lazy loading.

required
bloom BloomFilter | None

Pre-loaded bloom filter, or None for lazy loading.

required
cache BlockCache | None

Shared block cache for cross-reader reuse, or None.

required
mm mmap | None

Memory-mapped data.bin file, or None if empty.

required
fd int

Raw file descriptor for the mmap (kept open until close).

required
Source code in app/sstable/reader.py
def __init__(
    self,
    directory: Path,
    file_id: FileID,
    meta: SSTableMeta,
    index: SparseIndex | None,
    bloom: BloomFilter | None,
    cache: BlockCache | None,
    mm: mmap.mmap | None,
    fd: int,
) -> None:
    """Construct an SSTableReader (prefer the :meth:`open` factory).

    Args:
        directory: Path to the SSTable directory on disk.
        file_id: Unique identifier for this SSTable.
        meta: Parsed metadata from ``meta.json``.
        index: Pre-loaded sparse index, or ``None`` for lazy loading.
        bloom: Pre-loaded bloom filter, or ``None`` for lazy loading.
        cache: Shared block cache for cross-reader reuse, or ``None``.
        mm: Memory-mapped ``data.bin`` file, or ``None`` if empty.
        fd: Raw file descriptor for the mmap (kept open until close).
    """
    self._dir = directory
    self._file_id = file_id
    self._meta = meta
    self._index = index
    self._bloom = bloom
    self._cache = cache
    self._mm = mm
    self._fd = fd
    self._loaded = index is not None and bloom is not None

meta property

Return the SSTable metadata.

file_id property

Return the file ID.

open(directory, file_id, cache=None, level=0) async classmethod

Open an SSTable for reading.

Bloom and index are NOT loaded here — they are deferred to the first get() call (lazy loading). Only meta.json is parsed and data.bin is memory-mapped.

Source code in app/sstable/reader.py
@classmethod
async def open(
    cls,
    directory: Path,
    file_id: FileID,
    cache: BlockCache | None = None,
    level: Level = 0,
) -> SSTableReader:
    """Open an SSTable for reading.

    Bloom and index are NOT loaded here — they are deferred to the
    first ``get()`` call (lazy loading).  Only ``meta.json`` is
    parsed and ``data.bin`` is memory-mapped.
    """
    meta_path = directory / "meta.json"
    meta = SSTableMeta.from_json(
        meta_path.read_text(encoding="utf-8"),
    )

    # mmap the data file
    data_path = directory / meta.data_file
    fd = os.open(str(data_path), os.O_RDONLY)
    try:
        size = os.fstat(fd).st_size
        if size == 0:
            mm: mmap.mmap | None = None
        else:
            mm = mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
    except Exception:
        os.close(fd)
        raise

    reader = cls(
        directory=directory,
        file_id=file_id,
        meta=meta,
        index=None,   # lazy
        bloom=None,   # lazy
        cache=cache,
        mm=mm,
        fd=fd,
    )

    logger.info(
        "SSTable opened (lazy)",
        file_id=file_id,
        records=meta.record_count,
        level=level,
    )
    return reader

get(key)

Look up key. Returns (seq, timestamp_ms, value) or None.

Flow: bloom check → sparse index bisect → block scan. Bloom + index loaded lazily on first call.

Source code in app/sstable/reader.py
def get(self, key: Key) -> tuple[SeqNum, int, Value] | None:
    """Look up *key*. Returns ``(seq, timestamp_ms, value)`` or None.

    Flow: bloom check → sparse index bisect → block scan.
    Bloom + index loaded lazily on first call.
    """
    # Empty SSTable
    if self._mm is None:
        return None

    # Lazy load bloom + index
    self._ensure_loaded()
    assert self._bloom is not None  # noqa: S101
    assert self._index is not None  # noqa: S101

    # 1. Bloom filter — fast negative
    if not self._bloom.may_contain(key):
        return None

    # 2. Sparse index — find candidate block
    block_offset = self._index.floor_offset(key)
    if block_offset is None:
        return None

    # 3. Determine block end (next block offset or EOF)
    block_end = self._find_block_end(block_offset)

    # 4. Check block cache before scanning
    block_data: bytes | None = None
    if self._cache is not None:
        block_data = self._cache.get(self._file_id, block_offset)

    if block_data is not None:
        mv = memoryview(block_data)
        scan_start = 0
        scan_end = len(block_data)
    else:
        mv = memoryview(self._mm)
        scan_start = block_offset
        scan_end = block_end
        # Populate cache on miss
        if self._cache is not None:
            self._cache.put(
                self._file_id,
                block_offset,
                bytes(self._mm[block_offset:block_end]),
            )

    # 5. Scan the block
    best: tuple[SeqNum, int, Value] | None = None
    for rec in iter_block(mv, scan_start, scan_end):
        if rec.key == key:
            if best is None or rec.seq > best[0]:
                best = (rec.seq, rec.timestamp_ms, rec.value)
        elif rec.key > key:
            break  # keys are sorted, no point continuing

    return best

scan_all()

Return all records in this SSTable as a sorted list.

Used by the disk command to display SSTable contents.

Source code in app/sstable/reader.py
def scan_all(self) -> list[tuple[Key, SeqNum, int, Value]]:
    """Return all records in this SSTable as a sorted list.

    Used by the ``disk`` command to display SSTable contents.
    """
    if self._mm is None or self._meta.size_bytes == 0:
        return []
    mv = memoryview(self._mm)
    return [
        (rec.key, rec.seq, rec.timestamp_ms, rec.value)
        for rec in iter_block(mv, 0, self._meta.size_bytes)
    ]

iter_sorted()

Yield all records in ascending key order without materialising.

Used as input to KWayMergeIterator during compaction. More memory-efficient than scan_all() for large SSTables.

Source code in app/sstable/reader.py
def iter_sorted(self) -> Iterator[tuple[Key, SeqNum, int, Value]]:
    """Yield all records in ascending key order without materialising.

    Used as input to KWayMergeIterator during compaction.
    More memory-efficient than scan_all() for large SSTables.
    """
    if self._mm is None or self._meta.size_bytes == 0:
        return
    mv = memoryview(self._mm)
    for rec in iter_block(mv, 0, self._meta.size_bytes):
        yield (rec.key, rec.seq, rec.timestamp_ms, rec.value)

close()

Release mmap and file descriptor. Never raises.

Source code in app/sstable/reader.py
def close(self) -> None:
    """Release mmap and file descriptor. Never raises."""
    if self._mm is not None:
        with contextlib.suppress(Exception):
            self._mm.close()
    with contextlib.suppress(Exception):
        os.close(self._fd)

    logger.debug("SSTable closed", file_id=self._file_id)

SSTableMeta

SSTableMeta(file_id, snapshot_id, level, size_bytes, record_count, block_count, min_key, max_key, seq_min, seq_max, bloom_fpr, created_at, data_file, index_file, filter_file) dataclass

Immutable metadata for one SSTable.

Serialized to meta.json inside the SSTable directory. Its presence on disk is the completeness signal — if missing, the SSTable is considered incomplete and ignored on recovery.

Attributes:

Name Type Description
file_id FileID

Unique identifier (UUIDv7 hex) for this SSTable.

snapshot_id SnapshotID

ID of the memtable snapshot that produced this table.

level Level

Compaction level (0 for flush output, 1+ for compaction output).

size_bytes int

Total size of data.bin in bytes.

record_count int

Number of key-value records stored.

block_count int

Number of data blocks in data.bin.

min_key Key

Lexicographically smallest key in this table.

max_key Key

Lexicographically largest key in this table.

seq_min SeqNum

Smallest sequence number across all records.

seq_max SeqNum

Largest sequence number across all records.

bloom_fpr float

Configured false positive rate of the bloom filter.

created_at str

ISO-8601 timestamp of when this table was written.

data_file str

Filename of the data file (always data.bin).

index_file str

Filename of the sparse index (always index.bin).

filter_file str

Filename of the bloom filter (always filter.bin).

to_json()

Serialize to JSON with base64-encoded keys.

Source code in app/sstable/meta.py
def to_json(self) -> str:
    """Serialize to JSON with base64-encoded keys."""
    d = {
        "file_id": self.file_id,
        "snapshot_id": self.snapshot_id,
        "level": self.level,
        "size_bytes": self.size_bytes,
        "record_count": self.record_count,
        "block_count": self.block_count,
        "min_key": base64.b64encode(self.min_key).decode("ascii"),
        "max_key": base64.b64encode(self.max_key).decode("ascii"),
        "seq_min": self.seq_min,
        "seq_max": self.seq_max,
        "bloom_fpr": self.bloom_fpr,
        "created_at": self.created_at,
        "data_file": self.data_file,
        "index_file": self.index_file,
        "filter_file": self.filter_file,
    }
    return json.dumps(d, indent=2)

from_json(data) classmethod

Deserialize from JSON.

Source code in app/sstable/meta.py
@classmethod
def from_json(cls, data: str) -> SSTableMeta:
    """Deserialize from JSON."""
    d = json.loads(data)
    return cls(
        file_id=d["file_id"],
        snapshot_id=d["snapshot_id"],
        level=d["level"],
        size_bytes=d["size_bytes"],
        record_count=d["record_count"],
        block_count=d["block_count"],
        min_key=base64.b64decode(d["min_key"]),
        max_key=base64.b64decode(d["max_key"]),
        seq_min=d["seq_min"],
        seq_max=d["seq_max"],
        bloom_fpr=d["bloom_fpr"],
        created_at=d["created_at"],
        data_file=d["data_file"],
        index_file=d["index_file"],
        filter_file=d["filter_file"],
    )

SSTableRegistry

SSTableRegistry()

Thread-safe registry of open SSTable readers with ref counting.

Initialize an empty reader registry with no registered readers.

Source code in app/sstable/registry.py
def __init__(self) -> None:
    """Initialize an empty reader registry with no registered readers."""
    self._readers: dict[FileID, SSTableReader] = {}
    self._refcounts: dict[FileID, int] = {}
    self._marked: set[FileID] = set()
    self._lock = threading.Lock()

register(file_id, reader)

Register an open reader.

Source code in app/sstable/registry.py
def register(self, file_id: FileID, reader: SSTableReader) -> None:
    """Register an open reader."""
    with self._lock:
        self._readers[file_id] = reader
        self._refcounts[file_id] = 0
    logger.debug("Reader registered", file_id=file_id)

open_reader(file_id)

Acquire a ref-counted handle to a reader.

Source code in app/sstable/registry.py
@contextmanager
def open_reader(self, file_id: FileID) -> Iterator[SSTableReader]:
    """Acquire a ref-counted handle to a reader."""
    with self._lock:
        reader = self._readers.get(file_id)
        if reader is None:
            raise KeyError(f"No reader for file_id={file_id}")
        self._refcounts[file_id] += 1

    try:
        yield reader
    finally:
        with self._lock:
            self._refcounts[file_id] -= 1
            if file_id in self._marked and self._refcounts[file_id] == 0:
                self._cleanup(file_id)

mark_for_deletion(file_id)

Mark a reader for deletion. Cleaned up when refcount hits 0.

Source code in app/sstable/registry.py
def mark_for_deletion(self, file_id: FileID) -> None:
    """Mark a reader for deletion. Cleaned up when refcount hits 0."""
    with self._lock:
        self._marked.add(file_id)
        if self._refcounts.get(file_id, 0) == 0:
            self._cleanup(file_id)

close_all()

Close idle readers and mark in-use readers for deferred cleanup.

Source code in app/sstable/registry.py
def close_all(self) -> None:
    """Close idle readers and mark in-use readers for deferred cleanup."""
    with self._lock:
        for file_id in list(self._readers):
            if self._refcounts.get(file_id, 0) == 0:
                self._cleanup(file_id)
            else:
                self._marked.add(file_id)
                logger.warning(
                    "Reader deferred (in use)",
                    file_id=file_id,
                    refcount=self._refcounts[file_id],
                )

SSTableManager

SSTableManager(data_root, cache, registry, l0_order, l0_dirs, manifest, config=None)

Manages all on-disk SSTable state (L0 + L1 + L2 + L3).

Construct the SSTable manager (prefer the :meth:load factory).

Parameters:

Name Type Description Default
data_root Path

Root data directory for the engine.

required
cache BlockCache

Shared block cache for data blocks, indexes, and blooms.

required
registry SSTableRegistry

Ref-counted registry of open SSTable readers.

required
l0_order list[FileID]

L0 file IDs in newest-first order.

required
l0_dirs dict[FileID, Path]

Mapping from L0 file ID to its on-disk directory.

required
manifest Manifest

Persistent manifest for SSTable ordering.

required
config LSMConfig | None

Live engine configuration, or None for defaults.

None
Source code in app/engine/sstable_manager.py
def __init__(
    self,
    data_root: Path,
    cache: BlockCache,
    registry: SSTableRegistry,
    l0_order: list[FileID],
    l0_dirs: dict[FileID, Path],
    manifest: Manifest,
    config: LSMConfig | None = None,
) -> None:
    """Construct the SSTable manager (prefer the :meth:`load` factory).

    Args:
        data_root: Root data directory for the engine.
        cache: Shared block cache for data blocks, indexes, and blooms.
        registry: Ref-counted registry of open SSTable readers.
        l0_order: L0 file IDs in newest-first order.
        l0_dirs: Mapping from L0 file ID to its on-disk directory.
        manifest: Persistent manifest for SSTable ordering.
        config: Live engine configuration, or ``None`` for defaults.
    """
    self._data_root = data_root
    self._cache = cache
    self._registry = registry
    self._l0_order = l0_order  # newest first
    self._l0_dirs = l0_dirs
    self._manifest = manifest
    self._config = config
    self._max_seq: SeqNum = 0
    self._state_lock = threading.Lock()
    # Per-level single file: level → (file_id, directory)
    self._level_files: dict[Level, tuple[FileID, Path]] = {}
    self._level_rwlocks: dict[Level, AsyncRWLock] = {}

max_level property

Maximum level depth (from config or default).

cache property

Return the block cache instance.

l0_count property

Number of L0 SSTables.

load(data_root, cache=None, config=None) async classmethod

Load SSTables from disk using the manifest for ordering.

Source code in app/engine/sstable_manager.py
@classmethod
async def load(
    cls,
    data_root: Path,
    cache: BlockCache | None = None,
    config: LSMConfig | None = None,
) -> SSTableManager:
    """Load SSTables from disk using the manifest for ordering."""
    if cache is None:
        cache = BlockCache()

    registry = SSTableRegistry()
    l0_dirs: dict[FileID, Path] = {}

    l0_dir = data_root / "sstable" / "L0"
    manifest_path = data_root / "sstable" / "manifest.json"
    manifest = Manifest(manifest_path)

    # Discover all complete L0 SSTables on disk
    on_disk: dict[FileID, Path] = {}
    if l0_dir.exists():
        for child in l0_dir.iterdir():
            if not child.is_dir():
                continue
            if not (child / "meta.json").exists():
                logger.warning(
                    "Incomplete SSTable skipped", path=str(child),
                )
                continue
            on_disk[child.name] = child

    # Load manifest
    layout = manifest.read()
    manifest_l0: list[str] = layout.get("l0_order", [])  # type: ignore[assignment]
    manifest_levels: dict[str, str] = layout.get("levels", {})  # type: ignore[assignment]

    # Reconcile L0: manifest order + orphans
    ordered: list[FileID] = []
    seen: set[FileID] = set()
    for fid in manifest_l0:
        if fid in on_disk:
            ordered.append(fid)
            seen.add(fid)
        else:
            logger.warning(
                "Manifest references missing SSTable", file_id=fid,
            )

    orphans = sorted(
        (fid for fid in on_disk if fid not in seen),
        reverse=True,
    )
    if orphans:
        logger.info("Orphan L0 SSTables found", count=len(orphans))
        ordered.extend(orphans)

    # Open L0 readers
    l0_order: list[FileID] = []
    for fid in ordered:
        sst_dir = on_disk[fid]
        try:
            reader = await SSTableReader.open(
                sst_dir, fid, cache, level=0,
            )
        except Exception:
            logger.exception(
                "Failed to open SSTable", path=str(sst_dir),
            )
            continue
        registry.register(fid, reader)
        l0_order.append(fid)
        l0_dirs[fid] = sst_dir

    mgr = cls(
        data_root, cache, registry, l0_order, l0_dirs, manifest, config,
    )

    # Compute max seq from L0
    for fid in l0_order:
        with registry.open_reader(fid) as reader:
            if reader.meta.seq_max > mgr._max_seq:
                mgr._max_seq = reader.meta.seq_max

    # Load L1+ from manifest
    for level_str, fid in manifest_levels.items():
        level = int(level_str)
        await mgr._load_level_file(level, fid, cache, registry)

    # Check for orphan level files on disk (not in manifest)
    for level in range(1, mgr.max_level + 1):
        if level in mgr._level_files:
            continue  # already loaded from manifest
        level_root = data_root / "sstable" / f"L{level}"
        if not level_root.exists():
            continue
        for child in level_root.iterdir():
            if child.is_dir() and (child / "meta.json").exists():
                logger.info(
                    "Orphan SSTable found",
                    level=level,
                    file_id=child.name,
                )
                await mgr._load_level_file(
                    level, child.name, cache, registry,
                )
                break

    # Persist reconciled layout
    mgr._persist_manifest()

    # Clean up stale level directories
    for level in range(1, mgr.max_level + 1):
        mgr._cleanup_stale_level_dirs(level)

    logger.info(
        "SSTableManager loaded",
        l0_count=len(l0_order),
        levels={
            lv: fid for lv, (fid, _) in mgr._level_files.items()
        },
        max_seq=mgr._max_seq,
    )
    return mgr

flush(snapshot, file_id) async

Write snapshot to a new L0 SSTable, return (meta, reader).

The bloom filter is sized to len(snapshot) (exact entry count) with a false positive rate from config.bloom_fpr.

Source code in app/engine/sstable_manager.py
async def flush(
    self,
    snapshot: ImmutableMemTable,
    file_id: FileID,
) -> tuple[SSTableMeta, SSTableReader]:
    """Write *snapshot* to a new L0 SSTable, return (meta, reader).

    The bloom filter is sized to ``len(snapshot)`` (exact entry count)
    with a false positive rate from ``config.bloom_fpr``.
    """
    sst_dir = self._data_root / "sstable" / "L0" / file_id

    block_size = 0
    block_entries = 0
    if self._config is not None and self._config.is_dev:
        block_entries = max(1, len(snapshot) // 8)
    elif self._config is not None and self._config.is_prod:
        block_size = max(1, self._config.max_memtable_bytes // 8)
    else:
        block_size = int(self._config.block_size) if self._config else 4096

    bloom_fpr = self._config.bloom_fpr if self._config else 0.01

    writer = SSTableWriter(
        directory=sst_dir,
        file_id=file_id,
        snapshot_id=snapshot.snapshot_id,
        level=0,
        block_size=block_size,
        block_entries=block_entries,
        bloom_n=max(1, len(snapshot)),
        bloom_fpr=bloom_fpr,
    )

    for key, seq, ts, value in snapshot.items():
        writer.put(key, seq, ts, value)

    meta = await writer.finish()
    reader = await SSTableReader.open(
        sst_dir, file_id, self._cache, level=0,
    )
    return meta, reader

commit(file_id, reader, sst_dir)

Register a flushed L0 SSTable and persist manifest.

Source code in app/engine/sstable_manager.py
def commit(
    self, file_id: FileID, reader: SSTableReader, sst_dir: Path,
) -> None:
    """Register a flushed L0 SSTable and persist manifest."""
    self._registry.register(file_id, reader)
    with self._state_lock:
        self._l0_order.insert(0, file_id)
        self._l0_dirs[file_id] = sst_dir
        if reader.meta.seq_max > self._max_seq:
            self._max_seq = reader.meta.seq_max

    self._persist_manifest()

    logger.info(
        "SSTable committed",
        file_id=file_id,
        l0_count=len(self._l0_order),
    )

commit_compaction_async(task, new_meta, new_reader) async

Atomically commit a compaction result.

Acquires write locks on src and dst levels in ascending order.

Commit ordering (non-negotiable): 1. Register new reader (dst level becomes readable) 2. Write manifest (durable) 3. Mark old files for deletion (deferred by ref-count) 4. Update in-memory state 5. Evict stale cache blocks

Source code in app/engine/sstable_manager.py
async def commit_compaction_async(
    self,
    task: CompactionTask,
    new_meta: SSTableMeta,
    new_reader: SSTableReader,
) -> None:
    """Atomically commit a compaction result.

    Acquires write locks on src and dst levels in ascending order.

    Commit ordering (non-negotiable):
      1. Register new reader   (dst level becomes readable)
      2. Write manifest        (durable)
      3. Mark old files for deletion (deferred by ref-count)
      4. Update in-memory state
      5. Evict stale cache blocks
    """
    src_level = task.output_level - 1
    dst_level = task.output_level

    logger.info(
        "Compaction commit starting",
        input_files=len(task.input_file_ids),
        new_file=new_meta.file_id,
        src_level=src_level,
        dst_level=dst_level,
    )

    async with (  # noqa: SIM117
        self._level_lock(src_level).write_lock(),
        self._level_lock(dst_level).write_lock(),
    ):
            logger.debug("Write locks acquired")
            with self._state_lock:
                # Step 1: Register new reader
                self._registry.register(
                    new_meta.file_id, new_reader,
                )
                logger.debug("Step 1/5: Reader registered",
                             file_id=new_meta.file_id)

                # Step 2: Write manifest
                input_set = set(task.input_file_ids)
                old_dst = self._level_files.get(dst_level)
                old_dst_fid = old_dst[0] if old_dst else None

                if src_level == 0:
                    new_l0 = [
                        f for f in self._l0_order
                        if f not in input_set
                    ]
                    self._l0_order = new_l0
                else:
                    # Src level file consumed → remove
                    self._level_files.pop(src_level, None)

                self._level_files[dst_level] = (
                    new_meta.file_id,
                    Path(task.output_dir),
                )
                self._persist_manifest()
                logger.debug("Step 2/5: Manifest written")

                # Step 3: Mark old files for deletion
                for fid in task.input_file_ids:
                    self._registry.mark_for_deletion(fid)
                if old_dst_fid is not None:
                    self._registry.mark_for_deletion(old_dst_fid)
                logger.debug("Step 3/5: Old files marked",
                             count=len(task.input_file_ids),
                             old_dst=old_dst_fid)

                # Step 4: Update state
                if new_meta.seq_max > self._max_seq:
                    self._max_seq = new_meta.seq_max
                logger.debug("Step 4/5: State updated")

                # Step 5: Evict cache
                self._cache.invalidate_all(task.input_file_ids)
                logger.debug("Step 5/5: Cache invalidated")

    logger.info(
        "Compaction commit done",
        src_level=src_level,
        dst_level=dst_level,
        l0_remaining=len(self._l0_order),
        dst_file=new_meta.file_id,
    )

    # Step 6 (outside locks): delete old directories from disk
    self._delete_compacted_dirs(task.input_file_ids, old_dst_fid)

get(key) async

Look up key across L0 then L1, L2, L3.

Read locks prevent compaction commits from swapping level contents mid-scan.

Source code in app/engine/sstable_manager.py
async def get(self, key: Key) -> tuple[SeqNum, int, Value] | None:
    """Look up *key* across L0 then L1, L2, L3.

    Read locks prevent compaction commits from swapping level
    contents mid-scan.
    """
    with self._state_lock:
        l0_snapshot = list(self._l0_order)
        level_snapshot = dict(self._level_files)

    best: tuple[SeqNum, int, Value] | None = None

    # L0: hold read lock while scanning
    async with self._level_lock(0).read_lock():
        for file_id in l0_snapshot:
            try:
                with self._registry.open_reader(file_id) as reader:
                    result = reader.get(key)
            except KeyError:
                continue
            if result is not None and (
                best is None or result[0] > best[0]
            ):
                best = result

    # L1+: one file per level
    for level in range(1, self.max_level + 1):
        entry = level_snapshot.get(level)
        if entry is None:
            continue
        fid, _ = entry
        async with self._level_lock(level).read_lock():
            try:
                with self._registry.open_reader(fid) as reader:
                    result = reader.get(key)
            except KeyError:
                result = None
            if result is not None and (
                best is None or result[0] > best[0]
            ):
                best = result

    return best

compaction_snapshot()

Return a snapshot of state needed by CompactionManager.

Source code in app/engine/sstable_manager.py
def compaction_snapshot(self) -> dict[str, object]:
    """Return a snapshot of state needed by CompactionManager."""
    with self._state_lock:
        level_snap: dict[str, tuple[str, str]] = {
            str(lv): (fid, str(d))
            for lv, (fid, d) in self._level_files.items()
        }
        return {
            "l0_order": list(self._l0_order),
            "l0_dirs": {
                fid: str(d) for fid, d in self._l0_dirs.items()
            },
            "level_files": level_snap,
        }

level_seq_min(level)

Return seq_min of the SSTable at level, or 0 if none.

Source code in app/engine/sstable_manager.py
def level_seq_min(self, level: Level) -> SeqNum:
    """Return seq_min of the SSTable at *level*, or 0 if none."""
    entry = self._level_files.get(level)
    if entry is None:
        return 0
    fid, _ = entry
    try:
        with self._registry.open_reader(fid) as reader:
            return reader.meta.seq_min
    except KeyError:
        return 0

level_size_bytes(level)

Return the size in bytes of the SSTable at level, or 0.

Source code in app/engine/sstable_manager.py
def level_size_bytes(self, level: Level) -> int:
    """Return the size in bytes of the SSTable at *level*, or 0."""
    entry = self._level_files.get(level)
    if entry is None:
        return 0
    fid, _ = entry
    try:
        with self._registry.open_reader(fid) as reader:
            return reader.meta.size_bytes
    except KeyError:
        return 0

level_record_count(level)

Return the record count of the SSTable at level, or 0.

Source code in app/engine/sstable_manager.py
def level_record_count(self, level: Level) -> int:
    """Return the record count of the SSTable at *level*, or 0."""
    entry = self._level_files.get(level)
    if entry is None:
        return 0
    fid, _ = entry
    try:
        with self._registry.open_reader(fid) as reader:
            return reader.meta.record_count
    except KeyError:
        return 0

max_seq_seen()

Return the highest seq across all SSTables.

Source code in app/engine/sstable_manager.py
def max_seq_seen(self) -> SeqNum:
    """Return the highest seq across all SSTables."""
    return self._max_seq

sst_dir_for(file_id)

Return the directory for a given SSTable file ID.

Source code in app/engine/sstable_manager.py
def sst_dir_for(self, file_id: FileID) -> Path:
    """Return the directory for a given SSTable file ID."""
    return self._data_root / "sstable" / "L0" / file_id

new_file_id()

Generate a new time-ordered UUIDv7 file ID.

Source code in app/engine/sstable_manager.py
def new_file_id(self) -> FileID:
    """Generate a new time-ordered UUIDv7 file ID."""
    return uuid7_hex()

show_disk(file_id=None)

Inspect SSTable contents.

Source code in app/engine/sstable_manager.py
def show_disk(
    self, file_id: FileID | None = None,
) -> dict[str, object]:
    """Inspect SSTable contents."""
    if file_id is not None:
        # Detail mode — check all levels
        all_dirs: dict[FileID, Path] = dict(self._l0_dirs)
        for _, (fid, d) in self._level_files.items():
            all_dirs[fid] = d

        if file_id not in all_dirs:
            return {"error": f"No SSTable found with id {file_id!r}"}

        try:
            with self._registry.open_reader(file_id) as reader:
                meta = reader.meta
                records = reader.scan_all()
        except KeyError:
            return {"error": f"No SSTable found with id {file_id!r}"}

        return {
            "file_id": meta.file_id,
            "level": meta.level,
            "snapshot_id": meta.snapshot_id,
            "record_count": meta.record_count,
            "block_count": meta.block_count,
            "size_bytes": meta.size_bytes,
            "min_key": meta.min_key,
            "max_key": meta.max_key,
            "seq_min": meta.seq_min,
            "seq_max": meta.seq_max,
            "created_at": meta.created_at,
            "entries": [
                {"key": k, "seq": seq, "timestamp_ms": ts, "value": v}
                for k, seq, ts, v in records
            ],
        }

    # Listing mode — by level
    def _meta_list(
        fids: list[FileID],
    ) -> list[dict[str, object]]:
        tables: list[dict[str, object]] = []
        for fid in fids:
            try:
                with self._registry.open_reader(fid) as reader:
                    m = reader.meta
                    tables.append({
                        "file_id": m.file_id,
                        "record_count": m.record_count,
                        "block_count": m.block_count,
                        "size_bytes": m.size_bytes,
                        "min_key": m.min_key,
                        "max_key": m.max_key,
                        "seq_min": m.seq_min,
                        "seq_max": m.seq_max,
                        "created_at": m.created_at,
                    })
            except KeyError:
                continue
        return tables

    result: dict[str, object] = {"L0": _meta_list(self._l0_order)}
    for level in range(1, self.max_level + 1):
        entry = self._level_files.get(level)
        fids = [entry[0]] if entry else []
        result[f"L{level}"] = _meta_list(fids)
    return result

close_all()

Close all open readers.

Source code in app/engine/sstable_manager.py
def close_all(self) -> None:
    """Close all open readers."""
    self._registry.close_all()
    logger.info("SSTableManager closed all readers")