Skip to content

Compaction

Level-based compaction with subprocess merging.

CompactionManager

CompactionManager(sst, config, data_root)

Flush-triggered compaction with level-reservation scheduling.

Initialize the compaction manager.

Parameters:

Name Type Description Default
sst SSTableManager

SSTable manager providing level state and commit API.

required
config LSMConfig

Live engine configuration for thresholds and modes.

required
data_root Path

Root data directory, used for compaction log and output SSTable paths.

required
Source code in app/engine/compaction_manager.py
def __init__(
    self,
    sst: SSTableManager,
    config: LSMConfig,
    data_root: Path,
) -> None:
    """Initialize the compaction manager.

    Args:
        sst: SSTable manager providing level state and commit API.
        config: Live engine configuration for thresholds and modes.
        data_root: Root data directory, used for compaction log and
            output SSTable paths.
    """
    self._sst = sst
    self._config = config
    self._data_root = data_root
    self._reservation_lock = asyncio.Lock()
    self._active_levels: set[Level] = set()
    self._active_jobs: dict[
        tuple[Level, Level], asyncio.Task[None]
    ] = {}

active_jobs property

Currently running compaction jobs.

check_and_compact() async

Entry point. Called after every flush commit and after each completed compaction job.

Source code in app/engine/compaction_manager.py
async def check_and_compact(self) -> None:
    """Entry point. Called after every flush commit and after each
    completed compaction job.
    """
    logger.debug(
        "Compaction check triggered",
        l0_count=self._sst.l0_count,
        threshold=int(self._config.l0_compaction_threshold),
        active_jobs=len(self._active_jobs),
    )

    jobs = self._find_eligible_jobs()
    if not jobs:
        logger.debug(
            "No eligible compaction jobs",
            l0_count=self._sst.l0_count,
        )
        return

    for src, dst in jobs:
        reserved = await self._try_reserve(src, dst)
        if reserved:
            logger.info(
                "Launching compaction job",
                src_level=src,
                dst_level=dst,
            )
            task = asyncio.create_task(self._run_job(src, dst))
            self._active_jobs[(src, dst)] = task

CompactionTask

CompactionTask(task_id, input_file_ids, input_dirs, output_file_id, output_dir, output_level, seq_cutoff, bloom_fpr=0.01) dataclass

Immutable description of one compaction job.

All fields are primitive types so the task can be safely passed across the ProcessPoolExecutor subprocess boundary.

Attributes:

Name Type Description
task_id str

Unique identifier for this compaction run.

input_file_ids list[FileID]

SSTable file IDs to merge (includes source and destination level files).

input_dirs dict[FileID, str]

Mapping from file ID to its on-disk directory path.

output_file_id FileID

File ID for the newly created merged SSTable.

output_dir str

Directory where the output SSTable will be written.

output_level Level

Target compaction level for the output.

seq_cutoff SeqNum

Tombstones with seq < seq_cutoff are garbage- collected during the merge.

bloom_fpr float

Target false positive rate for the output bloom filter.

run_compaction

run_compaction(task)

Merge all input SSTables into one output SSTable.

Called in a subprocess — no event loop, no registry, no cache. Opens its own file handles via _open_reader_sync(). Uses finish_sync() because blocking is fine in a subprocess.

Parameters:

Name Type Description Default
task CompactionTask

Immutable description of the compaction job, including input file IDs, directories, output path, and GC cutoff.

required

Returns:

Type Description
SSTableMeta

Metadata of the newly created merged SSTable.

Source code in app/compaction/worker.py
def run_compaction(task: CompactionTask) -> SSTableMeta:
    """Merge all input SSTables into one output SSTable.

    Called in a subprocess — no event loop, no registry, no cache.
    Opens its own file handles via ``_open_reader_sync()``.
    Uses ``finish_sync()`` because blocking is fine in a subprocess.

    Args:
        task: Immutable description of the compaction job, including
            input file IDs, directories, output path, and GC cutoff.

    Returns:
        Metadata of the newly created merged SSTable.
    """
    t0 = time.monotonic()
    total_input_records = 0

    logger.info(
        "Subprocess merge starting",
        task_id=task.task_id,
        input_count=len(task.input_file_ids),
        output_level=task.output_level,
        seq_cutoff=task.seq_cutoff,
    )

    readers: list[SSTableReader] = []
    try:
        for file_id in task.input_file_ids:
            sst_dir = Path(task.input_dirs[file_id])
            reader = _open_reader_sync(sst_dir, file_id)
            readers.append(reader)
            total_input_records += reader.meta.record_count
            logger.debug(
                "Input reader opened",
                file_id=file_id,
                records=reader.meta.record_count,
                size_bytes=reader.meta.size_bytes,
            )

        logger.info(
            "All inputs opened",
            task_id=task.task_id,
            total_input_records=total_input_records,
            reader_count=len(readers),
        )

        merge_iter = KWayMergeIterator(
            iterators=[r.iter_sorted() for r in readers],
            seq_cutoff=task.seq_cutoff,
            skip_tombstones=False,
            deduplicate=True,
        )

        output_dir = Path(task.output_dir)
        writer = SSTableWriter(
            directory=output_dir,
            file_id=task.output_file_id,
            snapshot_id=task.task_id,
            level=task.output_level,
            bloom_n=max(1, total_input_records),
            bloom_fpr=task.bloom_fpr,
        )

        records_written = 0
        for key, seq, ts, value in merge_iter:
            writer.put(key, seq, ts, value)
            records_written += 1

        meta = writer.finish_sync()

        elapsed_ms = (time.monotonic() - t0) * 1000
        dedup_dropped = total_input_records - records_written
        logger.info(
            "Subprocess merge complete",
            task_id=task.task_id,
            input_records=total_input_records,
            output_records=records_written,
            dedup_dropped=dedup_dropped,
            output_bytes=meta.size_bytes,
            output_blocks=meta.block_count,
            elapsed_ms=round(elapsed_ms, 1),
        )
        return meta

    finally:
        for reader in readers:
            reader.close()