Skip to content

Write-Ahead Log

Append-only, msgpack-framed WAL for crash recovery.

WALEntry

WALEntry(seq, timestamp_ms, op, key, value) dataclass

A single write-ahead log entry.

Fields are ordered to match the msgpack encoding: packb((seq, timestamp_ms, op, key, value), use_bin_type=True)

Attributes:

Name Type Description
seq SeqNum

Monotonically increasing sequence number.

timestamp_ms int

Wall-clock timestamp in milliseconds since epoch.

op OpType

Operation type (PUT or DELETE).

key Key

The raw byte key.

value Value

The raw byte value (or TOMBSTONE for deletions).

is_tombstone property

Return True if this entry represents a deletion.

WALWriter

WALWriter(path)

Append-only WAL file writer with msgpack-framed entries.

Owns the file handle for a single wal.log file. All methods are synchronous — async wrapping is done by :class:WALManager.

Open or create a WAL file at path in append mode.

Parent directories are created automatically if they do not exist.

Parameters:

Name Type Description Default
path Path

Filesystem path for the WAL file (e.g. data/wal.log).

required

Raises:

Type Description
OSError

If the file cannot be opened or created.

Source code in app/wal/writer.py
def __init__(self, path: Path) -> None:
    """Open or create a WAL file at *path* in append mode.

    Parent directories are created automatically if they do not exist.

    Args:
        path: Filesystem path for the WAL file (e.g. ``data/wal.log``).

    Raises:
        OSError: If the file cannot be opened or created.
    """
    self._path = path
    try:
        path.parent.mkdir(parents=True, exist_ok=True)
        self._fd = open(path, "ab")  # noqa: SIM115
    except OSError as exc:
        logger.error("WAL file open failed", path=str(path), error=str(exc))
        raise
    logger.debug("WALWriter opened", path=str(path))

path property

Return the WAL file path.

append(entry)

Encode entry as msgpack, write to WAL, and fsync.

Source code in app/wal/writer.py
def append(self, entry: WALEntry) -> None:
    """Encode *entry* as msgpack, write to WAL, and fsync."""
    try:
        self._fd.write(_encode_entry(entry))
        self._fd.flush()
        os.fsync(self._fd.fileno())
    except OSError as exc:
        logger.error(
            "WAL append failed", seq=entry.seq, error=str(exc),
        )
        raise
    logger.debug(
        "WAL append",
        seq=entry.seq,
        op=entry.op.name,
        key_len=len(entry.key),
    )

replay()

Read the WAL file and return all entries sorted by seq.

Returns an empty list when the file does not exist or is empty. Raises :class:WALCorruptError if decoding or CRC check fails.

The framed format is: [4B length][payload][4B CRC].

Source code in app/wal/writer.py
def replay(self) -> list[WALEntry]:
    """Read the WAL file and return all entries sorted by seq.

    Returns an empty list when the file does not exist or is empty.
    Raises :class:`WALCorruptError` if decoding or CRC check fails.

    The framed format is: ``[4B length][payload][4B CRC]``.
    """
    logger.debug("WAL replay start", path=str(self._path))

    if not self._path.exists() or self._path.stat().st_size == 0:
        logger.debug("WAL replay skipped (empty or missing)")
        return []

    entries: list[WALEntry] = []
    try:
        with open(self._path, "rb") as fd:
            data = fd.read()
        pos = 0
        while pos < len(data):
            # Read 4-byte length prefix
            if pos + 4 > len(data):
                raise WALCorruptError(
                    f"Truncated length at offset {pos}"
                )
            payload_len = int.from_bytes(
                data[pos : pos + 4], "big",
            )
            pos += 4

            # Read payload
            if pos + payload_len > len(data):
                raise WALCorruptError(
                    f"Truncated payload at offset {pos - 4}"
                )
            payload = data[pos : pos + payload_len]
            pos += payload_len

            # Read and verify CRC
            if pos + crc.CRC_SIZE > len(data):
                raise WALCorruptError(
                    f"Truncated CRC at offset {pos}"
                )
            stored = crc.unpack(data, pos)
            if not crc.verify(payload, stored):
                raise WALCorruptError(
                    f"CRC mismatch at offset {pos - payload_len - 4}"
                )
            pos += crc.CRC_SIZE

            # Decode msgpack payload
            item = msgpack.unpackb(payload, raw=True)  # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
            raw_seq, raw_ts, raw_op, raw_key, raw_value = item  # pyright: ignore[reportUnknownVariableType]
            entries.append(
                WALEntry(
                    seq=int(raw_seq),  # pyright: ignore[reportUnknownArgumentType]
                    timestamp_ms=int(raw_ts),  # pyright: ignore[reportUnknownArgumentType]
                    op=OpType(int(raw_op)),  # pyright: ignore[reportUnknownArgumentType]
                    key=bytes(raw_key),  # pyright: ignore[reportUnknownArgumentType]
                    value=bytes(raw_value),  # pyright: ignore[reportUnknownArgumentType]
                )
            )
    except WALCorruptError:
        raise
    except (msgpack.UnpackException, ValueError, TypeError) as exc:
        raise WALCorruptError(
            f"Failed to decode WAL at {self._path}: {exc}"
        ) from exc

    entries.sort(key=lambda e: e.seq)
    logger.info("WAL replay complete", entry_count=len(entries))
    return entries

