Subsystemssubsystems/tracing

Tracing & Observability

indusagi::tracing is the framework's homegrown, dependency-light observability core — an OpenTelemetry-free way to trace agent runs, model calls, and tool actions. A trace is an immutable Segment value driven by a functional SegmentHandle (note/child/fail/close); each transition emits exactly one TraceSignal onto a SignalChannel, and a Sink drains that channel to a console, an NDJSON file, or any async writer. Admission control (SampleGate), a named-recorder registry (TelemetryHub), credential redaction (SecretScrubber), and an optional RunEvent adapter complete the surface. The vocabulary is deliberately fresh — no span/exporter/tracer-provider naming, and this is not the tracing crate.

Table of Contents

What it is

indusagi::tracing is a self-contained tracer with its own vocabulary. It replaces the OTel "tracer provider / tracer / span / exporter" lineage with four flat concepts:

  • A Segment — an immutable value object recording one unit of work (Run / Inference / Action / Recall / Custom). The OTel-free analogue of a span, but never mutated in place.
  • A SegmentHandle — the behavioural face: a small functional handle with note / child / fail / close. Each op derives the next frozen value and emits a signal.
  • A TraceSignal — the Open / Update / Close event a handle emits, carried over a SignalChannel (a Mutex<VecDeque> + Notify, not an mpsc channel).
  • A Sink — the terminal consumer that pulls signals off the channel and acts only on Close.

Around that core sit Recorder (the tracer instance), SampleGate (admission control), TelemetryHub (a named registry), SecretScrubber (attribute redaction), and an optional adapter that projects the runtime's RunEvent stream onto a recorder. Everything lives in the single indusagi crate under src/tracing/, exposed as the indusagi::tracing module.

This mirrors the Python edition indusagi.tracing subsystem one-for-one — same lifecycle, same FNV-1a sampler, same guillemet redaction token — but with Rust types: snake_case modules, CamelCase types, and a Stream-based reader instead of an async for iterable.

Module layout

The module barrel (src/tracing/mod.rs) re-exports its layers bottom-up. The declared submodules:

Module File(s) Holds
signal signal/mod.rs, signal/segment.rs, signal/handle.rs The immutable Segment / TraceRecord data model + value constructors, id minting, and the functional SegmentHandle with open_handle / SegmentHandle::noop
channel channel.rs The TraceSignal union and the SignalChannel transport + its SignalChannelReader stream
redaction redaction.rs SecretScrubber, SecretPattern, default_secret_patterns, REDACTION_TOKEN
sampling sampling.rs SampleGate, SampleStrategy, the FNV-1a unit_hash
recorder recorder.rs Recorder, OpenOptions, the service.name stamp
hub hub.rs TelemetryHub named registry + the opt-in process default
sinks sinks/mod.rs, sinks/console.rs, sinks/file.rs, sinks/stream.rs The Sink trait + ConsoleSink, FileSink, StreamSink
adapter adapter.rs The optional trace_agent_run bridge + the RunEvent mirror types

The dependency graph is acyclic: handle imports channel::TraceSignal and signal::segment; channel references segment for its payload types; recorder composes channel, handle, and sampling; hub wraps recorder; sinks consume the channel reader; adapter reads only the public Recorder / SegmentHandle surface.

Public surface

pub use from src/tracing/mod.rs (the barrel that mirrors the TS src/tracing/index.ts):

