Tracing & Observability
indusagi::tracingis the framework's homegrown, dependency-light observability core — an OpenTelemetry-free way to trace agent runs, model calls, and tool actions. A trace is an immutableSegmentvalue driven by a functionalSegmentHandle(note/child/fail/close); each transition emits exactly oneTraceSignalonto aSignalChannel, and aSinkdrains that channel to a console, an NDJSON file, or any async writer. Admission control (SampleGate), a named-recorder registry (TelemetryHub), credential redaction (SecretScrubber), and an optionalRunEventadapter complete the surface. The vocabulary is deliberately fresh — no span/exporter/tracer-provider naming, and this is not thetracingcrate.
Table of Contents
- What it is
- Module layout
- Public surface
- The segment data model
- The functional handle
- The signal channel
- Recorders and the service stamp
- Sampling and admission control
- The telemetry hub
- Draining to sinks
- Redaction
- Bridging the runtime
- End-to-end example
- Relationship to neighbors
What it is
indusagi::tracing is a self-contained tracer with its own vocabulary. It replaces
the OTel "tracer provider / tracer / span / exporter" lineage with four flat
concepts:
- A
Segment— an immutable value object recording one unit of work (Run/Inference/Action/Recall/Custom). The OTel-free analogue of a span, but never mutated in place. - A
SegmentHandle— the behavioural face: a small functional handle withnote/child/fail/close. Each op derives the next frozen value and emits a signal. - A
TraceSignal— theOpen/Update/Closeevent a handle emits, carried over aSignalChannel(aMutex<VecDeque>+Notify, not an mpsc channel). - A
Sink— the terminal consumer that pulls signals off the channel and acts only onClose.
Around that core sit Recorder (the tracer instance), SampleGate (admission
control), TelemetryHub (a named registry), SecretScrubber (attribute
redaction), and an optional adapter that projects the runtime's
RunEvent stream onto a recorder. Everything lives in the single indusagi crate
under src/tracing/, exposed as the indusagi::tracing module.
This mirrors the Python edition indusagi.tracing
subsystem one-for-one — same lifecycle, same FNV-1a sampler, same guillemet
redaction token — but with Rust types: snake_case modules, CamelCase types, and a
Stream-based reader instead of an async for iterable.
Module layout
The module barrel (src/tracing/mod.rs) re-exports its layers bottom-up. The
declared submodules:
| Module | File(s) | Holds |
|---|---|---|
signal |
signal/mod.rs, signal/segment.rs, signal/handle.rs |
The immutable Segment / TraceRecord data model + value constructors, id minting, and the functional SegmentHandle with open_handle / SegmentHandle::noop |
channel |
channel.rs |
The TraceSignal union and the SignalChannel transport + its SignalChannelReader stream |
redaction |
redaction.rs |
SecretScrubber, SecretPattern, default_secret_patterns, REDACTION_TOKEN |
sampling |
sampling.rs |
SampleGate, SampleStrategy, the FNV-1a unit_hash |
recorder |
recorder.rs |
Recorder, OpenOptions, the service.name stamp |
hub |
hub.rs |
TelemetryHub named registry + the opt-in process default |
sinks |
sinks/mod.rs, sinks/console.rs, sinks/file.rs, sinks/stream.rs |
The Sink trait + ConsoleSink, FileSink, StreamSink |
adapter |
adapter.rs |
The optional trace_agent_run bridge + the RunEvent mirror types |
The dependency graph is acyclic: handle imports channel::TraceSignal and
signal::segment; channel references segment for its payload types; recorder
composes channel, handle, and sampling; hub wraps recorder; sinks consume
the channel reader; adapter reads only the public Recorder / SegmentHandle
surface.
Public surface
pub use from src/tracing/mod.rs (the barrel that mirrors the TS
src/tracing/index.ts):
| Name | Kind | Source | Purpose |
|---|---|---|---|
Segment |
struct | signal/segment.rs |
Immutable trace segment: id, trace_id, parent_id, kind, name, started_at, ended_at, status, attributes, error. #[serde(rename_all = "camelCase")] |
SegmentKind |
enum | signal/segment.rs |
Run / Inference / Action / Recall / Custom (#[default] Custom); serde lowercase |
SegmentStatus |
enum | signal/segment.rs |
Open / Ok / Error; serde lowercase |
TerminalStatus |
enum | signal/segment.rs |
A narrowed Ok / Error; From<TerminalStatus> for SegmentStatus |
SegmentError |
struct | signal/segment.rs |
A serializable failure: one message: String; from_display / From<&str> / From<String> |
TraceRecord |
struct | signal/segment.rs |
#[serde(transparent)] newtype over a guaranteed-terminal Segment; what a Close carries; Deref<Target = Segment> |
Attributes |
type | signal/segment.rs |
serde_json::Map<String, Value> — an insertion-ordered attribute bag |
OpenSegmentInput |
struct | signal/segment.rs |
Inputs for open_segment; missing ids/clock are generated |
CloseOptions |
struct | signal/segment.rs |
ended_at / error overrides for close_segment |
open_segment |
fn | signal/segment.rs |
Construct a fresh Open Segment |
with_attributes |
fn | signal/segment.rs |
Derive a new segment with a patch merged over attributes (last-writer-wins) |
close_segment |
fn | signal/segment.rs |
Derive the terminal TraceRecord, stamping ended_at, final status, and (on error) the SegmentError |
fresh_id |
fn | signal/segment.rs |
byte_length random bytes → lowercase hex via getrandom + hex::encode |
fresh_trace_id |
fn | signal/segment.rs |
A new 128-bit (16-byte) hex trace id |
fresh_segment_id |
fn | signal/segment.rs |
A new 64-bit (8-byte) hex segment id |
SegmentHandle |
struct | signal/handle.rs |
The functional handle: trace_id()/id()/active(); note/child/fail/close; SegmentHandle::noop() |
OpenHandleInput |
struct | signal/handle.rs |
Inputs for opening a real, sampled-in handle |
open_handle |
fn | signal/handle.rs |
Open a live segment, emit an Open signal onto the sink, return its SegmentHandle |
SignalSink |
type | signal/handle.rs |
Arc<dyn Fn(TraceSignal) + Send + Sync> — a consumer of trace signals |
TraceSignal |
enum | channel.rs |
Open { segment } / Update { id, attributes } / Close { record }; #[serde(tag = "type")] |
SignalChannel |
struct | channel.rs |
Single-stream, multi-emit, async-pullable transport; emit()/close() sync, reader() async |
SignalChannelOptions |
struct | channel.rs |
bound: Option<usize> (drop-oldest cap) |
SignalChannelReader |
struct | channel.rs |
A futures::Stream<Item = TraceSignal> over the channel |
SecretPattern |
struct | redaction.rs |
A rule: label + optional key / value Regex (unanchored is_match) |
REDACTION_TOKEN |
const | redaction.rs |
The inert replacement "‹redacted›" (guillemets, deliberately not [REDACTED]) |
default_secret_patterns |
fn | redaction.rs |
&'static [SecretPattern] — built-in rules (lazy OnceLock) |
SecretScrubber |
struct | redaction.rs |
Stateless attribute processor; scrub() returns a deep copy with secrets replaced |
SecretScrubberOptions |
struct | redaction.rs |
Override patterns / token |
SampleGate |
struct | sampling.rs |
Trace admission control; decide(trace_id) -> bool. Copy + stateless |
SampleStrategy |
enum | sampling.rs |
Always (#[default]) / Never / Ratio(f64) |
Recorder |
struct | recorder.rs |
The tracer instance: service_name, a SampleGate, a SignalChannel |
OpenOptions |
struct | recorder.rs |
Per-open options: trace_id / parent_id / attributes |
TelemetryHub |
struct | hub.rs |
Named registry of Recorders with a designated default |
get_default_hub |
fn | hub.rs |
Lazily create/return the process-wide default hub (opt-in) |
set_default_hub |
fn | hub.rs |
Replace the process-wide default hub (e.g. for tests) |
Sink |
trait | sinks/mod.rs |
Terminal channel consumer: async drain(reader); optional flush()/close() |
ConsoleSink |
struct | sinks/console.rs |
Prints one tidy line per closed segment through an injected LogFn |
ConsoleSinkOptions |
struct | sinks/console.rs |
log: Option<LogFn> |
LogFn |
type | sinks/console.rs |
Box<dyn Fn(&str) + Send> — the console line writer |
FileSink |
struct | sinks/file.rs |
Appends each closed segment as an NDJSON line; batches per flush_every |
FileSinkOptions |
struct | sinks/file.rs |
flush_every: Option<usize> (default 1) |
StreamSink |
struct | sinks/stream.rs |
Writes each closed record as a JSON line to a tokio::io::AsyncWrite; honours backpressure |
StreamSinkOptions |
struct | sinks/stream.rs |
end_on_close: bool (default false) |
trace_agent_run |
fn | adapter.rs |
Project a runtime RunEvent stream onto a Recorder as nested segments; returns a Disposer |
RunEvent / RunSnapshot / ToolOutcome / RunEventSubscribe |
enum/struct/type | adapter.rs |
The local mirror of the runtime event vocabulary the adapter reads |
The segment data model
A Segment is a plain value object — never mutated in place. The lifecycle
open → note → close is expressed by producing new segment values via
open_segment, with_attributes, and close_segment. Because every value is
immutable and Clone, a closed TraceRecord is trivially serializable. There is no
BaseSpan / DefaultSpan / NoopSpan inheritance shape.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Segment {
pub id: String,
pub trace_id: String,
pub parent_id: Option<String>,
pub kind: SegmentKind,
pub name: String,
pub started_at: u64, // epoch ms
pub ended_at: Option<u64>, // None while open
pub status: SegmentStatus,
pub attributes: Attributes, // serde_json::Map<String, Value>
#[serde(skip_serializing_if = "Option::is_none", default)]
pub error: Option<SegmentError>,
}
#[serde(rename_all = "camelCase")] makes startedAt / endedAt / traceId /
parentId serialize byte-identically to the TS NDJSON, so the Rust file/stream sinks
produce the same wire bytes as the original. Attributes is a serde_json::Map
backed by preserve_order (an IndexMap), so key order follows insertion — parity
with TS Object.keys.
TraceRecord is a #[serde(transparent)] newtype over a Segment guaranteed
terminal (ended_at present, status no longer Open). It Derefs to Segment
and offers segment(), ended_at(), and status() accessors.
The value constructors:
pub fn open_segment(input: OpenSegmentInput) -> Segment;
pub fn with_attributes(segment: &Segment, patch: &Attributes) -> Segment;
pub fn close_segment(segment: &Segment, status: TerminalStatus, options: CloseOptions) -> TraceRecord;
open_segment mints missing ids (fresh_segment_id / fresh_trace_id) and defaults
the clock to now_ms; with everything supplied it is deterministic.
with_attributes clones the segment and merges the patch (last-writer-wins) without
touching the original. close_segment stamps ended_at, sets the final status, and
on Error attaches the error — a passed-in CloseOptions.error wins over any error
already on the segment (options.error.or_else(|| segment.error.clone())), exactly
reproducing the TS error = options.error ?? segment.error. Ids are minted from the
OS CSPRNG (getrandom), 16 bytes for trace ids (128-bit, W3C-Trace-Context sizing)
and 8 bytes for segment ids (64-bit), hex-encoded.
The functional handle
Callers rarely touch the value constructors directly. The behavioural face is
SegmentHandle:
impl SegmentHandle {
pub fn trace_id(&self) -> &str;
pub fn id(&self) -> &str;
pub fn active(&self) -> bool; // false for the noop handle
pub fn noop() -> SegmentHandle; // the single shared sampled-out value
pub fn note(&self, attributes: Attributes); // emits Update
pub fn child(&self, kind: SegmentKind, name: &str) -> SegmentHandle; // opens a child
pub fn fail(&self, error: impl Into<SegmentError>); // records error; does NOT close
pub fn close(&self, status: Option<TerminalStatus>);// emits Close
}
There is intentionally no class hierarchy. A sampled-in handle holds shared
interior-mutable state behind a parking_lot::Mutex (the only mutable state in the
module; it never escapes — callers only see immutable Segment / TraceRecord
values via the emitted signals). A sampled-out handle is the single value returned by
SegmentHandle::noop(): its state field is None, all methods are early-returns,
and child() returns itself, so an entire sampled-out subtree costs nothing. The
handle is cheaply Clone (an Arc plus two small strings); clones share the same
underlying segment state.
open_handle creates a live handle:
pub fn open_handle(input: OpenHandleInput, sink: SignalSink) -> SegmentHandle;
It builds the open segment, emits an Open signal through the injected sink before
stashing the state (mirroring the TS ordering), then returns the handle.
Lifecycle guarantees, all preserved from the TS handle:
note,child, andfailare no-ops once a handle is closed (s.done).closeis idempotent — the first call wins (if s.done { return }; s.done = true), so a double-close in a teardown path is safe.fail(error)records apending_errorand does not close; the eventualclosecarries it.close(None)resolves status frompending_error(Errorif set, elseOk);close(Some(status))forces a status.
Each transition emits exactly one TraceSignal: note → Update, close → Close,
and the constructor → Open. The sink is an Arc<dyn Fn(TraceSignal) + Send + Sync>
(the SignalSink type) — typically a closure over a SignalChannel::emit.
The signal channel
TraceSignal is the tagged union every channel consumer branches on:
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum TraceSignal {
Open { segment: Segment },
Update { id: String, attributes: Attributes },
Close { record: TraceRecord },
}
#[serde(tag = "type")] reproduces the TS { type: "open", … } shape.
SignalChannel is the transport — a single-stream, multi-emit, async-pullable queue.
Critically, it is not an mpsc channel: it is a Mutex<VecDeque<TraceSignal>> plus
a tokio::sync::Notify, owned directly so the TS drop-oldest backpressure semantics
can be reproduced exactly (a bounded mpsc channel blocks or errors on full instead of
evicting):
impl SignalChannel {
pub fn new() -> Self; // unbounded
pub fn with_options(options: SignalChannelOptions) -> Self;
pub fn emit(&self, signal: TraceSignal); // sync; callable from handle closures
pub fn close(&self); // idempotent; ends the stream
pub fn is_closed(&self) -> bool;
pub fn pending(&self) -> usize; // buffered-but-unread count
pub fn reader(&self) -> SignalChannelReader; // the Stream entrypoint
}
emit is synchronous so handle closures can call it from non-async code; it
pushes onto the deque, drops the oldest entry when the buffer exceeds bound
(newest wins), and wakes a parked consumer via notify_one. A None or non-positive
bound leaves the channel unbounded. emit is a no-op once closed.
Consumption is a futures::Stream:
impl Stream for SignalChannelReader {
type Item = TraceSignal;
// FIFO: drains the buffer front first (even after close), then parks on Notify.
}
The reader drains the buffer front-first (FIFO), even after close, then parks on the
next Notify. Empty-and-closed yields Poll::Ready(None) — drain-then-done — so a
consumer always sees every queued signal before the stream ends. The channel is
single-consumer (a second reader() shares the same queue; signals are not fanned
out). Clone on SignalChannel is cheap (an Arc<Inner>), so a recorder, its
handles, and a sink all share one stream.
Recorders and the service stamp
A Recorder is the flat tracer instance — it replaces the OTel tracer-provider /
tracer pair with one object. It owns a service_name, a SampleGate, and a
SignalChannel:
impl Recorder {
pub fn new(service_name: impl Into<String>) -> Self; // Always sampling, fresh channel
pub fn with_sampling(self, strategy: SampleStrategy) -> Self; // builder
pub fn with_gate(self, gate: SampleGate) -> Self; // share a prebuilt gate
pub fn with_channel(self, channel: SignalChannel) -> Self; // emit onto an existing channel
pub fn service_name(&self) -> &str;
pub fn open(&self, kind: SegmentKind, name: &str, opts: Option<OpenOptions>) -> SegmentHandle;
pub fn channel(&self) -> SignalChannel; // what sinks drain
}
open is the sole entry point. It resolves the trace id — a supplied
opts.trace_id joins an existing trace, otherwise a fresh one is minted — then
consults the SampleGate. Sampled out → SegmentHandle::noop() (nothing is emitted).
Sampled in → open_handle bound to a closure that calls self.channel().emit(...).
A root open (no trace_id) folds service.name into the segment's attributes
first, so a caller attribute of the same key wins (last-writer-wins, matching the TS
spread order). channel() exposes the stream sinks drain — the same underlying
SignalChannel every opened handle emits onto.
Sampling and admission control
SampleGate decides, once per open, whether a new trace is recorded at all:
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub enum SampleStrategy {
#[default]
Always,
Never,
Ratio(f64), // admit a deterministic fraction in [0, 1]
}
impl SampleGate {
pub fn new(strategy: SampleStrategy) -> Self; // ratio thresholds pre-clamped
pub fn decide(&self, trace_id: &str) -> bool;
}
Always and Never are constant verdicts. Ratio(r) is deterministic ratio
sampling, not a random draw: the verdict is unit_hash(trace_id) < threshold, where
unit_hash is a dependency-free FNV-1a over the trace id's bytes folded to [0, 1):
const FNV_OFFSET_BASIS: u32 = 0x811c_9dc5;
const FNV_PRIME: u32 = 0x0100_0193;
const TWO_POW_32: f64 = 4_294_967_296.0;
fn unit_hash(trace_id: &str) -> f64 {
let mut hash: u32 = FNV_OFFSET_BASIS;
for b in trace_id.bytes() {
hash ^= b as u32;
hash = hash.wrapping_mul(FNV_PRIME); // == TS Math.imul + implicit >>> 0
}
(hash as f64) / TWO_POW_32
}
wrapping_mul reproduces the TS Math.imul plus the implicit >>> 0 uint32
wrap-around exactly, so the same trace id always yields the same verdict — across
processes and replays — and children opened on an admitted trace id are themselves
admitted. The ratio is clamped into [0, 1] at construction (NaN/≤ 0 → 0,
≥ 1 → 1); ratio 0 admits nothing, ratio 1 admits everything regardless of the
hash. SampleGate is Copy and holds no mutable state, so one instance is safely
shared across recorders and tasks.
The telemetry hub
TelemetryHub is a named registry of Recorders — one per agent, subsystem, or
tenant — addressable by name without threading instances through every call site:
impl TelemetryHub {
pub fn new() -> Self;
pub fn register(&mut self, name: &str, recorder: Recorder) -> Recorder;
pub fn recorder(&self, name: Option<&str>) -> Result<Recorder, String>;
pub fn get_default(&self) -> Option<Recorder>;
pub fn has(&self, name: &str) -> bool;
}
register adds (or replaces) a recorder and returns it. The first registration
becomes the default unless a later register uses the reserved "default" name,
which always re-designates it. recorder(None) falls back to the default;
recorder(Some(name)) looks one up. A missing requested-or-default recorder is an
Err, not a silent drop — so dropped traces fail loudly. The error string is
parity-critical: TelemetryHub: no recorder registered as "<which>" (with (default)
substituted when no name was given).
A process-wide default hub is offered for code that has no hub to hand, but it is strictly opt-in — nothing in the subsystem reads it implicitly:
pub fn get_default_hub() -> Arc<Mutex<TelemetryHub>>; // lazily created on first call
pub fn set_default_hub(hub: TelemetryHub); // replace it (e.g. for tests)
get_default_hub returns a shared Arc<Mutex<TelemetryHub>> so all callers see the
same registry.
Draining to sinks
A Sink is the terminal end of the transport — the OTel-free analogue of an
exporter, but a plain trait with one mandatory drain pull-loop plus optional
flush / close lifecycle hooks:
#[allow(async_fn_in_trait)]
pub trait Sink {
async fn drain(&mut self, reader: SignalChannelReader);
async fn flush(&mut self) -> io::Result<()> { Ok(()) } // default no-op
async fn close(&mut self) -> io::Result<()> { Ok(()) } // default no-op
}
There is no subscriber registration and no push: a sink owns its own pull loop and
runs until the channel is closed and drained. drain takes the concrete
SignalChannelReader (there is exactly one channel type), so the subsystem avoids
trait-object boxing entirely. Every concrete sink iterates the reader and acts only
on TraceSignal::Close, ignoring Open / Update chatter. Three ship:
| Sink | Construct | Acts on close by |
|---|---|---|
ConsoleSink |
ConsoleSink::new() / with_options(ConsoleSinkOptions { log }) |
Printing one tidy line per closed segment through an injected LogFn (default stdout) |
FileSink |
FileSink::new(path) / with_options(path, FileSinkOptions { flush_every }) |
Appending the record as an NDJSON line; batches per flush_every (default 1), flushes on threshold / end-of-drain / close |
StreamSink<W> |
StreamSink::new(writer) / with_options(writer, StreamSinkOptions { end_on_close }) |
Writing the record as a JSON line to any tokio::io::AsyncWrite, honouring backpressure via write_all |
ConsoleSink is stateless and resource-free, so it needs neither a real flush nor
close. Its line carries a status glyph (✓ ok, ✗ error, ? unknown), the
padded kind and name, the wall-clock duration, and a [traceHead/segmentId] tag (the
first 8 chars of the trace id), e.g.:
✓ inference chat.completion 842ms [a1b2c3d4/9f8e7d6c]
✗ action write_file 12ms [a1b2c3d4/0011aabb] — EACCES: permission denied
FileSink owns an in-memory pending batch (hence a meaningful flush) but no live
OS handle — each append opens with create(true).append(true), writes, and closes.
It flushes whatever remains when drain ends, and close is a final flush.
StreamSink<W> is the most general: it owns no buffer (bytes go straight to the
writer), flush flushes the writer, and close shuts the writer down only when
constructed with end_on_close: true (so it never ends a writer it did not create,
e.g. stdout). into_inner returns the underlying writer, useful in tests.
FileSink and StreamSink serialize through serde_json::to_string(&record), which
— via the Segment's camelCase serde — emits traceId / parentId / startedAt
/ endedAt keys, byte-identical to the TS NDJSON.
Redaction
SecretScrubber is a stateless attribute processor. scrub(&attributes) returns a
deep copy with secret-looking keys and values replaced by REDACTION_TOKEN — the
guillemet string "‹redacted›" (U+2039 / U+203A), deliberately not [REDACTED]:
impl SecretScrubber {
pub fn new(options: SecretScrubberOptions) -> Self; // default patterns + token
pub fn scrub(&self, attributes: &Map<String, Value>) -> Map<String, Value>;
}
Two independent signals trigger a redaction:
- Key shape — the path key matches a
SecretPattern.key(e.g.api_key,authorization). The value is redacted regardless of its content. - Value shape — a string leaf matches a
SecretPattern.value(e.g. aBearer …token,sk-…key, JWT, PEM body) even under an innocuous key.
Rules come from default_secret_patterns() — a lazily-built &'static [SecretPattern]
(via OnceLock) with three key-name rules ((?i) case-insensitive) and four
value-shape rules. One value rule, vendor-key-prefix-value (sk-…/xoxb-…/ghp_…/
AKIA…), is deliberately case-sensitive to match the TS source. Each pattern's
regexes are matched unanchored with Regex::is_match (the TS RegExp.test
equivalent).
The walk is a single deep copy that never mutates its input and produces a fresh
structure each time; output key order follows input iteration order. When a key name
looks secret it short-circuits descent and collapses the whole subtree to the
token — so a credential nested inside (auth: { token: … }) cannot leak. The scrubber
is exported so callers can sanitize attributes before persisting, but the bundled
ConsoleSink / FileSink / StreamSink do not call it automatically — a caller
must apply it explicitly (e.g. inside the note they pass to a handle).
use indusagi::tracing::SecretScrubber;
use serde_json::json;
let scrubber = SecretScrubber::default(); // default patterns + REDACTION_TOKEN
let attrs = json!({ "authorization": "Bearer abcd1234efgh", "model": "opus" })
.as_object().unwrap().clone();
let safe = scrubber.scrub(&attrs);
// safe["authorization"] == "‹redacted›", safe["model"] == "opus"
Bridging the runtime
trace_agent_run is the optional, decoupled bridge that projects the
runtime's flat RunEvent stream into nested
Run / Inference / Action segments on a Recorder:
pub fn trace_agent_run(recorder: Recorder, subscribe: RunEventSubscribe) -> Disposer;
pub type RunEventSubscribe =
Box<dyn FnOnce(Box<dyn FnMut(RunEvent) + Send>) -> Box<dyn FnOnce() + Send>>;
pub type Disposer = Box<dyn FnOnce() + Send>;
trace_agent_run, RunEvent, RunSnapshot, ToolOutcome, and RunEventSubscribe
are re-exported from the indusagi::tracing barrel; Disposer is pub but lives only
in the adapter submodule, so callers that need to name the return type reach it as
indusagi::tracing::adapter::Disposer (most code just calls the returned value).
subscribe matches the shape of Agent.subscribe without importing the Agent
type. The adapter reads only the public RunEvent shape — a faithful local mirror of
the TS RunEvent union (Snapshot / TextDelta / ThinkingDelta / ToolStarted /
ToolFinished / Settled / Faulted) plus RunSnapshot { run_id, phase } and
ToolOutcome { id, output, is_error }. (The module docstring notes these mirror the
runtime's contract; the code carries no #[cfg] gate, so the adapter is always
compiled.)
The projection runs through a RunTracer held behind an Arc<Mutex<…>>:
- The run segment is opened lazily on the first activity (
ensure_run), stampingrun.idwhen known, and closed onSettled(statusOk) /Faulted(statusError, with the error folded viarun.fail(...)). - The inference segment is a run child opened on the
invokingphase and closed ondispatching/compacting/idle. A duplicate phase is ignored (last_phasededup); a newinvokingcloses any stale inference first. - Streamed
TextDelta/ThinkingDeltaevents bump counters that are folded onto the open inference asstream.text_deltas/stream.thinking_deltasattributes at close. - Each action segment is a run child keyed by tool id (
ToolStarted→actions.insert), annotated withtool.id/tool.name, then closed on the matchingToolFinishedwith statusErroriffoutcome.is_errorand atool.is_errorattribute.
The returned Disposer detaches the subscription and tears down anything still open
(teardown closes dangling actions, the inference, and the run as Ok). The mapping
is best-effort and never panics on malformed input — out-of-order, missing, or
repeated events just close what's open and ignore the rest.
use indusagi::tracing::{Recorder, trace_agent_run};
let recorder = Recorder::new("agent");
// `agent.subscribe(handler) -> unsubscribe` matches RunEventSubscribe's shape.
let dispose = trace_agent_run(recorder, subscribe);
// ... run the agent; Run/Inference/Action segments flow onto recorder.channel() ...
dispose(); // detaches and closes any still-open segments
End-to-end example
Wire a recorder to a ConsoleSink, drive a handful of segments, then close the
stream so the drain task finishes:
use indusagi::tracing::{ConsoleSink, Recorder, SegmentKind, Sink};
#[tokio::main]
async fn main() {
let rec = Recorder::new("my-agent");
let reader = rec.channel().reader();
// Spawn the sink's pull loop.
let drain = tokio::spawn(async move {
let mut sink = ConsoleSink::new();
sink.drain(reader).await;
});
let run = rec.open(SegmentKind::Run, "answer-question", None);
let inf = run.child(SegmentKind::Inference, "chat.completion");
inf.note(serde_json::json!({ "model": "opus", "tokens.in": 1200 })
.as_object().unwrap().clone());
inf.close(None); // resolves to Ok
let act = run.child(SegmentKind::Action, "write_file");
act.fail("EACCES: permission denied"); // sets the closing status to Error
act.close(None); // resolves to Error
run.close(None);
rec.channel().close(); // end the stream so drain() returns
drain.await.unwrap();
}
For ratio sampling, share a stateless gate or pass a strategy:
use indusagi::tracing::{Recorder, SampleGate, SampleStrategy};
let rec = Recorder::new("svc").with_sampling(SampleStrategy::Ratio(0.25));
let gate = SampleGate::new(SampleStrategy::Ratio(0.25)); // Copy + shareable
let admitted = gate.decide("a1b2c3d4e5f60718a1b2c3d4e5f60718"); // deterministic per id
let h = rec.open(indusagi::tracing::SegmentKind::Run, "task", None);
assert_eq!(h.active(), /* true when admitted, false (noop handle) when sampled out */ h.active());
Relationship to neighbors
The tracing core (signal, channel, recorder, hub, redaction, sinks) is entirely
runtime-agnostic and sits below or beside the rest of the framework. Ids and the
sampler draw randomness from getrandom; segment clocks delegate to
crate::core::now_ms; the transport rides on tokio's
Notify and futures::Stream. The only coupling to the
runtime is the optional adapter, which reads just the
public RunEvent shape via its own local mirror types — it never touches runtime
internals, and nothing in the runtime depends on it.
For the model conversations being traced, see the
LLM Gateway; for how all the layers fit together, see
the Architecture. This subsystem is the Rust counterpart of the
Python indusagi.tracing — same lifecycle, same
deterministic FNV-1a sampler, same guillemet redaction token, same camelCase NDJSON
wire — re-expressed with Rust's type system: an immutable Segment value, a Stream
reader instead of an async iterable, and a drop-oldest VecDeque transport in place
of the TS deque.
