Subsystemssubsystems/tracing

Tracing

indusagi.tracing is the framework's homegrown, dependency-light observability subsystem — an OpenTelemetry-free way to trace agent runs, model calls, and tool actions. Imported as import indusagi.tracing, or via the lazy tracing submodule on the top-level indusagi package.

A trace is modeled as immutable Segment values driven by a functional SegmentHandle (note/child/fail/close). Each state transition emits exactly one TraceSignal onto an async-iterable SignalChannel, and terminal Sinks drain that channel to a console, an NDJSON file, or an arbitrary stream. Admission control (SampleGate), a named-recorder registry (TelemetryHub), credential redaction (SecretScrubber), and an optional RunEvent adapter round out the surface. The vocabulary is deliberately fresh — there is no span/exporter/tracer-provider naming.

Table of Contents

Public exports

The package barrel re-exports its layers in a bottom-up order (signal, channel, redaction, sinks, recorder, registry, adapter):

Name Kind Source Purpose
Segment dataclass signal/segment.py Immutable, frozen trace segment (id, trace_id, parent_id, kind, name, started_at, ended_at, status, attributes, error). Never mutated in place
SegmentKind type signal/segment.py Literal['run','inference','action','recall','custom'] — the kind of work a segment records
SegmentStatus type signal/segment.py Literal['open','ok','error'] — lifecycle status
SegmentError dataclass signal/segment.py Minimal serializable captured failure: a single message: str field
TraceRecord type signal/segment.py Alias for Segment; the closed/serializable form a close signal carries
OpenSegmentInput dataclass signal/segment.py Inputs for open_segment; missing ids/clock are generated
open_segment function signal/segment.py Construct a fresh frozen open Segment; pure aside from id/clock defaulting
with_attributes function signal/segment.py Derive a new frozen segment with a patch merged over attributes (last-writer-wins)
close_segment function signal/segment.py Derive the terminal TraceRecord, stamping ended_at, final status, and (on error) a SegmentError
fresh_id function signal/segment.py Generate a hex id of N random bytes via secrets.token_hex
fresh_trace_id function signal/segment.py A new 128-bit (16-byte) hex trace id
fresh_segment_id function signal/segment.py A new 64-bit (8-byte) hex segment id
SegmentHandle protocol signal/handle.py The functional handle over an in-flight segment (trace_id, id, active; note/child/fail/close)
OpenHandleInput dataclass signal/handle.py Inputs for opening a real, sampled-in handle
open_handle function signal/handle.py Open a live segment, emit an OpenSignal onto the sink, return its SegmentHandle
NOOP_HANDLE const signal/handle.py The single shared sampled-out SegmentHandle; emits/allocates nothing, child() returns itself
SignalSink type signal/handle.py Callable[[TraceSignal], None] — a consumer of trace signals (typically SignalChannel.emit)
OpenSignal dataclass channel/signal.py Emitted once when a segment opens; carries the open Segment. type='open'
UpdateSignal dataclass channel/signal.py Emitted when an open segment's attributes are amended; carries id + attributes. type='update'
CloseSignal dataclass channel/signal.py Emitted once when a segment reaches a terminal state; carries the TraceRecord. type='close'
TraceSignal type channel/signal.py Union OpenSignal | UpdateSignal | CloseSignal — what every channel consumer branches on
SignalChannel class channel/signal.py Single-stream, multi-emit, async-iterable transport. emit()/close() sync; consumed via async for
SecretPattern dataclass redaction/secret_scrubber.py A redaction rule: label + optional key/value compiled regexes (re.search, unanchored)
REDACTION_TOKEN const redaction/secret_scrubber.py The inert replacement string '‹redacted›' (guillemet, deliberately not '[REDACTED]')
DEFAULT_SECRET_PATTERNS const redaction/secret_scrubber.py Built-in tuple of rules: credential key names, auth headers, bearer/vendor-key/JWT/PEM value shapes
SecretScrubber class redaction/secret_scrubber.py Stateless attribute processor; scrub() returns a deep copy with secret-looking keys/values replaced
Sink ABC sinks/base.py Terminal channel-consumer contract: abstract async drain(channel); optional flush()/close() no-ops
ConsoleSink class sinks/console.py Prints one tidy line per closed segment through an injected log callable (default print)
ConsoleSinkOptions dataclass sinks/console.py Options for ConsoleSink: log callable
FileSink class sinks/file.py Appends each closed segment as an NDJSON line; buffers and flushes via asyncio.to_thread
FileSinkOptions dataclass sinks/file.py Options for FileSink: flush_every (default 1)
StreamSink class sinks/stream.py Writes each closed record as a JSON line to an asyncio.StreamWriter or any text writer; honours backpressure
StreamSinkOptions dataclass sinks/stream.py Options for StreamSink: end_on_close (default False)
SampleGate class recorder/sampling.py Trace admission control; decide(trace_id) -> bool. Constant for always/never, deterministic for ratio
SampleStrategy type recorder/sampling.py Literal['always','never'] | RatioStrategy
RatioStrategy dataclass recorder/sampling.py Deterministic ratio strategy: admit fraction ratio ∈ [0,1] keyed by trace id
Recorder class recorder/recorder.py The tracer instance. Holds service_name, a SampleGate, and a SignalChannel
RecorderOptions dataclass recorder/recorder.py Construction options: service_name, sampling, channel
OpenOptions dataclass recorder/recorder.py Per-open options: trace_id, parent_id, attributes (all optional)
TelemetryHub class registry/hub.py Named registry of Recorders with a designated default
get_default_hub function registry/hub.py Lazily create/return the process-wide default TelemetryHub (opt-in)
set_default_hub function registry/hub.py Replace the process-wide default TelemetryHub (e.g. for tests)
trace_agent_run function adapter/runtime_trace.py Project a runtime RunEvent stream onto a Recorder as nested segments; returns a disposer
RunEventSubscribe type adapter/runtime_trace.py Callable[[Callable[[RunEvent], None]], Callable[[], None]]Agent.subscribe's shape