Name Kind Source Purpose
Segment struct signal/segment.rs Immutable trace segment: id, trace_id, parent_id, kind, name, started_at, ended_at, status, attributes, error. #[serde(rename_all = "camelCase")]
SegmentKind enum signal/segment.rs Run / Inference / Action / Recall / Custom (#[default] Custom); serde lowercase
SegmentStatus enum signal/segment.rs Open / Ok / Error; serde lowercase
TerminalStatus enum signal/segment.rs A narrowed Ok / Error; From<TerminalStatus> for SegmentStatus
SegmentError struct signal/segment.rs A serializable failure: one message: String; from_display / From<&str> / From<String>
TraceRecord struct signal/segment.rs #[serde(transparent)] newtype over a guaranteed-terminal Segment; what a Close carries; Deref<Target = Segment>
Attributes type signal/segment.rs serde_json::Map<String, Value> — an insertion-ordered attribute bag
OpenSegmentInput struct signal/segment.rs Inputs for open_segment; missing ids/clock are generated
CloseOptions struct signal/segment.rs ended_at / error overrides for close_segment
open_segment fn signal/segment.rs Construct a fresh Open Segment
with_attributes fn signal/segment.rs Derive a new segment with a patch merged over attributes (last-writer-wins)
close_segment fn signal/segment.rs Derive the terminal TraceRecord, stamping ended_at, final status, and (on error) the SegmentError
fresh_id fn signal/segment.rs byte_length random bytes → lowercase hex via getrandom + hex::encode
fresh_trace_id fn signal/segment.rs A new 128-bit (16-byte) hex trace id
fresh_segment_id fn signal/segment.rs A new 64-bit (8-byte) hex segment id
SegmentHandle struct signal/handle.rs The functional handle: trace_id()/id()/active(); note/child/fail/close; SegmentHandle::noop()
OpenHandleInput struct signal/handle.rs Inputs for opening a real, sampled-in handle
open_handle fn signal/handle.rs Open a live segment, emit an Open signal onto the sink, return its SegmentHandle
SignalSink type signal/handle.rs Arc<dyn Fn(TraceSignal) + Send + Sync> — a consumer of trace signals
TraceSignal enum channel.rs Open { segment } / Update { id, attributes } / Close { record }; #[serde(tag = "type")]
SignalChannel struct channel.rs Single-stream, multi-emit, async-pullable transport; emit()/close() sync, reader() async
SignalChannelOptions struct channel.rs bound: Option<usize> (drop-oldest cap)
SignalChannelReader struct channel.rs A futures::Stream<Item = TraceSignal> over the channel
SecretPattern struct redaction.rs A rule: label + optional key / value Regex (unanchored is_match)
REDACTION_TOKEN const redaction.rs The inert replacement "‹redacted›" (guillemets, deliberately not [REDACTED])
default_secret_patterns fn redaction.rs &'static [SecretPattern] — built-in rules (lazy OnceLock)
SecretScrubber struct redaction.rs Stateless attribute processor; scrub() returns a deep copy with secrets replaced
SecretScrubberOptions struct redaction.rs Override patterns / token
SampleGate struct sampling.rs Trace admission control; decide(trace_id) -> bool. Copy + stateless
SampleStrategy enum sampling.rs Always (#[default]) / Never / Ratio(f64)
Recorder struct recorder.rs The tracer instance: service_name, a SampleGate, a SignalChannel
OpenOptions struct recorder.rs Per-open options: trace_id / parent_id / attributes
TelemetryHub struct hub.rs Named registry of Recorders with a designated default
get_default_hub fn hub.rs Lazily create/return the process-wide default hub (opt-in)
set_default_hub fn hub.rs Replace the process-wide default hub (e.g. for tests)
Sink trait sinks/mod.rs Terminal channel consumer: async drain(reader); optional flush()/close()
ConsoleSink struct sinks/console.rs Prints one tidy line per closed segment through an injected LogFn
ConsoleSinkOptions struct sinks/console.rs log: Option<LogFn>
LogFn type sinks/console.rs Box<dyn Fn(&str) + Send> — the console line writer
FileSink struct sinks/file.rs Appends each closed segment as an NDJSON line; batches per flush_every
FileSinkOptions struct sinks/file.rs flush_every: Option<usize> (default 1)
StreamSink struct sinks/stream.rs Writes each closed record as a JSON line to a tokio::io::AsyncWrite; honours backpressure
StreamSinkOptions struct sinks/stream.rs end_on_close: bool (default false)
trace_agent_run fn adapter.rs Project a runtime RunEvent stream onto a Recorder as nested segments; returns a Disposer
RunEvent / RunSnapshot / ToolOutcome / RunEventSubscribe enum/struct/type adapter.rs The local mirror of the runtime event vocabulary the adapter reads

The segment data model

A Segment is a plain value object — never mutated in place. The lifecycle open → note → close is expressed by producing new segment values via open_segment, with_attributes, and close_segment. Because every value is immutable and Clone, a closed TraceRecord is trivially serializable. There is no BaseSpan / DefaultSpan / NoopSpan inheritance shape.

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Segment {
    pub id: String,
    pub trace_id: String,
    pub parent_id: Option<String>,
    pub kind: SegmentKind,
    pub name: String,
    pub started_at: u64,            // epoch ms
    pub ended_at: Option<u64>,      // None while open
    pub status: SegmentStatus,
    pub attributes: Attributes,     // serde_json::Map<String, Value>
    #[serde(skip_serializing_if = "Option::is_none", default)]
    pub error: Option<SegmentError>,
}

#[serde(rename_all = "camelCase")] makes startedAt / endedAt / traceId / parentId serialize byte-identically to the TS NDJSON, so the Rust file/stream sinks produce the same wire bytes as the original. Attributes is a serde_json::Map backed by preserve_order (an IndexMap), so key order follows insertion — parity with TS Object.keys.

TraceRecord is a #[serde(transparent)] newtype over a Segment guaranteed terminal (ended_at present, status no longer Open). It Derefs to Segment and offers segment(), ended_at(), and status() accessors.

The value constructors:

pub fn open_segment(input: OpenSegmentInput) -> Segment;
pub fn with_attributes(segment: &Segment, patch: &Attributes) -> Segment;
pub fn close_segment(segment: &Segment, status: TerminalStatus, options: CloseOptions) -> TraceRecord;

open_segment mints missing ids (fresh_segment_id / fresh_trace_id) and defaults the clock to now_ms; with everything supplied it is deterministic. with_attributes clones the segment and merges the patch (last-writer-wins) without touching the original. close_segment stamps ended_at, sets the final status, and on Error attaches the error — a passed-in CloseOptions.error wins over any error already on the segment (options.error.or_else(|| segment.error.clone())), exactly reproducing the TS error = options.error ?? segment.error. Ids are minted from the OS CSPRNG (getrandom), 16 bytes for trace ids (128-bit, W3C-Trace-Context sizing) and 8 bytes for segment ids (64-bit), hex-encoded.

The functional handle

Callers rarely touch the value constructors directly. The behavioural face is SegmentHandle:

impl SegmentHandle {
    pub fn trace_id(&self) -> &str;
    pub fn id(&self) -> &str;
    pub fn active(&self) -> bool;                       // false for the noop handle
    pub fn noop() -> SegmentHandle;                     // the single shared sampled-out value
    pub fn note(&self, attributes: Attributes);        // emits Update
    pub fn child(&self, kind: SegmentKind, name: &str) -> SegmentHandle;  // opens a child
    pub fn fail(&self, error: impl Into<SegmentError>); // records error; does NOT close
    pub fn close(&self, status: Option<TerminalStatus>);// emits Close
}

There is intentionally no class hierarchy. A sampled-in handle holds shared interior-mutable state behind a parking_lot::Mutex (the only mutable state in the module; it never escapes — callers only see immutable Segment / TraceRecord values via the emitted signals). A sampled-out handle is the single value returned by SegmentHandle::noop(): its state field is None, all methods are early-returns, and child() returns itself, so an entire sampled-out subtree costs nothing. The handle is cheaply Clone (an Arc plus two small strings); clones share the same underlying segment state.

open_handle creates a live handle:

pub fn open_handle(input: OpenHandleInput, sink: SignalSink) -> SegmentHandle;

It builds the open segment, emits an Open signal through the injected sink before stashing the state (mirroring the TS ordering), then returns the handle.

Lifecycle guarantees, all preserved from the TS handle:

  • note, child, and fail are no-ops once a handle is closed (s.done).
  • close is idempotent — the first call wins (if s.done { return }; s.done = true), so a double-close in a teardown path is safe.
  • fail(error) records a pending_error and does not close; the eventual close carries it.
  • close(None) resolves status from pending_error (Error if set, else Ok); close(Some(status)) forces a status.

Each transition emits exactly one TraceSignal: noteUpdate, closeClose, and the constructor → Open. The sink is an Arc<dyn Fn(TraceSignal) + Send + Sync> (the SignalSink type) — typically a closure over a SignalChannel::emit.

The signal channel

TraceSignal is the tagged union every channel consumer branches on:

#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum TraceSignal {
    Open { segment: Segment },
    Update { id: String, attributes: Attributes },
    Close { record: TraceRecord },
}

#[serde(tag = "type")] reproduces the TS { type: "open", … } shape.

SignalChannel is the transport — a single-stream, multi-emit, async-pullable queue. Critically, it is not an mpsc channel: it is a Mutex<VecDeque<TraceSignal>> plus a tokio::sync::Notify, owned directly so the TS drop-oldest backpressure semantics can be reproduced exactly (a bounded mpsc channel blocks or errors on full instead of evicting):

impl SignalChannel {
    pub fn new() -> Self;                                 // unbounded
    pub fn with_options(options: SignalChannelOptions) -> Self;
    pub fn emit(&self, signal: TraceSignal);              // sync; callable from handle closures
    pub fn close(&self);                                  // idempotent; ends the stream
    pub fn is_closed(&self) -> bool;
    pub fn pending(&self) -> usize;                       // buffered-but-unread count
    pub fn reader(&self) -> SignalChannelReader;          // the Stream entrypoint
}

emit is synchronous so handle closures can call it from non-async code; it pushes onto the deque, drops the oldest entry when the buffer exceeds bound (newest wins), and wakes a parked consumer via notify_one. A None or non-positive bound leaves the channel unbounded. emit is a no-op once closed.

Consumption is a futures::Stream:

impl Stream for SignalChannelReader {
    type Item = TraceSignal;
    // FIFO: drains the buffer front first (even after close), then parks on Notify.
}

The reader drains the buffer front-first (FIFO), even after close, then parks on the next Notify. Empty-and-closed yields Poll::Ready(None) — drain-then-done — so a consumer always sees every queued signal before the stream ends. The channel is single-consumer (a second reader() shares the same queue; signals are not fanned out). Clone on SignalChannel is cheap (an Arc<Inner>), so a recorder, its handles, and a sink all share one stream.

Recorders and the service stamp

A Recorder is the flat tracer instance — it replaces the OTel tracer-provider / tracer pair with one object. It owns a service_name, a SampleGate, and a SignalChannel:

impl Recorder {
    pub fn new(service_name: impl Into<String>) -> Self;            // Always sampling, fresh channel
    pub fn with_sampling(self, strategy: SampleStrategy) -> Self;   // builder
    pub fn with_gate(self, gate: SampleGate) -> Self;               // share a prebuilt gate
    pub fn with_channel(self, channel: SignalChannel) -> Self;      // emit onto an existing channel
    pub fn service_name(&self) -> &str;
    pub fn open(&self, kind: SegmentKind, name: &str, opts: Option<OpenOptions>) -> SegmentHandle;
    pub fn channel(&self) -> SignalChannel;                         // what sinks drain
}

open is the sole entry point. It resolves the trace id — a supplied opts.trace_id joins an existing trace, otherwise a fresh one is minted — then consults the SampleGate. Sampled out → SegmentHandle::noop() (nothing is emitted). Sampled in → open_handle bound to a closure that calls self.channel().emit(...).

A root open (no trace_id) folds service.name into the segment's attributes first, so a caller attribute of the same key wins (last-writer-wins, matching the TS spread order). channel() exposes the stream sinks drain — the same underlying SignalChannel every opened handle emits onto.

Sampling and admission control

SampleGate decides, once per open, whether a new trace is recorded at all:

#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub enum SampleStrategy {
    #[default]
    Always,
    Never,
    Ratio(f64),   // admit a deterministic fraction in [0, 1]
}

