Skip to content

Observability

Structured logging with file and TCP broadcast targets.

configure_logging

configure_logging(data_root=Path('./data'), log_port=None)

Set up structured logging with file and TCP targets.

Parameters

data_root: Root data directory. Log file is written to <data_root>/logs/kiwidb.log. log_port: TCP port for the log broadcast server. None reads from LSM_LOG_PORT env var (default 9009). Pass 0 to disable TCP.

Returns

The :class:LogBroadcastServer instance (or None if TCP is disabled).

Source code in app/observability/logging.py
def configure_logging(
    data_root: Path = Path("./data"),
    log_port: int | None = None,
) -> LogBroadcastServer | None:
    """Set up structured logging with file and TCP targets.

    Parameters
    ----------
    data_root:
        Root data directory. Log file is written to ``<data_root>/logs/kiwidb.log``.
    log_port:
        TCP port for the log broadcast server.  ``None`` reads from
        ``LSM_LOG_PORT`` env var (default 9009).  Pass ``0`` to disable TCP.

    Returns
    -------
    The :class:`LogBroadcastServer` instance (or None if TCP is disabled).
    """
    global _log_server, _configured  # noqa: PLW0603

    if _configured:
        return _log_server

    # ── Resolve port ──────────────────────────────────────────────────────

    if log_port is None:
        log_port = int(os.getenv("LSM_LOG_PORT", "9009"))

    # ── File handler ──────────────────────────────────────────────────────

    log_dir = data_root / "logs"
    log_dir.mkdir(parents=True, exist_ok=True)
    log_file = log_dir / "kiwidb.log"

    file_handler = logging.FileHandler(log_file, encoding="utf-8")
    file_handler.setLevel(LOG_LEVEL)
    file_handler.setFormatter(_FileFormatter(datefmt="%Y-%m-%d %H:%M:%S"))

    # ── TCP broadcast handler ─────────────────────────────────────────────

    handlers: list[logging.Handler] = [file_handler]

    if log_port > 0:
        server = LogBroadcastServer(port=log_port)
        server.start()
        _log_server = server

        broadcast_handler = _BroadcastHandler(server)
        broadcast_handler.setLevel(LOG_LEVEL)
        broadcast_handler.setFormatter(_PrettyFormatter(datefmt="%H:%M:%S"))
        handlers.append(broadcast_handler)

    # ── Wire stdlib logging ───────────────────────────────────────────────

    root = logging.getLogger()
    root.setLevel(LOG_LEVEL)
    # Clear any existing handlers to avoid duplicates on re-configure
    root.handlers.clear()
    for h in handlers:
        root.addHandler(h)

    # ── Configure structlog to route through stdlib ───────────────────────

    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            structlog.processors.add_log_level,
            structlog.processors.StackInfoRenderer(),
            structlog.dev.set_exc_info,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        wrapper_class=structlog.stdlib.BoundLogger,
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

    _configured = True
    return _log_server

get_logger

get_logger(name)

Return a structlog logger bound to name.

All application modules should use::

from app.observability import get_logger
logger = get_logger(__name__)
Source code in app/observability/logging.py
def get_logger(name: str) -> structlog.stdlib.BoundLogger:
    """Return a structlog logger bound to *name*.

    All application modules should use::

        from app.observability import get_logger
        logger = get_logger(__name__)
    """
    return structlog.get_logger(name)  # type: ignore[no-any-return]

LogBroadcastServer

LogBroadcastServer(host='127.0.0.1', port=DEFAULT_LOG_PORT)

TCP server that broadcasts log lines to all connected clients.

Initialize the log broadcast server (does not start listening).

Call :meth:start to bind the socket and begin accepting clients.

Parameters:

Name Type Description Default
host str

Network interface to bind to.

'127.0.0.1'
port int

TCP port number. Defaults to the LSM_LOG_PORT environment variable or 9009.

DEFAULT_LOG_PORT
Source code in app/observability/log_server.py
def __init__(self, host: str = "127.0.0.1", port: int = DEFAULT_LOG_PORT) -> None:
    """Initialize the log broadcast server (does not start listening).

    Call :meth:`start` to bind the socket and begin accepting clients.

    Args:
        host: Network interface to bind to.
        port: TCP port number. Defaults to the ``LSM_LOG_PORT``
            environment variable or ``9009``.
    """
    self._host = host
    self._port = port
    self._clients: list[socket.socket] = []
    self._clients_lock = threading.Lock()
    self._server_sock: socket.socket | None = None
    self._stop_event = threading.Event()
    self._thread: threading.Thread | None = None

port property

Return the port the server is listening on.

start()

Start the broadcast server in a daemon thread.

Source code in app/observability/log_server.py
def start(self) -> None:
    """Start the broadcast server in a daemon thread."""
    self._server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self._server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self._server_sock.bind((self._host, self._port))
    self._server_sock.listen(8)
    self._server_sock.setblocking(False)

    self._thread = threading.Thread(
        target=self._accept_loop, daemon=True, name="log-broadcast",
    )
    self._thread.start()
    _logger.debug(
        "Log broadcast server started", host=self._host, port=self._port,
    )

broadcast(data)

Send data to all connected clients. Drop disconnected ones.

Source code in app/observability/log_server.py
def broadcast(self, data: bytes) -> None:
    """Send *data* to all connected clients. Drop disconnected ones."""
    with self._clients_lock:
        alive: list[socket.socket] = []
        for client in self._clients:
            try:
                client.sendall(data)
                alive.append(client)
            except OSError:
                with contextlib.suppress(OSError):
                    client.close()
        self._clients = alive

stop()

Shut down the server and disconnect all clients.

Source code in app/observability/log_server.py
def stop(self) -> None:
    """Shut down the server and disconnect all clients."""
    _logger.debug("Log broadcast server stopping", port=self._port)
    self._stop_event.set()
    if self._server_sock:
        self._server_sock.close()
    with self._clients_lock:
        client_count = len(self._clients)
        for client in self._clients:
            with contextlib.suppress(OSError):
                client.close()
        self._clients.clear()
    if self._thread and self._thread.is_alive():
        self._thread.join(timeout=2.0)
        if self._thread.is_alive():
            _logger.warning(
                "Log broadcast thread did not stop in time",
                port=self._port,
            )
    _logger.info(
        "Log broadcast server stopped",
        port=self._port,
        clients_disconnected=client_count,
    )