Tracing
indusagi.tracingis the framework's homegrown, dependency-light observability subsystem — an OpenTelemetry-free way to trace agent runs, model calls, and tool actions. Imported asimport indusagi.tracing, or via the lazytracingsubmodule on the top-levelindusagipackage.
A trace is modeled as immutable Segment values driven by a functional
SegmentHandle (note/child/fail/close). Each state transition emits exactly
one TraceSignal onto an async-iterable SignalChannel, and terminal Sinks drain
that channel to a console, an NDJSON file, or an arbitrary stream. Admission control
(SampleGate), a named-recorder registry (TelemetryHub), credential redaction
(SecretScrubber), and an optional RunEvent adapter round out the surface. The
vocabulary is deliberately fresh — there is no span/exporter/tracer-provider naming.
Table of Contents
- Public exports
- Sub-directories
- The segment lifecycle
- Recorders and sampling
- Draining to sinks
- Redaction
- The telemetry hub
- Bridging the runtime
- Relationship to neighbors
Public exports
The package barrel re-exports its layers in a bottom-up order (signal, channel,
redaction, sinks, recorder, registry, adapter):
| Name | Kind | Source | Purpose |
|---|---|---|---|
Segment |
dataclass | signal/segment.py |
Immutable, frozen trace segment (id, trace_id, parent_id, kind, name, started_at, ended_at, status, attributes, error). Never mutated in place |
SegmentKind |
type | signal/segment.py |
Literal['run','inference','action','recall','custom'] — the kind of work a segment records |
SegmentStatus |
type | signal/segment.py |
Literal['open','ok','error'] — lifecycle status |
SegmentError |
dataclass | signal/segment.py |
Minimal serializable captured failure: a single message: str field |
TraceRecord |
type | signal/segment.py |
Alias for Segment; the closed/serializable form a close signal carries |
OpenSegmentInput |
dataclass | signal/segment.py |
Inputs for open_segment; missing ids/clock are generated |
open_segment |
function | signal/segment.py |
Construct a fresh frozen open Segment; pure aside from id/clock defaulting |
with_attributes |
function | signal/segment.py |
Derive a new frozen segment with a patch merged over attributes (last-writer-wins) |
close_segment |
function | signal/segment.py |
Derive the terminal TraceRecord, stamping ended_at, final status, and (on error) a SegmentError |
fresh_id |
function | signal/segment.py |
Generate a hex id of N random bytes via secrets.token_hex |
fresh_trace_id |
function | signal/segment.py |
A new 128-bit (16-byte) hex trace id |
fresh_segment_id |
function | signal/segment.py |
A new 64-bit (8-byte) hex segment id |
SegmentHandle |
protocol | signal/handle.py |
The functional handle over an in-flight segment (trace_id, id, active; note/child/fail/close) |
OpenHandleInput |
dataclass | signal/handle.py |
Inputs for opening a real, sampled-in handle |
open_handle |
function | signal/handle.py |
Open a live segment, emit an OpenSignal onto the sink, return its SegmentHandle |
NOOP_HANDLE |
const | signal/handle.py |
The single shared sampled-out SegmentHandle; emits/allocates nothing, child() returns itself |
SignalSink |
type | signal/handle.py |
Callable[[TraceSignal], None] — a consumer of trace signals (typically SignalChannel.emit) |
OpenSignal |
dataclass | channel/signal.py |
Emitted once when a segment opens; carries the open Segment. type='open' |
UpdateSignal |
dataclass | channel/signal.py |
Emitted when an open segment's attributes are amended; carries id + attributes. type='update' |
CloseSignal |
dataclass | channel/signal.py |
Emitted once when a segment reaches a terminal state; carries the TraceRecord. type='close' |
TraceSignal |
type | channel/signal.py |
Union OpenSignal | UpdateSignal | CloseSignal — what every channel consumer branches on |
SignalChannel |
class | channel/signal.py |
Single-stream, multi-emit, async-iterable transport. emit()/close() sync; consumed via async for |
SecretPattern |
dataclass | redaction/secret_scrubber.py |
A redaction rule: label + optional key/value compiled regexes (re.search, unanchored) |
REDACTION_TOKEN |
const | redaction/secret_scrubber.py |
The inert replacement string '‹redacted›' (guillemet, deliberately not '[REDACTED]') |
DEFAULT_SECRET_PATTERNS |
const | redaction/secret_scrubber.py |
Built-in tuple of rules: credential key names, auth headers, bearer/vendor-key/JWT/PEM value shapes |
SecretScrubber |
class | redaction/secret_scrubber.py |
Stateless attribute processor; scrub() returns a deep copy with secret-looking keys/values replaced |
Sink |
ABC | sinks/base.py |
Terminal channel-consumer contract: abstract async drain(channel); optional flush()/close() no-ops |
ConsoleSink |
class | sinks/console.py |
Prints one tidy line per closed segment through an injected log callable (default print) |
ConsoleSinkOptions |
dataclass | sinks/console.py |
Options for ConsoleSink: log callable |
FileSink |
class | sinks/file.py |
Appends each closed segment as an NDJSON line; buffers and flushes via asyncio.to_thread |
FileSinkOptions |
dataclass | sinks/file.py |
Options for FileSink: flush_every (default 1) |
StreamSink |
class | sinks/stream.py |
Writes each closed record as a JSON line to an asyncio.StreamWriter or any text writer; honours backpressure |
StreamSinkOptions |
dataclass | sinks/stream.py |
Options for StreamSink: end_on_close (default False) |
SampleGate |
class | recorder/sampling.py |
Trace admission control; decide(trace_id) -> bool. Constant for always/never, deterministic for ratio |
SampleStrategy |
type | recorder/sampling.py |
Literal['always','never'] | RatioStrategy |
RatioStrategy |
dataclass | recorder/sampling.py |
Deterministic ratio strategy: admit fraction ratio ∈ [0,1] keyed by trace id |
Recorder |
class | recorder/recorder.py |
The tracer instance. Holds service_name, a SampleGate, and a SignalChannel |
RecorderOptions |
dataclass | recorder/recorder.py |
Construction options: service_name, sampling, channel |
OpenOptions |
dataclass | recorder/recorder.py |
Per-open options: trace_id, parent_id, attributes (all optional) |
TelemetryHub |
class | registry/hub.py |
Named registry of Recorders with a designated default |
get_default_hub |
function | registry/hub.py |
Lazily create/return the process-wide default TelemetryHub (opt-in) |
set_default_hub |
function | registry/hub.py |
Replace the process-wide default TelemetryHub (e.g. for tests) |
trace_agent_run |
function | adapter/runtime_trace.py |
Project a runtime RunEvent stream onto a Recorder as nested segments; returns a disposer |
RunEventSubscribe |
type | adapter/runtime_trace.py |
Callable[[Callable[[RunEvent], None]], Callable[[], None]] — Agent.subscribe's shape |
Sub-directories
| Directory | Holds |
|---|---|
signal/ |
segment.py (immutable Segment, SegmentKind/Status/Error, TraceRecord, open/with_attributes/close, fresh ids) and handle.py (SegmentHandle protocol, _LiveHandle via open_handle, NOOP_HANDLE, SignalSink) |
channel/ |
signal.py — the TraceSignal union (Open/Update/CloseSignal) and the single-consumer async-iterable SignalChannel |
redaction/ |
secret_scrubber.py — SecretScrubber, SecretPattern, DEFAULT_SECRET_PATTERNS, REDACTION_TOKEN |
recorder/ |
recorder.py (Recorder, RecorderOptions, OpenOptions) and sampling.py (SampleGate, SampleStrategy, RatioStrategy, FNV-1a unit hash) |
registry/ |
hub.py — the TelemetryHub named-recorder registry plus the opt-in process default |
sinks/ |
base.py (Sink ABC + _record_to_json), console.py, file.py (NDJSON), stream.py (to a writer) |
adapter/ |
runtime_trace.py — the optional trace_agent_run bridge + RunEventSubscribe; the one runtime-aware module |
The segment lifecycle
A Segment is a frozen dataclass — the OTel-free analogue of a span. It is never
mutated in place: the lifecycle open → note → close is modeled by producing new
frozen values through open_segment, with_attributes, and close_segment. Because
every value is immutable, a closed TraceRecord is trivially serializable.
Callers rarely touch those functions directly. The behavioural face is
SegmentHandle, a Protocol with note, child, fail, and close. There is
intentionally no class hierarchy:
- A sampled-in handle is the plain
_LiveHandlereturned byopen_handle. It holds the segment's current frozen value, emits anOpenSignalon construction, and on eachnote/fail/closederives the next frozen value and emits anUpdateSignal/CloseSignalthrough an injectedSignalSinkcallable. - A sampled-out handle is the single shared
NOOP_HANDLE— its methods do nothing and itschild()returns itself, so an entire sampled-out subtree costs nothing.
note, child, and fail are no-ops once a handle is closed, and close is
idempotent (the first call wins), so a double-close in a finally block is safe.
fail(error) records a SegmentError and sets the closing status to error
without closing — the actual close carries it. close_segment preserves a
pre-existing error even on an explicit close("ok"): only terminal fields are
replaced.
Each handle transition emits exactly one TraceSignal. These flow onto a
SignalChannel: a single-consumer async iterable backed by a deque plus an
asyncio future waiter. emit() appends and wakes a parked consumer; async for
pulls; close() ends iteration after draining. The channel is lossy under
backpressure — past bound it drops the oldest unread signal (newest wins); a
None/non-positive bound is unbounded. emit/close are synchronous and must run on
the consumer's event-loop thread; off-loop producers must marshal via
loop.call_soon_threadsafe.
import asyncio
from indusagi.tracing import Recorder, RecorderOptions, ConsoleSink
async def main() -> None:
rec = Recorder(RecorderOptions(service_name="my-agent"))
sink = ConsoleSink()
drain_task = asyncio.create_task(sink.drain(rec.channel()))
run = rec.open("run", "answer-question")
inf = run.child("inference", "chat.completion")
inf.note({"model": "opus", "tokens.in": 1200})
inf.close("ok")
act = run.child("action", "write_file")
act.fail("EACCES: permission denied") # sets the closing status to error
act.close()
run.close("ok")
rec.channel().close() # end the stream so drain() returns
await drain_task
asyncio.run(main())
Recorders and sampling
A Recorder is the flat tracer instance — it replaces the OTel tracer-provider /
tracer pair with one object. It holds a service_name, a SampleGate, and a
SignalChannel. open(kind, name, opts) resolves a trace id (a supplied
opts.trace_id joins an existing trace, otherwise a fresh one is minted), consults
the gate, and either returns NOOP_HANDLE (sampled out — nothing is emitted) or calls
open_handle bound to the channel's emit. A root open folds service.name into
the segment's attributes. channel() exposes the stream sinks drain.
SampleGate.decide(trace_id) -> bool is admission control. "always" and "never"
are constant verdicts; RatioStrategy(ratio=...) is deterministic ratio sampling,
not a random draw. The decision is _unit_hash(trace_id) < ratio, where _unit_hash
is a dependency-free FNV-1a folded to [0, 1). The hash masks with & 0xFFFFFFFF
after every multiply to reproduce a uint32 wrap-around exactly, so the same trace
id always yields the same verdict — across processes and replays — and children
opened on an admitted trace id are themselves admitted. A gate holds no mutable state,
so one instance is safely shared across recorders and tasks.
from indusagi.tracing import Recorder, RecorderOptions, RatioStrategy, SampleGate
# Either pass a strategy directly...
rec = Recorder(RecorderOptions(service_name="svc", sampling=RatioStrategy(ratio=0.25)))
# ...or a prebuilt gate (shareable, stateless):
gate = SampleGate(RatioStrategy(ratio=0.25))
admitted = gate.decide("a1b2c3d4e5f60718a1b2c3d4e5f60718") # deterministic per trace id
h = rec.open("run", "task")
print(h.active) # False when sampled out -> this is the shared NOOP_HANDLE
Draining to sinks
A Sink is the terminal end of the transport — the OTel-free analogue of an
exporter, but a plain ABC with one mandatory async drain(channel) pull-loop plus
optional flush()/close() lifecycle hooks (both default to no-ops). Each sink owns
its own async for … in channel loop and acts only on CloseSignal, ignoring
open/update chatter. Three concrete sinks ship:
ConsoleSinkprints one tidy line per closed segment — glyph, kind, name, duration, and a shorttrace/segmentid tag — through an injectedlogcallable (defaultprint). Stateless and resource-free, so it needs neitherflushnorclose.FileSink(path, FileSinkOptions(flush_every=...))appends each closed record as an NDJSON line, buffering and flushing viaasyncio.to_threadon theflush_everythreshold, at the end ofdrain, and onclose.StreamSinkis the most general: it writes each closed record as a JSON line to anasyncio.StreamWriter(or any text writer), honours backpressure via the writer'sdrain(), and optionally ends the stream onclose(end_on_close).
FileSink and StreamSink serialize through the shared base._record_to_json, which
emits camelCase keys (traceId, parentId, startedAt, endedAt) with compact
separators and default=str so a non-serializable attribute value won't crash the
sink.
Redaction
SecretScrubber is a stateless attribute processor. scrub(attributes) returns a
deep copy with secret-looking keys and values replaced by REDACTION_TOKEN — the
guillemet string '‹redacted›', deliberately not '[REDACTED]'. Rules come from
DEFAULT_SECRET_PATTERNS (credential key names, auth headers, bearer/vendor-key/JWT/
PEM value shapes), each a SecretPattern with optional key/value regexes matched
unanchored via re.search. The walk is iterative with an explicit stack (no
recursion) and never mutates its input; when a key name looks secret it
short-circuits descent and collapses the whole subtree to the token.
The scrubber is exported so sinks can sanitize attributes before persisting, but the
bundled ConsoleSink/FileSink/StreamSink do not call it automatically — a
caller must apply it explicitly.
import asyncio
from indusagi.tracing import (
Recorder, RecorderOptions, FileSink, FileSinkOptions, SecretScrubber,
)
scrubber = SecretScrubber() # uses DEFAULT_SECRET_PATTERNS + REDACTION_TOKEN
safe = scrubber.scrub({"authorization": "Bearer abcd1234efgh", "model": "opus"})
# -> {'authorization': '‹redacted›', 'model': 'opus'}
async def main() -> None:
rec = Recorder(RecorderOptions(service_name="svc"))
sink = FileSink("/tmp/trace.ndjson", FileSinkOptions(flush_every=10))
task = asyncio.create_task(sink.drain(rec.channel()))
step = rec.open("custom", "step")
step.note(safe)
step.close("ok")
rec.channel().close()
await task
await sink.close() # final flush
asyncio.run(main())
The telemetry hub
TelemetryHub is a named registry of Recorders. register(name, options) adds (or
replaces) a recorder and returns it; recorder(name=None) looks one up, falling back
to the default and raising LookupError if the requested or default recorder is
missing (so dropped traces fail loudly). get_default() returns the default recorder
or None, and has(name) checks membership. The first registration becomes the
default unless a later register uses the reserved "default" name, which always
re-designates it.
get_default_hub() lazily creates and returns a process-wide default hub, and
set_default_hub(hub) replaces it — both strictly opt-in; nothing in the subsystem
reads the process default implicitly.
from indusagi.tracing import TelemetryHub, RecorderOptions, get_default_hub
hub = TelemetryHub()
hub.register("agent-a", RecorderOptions(service_name="agent-a")) # becomes default
hub.register("agent-b", RecorderOptions(service_name="agent-b"))
rec = hub.recorder("agent-b")
default_rec = hub.recorder() # the default (agent-a)
print(hub.has("agent-b")) # True
shared = get_default_hub() # opt-in process-wide hub
Bridging the runtime
trace_agent_run is the optional, decoupled bridge that projects the runtime's flat
RunEvent stream into nested run/inference/action segments on a Recorder. It
subscribes to an event stream whose shape matches Agent.subscribe (captured as
RunEventSubscribe) and maintains the run/inference/action handles in a closure:
snapshot phases drive the inference (an invoking phase opens one;
dispatching/compacting/idle close it), tool_started/tool_finished map to
actions, and settled/faulted close the run. It also folds streamed
text_delta/thinking_delta counts onto the open inference as
stream.text_deltas/stream.thinking_deltas attributes. The call returns a disposer
that detaches and closes anything still open. It is best-effort and never raises on
malformed input — out-of-order, missing, or repeated events just close what's open and
ignore the rest.
from indusagi.tracing import Recorder, RecorderOptions, trace_agent_run
rec = Recorder(RecorderOptions(service_name="agent"))
# `agent.subscribe(handler) -> unsubscribe` matches RunEventSubscribe's shape.
dispose = trace_agent_run(rec, agent.subscribe)
try:
... # run the agent; run/inference/action segments flow onto rec.channel()
finally:
dispose() # detaches and closes any still-open segments
Relationship to neighbors
The tracing core (signal, channel, recorder, registry, redaction, sinks) is entirely
runtime-agnostic and sits below or beside the rest of the framework. The only coupling
to the Runtime is the optional adapter layer:
adapter/runtime_trace.py imports RunEvent under TYPE_CHECKING only and reads its
public shape (event.kind, snapshot.run_id/phase, event.id/name,
event.outcome.is_error, event.error) via getattr/match — it never touches
runtime internals, and nothing in the runtime depends on it.
Internally the layers form an acyclic stack: handle imports channel.signal and
signal.segment; channel.signal references segment only under TYPE_CHECKING to
stay acyclic; recorder composes channel, handle, and sampling; registry wraps
recorder; sinks consume the channel and share base._record_to_json. The top-level
package exposes tracing as a lazy submodule. For the model conversations being
traced, see the LLM Gateway; for the broader picture
of how subsystems fit together, see the Architecture.