impl SampleGate {
    pub fn new(strategy: SampleStrategy) -> Self;     // ratio thresholds pre-clamped
    pub fn decide(&self, trace_id: &str) -> bool;
}

Always and Never are constant verdicts. Ratio(r) is deterministic ratio sampling, not a random draw: the verdict is unit_hash(trace_id) < threshold, where unit_hash is a dependency-free FNV-1a over the trace id's bytes folded to [0, 1):

const FNV_OFFSET_BASIS: u32 = 0x811c_9dc5;
const FNV_PRIME: u32 = 0x0100_0193;
const TWO_POW_32: f64 = 4_294_967_296.0;

fn unit_hash(trace_id: &str) -> f64 {
    let mut hash: u32 = FNV_OFFSET_BASIS;
    for b in trace_id.bytes() {
        hash ^= b as u32;
        hash = hash.wrapping_mul(FNV_PRIME);   // == TS Math.imul + implicit >>> 0
    }
    (hash as f64) / TWO_POW_32
}

wrapping_mul reproduces the TS Math.imul plus the implicit >>> 0 uint32 wrap-around exactly, so the same trace id always yields the same verdict — across processes and replays — and children opened on an admitted trace id are themselves admitted. The ratio is clamped into [0, 1] at construction (NaN/≤ 00, ≥ 11); ratio 0 admits nothing, ratio 1 admits everything regardless of the hash. SampleGate is Copy and holds no mutable state, so one instance is safely shared across recorders and tasks.