truncate_before(seq)

Remove all entries with entry.seq <= seq.

Rewrites the WAL to a temp file, then atomically replaces the original via :func:os.replace.

Source code in app/wal/writer.py
def truncate_before(self, seq: SeqNum) -> None:
    """Remove all entries with ``entry.seq <= seq``.

    Rewrites the WAL to a temp file, then atomically replaces the
    original via :func:`os.replace`.
    """
    logger.info("WAL truncate start", cutoff_seq=seq)
    try:
        all_entries = self.replay()
        kept = [e for e in all_entries if e.seq > seq]

        # Write kept entries to a temp file in the same directory
        tmp_fd = tempfile.NamedTemporaryFile(  # noqa: SIM115
            dir=self._path.parent, delete=False, mode="wb"
        )
        tmp_path = Path(tmp_fd.name)
        try:
            for entry in kept:
                tmp_fd.write(_encode_entry(entry))
            os.fsync(tmp_fd.fileno())
            tmp_fd.close()

            # Atomic swap
            os.replace(tmp_path, self._path)
        except BaseException:
            tmp_fd.close()
            tmp_path.unlink(missing_ok=True)
            raise

        # Reopen the fd on the new file
        self._fd.close()
        self._fd = open(self._path, "ab")  # noqa: SIM115
    except OSError as exc:
        raise WALTruncateError(
            f"WAL truncation failed at {self._path}: {exc}"
        ) from exc

    logger.info(
        "WAL truncated", cutoff_seq=seq, remaining=len(kept)
    )

close()

Fsync and close the WAL file handle.

Cleanup never raises — errors are logged as warnings.

Source code in app/wal/writer.py
def close(self) -> None:
    """Fsync and close the WAL file handle.

    Cleanup never raises — errors are logged as warnings.
    """
    logger.debug("WAL close start", path=str(self._path))
    try:
        os.fsync(self._fd.fileno())
        self._fd.close()
        logger.debug("WAL close done", path=str(self._path))
    except Exception as exc:
        logger.warning(
            "WAL close error (non-fatal)",
            path=str(self._path),
            error=str(exc),
        )

WALManager

WALManager(wal)

Thread-safe, async-aware wrapper around :class:WALWriter.

Wrap an existing WALWriter with thread-safe locking.

Parameters:

Name Type Description Default
wal WALWriter

The underlying synchronous WAL writer to manage.

required
Source code in app/engine/wal_manager.py
def __init__(self, wal: WALWriter) -> None:
    """Wrap an existing ``WALWriter`` with thread-safe locking.

    Args:
        wal: The underlying synchronous WAL writer to manage.
    """
    self._wal = wal
    self._wal_lock = threading.Lock()

open(path) classmethod

Create a :class:WALWriter at path and wrap it.

Source code in app/engine/wal_manager.py
@classmethod
def open(cls, path: Path) -> WALManager:
    """Create a :class:`WALWriter` at *path* and wrap it."""
    logger.info("WALManager opening", path=str(path))
    try:
        mgr = cls(WALWriter(path))
    except OSError as exc:
        logger.error(
            "WALManager open failed",
            path=str(path),
            error=str(exc),
        )
        raise
    logger.info("WALManager opened", path=str(path))
    return mgr

sync_append(entry)

Synchronous append for use under external lock.

Source code in app/engine/wal_manager.py
def sync_append(self, entry: WALEntry) -> None:
    """Synchronous append for use under external lock."""
    self._sync_append(entry)

append(entry) async

Write + fsync in a thread so the loop stays free.

Source code in app/engine/wal_manager.py
async def append(self, entry: WALEntry) -> None:
    """Write + fsync in a thread so the loop stays free."""
    await asyncio.to_thread(self._sync_append, entry)

replay()

Replay the WAL — synchronous, called before workers.

Source code in app/engine/wal_manager.py
def replay(self) -> list[WALEntry]:
    """Replay the WAL — synchronous, called before workers."""
    logger.info("WALManager replay start")
    entries = self._wal.replay()
    logger.info("WALManager replay done", entry_count=len(entries))
    return entries

truncate_before(seq) async

Remove entries with seq <= seq in a background thread.

Source code in app/engine/wal_manager.py
async def truncate_before(self, seq: SeqNum) -> None:
    """Remove entries with ``seq <= seq`` in a background thread."""
    await asyncio.to_thread(self._sync_truncate, seq)

close() async

Fsync and close the WAL file handle.

Source code in app/engine/wal_manager.py
async def close(self) -> None:
    """Fsync and close the WAL file handle."""
    await asyncio.to_thread(self._sync_close)