Skip to content

Engine

The main entry point for the kiwi-db library.

LSMEngine

LSMEngine()

Bases: StorageEngine

Central entry point for the kiwidb key-value store.

Use the :meth:open classmethod to create an instance — never call __init__ directly.

Source code in app/engine/lsm_engine.py
def __init__(self) -> None:
    # Private — use open() instead
    self._wal: WALManager
    self._mem: MemTableManager
    self._sst: SSTableManager
    self._seq: SeqGenerator
    self._config: LSMConfig
    self._pipeline: FlushPipeline
    self._compaction: CompactionManager | None = None
    self._pipeline_task: asyncio.Task[None] | None = None
    self._data_root: Path
    self._closed: bool = False
    self._log_server: LogBroadcastServer | None = None

data_root property

Return the root data directory path.

log_port property

Return the TCP log broadcast port, or 0 if disabled.

config property

Return the live config instance.

open(data_root='./data', config_path=None) async classmethod

Open or create an LSM engine rooted at data_root.

Parameters

data_root: Root data directory for WAL, logs, and SSTables. config_path: Optional override for config file location. Defaults to <data_root>/config.json.

Source code in app/engine/lsm_engine.py
@classmethod
async def open(
    cls,
    data_root: str | Path = "./data",
    config_path: Path | None = None,
) -> LSMEngine:
    """Open or create an LSM engine rooted at *data_root*.

    Parameters
    ----------
    data_root:
        Root data directory for WAL, logs, and SSTables.
    config_path:
        Optional override for config file location.
        Defaults to ``<data_root>/config.json``.
    """
    engine = cls()
    engine._data_root = Path(data_root)

    # 1. Logging — configured BEFORE any log call
    engine._log_server = configure_logging(
        data_root=engine._data_root,
    )
    log_port = engine._log_server.port if engine._log_server else 0
    logger.info(
        "Startup started",
        data_root=str(engine._data_root),
        log_file=str(engine._data_root / "logs" / "kiwidb.log"),
        log_port=log_port,
    )

    # 2. Config — loaded early so all components can use it
    engine._config = LSMConfig.load(
        engine._data_root, config_path,
    )
    logger.info("Config loaded", config=engine._config.to_dict())

    # 3. WAL
    wal_path = engine._data_root / "wal" / "wal.log"
    engine._wal = WALManager.open(wal_path)
    logger.info("WAL opened", path=str(wal_path))

    # 4. SeqGenerator
    engine._seq = SeqGenerator()

    # 5. MemTableManager — uses config for thresholds
    engine._mem = MemTableManager(engine._config)
    logger.info(
        "MemTableManager created",
        table_id=engine._mem.active_metadata.table_id,
    )

    # 6. SSTableManager — load existing SSTables from disk
    cache = BlockCache(
        data_maxsize=int(engine._config.cache_data_entry_limit),
        index_maxsize=int(engine._config.cache_index_entry_limit),
        bloom_maxsize=int(engine._config.cache_bloom_entry_limit),
    )
    engine._sst = await SSTableManager.load(
        engine._data_root, cache, engine._config,
    )

    # 7. Recovery — use max seq from SSTables + WAL
    engine._recover()

    # 8. CompactionManager + FlushPipeline — start as daemon task
    engine._compaction = CompactionManager(
        sst=engine._sst,
        config=engine._config,
        data_root=engine._data_root,
    )
    max_workers = int(engine._config.flush_max_workers)
    engine._pipeline = FlushPipeline(
        mem=engine._mem,
        sst=engine._sst,
        wal=engine._wal,
        max_workers=max_workers,
        compaction=engine._compaction,
    )
    engine._pipeline_task = asyncio.create_task(engine._pipeline.run())
    engine._pipeline_task.add_done_callback(engine._on_pipeline_done)

    # 9. Startup compaction check (L0 may already be at threshold)
    if engine._sst.l0_count >= int(
        engine._config.l0_compaction_threshold,
    ):
        asyncio.create_task(engine._compaction.check_and_compact())

    # 10. Ready
    logger.info(
        "Startup complete",
        data_root=str(engine._data_root),
        live_keys=engine._mem.active_metadata.entry_count,
        seq=engine._seq.current,
        l0_sstables=engine._sst.l0_count,
        log_port=log_port,
    )
    return engine

put(key, value) async

Write a key-value pair.

Atomic under write_lock: seq generation + WAL append + memtable put + maybe_freeze.