The telemetry hub

TelemetryHub is a named registry of Recorders — one per agent, subsystem, or tenant — addressable by name without threading instances through every call site:

impl TelemetryHub {
    pub fn new() -> Self;
    pub fn register(&mut self, name: &str, recorder: Recorder) -> Recorder;
    pub fn recorder(&self, name: Option<&str>) -> Result<Recorder, String>;
    pub fn get_default(&self) -> Option<Recorder>;
    pub fn has(&self, name: &str) -> bool;
}

register adds (or replaces) a recorder and returns it. The first registration becomes the default unless a later register uses the reserved "default" name, which always re-designates it. recorder(None) falls back to the default; recorder(Some(name)) looks one up. A missing requested-or-default recorder is an Err, not a silent drop — so dropped traces fail loudly. The error string is parity-critical: TelemetryHub: no recorder registered as "<which>" (with (default) substituted when no name was given).

A process-wide default hub is offered for code that has no hub to hand, but it is strictly opt-in — nothing in the subsystem reads it implicitly:

pub fn get_default_hub() -> Arc<Mutex<TelemetryHub>>;  // lazily created on first call
pub fn set_default_hub(hub: TelemetryHub);             // replace it (e.g. for tests)

get_default_hub returns a shared Arc<Mutex<TelemetryHub>> so all callers see the same registry.