Sub-directories

Directory Holds
signal/ segment.py (immutable Segment, SegmentKind/Status/Error, TraceRecord, open/with_attributes/close, fresh ids) and handle.py (SegmentHandle protocol, _LiveHandle via open_handle, NOOP_HANDLE, SignalSink)
channel/ signal.py — the TraceSignal union (Open/Update/CloseSignal) and the single-consumer async-iterable SignalChannel
redaction/ secret_scrubber.pySecretScrubber, SecretPattern, DEFAULT_SECRET_PATTERNS, REDACTION_TOKEN
recorder/ recorder.py (Recorder, RecorderOptions, OpenOptions) and sampling.py (SampleGate, SampleStrategy, RatioStrategy, FNV-1a unit hash)
registry/ hub.py — the TelemetryHub named-recorder registry plus the opt-in process default
sinks/ base.py (Sink ABC + _record_to_json), console.py, file.py (NDJSON), stream.py (to a writer)
adapter/ runtime_trace.py — the optional trace_agent_run bridge + RunEventSubscribe; the one runtime-aware module

The segment lifecycle

A Segment is a frozen dataclass — the OTel-free analogue of a span. It is never mutated in place: the lifecycle open → note → close is modeled by producing new frozen values through open_segment, with_attributes, and close_segment. Because every value is immutable, a closed TraceRecord is trivially serializable.

Callers rarely touch those functions directly. The behavioural face is SegmentHandle, a Protocol with note, child, fail, and close. There is intentionally no class hierarchy:

  • A sampled-in handle is the plain _LiveHandle returned by open_handle. It holds the segment's current frozen value, emits an OpenSignal on construction, and on each note/fail/close derives the next frozen value and emits an UpdateSignal/CloseSignal through an injected SignalSink callable.
  • A sampled-out handle is the single shared NOOP_HANDLE — its methods do nothing and its child() returns itself, so an entire sampled-out subtree costs nothing.

note, child, and fail are no-ops once a handle is closed, and close is idempotent (the first call wins), so a double-close in a finally block is safe. fail(error) records a SegmentError and sets the closing status to error without closing — the actual close carries it. close_segment preserves a pre-existing error even on an explicit close("ok"): only terminal fields are replaced.

Each handle transition emits exactly one TraceSignal. These flow onto a SignalChannel: a single-consumer async iterable backed by a deque plus an asyncio future waiter. emit() appends and wakes a parked consumer; async for pulls; close() ends iteration after draining. The channel is lossy under backpressure — past bound it drops the oldest unread signal (newest wins); a None/non-positive bound is unbounded. emit/close are synchronous and must run on the consumer's event-loop thread; off-loop producers must marshal via loop.call_soon_threadsafe.

