Flush Pipeline¶
Parallel flush with ordered commits.
FlushPipeline¶
FlushPipeline(mem, sst, wal, max_workers=2, compaction=None)
¶
Daemon that drains the immutable queue to SSTables.
Writes run in parallel (up to max_workers), but commits
are serialized: each slot waits for the previous slot's commit
before committing its own result.
Wire up the flush pipeline to its dependent managers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mem
|
MemTableManager
|
Memtable manager supplying immutable snapshots to flush. |
required |
sst
|
SSTableManager
|
SSTable manager for writing and committing SSTables. |
required |
wal
|
WALManager
|
WAL manager for truncating entries after successful flush. |
required |
max_workers
|
int
|
Maximum number of concurrent SSTable writes. |
2
|
compaction
|
CompactionManager | None
|
Optional compaction manager to trigger after each flush commit. |
None
|
Source code in app/engine/flush_pipeline.py
running
property
¶
Whether the pipeline loop is active.
run()
async
¶
Main daemon loop — runs until :meth:stop is called.
Source code in app/engine/flush_pipeline.py
FlushSlot¶
FlushSlot(snapshot, file_id, prev_committed, my_committed=asyncio.Event(), batch_abort=asyncio.Event(), position=0)
dataclass
¶
Tracks one in-flight flush operation.
Attributes:
| Name | Type | Description |
|---|---|---|
snapshot |
ImmutableMemTable
|
The immutable memtable being flushed. |
file_id |
FileID
|
Target SSTable file ID for this flush. |
prev_committed |
Event
|
Event set when the previous slot's commit completes. |
my_committed |
Event
|
Event set when this slot's commit completes. |
batch_abort |
Event
|
Shared event — set if any slot in the batch fails. |
position |
int
|
Zero-based index of this slot within the batch. |
MemTableManager¶
MemTableManager(config)
¶
Single point of coordination for all in-memory write-side state.
All thresholds are read from config at the moment they're needed, so runtime config changes take effect on the next operation.
Initialize the memtable manager with an empty active table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
LSMConfig
|
Live engine configuration. Thresholds (memtable size, queue length, backpressure timeout) are read from config at the point of use so runtime changes take effect immediately. |
required |
Source code in app/engine/memtable_manager.py
flush_event
property
¶
The event signalled when a new snapshot is added to the queue.
write_lock
property
¶
The write lock for the atomic write path.
size_bytes
property
¶
Size of the active memtable in bytes.
active_metadata
property
¶
Metadata for the current active memtable.
immutable_metadata
property
¶
Metadata for all snapshots (newest first).
put(key, seq, timestamp_ms, value)
¶
Write key into the active memtable.
Called by the engine while holding _write_lock.
Source code in app/engine/memtable_manager.py
get(key)
¶
Look up key in active memtable then immutable queue.
Scans newest → oldest. Returns raw value including TOMBSTONE — the engine resolves it. Takes a snapshot copy of the immutable queue to avoid concurrent-modification issues (BUG-02).
Source code in app/engine/memtable_manager.py
maybe_freeze()
¶
Freeze the active memtable if it exceeds size or entry threshold.
Called under _write_lock by the engine. Applies backpressure
via _queue_not_full.wait() if the immutable queue is full.
Source code in app/engine/memtable_manager.py
force_freeze()
¶
Freeze the active memtable unconditionally.
Bypasses threshold checks. Returns None only if the active
memtable is empty (nothing to freeze). Called under
_write_lock by the engine's flush() command.
Source code in app/engine/memtable_manager.py
peek_oldest()
¶
peek_at_depth(depth)
¶
Return the snapshot at depth (0 = oldest, 1 = second oldest, etc.).
Protected by _write_lock to prevent TOCTOU race (BUG-03).
Source code in app/engine/memtable_manager.py
snapshot_queue()
¶
Return a snapshot of the immutable queue (oldest first).
Used by the flush pipeline to safely iterate all pending snapshots without holding a lock during the flush.
Source code in app/engine/memtable_manager.py
pop_oldest()
¶
Remove the oldest snapshot and unblock any stalled freeze.
Source code in app/engine/memtable_manager.py
queue_len()
¶
restore(entries)
¶
Replay WAL entries into the active memtable.
Called by engine._recover() only — single-threaded, no lock
needed.
Source code in app/engine/memtable_manager.py
set_flush_notify(callback)
¶
Register (or clear) a callback invoked after each freeze.
Used by FlushPipeline to bridge the sync flush_event to an
asyncio.Event via loop.call_soon_threadsafe (BUG-14).
Source code in app/engine/memtable_manager.py
show_mem(table_id=None)
¶
Inspect memtable contents.
Parameters¶
table_id:
Optional. The table ID (active) or snapshot ID (immutable)
to inspect. When provided, returns the full entry list for
that table. When None (the default), returns a summary
listing all active and immutable memtables without their
entries.
Returns¶
A dict with the structure::
# When table_id is None (listing mode):
{
"active": { "table_id": ..., "entry_count": ..., ... },
"immutable": [ { "snapshot_id": ..., ... }, ... ],
}
# When table_id matches a table (detail mode):
{
"type": "active" | "immutable",
"table_id": ...,
"entry_count": ...,
"entries": [ {"key": ..., "seq": ..., "ts": ..., "value": ...}, ... ],
}
Source code in app/engine/memtable_manager.py
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 | |