Draining to sinks

A Sink is the terminal end of the transport — the OTel-free analogue of an exporter, but a plain trait with one mandatory drain pull-loop plus optional flush / close lifecycle hooks:

#[allow(async_fn_in_trait)]
pub trait Sink {
    async fn drain(&mut self, reader: SignalChannelReader);
    async fn flush(&mut self) -> io::Result<()> { Ok(()) }   // default no-op
    async fn close(&mut self) -> io::Result<()> { Ok(()) }   // default no-op
}

There is no subscriber registration and no push: a sink owns its own pull loop and runs until the channel is closed and drained. drain takes the concrete SignalChannelReader (there is exactly one channel type), so the subsystem avoids trait-object boxing entirely. Every concrete sink iterates the reader and acts only on TraceSignal::Close, ignoring Open / Update chatter. Three ship:

Sink Construct Acts on close by
ConsoleSink ConsoleSink::new() / with_options(ConsoleSinkOptions { log }) Printing one tidy line per closed segment through an injected LogFn (default stdout)
FileSink FileSink::new(path) / with_options(path, FileSinkOptions { flush_every }) Appending the record as an NDJSON line; batches per flush_every (default 1), flushes on threshold / end-of-drain / close
StreamSink<W> StreamSink::new(writer) / with_options(writer, StreamSinkOptions { end_on_close }) Writing the record as a JSON line to any tokio::io::AsyncWrite, honouring backpressure via write_all

ConsoleSink is stateless and resource-free, so it needs neither a real flush nor close. Its line carries a status glyph ( ok, error, ? unknown), the padded kind and name, the wall-clock duration, and a [traceHead/segmentId] tag (the first 8 chars of the trace id), e.g.:

✓ inference  chat.completion        842ms  [a1b2c3d4/9f8e7d6c]
✗ action     write_file              12ms  [a1b2c3d4/0011aabb] — EACCES: permission denied

FileSink owns an in-memory pending batch (hence a meaningful flush) but no live OS handle — each append opens with create(true).append(true), writes, and closes. It flushes whatever remains when drain ends, and close is a final flush. StreamSink<W> is the most general: it owns no buffer (bytes go straight to the writer), flush flushes the writer, and close shuts the writer down only when constructed with end_on_close: true (so it never ends a writer it did not create, e.g. stdout). into_inner returns the underlying writer, useful in tests.

FileSink and StreamSink serialize through serde_json::to_string(&record), which — via the Segment's camelCase serde — emits traceId / parentId / startedAt / endedAt keys, byte-identical to the TS NDJSON.

Redaction

SecretScrubber is a stateless attribute processor. scrub(&attributes) returns a deep copy with secret-looking keys and values replaced by REDACTION_TOKEN — the guillemet string "‹redacted›" (U+2039 / U+203A), deliberately not [REDACTED]:

impl SecretScrubber {
    pub fn new(options: SecretScrubberOptions) -> Self;   // default patterns + token
    pub fn scrub(&self, attributes: &Map<String, Value>) -> Map<String, Value>;
}

Two independent signals trigger a redaction:

  • Key shape — the path key matches a SecretPattern.key (e.g. api_key, authorization). The value is redacted regardless of its content.
  • Value shape — a string leaf matches a SecretPattern.value (e.g. a Bearer … token, sk-… key, JWT, PEM body) even under an innocuous key.

Rules come from default_secret_patterns() — a lazily-built &'static [SecretPattern] (via OnceLock) with three key-name rules ((?i) case-insensitive) and four value-shape rules. One value rule, vendor-key-prefix-value (sk-…/xoxb-…/ghp_…/ AKIA…), is deliberately case-sensitive to match the TS source. Each pattern's regexes are matched unanchored with Regex::is_match (the TS RegExp.test equivalent).

The walk is a single deep copy that never mutates its input and produces a fresh structure each time; output key order follows input iteration order. When a key name looks secret it short-circuits descent and collapses the whole subtree to the token — so a credential nested inside (auth: { token: … }) cannot leak. The scrubber is exported so callers can sanitize attributes before persisting, but the bundled ConsoleSink / FileSink / StreamSink do not call it automatically — a caller must apply it explicitly (e.g. inside the note they pass to a handle).

use indusagi::tracing::SecretScrubber;
use serde_json::json;

let scrubber = SecretScrubber::default();   // default patterns + REDACTION_TOKEN
let attrs = json!({ "authorization": "Bearer abcd1234efgh", "model": "opus" })
    .as_object().unwrap().clone();
let safe = scrubber.scrub(&attrs);
// safe["authorization"] == "‹redacted›", safe["model"] == "opus"

Bridging the runtime

trace_agent_run is the optional, decoupled bridge that projects the runtime's flat RunEvent stream into nested Run / Inference / Action segments on a Recorder:

pub fn trace_agent_run(recorder: Recorder, subscribe: RunEventSubscribe) -> Disposer;

pub type RunEventSubscribe =
    Box<dyn FnOnce(Box<dyn FnMut(RunEvent) + Send>) -> Box<dyn FnOnce() + Send>>;
pub type Disposer = Box<dyn FnOnce() + Send>;

trace_agent_run, RunEvent, RunSnapshot, ToolOutcome, and RunEventSubscribe are re-exported from the indusagi::tracing barrel; Disposer is pub but lives only in the adapter submodule, so callers that need to name the return type reach it as indusagi::tracing::adapter::Disposer (most code just calls the returned value).