import asyncio
from indusagi.tracing import Recorder, RecorderOptions, ConsoleSink


async def main() -> None:
    rec = Recorder(RecorderOptions(service_name="my-agent"))
    sink = ConsoleSink()
    drain_task = asyncio.create_task(sink.drain(rec.channel()))

    run = rec.open("run", "answer-question")
    inf = run.child("inference", "chat.completion")
    inf.note({"model": "opus", "tokens.in": 1200})
    inf.close("ok")
    act = run.child("action", "write_file")
    act.fail("EACCES: permission denied")  # sets the closing status to error
    act.close()
    run.close("ok")

    rec.channel().close()   # end the stream so drain() returns
    await drain_task


asyncio.run(main())

Recorders and sampling

A Recorder is the flat tracer instance — it replaces the OTel tracer-provider / tracer pair with one object. It holds a service_name, a SampleGate, and a SignalChannel. open(kind, name, opts) resolves a trace id (a supplied opts.trace_id joins an existing trace, otherwise a fresh one is minted), consults the gate, and either returns NOOP_HANDLE (sampled out — nothing is emitted) or calls open_handle bound to the channel's emit. A root open folds service.name into the segment's attributes. channel() exposes the stream sinks drain.

SampleGate.decide(trace_id) -> bool is admission control. "always" and "never" are constant verdicts; RatioStrategy(ratio=...) is deterministic ratio sampling, not a random draw. The decision is _unit_hash(trace_id) < ratio, where _unit_hash is a dependency-free FNV-1a folded to [0, 1). The hash masks with & 0xFFFFFFFF after every multiply to reproduce a uint32 wrap-around exactly, so the same trace id always yields the same verdict — across processes and replays — and children opened on an admitted trace id are themselves admitted. A gate holds no mutable state, so one instance is safely shared across recorders and tasks.

from indusagi.tracing import Recorder, RecorderOptions, RatioStrategy, SampleGate

# Either pass a strategy directly...
rec = Recorder(RecorderOptions(service_name="svc", sampling=RatioStrategy(ratio=0.25)))

# ...or a prebuilt gate (shareable, stateless):
gate = SampleGate(RatioStrategy(ratio=0.25))
admitted = gate.decide("a1b2c3d4e5f60718a1b2c3d4e5f60718")  # deterministic per trace id

h = rec.open("run", "task")
print(h.active)  # False when sampled out -> this is the shared NOOP_HANDLE

Draining to sinks

A Sink is the terminal end of the transport — the OTel-free analogue of an exporter, but a plain ABC with one mandatory async drain(channel) pull-loop plus optional flush()/close() lifecycle hooks (both default to no-ops). Each sink owns its own async for … in channel loop and acts only on CloseSignal, ignoring open/update chatter. Three concrete sinks ship:

  • ConsoleSink prints one tidy line per closed segment — glyph, kind, name, duration, and a short trace/segment id tag — through an injected log callable (default print). Stateless and resource-free, so it needs neither flush nor close.
  • FileSink(path, FileSinkOptions(flush_every=...)) appends each closed record as an NDJSON line, buffering and flushing via asyncio.to_thread on the flush_every threshold, at the end of drain, and on close.
  • StreamSink is the most general: it writes each closed record as a JSON line to an asyncio.StreamWriter (or any text writer), honours backpressure via the writer's drain(), and optionally ends the stream on close (end_on_close).

FileSink and StreamSink serialize through the shared base._record_to_json, which emits camelCase keys (traceId, parentId, startedAt, endedAt) with compact separators and default=str so a non-serializable attribute value won't crash the sink.

Redaction

SecretScrubber is a stateless attribute processor. scrub(attributes) returns a deep copy with secret-looking keys and values replaced by REDACTION_TOKEN — the guillemet string '‹redacted›', deliberately not '[REDACTED]'. Rules come from DEFAULT_SECRET_PATTERNS (credential key names, auth headers, bearer/vendor-key/JWT/ PEM value shapes), each a SecretPattern with optional key/value regexes matched unanchored via re.search. The walk is iterative with an explicit stack (no recursion) and never mutates its input; when a key name looks secret it short-circuits descent and collapses the whole subtree to the token.