Source code in app/engine/lsm_engine.py
async def put(self, key: Key, value: Value) -> None:
    """Write a key-value pair.

    Atomic under ``write_lock``: seq generation + WAL append +
    memtable put + maybe_freeze.
    """
    self._check_closed()
    logger.debug("Engine PUT start", key=key)

    try:
        with self._mem.write_lock:
            seq = self._seq.next()
            timestamp_ms = time.time_ns() // 1_000_000

            entry = WALEntry(
                seq=seq,
                timestamp_ms=timestamp_ms,
                op=OpType.PUT,
                key=key,
                value=value,
            )

            # WAL first — durability before visibility
            self._wal.sync_append(entry)

            # Memtable write
            self._mem.put(key, seq, timestamp_ms, value)

            # Check if freeze is needed
            self._mem.maybe_freeze()
    except Exception as exc:
        logger.error("Engine PUT failed", key=key, error=str(exc))
        raise

    logger.debug("Engine PUT done", key=key, seq=seq)

delete(key) async

Delete a key by writing a tombstone.

Source code in app/engine/lsm_engine.py
async def delete(self, key: Key) -> None:
    """Delete a key by writing a tombstone."""
    self._check_closed()
    logger.debug("Engine DELETE start", key=key)

    try:
        with self._mem.write_lock:
            seq = self._seq.next()
            timestamp_ms = time.time_ns() // 1_000_000

            entry = WALEntry(
                seq=seq,
                timestamp_ms=timestamp_ms,
                op=OpType.DELETE,
                key=key,
                value=TOMBSTONE,
            )

            self._wal.sync_append(entry)
            self._mem.put(key, seq, timestamp_ms, TOMBSTONE)
            self._mem.maybe_freeze()
    except Exception as exc:
        logger.error("Engine DELETE failed", key=key, error=str(exc))
        raise

    logger.debug("Engine DELETE done", key=key, seq=seq)

flush() async

Force-flush the active memtable to an SSTable.

Freezes the current memtable regardless of size/entry thresholds and queues it for the flush pipeline. Returns True if a snapshot was created, False if the memtable was empty.

Source code in app/engine/lsm_engine.py
async def flush(self) -> bool:
    """Force-flush the active memtable to an SSTable.

    Freezes the current memtable regardless of size/entry thresholds
    and queues it for the flush pipeline.  Returns ``True`` if a
    snapshot was created, ``False`` if the memtable was empty.
    """
    self._check_closed()
    logger.info("Engine FLUSH (manual) requested")

    with self._mem.write_lock:
        snapshot = self._mem.force_freeze()

    if snapshot is None:
        logger.info("Engine FLUSH skipped (empty memtable)")
        return False

    logger.info(
        "Engine FLUSH queued",
        snapshot_id=snapshot.snapshot_id,
        entry_count=len(snapshot),
        size_bytes=snapshot.size_bytes,
    )
    return True

get(key) async

Read a value by key. Returns None if not found or deleted.

Checks memtable first, then SSTables.

Source code in app/engine/lsm_engine.py
async def get(self, key: Key) -> Value | None:
    """Read a value by key. Returns None if not found or deleted.

    Checks memtable first, then SSTables.
    """
    self._check_closed()

    # 1. MemTable (active + immutable queue)
    result = self._mem.get(key)
    if result is not None:
        _, value = result
        hit = value != TOMBSTONE
        logger.debug("Engine GET", key=key, hit=hit, source="mem")
        return value if hit else None

    # 2. SSTables
    sst_result = await self._sst.get(key)
    if sst_result is not None:
        _, _, value = sst_result
        hit = value != TOMBSTONE
        logger.debug("Engine GET", key=key, hit=hit, source="sst")
        return value if hit else None

    logger.debug("Engine GET", key=key, hit=False, source="miss")
    return None

update_config(key, value)

Update a config field, persist to disk, return (old, new).

Source code in app/engine/lsm_engine.py
def update_config(
    self, key: str, value: int | float | str,
) -> tuple[int | float | str, int | float | str]:
    """Update a config field, persist to disk, return (old, new)."""
    self._check_closed()
    return self._config.set(key, value)

stats()

Return a snapshot of engine statistics.

Source code in app/engine/lsm_engine.py
def stats(self) -> EngineStats:
    """Return a snapshot of engine statistics."""
    active_meta = self._mem.active_metadata
    immutable_metas = self._mem.immutable_metadata
    wal_entries = len(self._wal.replay())
    return EngineStats(
        key_count=active_meta.entry_count,
        seq=self._seq.current,
        wal_entry_count=wal_entries,
        data_root=str(self._data_root),
        active_table_id=active_meta.table_id,
        active_size_bytes=active_meta.size_bytes,
        immutable_queue_len=len(immutable_metas),
        immutable_snapshots=[
            m.snapshot_id for m in immutable_metas
        ],
        l0_sstable_count=self._sst.l0_count,
    )

show_mem(table_id=None)

Show memtable contents.

Parameters