subscribe matches the shape of Agent.subscribe without importing the Agent type. The adapter reads only the public RunEvent shape — a faithful local mirror of the TS RunEvent union (Snapshot / TextDelta / ThinkingDelta / ToolStarted / ToolFinished / Settled / Faulted) plus RunSnapshot { run_id, phase } and ToolOutcome { id, output, is_error }. (The module docstring notes these mirror the runtime's contract; the code carries no #[cfg] gate, so the adapter is always compiled.)

The projection runs through a RunTracer held behind an Arc<Mutex<…>>:

  • The run segment is opened lazily on the first activity (ensure_run), stamping run.id when known, and closed on Settled (status Ok) / Faulted (status Error, with the error folded via run.fail(...)).
  • The inference segment is a run child opened on the invoking phase and closed on dispatching / compacting / idle. A duplicate phase is ignored (last_phase dedup); a new invoking closes any stale inference first.
  • Streamed TextDelta / ThinkingDelta events bump counters that are folded onto the open inference as stream.text_deltas / stream.thinking_deltas attributes at close.
  • Each action segment is a run child keyed by tool id (ToolStartedactions.insert), annotated with tool.id / tool.name, then closed on the matching ToolFinished with status Error iff outcome.is_error and a tool.is_error attribute.

The returned Disposer detaches the subscription and tears down anything still open (teardown closes dangling actions, the inference, and the run as Ok). The mapping is best-effort and never panics on malformed input — out-of-order, missing, or repeated events just close what's open and ignore the rest.

use indusagi::tracing::{Recorder, trace_agent_run};

let recorder = Recorder::new("agent");
// `agent.subscribe(handler) -> unsubscribe` matches RunEventSubscribe's shape.
let dispose = trace_agent_run(recorder, subscribe);
// ... run the agent; Run/Inference/Action segments flow onto recorder.channel() ...
dispose();   // detaches and closes any still-open segments

End-to-end example

Wire a recorder to a ConsoleSink, drive a handful of segments, then close the stream so the drain task finishes:

use indusagi::tracing::{ConsoleSink, Recorder, SegmentKind, Sink};

#[tokio::main]
async fn main() {
    let rec = Recorder::new("my-agent");
    let reader = rec.channel().reader();

    // Spawn the sink's pull loop.
    let drain = tokio::spawn(async move {
        let mut sink = ConsoleSink::new();
        sink.drain(reader).await;
    });

    let run = rec.open(SegmentKind::Run, "answer-question", None);
    let inf = run.child(SegmentKind::Inference, "chat.completion");
    inf.note(serde_json::json!({ "model": "opus", "tokens.in": 1200 })
        .as_object().unwrap().clone());
    inf.close(None);                              // resolves to Ok

    let act = run.child(SegmentKind::Action, "write_file");
    act.fail("EACCES: permission denied");        // sets the closing status to Error
    act.close(None);                              // resolves to Error
    run.close(None);

    rec.channel().close();                        // end the stream so drain() returns
    drain.await.unwrap();
}

For ratio sampling, share a stateless gate or pass a strategy:

use indusagi::tracing::{Recorder, SampleGate, SampleStrategy};

let rec = Recorder::new("svc").with_sampling(SampleStrategy::Ratio(0.25));

let gate = SampleGate::new(SampleStrategy::Ratio(0.25));      // Copy + shareable
let admitted = gate.decide("a1b2c3d4e5f60718a1b2c3d4e5f60718"); // deterministic per id

let h = rec.open(indusagi::tracing::SegmentKind::Run, "task", None);
assert_eq!(h.active(), /* true when admitted, false (noop handle) when sampled out */ h.active());

Relationship to neighbors

The tracing core (signal, channel, recorder, hub, redaction, sinks) is entirely runtime-agnostic and sits below or beside the rest of the framework. Ids and the sampler draw randomness from getrandom; segment clocks delegate to crate::core::now_ms; the transport rides on tokio's Notify and futures::Stream. The only coupling to the runtime is the optional adapter, which reads just the public RunEvent shape via its own local mirror types — it never touches runtime internals, and nothing in the runtime depends on it.

For the model conversations being traced, see the LLM Gateway; for how all the layers fit together, see the Architecture. This subsystem is the Rust counterpart of the Python indusagi.tracing — same lifecycle, same deterministic FNV-1a sampler, same guillemet redaction token, same camelCase NDJSON wire — re-expressed with Rust's type system: an immutable Segment value, a Stream reader instead of an async iterable, and a drop-oldest VecDeque transport in place of the TS deque.