The scrubber is exported so sinks can sanitize attributes before persisting, but the bundled ConsoleSink/FileSink/StreamSink do not call it automatically — a caller must apply it explicitly.

import asyncio
from indusagi.tracing import (
    Recorder, RecorderOptions, FileSink, FileSinkOptions, SecretScrubber,
)

scrubber = SecretScrubber()  # uses DEFAULT_SECRET_PATTERNS + REDACTION_TOKEN
safe = scrubber.scrub({"authorization": "Bearer abcd1234efgh", "model": "opus"})
# -> {'authorization': '‹redacted›', 'model': 'opus'}


async def main() -> None:
    rec = Recorder(RecorderOptions(service_name="svc"))
    sink = FileSink("/tmp/trace.ndjson", FileSinkOptions(flush_every=10))
    task = asyncio.create_task(sink.drain(rec.channel()))

    step = rec.open("custom", "step")
    step.note(safe)
    step.close("ok")

    rec.channel().close()
    await task
    await sink.close()  # final flush


asyncio.run(main())

The telemetry hub

TelemetryHub is a named registry of Recorders. register(name, options) adds (or replaces) a recorder and returns it; recorder(name=None) looks one up, falling back to the default and raising LookupError if the requested or default recorder is missing (so dropped traces fail loudly). get_default() returns the default recorder or None, and has(name) checks membership. The first registration becomes the default unless a later register uses the reserved "default" name, which always re-designates it.

get_default_hub() lazily creates and returns a process-wide default hub, and set_default_hub(hub) replaces it — both strictly opt-in; nothing in the subsystem reads the process default implicitly.

from indusagi.tracing import TelemetryHub, RecorderOptions, get_default_hub

hub = TelemetryHub()
hub.register("agent-a", RecorderOptions(service_name="agent-a"))  # becomes default
hub.register("agent-b", RecorderOptions(service_name="agent-b"))

rec = hub.recorder("agent-b")
default_rec = hub.recorder()        # the default (agent-a)
print(hub.has("agent-b"))           # True

shared = get_default_hub()          # opt-in process-wide hub

Bridging the runtime

trace_agent_run is the optional, decoupled bridge that projects the runtime's flat RunEvent stream into nested run/inference/action segments on a Recorder. It subscribes to an event stream whose shape matches Agent.subscribe (captured as RunEventSubscribe) and maintains the run/inference/action handles in a closure: snapshot phases drive the inference (an invoking phase opens one; dispatching/compacting/idle close it), tool_started/tool_finished map to actions, and settled/faulted close the run. It also folds streamed text_delta/thinking_delta counts onto the open inference as stream.text_deltas/stream.thinking_deltas attributes. The call returns a disposer that detaches and closes anything still open. It is best-effort and never raises on malformed input — out-of-order, missing, or repeated events just close what's open and ignore the rest.

from indusagi.tracing import Recorder, RecorderOptions, trace_agent_run

rec = Recorder(RecorderOptions(service_name="agent"))

# `agent.subscribe(handler) -> unsubscribe` matches RunEventSubscribe's shape.
dispose = trace_agent_run(rec, agent.subscribe)
try:
    ...  # run the agent; run/inference/action segments flow onto rec.channel()
finally:
    dispose()  # detaches and closes any still-open segments

Relationship to neighbors

The tracing core (signal, channel, recorder, registry, redaction, sinks) is entirely runtime-agnostic and sits below or beside the rest of the framework. The only coupling to the Runtime is the optional adapter layer: adapter/runtime_trace.py imports RunEvent under TYPE_CHECKING only and reads its public shape (event.kind, snapshot.run_id/phase, event.id/name, event.outcome.is_error, event.error) via getattr/match — it never touches runtime internals, and nothing in the runtime depends on it.

Internally the layers form an acyclic stack: handle imports channel.signal and signal.segment; channel.signal references segment only under TYPE_CHECKING to stay acyclic; recorder composes channel, handle, and sampling; registry wraps recorder; sinks consume the channel and share base._record_to_json. The top-level package exposes tracing as a lazy submodule. For the model conversations being traced, see the LLM Gateway; for the broader picture of how subsystems fit together, see the Architecture.