Subsystemssubsystems/runtime

Runtime

indusagi::runtime is the agent engine: it drives one model conversation from a prompt to settlement. It pairs a pure finite-state reducer (cadence) — I/O-free, await-free, never mutating its inputs — with a single stateful conductor (Agent / create_agent) that interprets the reducer's Effects, folds the resulting Signals back in, and turns the loop until the run reaches a settled or faulted terminal phase. Beneath them sit single-purpose drivers: the tool scheduler, the turn driver, the condensation engine, the content-addressed session store, the event ledger, and the wire projector.

The runtime is split in two. The pure core is an immutable RunSnapshot advanced by a side-effect-free reducer (cadence) that returns Effects and never touches I/O. The thin stateful conductor (Agent) interprets those effects — reach the model, run tools, condense history, persist, publish — and folds the resulting Signals back into the reducer until the run settles or faults. The whole agent loop lives here: streamed emissions fold into an in-flight assistant turn, requested tools are dispatched with bounded concurrency, history is condensed when it nears the model's window, settled history is persisted to a content-addressed session DAG, and progress is published as RunEvents.

This is the Rust edition of the runtime documented for Python; the two share the same architecture (pure reducer + impure conductor, the same effect/signal vocabulary) but the Rust port adds a compile-time-exhaustive signal match, Arc-shared structural sharing of history, and a tokio/FuturesUnordered quiescence loop.

Table of Contents

Quickstart

create_agent builds a runnable Agent from a static AgentConfig plus optional AgentDeps. The model entrypoint is injectable through AgentDeps::invoke_model, so a test can script the model deterministically without a network. The simplest production path is agent_with_capabilities, which wires the gateway connector to a capabilities tool box.

use std::sync::Arc;
use indusagi::runtime::{
    create_agent, AgentConfig, AgentDeps, ModelInvoker, RunPhase,
};
use indusagi::llmgateway::contract::{
    Channel, Conversation, StreamOptions, Emission, Reply, Block,
    AssistantRole, StopReason, Usage,
};
use indusagi::core::channel::BoxStream;

// A scripted model: one text emission then a terminal `done`.
struct Scripted;
impl ModelInvoker for Scripted {
    fn invoke(&self, _c: Conversation, _o: StreamOptions) -> Channel {
        Channel::of(|| {
            let reply = Reply {
                role: AssistantRole::Assistant,
                model: "m".into(),
                blocks: vec![Block::Text { text: "Hello, world!".into() }],
                usage: Usage::EMPTY,
                stop: StopReason::Complete,
            };
            Box::pin(futures::stream::iter(vec![
                Emission::Text { delta: "Hello, world!".into() },
                Emission::Done { reply },
            ])) as BoxStream<Emission>
        })
    }
}

#[tokio::main]
async fn main() {
    let agent = create_agent(
        AgentConfig::new("some-model"),
        AgentDeps { invoke_model: Some(Arc::new(Scripted)), ..Default::default() },
    );
    let final_snapshot = agent.submit_prompt("hi").await; // string sugar -> one user turn
    assert_eq!(final_snapshot.phase, RunPhase::Settled);
    println!("{:?}", final_snapshot.messages.last()); // the folded assistant turn
}

AgentConfig::new(model) binds only the model id; system, tools, max_output_tokens, thinking, compaction, and max_turns are optional. submit takes a Vec<Turn> and returns the terminal RunSnapshot; submit_prompt is sugar that wraps a bare string into a single user text turn via as_turns.

Module map

Internal layering is strict: contract (types) ← cadence (the pure reducer) ← turn / dispatch / memory / store / ledger / wire (single-purpose drivers) ← conductor (the only orchestrator that touches all of them).