table_id: Optional. Pass a table ID (for the active memtable) or a snapshot ID (for an immutable memtable) to see its full entry list. When omitted or None, returns a summary listing all active and immutable memtables without entries.

Returns

dict In listing mode (no table_id)::

    {
        "active": {"table_id": ..., "entry_count": ..., ...},
        "immutable": [{"snapshot_id": ..., ...}, ...],
    }

In detail mode (with *table_id*)::

    {
        "type": "active" | "immutable",
        "table_id": ...,
        "entries": [{"key": ..., "seq": ..., ...}, ...],
    }
Source code in app/engine/lsm_engine.py
def show_mem(self, table_id: str | None = None) -> dict[str, object]:
    """Show memtable contents.

    Parameters
    ----------
    table_id:
        **Optional.**  Pass a table ID (for the active memtable) or
        a snapshot ID (for an immutable memtable) to see its full
        entry list.  When omitted or ``None``, returns a summary
        listing all active and immutable memtables without entries.

    Returns
    -------
    dict
        In listing mode (no *table_id*)::

            {
                "active": {"table_id": ..., "entry_count": ..., ...},
                "immutable": [{"snapshot_id": ..., ...}, ...],
            }

        In detail mode (with *table_id*)::

            {
                "type": "active" | "immutable",
                "table_id": ...,
                "entries": [{"key": ..., "seq": ..., ...}, ...],
            }
    """
    self._check_closed()
    return self._mem.show_mem(table_id)

show_disk(file_id=None)

Show SSTable contents on disk.

Parameters

file_id: Optional. Pass a file ID to see the full record list for that SSTable. When omitted or None, returns a summary listing all SSTables organised by level.

Returns

dict In listing mode (no file_id)::

    {"L0": [{"file_id": ..., "record_count": ..., ...}, ...]}

In detail mode (with *file_id*)::

    {"file_id": ..., "entries": [...], ...}
Source code in app/engine/lsm_engine.py
def show_disk(self, file_id: str | None = None) -> dict[str, object]:
    """Show SSTable contents on disk.

    Parameters
    ----------
    file_id:
        **Optional.**  Pass a file ID to see the full record list
        for that SSTable.  When omitted or ``None``, returns a
        summary listing all SSTables organised by level.

    Returns
    -------
    dict
        In listing mode (no *file_id*)::

            {"L0": [{"file_id": ..., "record_count": ..., ...}, ...]}

        In detail mode (with *file_id*)::

            {"file_id": ..., "entries": [...], ...}
    """
    self._check_closed()
    return self._sst.show_disk(file_id)

close() async

Flush and close the engine. Idempotent.

Source code in app/engine/lsm_engine.py
async def close(self) -> None:
    """Flush and close the engine. Idempotent."""
    if self._closed:
        return
    self._closed = True
    logger.info("Shutdown started", data_root=str(self._data_root))

    # 1. Stop the flush pipeline and wait for it to drain
    self._pipeline.stop()
    if self._pipeline_task is not None:
        try:
            await asyncio.wait_for(self._pipeline_task, timeout=30.0)
        except TimeoutError:
            logger.warning("FlushPipeline drain timed out")
            self._pipeline_task.cancel()
        except asyncio.CancelledError:
            pass

    # 2. Log memtable state before closing
    active_meta = self._mem.active_metadata
    logger.info(
        "MemTable state at shutdown",
        active_entries=active_meta.entry_count,
        active_size_bytes=active_meta.size_bytes,
        immutable_queue_len=self._mem.queue_len(),
        seq=self._seq.current,
    )

    # 3. WAL — fsync + close
    await self._wal.close()
    logger.info("WAL closed")

    # 4. SSTable readers — close all
    self._sst.close_all()
    logger.info("SSTables closed")

    # 5. Log server — stop last so shutdown logs are broadcast
    if self._log_server:
        logger.info(
            "Log server stopping", port=self._log_server.port,
        )
        self._log_server.stop()

    logger.info("Shutdown complete")

EngineStats

EngineStats(key_count, seq, wal_entry_count, data_root, active_table_id, active_size_bytes, immutable_queue_len, immutable_snapshots, l0_sstable_count) dataclass

Snapshot of engine statistics.

Attributes:

Name Type Description
key_count int

Number of keys in the active memtable.

seq SeqNum

Current sequence number (highest generated so far).

wal_entry_count int

Number of entries in the write-ahead log.

data_root str

Filesystem path to the engine's data directory.

active_table_id str

UUID hex of the current active memtable.

active_size_bytes int

Estimated byte size of the active memtable.

immutable_queue_len int

Number of frozen snapshots awaiting flush.

immutable_snapshots list[str]

Snapshot IDs of queued immutable memtables.

l0_sstable_count int

Number of Level-0 SSTables on disk.