Module Holds
contract/ The frozen type vocabulary every module imports: run_state, signal, effect, tools, errors, events, session, config
cadence/ The pure FSM: reducer.rs (cadence, Transition, StepFn, initial_snapshot) and fold.rs (fold_blocks/fold_turn/seal_tool_args/tool_calls_of/add_usage/parse_tool_args)
conductor/ agent.rs — the sole stateful layer: Agent, create_agent, AgentDeps, the quiescence drive loop, the turn-budget guard, the compaction gate, and all effect handlers
dispatch/ scheduler.rsScheduler/create_scheduler with bounded concurrency, per-call cancellation, completion-order Signal::ToolSettleds
turn/ driver.rsdrive_turn, the stateless ChannelSignal adapter for one model invocation
memory/ compactor.rs (should_compact/find_cut_point/summarize/compact, DISTILL_INSTRUCTION, DEFAULT_POLICY) and estimate.rs (estimate_context_tokens)
store/ Content-addressed sessions: dag.rs (SessionGraph/Clock), persist.rs (SessionStore), hash.rs (hash_node + HASH_WIDTH)
ledger/ bus.rs (RunLedger fan-out hub, RunEventHandler, Unsubscribe) and accumulator.rs (SnapshotAccumulator)
wire/ projectors.rsbuild_conversation + Projector/ProjectionContext/TurnRole, the table-driven (currently passthrough) message → Conversation assembly
capabilities_bridge.rs CapabilitiesToolBox + agent_with_capabilities — adapts a capabilities::ToolBox to the runtime ToolBox trait

The crate barrel indusagi::runtime re-exports the host-facing surface:

pub use conductor::{Agent, AgentDeps, AgentEventHandler, create_agent};
pub use cadence::{StepFn, Transition, cadence, cadence as step, initial_snapshot};
pub use dispatch::{DEFAULT_CONCURRENCY, Scheduler, create_scheduler};
pub use memory::{compact, estimate_context_tokens, find_cut_point, should_compact, summarize};
pub use store::{Clock, SessionGraph, SessionStore, hash_node};
pub use turn::drive_turn;
pub use ledger::{RunEventHandler, RunLedger, SnapshotAccumulator, Unsubscribe};
pub use wire::{ProjectionContext, Projector, TurnRole, build_conversation};
pub use capabilities_bridge::{CapabilitiesToolBox, agent_with_capabilities};

The frozen vocabulary

contract/ freezes the counts the whole runtime agrees on: 7 phases, 7 signals, 5 effects, 7 events. Every variant carries a kind() accessor returning the exact snake_case discriminant the TS source serialized on the wire.

Phases (`RunPhase`)

The discrete macro-states a run occupies. serde renders each snake_case (idle, invoking, …), matching the original TS string literals.

Variant Meaning
Idle Created but not yet invoking the model.
Invoking A model request has been requested but no stream has started.
Streaming Folding incremental emissions from the model.
Dispatching Running the tools the model asked for.
Compacting Reserved for condensing history (see Notable behavior — the reducer never sets it).
Settled Finished cleanly; no further work.
Faulted Finished with an unrecoverable RunError.

Signals (`Signal`)

A Signal is the only thing that can move a run forward. The reducer takes the current RunSnapshot plus one signal and returns the next snapshot together with side-effects.

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Signal {
    Submit { input: Vec<Turn> },
    Emission { emission: Emission },
    StreamEnd,
    ToolSettled { id: String, result: ToolOutcome },
    Compacted { summary: Turn },
    Abort,
    Fault { error: RunError },
}

The TS AbortSignalInput variant (renamed to avoid clashing with the DOM AbortSignal) is the Abort variant here.

Effects (`Effect`)

The reducer performs no I/O; it returns Effects describing what the conductor should do next. Effect is Serialize-only (effects are interpreted in-process, never deserialized); InvokeModel's StreamOptions::cancel is #[serde(skip)] because a live CancellationToken is not data.

#[derive(Clone, Debug, Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Effect {
    InvokeModel { conversation: Conversation, options: StreamOptions },
    RunTool { call: ToolCall },
    Persist { snapshot: RunSnapshot },
    Compact { messages: Arc<Vec<Turn>> },
    Publish { event: RunEvent },
}

Events (`RunEvent`)

The outward-facing notifications a host subscribes to. RunEvent derives Clone, Debug, SerializeSerialize-only, like Effect (events are dispatched in-process, never deserialized off a wire), tagged on kind (snake_case). Snapshot, Settled, and Faulted carry a RunSnapshot (exposed by RunEvent::snapshot() -> Option<&RunSnapshot>, mirroring the accumulator's snapshotOf); the delta and tool-lifecycle events carry none. RunEvent::kind() -> &'static str returns the snake_case discriminant.

Variant Payload
Snapshot { snapshot } — a fresh immutable snapshot was produced.
TextDelta { delta } — incremental assistant prose.
ThinkingDelta { delta } — incremental model reasoning.
ToolStarted { id, name } — a requested tool began executing.
ToolFinished { id, name, outcome } — a tool finished, success or not.
Settled { snapshot } — clean terminal state.
Faulted { error, snapshot } — faulted terminal state.

Run state, errors, tools, sessions, config

Type Source Purpose
RunSnapshot contract/run_state.rs Frozen event-sourced state: run_id, session_id, phase, messages: Arc<Vec<Turn>>, pending, usage_total, model, optional error. Wire keys are camelCase (runId/sessionId/usageTotal); error is omitted unless faulted.
PendingTool contract/run_state.rs One outstanding tool: id, name, stage (ToolStage).
ToolStage contract/run_state.rs QueuedRunningDone. The reducer stamps a tool Running on dispatch; the scheduler tracks queued → running internally.
RunError / RunErrorKind contract/errors.rs kind (the 6 variants ModelFailed/ToolFailed/Aborted/CompactionFailed/TurnBudget/InvalidState), message, optional cause: serde_json::Value (omitted from the wire when absent). Constructors: RunError::new(kind, message), RunError::with_cause(kind, message, cause), and the free function run_error(kind, message, cause: Option<Value>) (the TS runError helper — None omits the field).
ToolCall / ToolOutcome contract/tools.rs A parsed call (id, name, input: Value) and its result (id, output: Value, is_error — wire isError).
ToolRunner / ToolBox contract/tools.rs The async-trait executor seam and the bundle of descriptors() + runner().
SessionNode / SessionHead / Migrator contract/session.rs One immutable DAG node (id, parent, turn, createdAt), the session tip pointer, and the schema-upgrade trait.
AgentConfig / CompactionPolicy contract/config.rs Immutable run setup and the condensation threshold policy.
ModelInvoker contract/events.rs The single injectable model seam (a trait, object-safe behind Arc<dyn ModelInvoker>).

AgentConfig holds tools: Option<Arc<dyn ToolBox>>, so it is Clone (cheap — the box is shared) but not PartialEq/Serialize as a whole; the reducer reads the box through descriptors(), never by value comparison.

#[derive(Clone, Copy, Debug, PartialEq)]
pub struct CompactionPolicy {
    pub trigger_ratio: f64,  // fraction of the window (0..1) at which condensation triggers
    pub keep_recent: usize,  // trailing turns kept untouched
}

The pure reducer cadence

cadence(config) binds a static AgentConfig and returns a StepFn — a pure (&RunSnapshot, Signal) -> Transition you can call, thread signals through, and interpret the returned effects of. It performs no I/O and never awaits; every transition derives a fresh immutable snapshot (the input is treated as frozen) and a flat Vec<Effect>.

pub type StepFn = Arc<dyn Fn(&RunSnapshot, Signal) -> Transition + Send + Sync>;

#[derive(Clone, Debug)]
pub struct Transition {
    pub state: RunSnapshot,
    pub effects: Vec<Effect>,
}

pub fn cadence(config: AgentConfig) -> StepFn;
pub fn initial_snapshot(
    session_id: impl Into<String>,
    model: impl Into<String>,
    run_id: Option<String>,
) -> RunSnapshot;

The agent loop is encoded as a transition table keyed on (phase, signal), not nested control flow:

submit                         → invoke_model              · phase invoking
emission (text/thinking)       → fold + publish delta      · phase streaming
emission (tool start)          → fold + tool_started       · phase streaming
emission (usage/stop)          → accumulate/refine         · phase streaming (no effect)
emission (error)               → faulted                   · phase faulted
stream_end · has tool calls    → run_tool × N              · phase dispatching
stream_end · no tool calls     → persist + settled         · phase settled
tool_settled · more pending    → mark done, keep waiting   · phase dispatching
tool_settled · all done        → append tool turn, invoke  · phase invoking
compacted                      → splice summary, invoke    · phase invoking
abort                          → faulted (aborted)         · phase faulted
fault                          → faulted                   · phase faulted

Driving the FSM directly (bypassing the conductor) is supported — cadence is re-exported as step for advanced hosts:

use indusagi::runtime::{cadence, initial_snapshot, AgentConfig};
use indusagi::runtime::contract::Signal;
use indusagi::llmgateway::contract::{Turn, Block};

let step = cadence(AgentConfig::new("some-model")); // also exported as `step`
let state = initial_snapshot("s1", "some-model", None); // phase Idle

let signal = Signal::Submit {
    input: vec![Turn::User { blocks: vec![Block::Text { text: "hi".into() }] }],
};
let t = step(&state, signal);
assert_eq!(t.state.phase, indusagi::runtime::RunPhase::Invoking);
for effect in &t.effects { println!("{}", effect.kind()); } // invoke_model, publish

Exhaustiveness — no `_` arm

The dispatch match in the reducer enumerates all 7 Signal variants explicitly with NO wildcard arm. This is load-bearing: adding a signal to the contract fails the build at this match until the new transition is handled, so a signal can never be silently dropped. The terminal-phase pre-check above the match likewise names the three reopening signals (Submit/Compacted/Fault) explicitly; only the already-enumerated "stray late signal" fallthrough uses a catch-all Transition { state, effects: [] } — the intended "ignore" branch, not a missing case. A dedicated test (exhaustive_dispatch_covers_all_signals) is the compile-time witness.

Terminal states (Settled/Faulted) absorb nothing except the three named signals the pre-check enumerates: a Submit or Compacted arriving against a closed run reopens it (both route through invoke(), which sets phase = invoking and strips any prior terminal error so a reopened run starts clean), while a Fault simply re-faults (it stays in Faulted with the new error). Every other signal against a closed run hits the explicit "ignore" catch-all and returns the snapshot unchanged with no effects.

Folding emissions into a turn

cadence/fold.rs grows the in-flight assistant Turn one Emission at a time, purely — each helper returns a brand-new block list rather than mutating its argument.

Function Behavior
fold_blocks(&[Block], &Emission) -> Vec<Block> Text/thinking deltas extend a trailing block of the same kind or start a new one; ToolCallStart opens a new tool-call block whose input begins as Value::String(""); each ToolCallDelta concatenates onto that raw buffer; a terminal Done replaces the whole list with reply.blocks.
fold_turn(&Turn, &Emission) -> Turn Thin wrapper preserving the assistant role.
seal_tool_args(&[Block]) -> Vec<Block> At stream end, every ToolCall whose input is still a Value::String is re-parsed (the TS typeof b.input === "string" gate).
parse_tool_args(&str) -> Value Empty/whitespace → {}; invalid JSON → { "__unparsed": raw } (the untrimmed raw); valid JSON → the parsed value.
tool_calls_of(&[Block]) -> Vec<(String, String, Value)> Collects the tool-call blocks (id, name, input) in order.
add_usage(&Usage, &Usage) -> Usage Field-by-field sum; cache tiers are kept only when their sum is > 0 (a pair of absent/zero tiers stays absent on the wire). ZERO_USAGE = Usage::EMPTY.

The streaming-args representation is the key parity detail: the Rust gateway's Block::ToolCall.input is a serde_json::Value, so the still-growing raw JSON string is carried as Value::String(raw) during streaming and replaced by the parsed value on seal. affects_blocks is exhaustive over the 8-variant Emission union with no wildcard arm.

The conductor

The conductor (conductor/agent.rs) is the only stateful, side-effecting layer. It threads a serial stream of Signals through the reducer, performs each emitted Effect, and folds the resulting signals back in until a terminal phase. create_agent is the single entry point.

pub fn create_agent(config: AgentConfig, deps: AgentDeps) -> Agent;

#[derive(Default)]
pub struct AgentDeps {
    pub invoke_model: Option<Arc<dyn ModelInvoker>>, // default: gateway, bound to config.model
    pub store: Option<SessionStore>,                 // where settled history is written
    pub ledger: Option<RunLedger>,                   // where events are published
}

The Agent handle owns one logical session and exposes:

Method Signature Purpose
submit async fn submit(&self, input: Vec<Turn>) -> RunSnapshot Drive a prompt to settlement; returns the terminal snapshot.
submit_prompt async fn submit_prompt(&self, prompt: impl Into<String>) -> RunSnapshot String sugar over submit.
subscribe fn subscribe<F>(&self, handler: F) -> Unsubscribe Tap the live RunEvent stream; returns a disposer.
abort async fn abort(&self) Cancel the active token, cancel the scheduler's rounds, and steer an Abort signal in.
snapshot async fn snapshot(&self) -> RunSnapshot Read the latest snapshot synchronously.
resume async fn resume(&self, target_session: &str) -> Result<(), RunError> Rehydrate messages from a persisted session (phase returns to Idle).
session_id fn session_id(&self) -> &str The session this agent threads runs through.

When config.tools is present, create_agent builds a Scheduler over tools.runner(). The default invoke_model is a GatewayInvoker binding llmgateway::gateway::stream to config.model; an unknown model id (an unsupported GatewayError) is surfaced as a single-error-emission channel so the turn driver forwards it and the reducer faults the run (parity with the TS sync throw → fault).

A concurrent submit arriving while a run is in flight is folded in as steering: the Submit signal is pushed onto the live queue rather than starting a competing drive, and the caller chains on the in-flight drive by polling the shared driving flag.

The quiescence loop

drive(seed) is the heartbeat: pull the next signal, step the reducer, swap in the new snapshot, fire its effects. Streaming effects (the model stream, the tool dispatch round, condensation) push their own signals back, so the loop keeps turning. The serial signal pipe is a tokio::sync::mpsc::unbounded_channel, and the in-flight async effects are tracked in a FuturesUnordered<JoinHandle<()>>.

A run is genuinely finished only when it is in a terminal phase, no effect is still in flight, AND no signal is still buffered — so a steering submit/compacted/fault that lands while a trailing persist/publish is still draining can legitimately reopen the run. The loop races draining in-flight work against re-checking quiescence (using try_recv to peek the buffer and tokio::select! with biased to prefer a buffered signal over a draining effect), mirroring the TS while (isTerminal && !hasPending) { if (inflight.size === 0) return; await Promise.race(inflight); }.

perform_all collects all run_tool effects of one transition and dispatches them together so a single stream-end round runs as one batch under one cancellation scope; publish is synchronous (inline ledger.publish), while invoke_model, compact, and persist are spawned and tracked in inflight.

Tool dispatch

dispatch/scheduler.rs bridges the reducer's run_tool effects to real ToolRunner execution for one dispatch round. Scheduler::run(calls, outer) returns a Stream<Item = Signal> that yields a Signal::ToolSettled per finished call — in completion order, not request order.

pub const DEFAULT_CONCURRENCY: usize = 8;

pub fn create_scheduler(runner: Arc<dyn ToolRunner>) -> Scheduler;

impl Scheduler {
    pub fn new(runner: Arc<dyn ToolRunner>) -> Self;
    pub fn with_concurrency(runner: Arc<dyn ToolRunner>, concurrency: usize) -> Self; // never below 1
    pub fn run(&self, calls: Vec<ToolCall>, outer: CancellationToken)
        -> impl Stream<Item = Signal> + Send + 'static;
    pub fn cancel(&self); // fires the round token of every started round
}

Three properties define its behavior, ported faithfully from the TS:

  • Per-call cancellation. Every call runs under its own CancellationToken child, chained to the round token (round = round_token(&outer), child = child_token(&round)), which in turn descends from the caller's outer token and Scheduler::cancel. Down-propagation (cancel the parent ⇒ cancel every descendant) is exactly the round teardown.
  • Bounded concurrency. At most DEFAULT_CONCURRENCY (8) calls run at once, gated by a tokio::sync::Semaphore; the rest wait in request order. A queued call reached after the round is cancelled is skipped rather than started (no settled signal).
  • Completion-order emission. Settled signals are delivered as workers finish.

Per the cooperative-cancel contract, a cancelled ToolRunner::run returns a ToolOutcome { is_error: true } — never an Err, never a panic — so an aborted round drains to typed errors instead of deadlocking. If a tool is requested but no scheduler is configured (no tools in the config), the round faults with RunErrorKind::ToolFailed rather than hanging forever. Round tokens are tracked by a monotonic id (a CancellationToken has no PartialEq), and a round self-removes from the tracked set when it finishes.

The ToolRunner trait is the injected side that executes tools:

#[async_trait::async_trait]
pub trait ToolRunner: Send + Sync {
    async fn run(&self, call: ToolCall, cancel: CancellationToken) -> ToolOutcome;
}

The turn driver

turn/driver.rs is the stateless seam between "bytes off the wire" and "inputs the reducer understands". drive_turn(channel) adapts one model invocation's gateway Channel into the Signal vocabulary:

pub fn drive_turn(channel: Channel) -> impl Stream<Item = Signal> + Send;

Each Emission pulled from the channel is forwarded as a Signal::Emission in arrival order; once the channel is exhausted, a single terminal Signal::StreamEnd is yielded (built with StreamExt::map over the channel followed by a futures::stream::once chained on). It holds no run state and performs no folding — interpretation is left to the reducer. A transport/protocol failure surfaces as a terminal Emission::Error, which the driver forwards as an emission signal (the reducer's on_emission faults the run on it); a terminal StreamEnd still follows for TS parity even after an error envelope.

Compaction

memory/ condenses history when its estimated footprint approaches the model's window. The oldest stretch is replaced by a single distilled summary turn while a verbatim tail of recent turns is preserved. Condensation is not a reducer transition — it happens lazily at the conductor's invocation boundary.

Function Behavior
estimate_context_tokens(&[Turn]) -> usize chars/4 (CHARS_PER_TOKEN) ceiling-divided, plus 4 (TURN_OVERHEAD_TOKENS) per turn. Counts Unicode scalar values for prose and canonical_json(value).chars().count() for serialized values.
should_compact(&[Turn], &ModelCard, Option<&CompactionPolicy>) -> bool estimate >= context_window * trigger_ratio; false for a non-positive window.
find_cut_point(&[Turn], keep_recent) -> usize The boundary between the prefix to condense and the tail to keep, nudged forward while the first surviving turn carries tool results, so a tool_call/tool_result pair is never split. Returns the index of the first turn kept verbatim.
summarize(&[Turn], &dyn ModelInvoker) -> Result<Turn, String> Renders the prefix into a role-prefixed transcript, invokes the model with DISTILL_INSTRUCTION as system, and folds the response into a user-role turn marked with SUMMARY_HEADING ([condensed earlier context]).
compact(&[Turn], &ModelCard, Option<&CompactionPolicy>, &dyn ModelInvoker) -> Result<Vec<Turn>, String> [summary, ...tail]; a cut point of 0 returns history unchanged, so compact is safe to call unconditionally.

DEFAULT_POLICY = CompactionPolicy { trigger_ratio: 0.8, keep_recent: 8 } is applied when no policy is configured. The summary is filed under the user role (not assistant) so it stays valid input for any provider.

The conductor's gate (spawn_invoke_modelcondense_if_shrinks) reads the live self.snapshot.messages, not the conversation captured in the effect, so it always sizes the freshest history. Before opening a stream, if should_compact() trips, the older prefix is condensed and a Signal::Compacted is fed back instead — the reducer's on_compacted re-issues a fresh invoke_model over the shrunken history ([summary, ...keep_recent tail]), and the gate then passes on the second pass. A pre-check on find_cut_point(...) <= 1 avoids spending a model call when history is already as small as the policy allows (and stops the gate re-triggering forever). A condensation error faults the run with RunErrorKind::CompactionFailed; DEFAULT_KEEP_RECENT_POLICY (the conductor's own fallback, also keep_recent: 8) keeps the conductor and the compactor agreeing on where the tail begins.

Sessions and persistence

When AgentDeps::store is injected, settled history is appended to a content-addressed session DAG. store/ keeps history as an immutable DAG: each SessionNode carries one turn and points at its parent by hash, so the linear context for a model call is the path from a leaf back to the root, reversed.

impl SessionGraph {
    pub fn new(session_id: impl Into<String>) -> Self;        // production wall clock
    pub fn with_clock(session_id: impl Into<String>, clock: Clock) -> Self; // injected clock (tests)
    pub fn hydrate(session_id, nodes: impl IntoIterator<Item = SessionNode>, leaf: Option<String>) -> Self;
    pub fn append(&mut self, turn: Turn) -> SessionNode;       // chain + advance head
    pub fn branch_from(&mut self, node_id: &str) -> Result<SessionNode, String>;
    pub fn path_to(&self, leaf: &str) -> Result<Vec<Turn>, String>;   // root -> leaf
    pub fn resume(&mut self, leaf: &str) -> Result<Vec<Turn>, String>; // branch_from + path_to
    pub fn leaf(&self) -> Option<&str>;       // the current head, or None when empty
    pub fn all(&self) -> Vec<SessionNode>;    // insertion order, an IndexMap underneath
    pub fn get(&self, node_id: &str) -> Option<&SessionNode>;  // lookup by content hash
    pub fn size(&self) -> usize;              // node count across every branch
    pub fn session_id(&self) -> &str;
}

`Clock = Arc<dyn Fn() -> i64 + Send + Sync>` is the injectable epoch-milliseconds
source (the TS `Clock = () => number`, `Date.now` in production), so a test can
pin timestamps and therefore the content hashes a session produces.

hash_node(parent, turn, created_at) -> String derives a node's id as sha256(JSON.stringify({ parent: parent ?? null, turn, createdAt }))[..32] via the framework's core::content_hash (HASH_WIDTH = 32). Re-appending an identical turn from the same head at the same time is a no-op head advance — the content hash collides, so an unchanged prefix re-persists nothing. A cross-language golden test pins the hash byte-for-byte against a Node-generated fixture (7d759dabaf36f47466002755371dcf9d).

SessionStore is the filesystem JSONL backing: each session is an append-only .jsonl file under a root, every line a node record or a head marker.

impl SessionStore {
    pub fn new(root: impl Into<PathBuf>) -> Self;
    pub fn with_migrators(root: impl Into<PathBuf>, migrators: Vec<Arc<dyn Migrator>>) -> Self;
    pub async fn append_node(&self, session_id: &str, node: &SessionNode) -> std::io::Result<()>;
    pub async fn load_session(&self, session_id: &str) -> std::io::Result<SessionGraph>;
    pub async fn list_sessions(&self) -> std::io::Result<Vec<String>>;
}

load_session replays the file, runs each raw record through the Migrator pipeline (composed in sequence, each upgrading one schema version), and reconstructs the graph; a missing file yields an empty graph and a corrupt line is skipped. The conductor's spawn_persist loads the session, appends every turn not yet on the persisted leaf, and only writes a genuinely new leaf node to disk. Persistence is advisory — a store error is swallowed so an otherwise-complete run still settles. agent.resume(session_id) re-seats the in-memory snapshot on the rehydrated root→leaf path and returns the phase to Idle, so the next submit continues the conversation; it errors with RunErrorKind::InvalidState when no store is configured or the session fails to load. On load each session is read from its own append-only <session_id>.jsonl file, where each line is a {"type":"node", ...} or {"type":"head","leaf":...} record; append_node writes the node record followed by a fresh head marker, so the last head line always names the live leaf.

Ledger and accumulator

ledger/ is the seam between the pure loop and any host that renders or persists progress.

RunLedger is a synchronous fan-out hub: new(), subscribe(handler) -> Unsubscribe, publish(&RunEvent), clear(), size(). It is Clone + Default and holds its registry behind an Arc<Mutex<…>>, so a cloned ledger publishes to the same subscriber set. Delivery is sequential over a snapshot of the handler set taken at publish time (the Arcs are cloned out under the lock, then the lock is released before any handler runs), so subscribing or unsubscribing from inside a handler is safe (it takes effect on the next publish). A panicking handler is isolated with catch_unwind(AssertUnwindSafe(...)) so siblings still receive the event. Handlers are filed under a fresh monotonic id (Rust closures have no stable identity) in an IndexMap, preserving registration order for delivery; the TS "same function subscribed twice is one entry" quirk is therefore not reproduced (every subscribe is a distinct registration), but the observable fan-out contract is identical. Unsubscribe removes the handler on dispose() or Drop (idempotent).

pub type RunEventHandler = Arc<dyn Fn(&RunEvent) + Send + Sync>;

SnapshotAccumulator folds the snapshot-bearing events (snapshot/settled/faulted) into the single most recent RunSnapshot: observe(&RunEvent), snapshot() -> Option<RunSnapshot>, has_snapshot(), reset(), and attach(&RunLedger) -> Unsubscribe to wire it to a ledger automatically. Delta and tool-lifecycle events carry no snapshot and are ignored. The simplest path skips it entirely: agent.submit(...) already resolves the terminal snapshot, and agent.snapshot() reads the latest state at any time.

Wire projection

wire/projectors.rs assembles a runtime message list plus its AgentConfig into the gateway's provider-neutral Conversation:

pub fn build_conversation(messages: &[Turn], cfg: &AgentConfig) -> Conversation;

The assembly is table-driven: a HashMap<TurnRole, Projector> dispatches each message to its projector (Projector = fn(&Turn, &ProjectionContext) -> Vec<Turn>); returning a list keeps the contract flexible (a role may expand into several wire turns or collapse to none). Today every role passes through verbatim — the runtime already stores history in the gateway's turn shape — so projection is the identity; the seam is kept explicit so role-specific shaping (redaction, block filtering) would change only one entry, not dead code. system and tools are sourced from cfg and omitted (None) when absent/empty.

Cancellation and the turn budget

abort() cancels the run's root CancellationToken (cutting the model stream and tool dispatch at the source), cancels the scheduler's rounds, and steers an Abort signal into the reducer so the loop unwinds to a faulted terminal state. The model stream is opened with options.cancel = Some(active), and spawn_invoke_model drops any stream signal after the token is cancelled, because a connector parked on a network read may not poll the token until its next chunk.

The drive loop counts the InvokeModel effects each transition asks for and force-faults with RunErrorKind::TurnBudget once they would exceed max_turns (config.max_turns ?? 64, i.e. DEFAULT_MAX_TURNS = 64), guarding against a model that loops forever. Over-budget invoke_model effects are dropped while every other effect of that transition is still honoured.

Notable behavior

  • RunPhase::Compacting is reserved but unused — the reducer never sets it; condensation is handled at the conductor's invocation boundary via Compacted signals.
  • History is Arc-shared. RunSnapshot.messages is Arc<Vec<Turn>>, so a transition that does not touch history shares it structurally instead of deep-cloning. The pure layer treats every field as read-only and produces new owned values; the input snapshot is never mutated (the never_mutates_the_input_snapshot test asserts byte-for-byte equality of the input before/after a step).
  • The invalid helper is dead by construction but kept for parity. In TS it guarded a signal/phase mismatch; in Rust step destructures the payload out of the variant before dispatch, so a handler can never receive a mismatched signal. The helper still formats the exact "signal '<kind>' is not valid in phase '<phase>'" message.
  • Wire fields stay camelCase across the seamrunId/sessionId/usageTotal, isError, createdAt — so a Rust RunSnapshot/SessionNode/ToolOutcome round-trips to the same JSON keys the TS/Python editions emit.
  • RunError.cause is a free-form serde_json::Value (rather than Box<dyn Error>) to preserve Clone + PartialEq + Serialize, which the snapshot equality/wire tests rely on.
  • The capabilities bridge converts nominally distinct but structurally identical types. CapabilitiesToolBox field-for-field copies ToolDescriptor/ToolCall/ToolOutcome across the capabilitiesruntime seam (no behavior change), and agent_with_capabilities is the prompt's create_agent(model, tools) production wiring.

Relationship to neighbors

The runtime depends on the LLM gateway for its provider-neutral vocabulary (Turn, Block, Emission, Conversation, StreamOptions, Channel, Usage, Reply, ModelCard, ToolDescriptor, ThinkingLevel) and for the gateway::stream function plus catalog::get_card used by the default GatewayInvoker and the compaction window sizing. It deliberately does not re-export those gateway types — they have a single source of truth in indusagi::llmgateway::contract.

Downstream, the capabilities tool kernel and the shell/CLI build a ToolBox/ToolRunner, then call create_agent(...) (or agent_with_capabilities) and submit/subscribe. The cancellation currency (core::CancellationToken, root_token/round_token/child_token) and content hashing (core::content_hash, canonical_json) come from the crate's core module.

For the same engine in other editions, see the Python runtime. Back to the Architecture overview.