Runtime
indusagi::runtimeis the agent engine: it drives one model conversation from a prompt to settlement. It pairs a pure finite-state reducer (cadence) — I/O-free, await-free, never mutating its inputs — with a single stateful conductor (Agent/create_agent) that interprets the reducer'sEffects, folds the resultingSignals back in, and turns the loop until the run reaches asettledorfaultedterminal phase. Beneath them sit single-purpose drivers: the tool scheduler, the turn driver, the condensation engine, the content-addressed session store, the event ledger, and the wire projector.
The runtime is split in two. The pure core is an immutable RunSnapshot advanced by
a side-effect-free reducer (cadence) that returns Effects
and never touches I/O. The thin stateful conductor (Agent) interprets
those effects — reach the model, run tools, condense history, persist, publish — and folds
the resulting Signals back into the reducer until the run settles or faults. The whole
agent loop lives here: streamed emissions fold into an in-flight assistant turn, requested
tools are dispatched with bounded concurrency, history is condensed when it nears the model's
window, settled history is persisted to a content-addressed session DAG, and progress is
published as RunEvents.
This is the Rust edition of the runtime documented for Python;
the two share the same architecture (pure reducer + impure conductor, the same effect/signal
vocabulary) but the Rust port adds a compile-time-exhaustive signal match, Arc-shared
structural sharing of history, and a tokio/FuturesUnordered quiescence loop.
Table of Contents
- Quickstart
- Module map
- The frozen vocabulary
- The pure reducer cadence
- Folding emissions into a turn
- The conductor
- The quiescence loop
- Tool dispatch
- The turn driver
- Compaction
- Sessions and persistence
- Ledger and accumulator
- Wire projection
- Cancellation and the turn budget
- Notable behavior
- Relationship to neighbors
Quickstart
create_agent builds a runnable Agent from a static AgentConfig plus optional
AgentDeps. The model entrypoint is injectable through AgentDeps::invoke_model, so a test
can script the model deterministically without a network. The simplest production path is
agent_with_capabilities, which wires the gateway connector to a capabilities tool box.
use std::sync::Arc;
use indusagi::runtime::{
create_agent, AgentConfig, AgentDeps, ModelInvoker, RunPhase,
};
use indusagi::llmgateway::contract::{
Channel, Conversation, StreamOptions, Emission, Reply, Block,
AssistantRole, StopReason, Usage,
};
use indusagi::core::channel::BoxStream;
// A scripted model: one text emission then a terminal `done`.
struct Scripted;
impl ModelInvoker for Scripted {
fn invoke(&self, _c: Conversation, _o: StreamOptions) -> Channel {
Channel::of(|| {
let reply = Reply {
role: AssistantRole::Assistant,
model: "m".into(),
blocks: vec![Block::Text { text: "Hello, world!".into() }],
usage: Usage::EMPTY,
stop: StopReason::Complete,
};
Box::pin(futures::stream::iter(vec![
Emission::Text { delta: "Hello, world!".into() },
Emission::Done { reply },
])) as BoxStream<Emission>
})
}
}
#[tokio::main]
async fn main() {
let agent = create_agent(
AgentConfig::new("some-model"),
AgentDeps { invoke_model: Some(Arc::new(Scripted)), ..Default::default() },
);
let final_snapshot = agent.submit_prompt("hi").await; // string sugar -> one user turn
assert_eq!(final_snapshot.phase, RunPhase::Settled);
println!("{:?}", final_snapshot.messages.last()); // the folded assistant turn
}
AgentConfig::new(model) binds only the model id; system, tools, max_output_tokens,
thinking, compaction, and max_turns are optional. submit takes a Vec<Turn> and
returns the terminal RunSnapshot; submit_prompt is sugar that wraps a bare string into a
single user text turn via as_turns.
Module map
Internal layering is strict: contract (types) ← cadence (the pure reducer) ←
turn / dispatch / memory / store / ledger / wire (single-purpose drivers) ←
conductor (the only orchestrator that touches all of them).
| Module | Holds |
|---|---|
contract/ |
The frozen type vocabulary every module imports: run_state, signal, effect, tools, errors, events, session, config |
cadence/ |
The pure FSM: reducer.rs (cadence, Transition, StepFn, initial_snapshot) and fold.rs (fold_blocks/fold_turn/seal_tool_args/tool_calls_of/add_usage/parse_tool_args) |
conductor/ |
agent.rs — the sole stateful layer: Agent, create_agent, AgentDeps, the quiescence drive loop, the turn-budget guard, the compaction gate, and all effect handlers |
dispatch/ |
scheduler.rs — Scheduler/create_scheduler with bounded concurrency, per-call cancellation, completion-order Signal::ToolSettleds |
turn/ |
driver.rs — drive_turn, the stateless Channel → Signal adapter for one model invocation |
memory/ |
compactor.rs (should_compact/find_cut_point/summarize/compact, DISTILL_INSTRUCTION, DEFAULT_POLICY) and estimate.rs (estimate_context_tokens) |
store/ |
Content-addressed sessions: dag.rs (SessionGraph/Clock), persist.rs (SessionStore), hash.rs (hash_node + HASH_WIDTH) |
ledger/ |
bus.rs (RunLedger fan-out hub, RunEventHandler, Unsubscribe) and accumulator.rs (SnapshotAccumulator) |
wire/ |
projectors.rs — build_conversation + Projector/ProjectionContext/TurnRole, the table-driven (currently passthrough) message → Conversation assembly |
capabilities_bridge.rs |
CapabilitiesToolBox + agent_with_capabilities — adapts a capabilities::ToolBox to the runtime ToolBox trait |
The crate barrel indusagi::runtime re-exports the host-facing surface:
pub use conductor::{Agent, AgentDeps, AgentEventHandler, create_agent};
pub use cadence::{StepFn, Transition, cadence, cadence as step, initial_snapshot};
pub use dispatch::{DEFAULT_CONCURRENCY, Scheduler, create_scheduler};
pub use memory::{compact, estimate_context_tokens, find_cut_point, should_compact, summarize};
pub use store::{Clock, SessionGraph, SessionStore, hash_node};
pub use turn::drive_turn;
pub use ledger::{RunEventHandler, RunLedger, SnapshotAccumulator, Unsubscribe};
pub use wire::{ProjectionContext, Projector, TurnRole, build_conversation};
pub use capabilities_bridge::{CapabilitiesToolBox, agent_with_capabilities};
The frozen vocabulary
contract/ freezes the counts the whole runtime agrees on: 7 phases, 7 signals,
5 effects, 7 events. Every variant carries a kind() accessor returning the exact
snake_case discriminant the TS source serialized on the wire.
Phases (`RunPhase`)
The discrete macro-states a run occupies. serde renders each snake_case (idle, invoking,
…), matching the original TS string literals.
| Variant | Meaning |
|---|---|
Idle |
Created but not yet invoking the model. |
Invoking |
A model request has been requested but no stream has started. |
Streaming |
Folding incremental emissions from the model. |
Dispatching |
Running the tools the model asked for. |
Compacting |
Reserved for condensing history (see Notable behavior — the reducer never sets it). |
Settled |
Finished cleanly; no further work. |
Faulted |
Finished with an unrecoverable RunError. |
Signals (`Signal`)
A Signal is the only thing that can move a run forward. The reducer takes the current
RunSnapshot plus one signal and returns the next snapshot together with side-effects.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Signal {
Submit { input: Vec<Turn> },
Emission { emission: Emission },
StreamEnd,
ToolSettled { id: String, result: ToolOutcome },
Compacted { summary: Turn },
Abort,
Fault { error: RunError },
}
The TS AbortSignalInput variant (renamed to avoid clashing with the DOM AbortSignal) is
the Abort variant here.
Effects (`Effect`)
The reducer performs no I/O; it returns Effects describing what the conductor should do
next. Effect is Serialize-only (effects are interpreted in-process, never deserialized);
InvokeModel's StreamOptions::cancel is #[serde(skip)] because a live
CancellationToken is not data.
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Effect {
InvokeModel { conversation: Conversation, options: StreamOptions },
RunTool { call: ToolCall },
Persist { snapshot: RunSnapshot },
Compact { messages: Arc<Vec<Turn>> },
Publish { event: RunEvent },
}
Events (`RunEvent`)
The outward-facing notifications a host subscribes to. RunEvent derives
Clone, Debug, Serialize — Serialize-only, like Effect (events are dispatched in-process,
never deserialized off a wire), tagged on kind (snake_case). Snapshot, Settled, and
Faulted carry a RunSnapshot (exposed by RunEvent::snapshot() -> Option<&RunSnapshot>,
mirroring the accumulator's snapshotOf); the delta and tool-lifecycle events carry none.
RunEvent::kind() -> &'static str returns the snake_case discriminant.
| Variant | Payload |
|---|---|
Snapshot |
{ snapshot } — a fresh immutable snapshot was produced. |
TextDelta |
{ delta } — incremental assistant prose. |
ThinkingDelta |
{ delta } — incremental model reasoning. |
ToolStarted |
{ id, name } — a requested tool began executing. |
ToolFinished |
{ id, name, outcome } — a tool finished, success or not. |
Settled |
{ snapshot } — clean terminal state. |
Faulted |
{ error, snapshot } — faulted terminal state. |
Run state, errors, tools, sessions, config
| Type | Source | Purpose |
|---|---|---|
RunSnapshot |
contract/run_state.rs |
Frozen event-sourced state: run_id, session_id, phase, messages: Arc<Vec<Turn>>, pending, usage_total, model, optional error. Wire keys are camelCase (runId/sessionId/usageTotal); error is omitted unless faulted. |
PendingTool |
contract/run_state.rs |
One outstanding tool: id, name, stage (ToolStage). |
ToolStage |
contract/run_state.rs |
Queued → Running → Done. The reducer stamps a tool Running on dispatch; the scheduler tracks queued → running internally. |
RunError / RunErrorKind |
contract/errors.rs |
kind (the 6 variants ModelFailed/ToolFailed/Aborted/CompactionFailed/TurnBudget/InvalidState), message, optional cause: serde_json::Value (omitted from the wire when absent). Constructors: RunError::new(kind, message), RunError::with_cause(kind, message, cause), and the free function run_error(kind, message, cause: Option<Value>) (the TS runError helper — None omits the field). |
ToolCall / ToolOutcome |
contract/tools.rs |
A parsed call (id, name, input: Value) and its result (id, output: Value, is_error — wire isError). |
ToolRunner / ToolBox |
contract/tools.rs |
The async-trait executor seam and the bundle of descriptors() + runner(). |
SessionNode / SessionHead / Migrator |
contract/session.rs |
One immutable DAG node (id, parent, turn, createdAt), the session tip pointer, and the schema-upgrade trait. |
AgentConfig / CompactionPolicy |
contract/config.rs |
Immutable run setup and the condensation threshold policy. |
ModelInvoker |
contract/events.rs |
The single injectable model seam (a trait, object-safe behind Arc<dyn ModelInvoker>). |
AgentConfig holds tools: Option<Arc<dyn ToolBox>>, so it is Clone (cheap — the box is
shared) but not PartialEq/Serialize as a whole; the reducer reads the box through
descriptors(), never by value comparison.
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct CompactionPolicy {
pub trigger_ratio: f64, // fraction of the window (0..1) at which condensation triggers
pub keep_recent: usize, // trailing turns kept untouched
}
The pure reducer cadence
cadence(config) binds a static AgentConfig and returns a StepFn — a pure
(&RunSnapshot, Signal) -> Transition you can call, thread signals through, and interpret
the returned effects of. It performs no I/O and never awaits; every transition derives a
fresh immutable snapshot (the input is treated as frozen) and a flat Vec<Effect>.
pub type StepFn = Arc<dyn Fn(&RunSnapshot, Signal) -> Transition + Send + Sync>;
#[derive(Clone, Debug)]
pub struct Transition {
pub state: RunSnapshot,
pub effects: Vec<Effect>,
}
pub fn cadence(config: AgentConfig) -> StepFn;
pub fn initial_snapshot(
session_id: impl Into<String>,
model: impl Into<String>,
run_id: Option<String>,
) -> RunSnapshot;
The agent loop is encoded as a transition table keyed on (phase, signal), not nested
control flow:
submit → invoke_model · phase invoking
emission (text/thinking) → fold + publish delta · phase streaming
emission (tool start) → fold + tool_started · phase streaming
emission (usage/stop) → accumulate/refine · phase streaming (no effect)
emission (error) → faulted · phase faulted
stream_end · has tool calls → run_tool × N · phase dispatching
stream_end · no tool calls → persist + settled · phase settled
tool_settled · more pending → mark done, keep waiting · phase dispatching
tool_settled · all done → append tool turn, invoke · phase invoking
compacted → splice summary, invoke · phase invoking
abort → faulted (aborted) · phase faulted
fault → faulted · phase faulted
Driving the FSM directly (bypassing the conductor) is supported — cadence is re-exported
as step for advanced hosts:
use indusagi::runtime::{cadence, initial_snapshot, AgentConfig};
use indusagi::runtime::contract::Signal;
use indusagi::llmgateway::contract::{Turn, Block};
let step = cadence(AgentConfig::new("some-model")); // also exported as `step`
let state = initial_snapshot("s1", "some-model", None); // phase Idle
let signal = Signal::Submit {
input: vec![Turn::User { blocks: vec![Block::Text { text: "hi".into() }] }],
};
let t = step(&state, signal);
assert_eq!(t.state.phase, indusagi::runtime::RunPhase::Invoking);
for effect in &t.effects { println!("{}", effect.kind()); } // invoke_model, publish
Exhaustiveness — no `_` arm
The dispatch match in the reducer enumerates all 7 Signal variants explicitly with NO
wildcard arm. This is load-bearing: adding a signal to the contract fails the build at this
match until the new transition is handled, so a signal can never be silently dropped. The
terminal-phase pre-check above the match likewise names the three reopening signals
(Submit/Compacted/Fault) explicitly; only the already-enumerated "stray late signal"
fallthrough uses a catch-all Transition { state, effects: [] } — the intended "ignore"
branch, not a missing case. A dedicated test (exhaustive_dispatch_covers_all_signals) is
the compile-time witness.
Terminal states (Settled/Faulted) absorb nothing except the three named signals the
pre-check enumerates: a Submit or Compacted arriving against a closed run reopens it
(both route through invoke(), which sets phase = invoking and strips any prior terminal
error so a reopened run starts clean), while a Fault simply re-faults (it stays in
Faulted with the new error). Every other signal against a closed run hits the explicit
"ignore" catch-all and returns the snapshot unchanged with no effects.
Folding emissions into a turn
cadence/fold.rs grows the in-flight assistant Turn one Emission at a time, purely —
each helper returns a brand-new block list rather than mutating its argument.
| Function | Behavior |
|---|---|
fold_blocks(&[Block], &Emission) -> Vec<Block> |
Text/thinking deltas extend a trailing block of the same kind or start a new one; ToolCallStart opens a new tool-call block whose input begins as Value::String(""); each ToolCallDelta concatenates onto that raw buffer; a terminal Done replaces the whole list with reply.blocks. |
fold_turn(&Turn, &Emission) -> Turn |
Thin wrapper preserving the assistant role. |
seal_tool_args(&[Block]) -> Vec<Block> |
At stream end, every ToolCall whose input is still a Value::String is re-parsed (the TS typeof b.input === "string" gate). |
parse_tool_args(&str) -> Value |
Empty/whitespace → {}; invalid JSON → { "__unparsed": raw } (the untrimmed raw); valid JSON → the parsed value. |
tool_calls_of(&[Block]) -> Vec<(String, String, Value)> |
Collects the tool-call blocks (id, name, input) in order. |
add_usage(&Usage, &Usage) -> Usage |
Field-by-field sum; cache tiers are kept only when their sum is > 0 (a pair of absent/zero tiers stays absent on the wire). ZERO_USAGE = Usage::EMPTY. |
The streaming-args representation is the key parity detail: the Rust gateway's
Block::ToolCall.input is a serde_json::Value, so the still-growing raw JSON string is
carried as Value::String(raw) during streaming and replaced by the parsed value on seal.
affects_blocks is exhaustive over the 8-variant Emission union with no wildcard arm.
The conductor
The conductor (conductor/agent.rs) is the only stateful, side-effecting layer. It threads
a serial stream of Signals through the reducer, performs each emitted Effect, and folds
the resulting signals back in until a terminal phase. create_agent is the single entry
point.
pub fn create_agent(config: AgentConfig, deps: AgentDeps) -> Agent;
#[derive(Default)]
pub struct AgentDeps {
pub invoke_model: Option<Arc<dyn ModelInvoker>>, // default: gateway, bound to config.model
pub store: Option<SessionStore>, // where settled history is written
pub ledger: Option<RunLedger>, // where events are published
}
The Agent handle owns one logical session and exposes:
| Method | Signature | Purpose |
|---|---|---|
submit |
async fn submit(&self, input: Vec<Turn>) -> RunSnapshot |
Drive a prompt to settlement; returns the terminal snapshot. |
submit_prompt |
async fn submit_prompt(&self, prompt: impl Into<String>) -> RunSnapshot |
String sugar over submit. |
subscribe |
fn subscribe<F>(&self, handler: F) -> Unsubscribe |
Tap the live RunEvent stream; returns a disposer. |
abort |
async fn abort(&self) |
Cancel the active token, cancel the scheduler's rounds, and steer an Abort signal in. |
snapshot |
async fn snapshot(&self) -> RunSnapshot |
Read the latest snapshot synchronously. |
resume |
async fn resume(&self, target_session: &str) -> Result<(), RunError> |
Rehydrate messages from a persisted session (phase returns to Idle). |
session_id |
fn session_id(&self) -> &str |
The session this agent threads runs through. |
When config.tools is present, create_agent builds a Scheduler over tools.runner().
The default invoke_model is a GatewayInvoker binding llmgateway::gateway::stream to
config.model; an unknown model id (an unsupported GatewayError) is surfaced as a
single-error-emission channel so the turn driver forwards it and the reducer faults the run
(parity with the TS sync throw → fault).
A concurrent submit arriving while a run is in flight is folded in as steering: the
Submit signal is pushed onto the live queue rather than starting a competing drive, and the
caller chains on the in-flight drive by polling the shared driving flag.
The quiescence loop
drive(seed) is the heartbeat: pull the next signal, step the reducer, swap in the new
snapshot, fire its effects. Streaming effects (the model stream, the tool dispatch round,
condensation) push their own signals back, so the loop keeps turning. The serial signal pipe
is a tokio::sync::mpsc::unbounded_channel, and the in-flight async effects are tracked in a
FuturesUnordered<JoinHandle<()>>.
A run is genuinely finished only when it is in a terminal phase, no effect is still in
flight, AND no signal is still buffered — so a steering submit/compacted/fault
that lands while a trailing persist/publish is still draining can legitimately reopen the
run. The loop races draining in-flight work against re-checking quiescence (using try_recv
to peek the buffer and tokio::select! with biased to prefer a buffered signal over a
draining effect), mirroring the TS
while (isTerminal && !hasPending) { if (inflight.size === 0) return; await Promise.race(inflight); }.
perform_all collects all run_tool effects of one transition and dispatches them together
so a single stream-end round runs as one batch under one cancellation scope; publish is
synchronous (inline ledger.publish), while invoke_model, compact, and persist are
spawned and tracked in inflight.
Tool dispatch
dispatch/scheduler.rs bridges the reducer's run_tool effects to real ToolRunner
execution for one dispatch round. Scheduler::run(calls, outer) returns a Stream<Item = Signal>
that yields a Signal::ToolSettled per finished call — in completion order, not request
order.
pub const DEFAULT_CONCURRENCY: usize = 8;
pub fn create_scheduler(runner: Arc<dyn ToolRunner>) -> Scheduler;
impl Scheduler {
pub fn new(runner: Arc<dyn ToolRunner>) -> Self;
pub fn with_concurrency(runner: Arc<dyn ToolRunner>, concurrency: usize) -> Self; // never below 1
pub fn run(&self, calls: Vec<ToolCall>, outer: CancellationToken)
-> impl Stream<Item = Signal> + Send + 'static;
pub fn cancel(&self); // fires the round token of every started round
}
Three properties define its behavior, ported faithfully from the TS:
- Per-call cancellation. Every call runs under its own
CancellationTokenchild, chained to the round token (round = round_token(&outer),child = child_token(&round)), which in turn descends from the caller'soutertoken andScheduler::cancel. Down-propagation (cancel the parent ⇒ cancel every descendant) is exactly the round teardown. - Bounded concurrency. At most
DEFAULT_CONCURRENCY(8) calls run at once, gated by atokio::sync::Semaphore; the rest wait in request order. A queued call reached after the round is cancelled is skipped rather than started (no settled signal). - Completion-order emission. Settled signals are delivered as workers finish.
Per the cooperative-cancel contract, a cancelled ToolRunner::run returns a
ToolOutcome { is_error: true } — never an Err, never a panic — so an aborted round drains
to typed errors instead of deadlocking. If a tool is requested but no scheduler is configured
(no tools in the config), the round faults with RunErrorKind::ToolFailed rather than
hanging forever. Round tokens are tracked by a monotonic id (a CancellationToken has no
PartialEq), and a round self-removes from the tracked set when it finishes.
The ToolRunner trait is the injected side that executes tools:
#[async_trait::async_trait]
pub trait ToolRunner: Send + Sync {
async fn run(&self, call: ToolCall, cancel: CancellationToken) -> ToolOutcome;
}
The turn driver
turn/driver.rs is the stateless seam between "bytes off the wire" and "inputs the reducer
understands". drive_turn(channel) adapts one model invocation's gateway Channel into the
Signal vocabulary:
pub fn drive_turn(channel: Channel) -> impl Stream<Item = Signal> + Send;
Each Emission pulled from the channel is forwarded as a Signal::Emission in arrival
order; once the channel is exhausted, a single terminal Signal::StreamEnd is yielded
(built with StreamExt::map over the channel followed by a futures::stream::once chained
on). It holds no run state and performs no folding — interpretation is left to the reducer.
A transport/protocol failure surfaces as a terminal Emission::Error, which the driver
forwards as an emission signal (the reducer's on_emission faults the run on it); a
terminal StreamEnd still follows for TS parity even after an error envelope.
Compaction
memory/ condenses history when its estimated footprint approaches the model's window. The
oldest stretch is replaced by a single distilled summary turn while a verbatim tail of recent
turns is preserved. Condensation is not a reducer transition — it happens lazily at the
conductor's invocation boundary.
| Function | Behavior |
|---|---|
estimate_context_tokens(&[Turn]) -> usize |
chars/4 (CHARS_PER_TOKEN) ceiling-divided, plus 4 (TURN_OVERHEAD_TOKENS) per turn. Counts Unicode scalar values for prose and canonical_json(value).chars().count() for serialized values. |
should_compact(&[Turn], &ModelCard, Option<&CompactionPolicy>) -> bool |
estimate >= context_window * trigger_ratio; false for a non-positive window. |
find_cut_point(&[Turn], keep_recent) -> usize |
The boundary between the prefix to condense and the tail to keep, nudged forward while the first surviving turn carries tool results, so a tool_call/tool_result pair is never split. Returns the index of the first turn kept verbatim. |
summarize(&[Turn], &dyn ModelInvoker) -> Result<Turn, String> |
Renders the prefix into a role-prefixed transcript, invokes the model with DISTILL_INSTRUCTION as system, and folds the response into a user-role turn marked with SUMMARY_HEADING ([condensed earlier context]). |
compact(&[Turn], &ModelCard, Option<&CompactionPolicy>, &dyn ModelInvoker) -> Result<Vec<Turn>, String> |
[summary, ...tail]; a cut point of 0 returns history unchanged, so compact is safe to call unconditionally. |
DEFAULT_POLICY = CompactionPolicy { trigger_ratio: 0.8, keep_recent: 8 } is applied when no
policy is configured. The summary is filed under the user role (not assistant) so it
stays valid input for any provider.
The conductor's gate (spawn_invoke_model → condense_if_shrinks) reads the live
self.snapshot.messages, not the conversation captured in the effect, so it always sizes the
freshest history. Before opening a stream, if should_compact() trips, the older prefix is
condensed and a Signal::Compacted is fed back instead — the reducer's on_compacted
re-issues a fresh invoke_model over the shrunken history ([summary, ...keep_recent tail]),
and the gate then passes on the second pass. A pre-check on find_cut_point(...) <= 1 avoids
spending a model call when history is already as small as the policy allows (and stops the
gate re-triggering forever). A condensation error faults the run with
RunErrorKind::CompactionFailed; DEFAULT_KEEP_RECENT_POLICY (the conductor's own fallback,
also keep_recent: 8) keeps the conductor and the compactor agreeing on where the tail
begins.
Sessions and persistence
When AgentDeps::store is injected, settled history is appended to a content-addressed
session DAG. store/ keeps history as an immutable DAG: each SessionNode carries one turn
and points at its parent by hash, so the linear context for a model call is the path from a
leaf back to the root, reversed.
impl SessionGraph {
pub fn new(session_id: impl Into<String>) -> Self; // production wall clock
pub fn with_clock(session_id: impl Into<String>, clock: Clock) -> Self; // injected clock (tests)
pub fn hydrate(session_id, nodes: impl IntoIterator<Item = SessionNode>, leaf: Option<String>) -> Self;
pub fn append(&mut self, turn: Turn) -> SessionNode; // chain + advance head
pub fn branch_from(&mut self, node_id: &str) -> Result<SessionNode, String>;
pub fn path_to(&self, leaf: &str) -> Result<Vec<Turn>, String>; // root -> leaf
pub fn resume(&mut self, leaf: &str) -> Result<Vec<Turn>, String>; // branch_from + path_to
pub fn leaf(&self) -> Option<&str>; // the current head, or None when empty
pub fn all(&self) -> Vec<SessionNode>; // insertion order, an IndexMap underneath
pub fn get(&self, node_id: &str) -> Option<&SessionNode>; // lookup by content hash
pub fn size(&self) -> usize; // node count across every branch
pub fn session_id(&self) -> &str;
}
`Clock = Arc<dyn Fn() -> i64 + Send + Sync>` is the injectable epoch-milliseconds
source (the TS `Clock = () => number`, `Date.now` in production), so a test can
pin timestamps and therefore the content hashes a session produces.
hash_node(parent, turn, created_at) -> String derives a node's id as
sha256(JSON.stringify({ parent: parent ?? null, turn, createdAt }))[..32] via the
framework's core::content_hash (HASH_WIDTH = 32). Re-appending an identical turn from the
same head at the same time is a no-op head advance — the content hash collides, so an
unchanged prefix re-persists nothing. A cross-language golden test pins the hash byte-for-byte
against a Node-generated fixture (7d759dabaf36f47466002755371dcf9d).
SessionStore is the filesystem JSONL backing: each session is an append-only .jsonl file
under a root, every line a node record or a head marker.
impl SessionStore {
pub fn new(root: impl Into<PathBuf>) -> Self;
pub fn with_migrators(root: impl Into<PathBuf>, migrators: Vec<Arc<dyn Migrator>>) -> Self;
pub async fn append_node(&self, session_id: &str, node: &SessionNode) -> std::io::Result<()>;
pub async fn load_session(&self, session_id: &str) -> std::io::Result<SessionGraph>;
pub async fn list_sessions(&self) -> std::io::Result<Vec<String>>;
}
load_session replays the file, runs each raw record through the Migrator pipeline
(composed in sequence, each upgrading one schema version), and reconstructs the graph; a
missing file yields an empty graph and a corrupt line is skipped. The conductor's
spawn_persist loads the session, appends every turn not yet on the persisted leaf, and only
writes a genuinely new leaf node to disk. Persistence is advisory — a store error is
swallowed so an otherwise-complete run still settles. agent.resume(session_id) re-seats the
in-memory snapshot on the rehydrated root→leaf path and returns the phase to Idle, so the
next submit continues the conversation; it errors with RunErrorKind::InvalidState when no
store is configured or the session fails to load. On load each session is read from its own
append-only <session_id>.jsonl file, where each line is a {"type":"node", ...} or
{"type":"head","leaf":...} record; append_node writes the node record followed by a fresh
head marker, so the last head line always names the live leaf.
Ledger and accumulator
ledger/ is the seam between the pure loop and any host that renders or persists progress.
RunLedger is a synchronous fan-out hub: new(), subscribe(handler) -> Unsubscribe,
publish(&RunEvent), clear(), size(). It is Clone + Default and holds its registry
behind an Arc<Mutex<…>>, so a cloned ledger publishes to the same subscriber set. Delivery
is sequential over a snapshot of the handler set taken at publish time (the Arcs are cloned
out under the lock, then the lock is released before any handler runs), so subscribing or
unsubscribing from inside a handler is safe (it takes effect on the next publish). A panicking
handler is isolated with catch_unwind(AssertUnwindSafe(...)) so siblings still receive the
event. Handlers are filed under a fresh monotonic id (Rust closures have no stable identity) in
an IndexMap, preserving registration order for delivery; the TS "same function subscribed
twice is one entry" quirk is therefore not reproduced (every subscribe is a distinct
registration), but the observable fan-out contract is identical. Unsubscribe removes the
handler on dispose() or Drop (idempotent).
pub type RunEventHandler = Arc<dyn Fn(&RunEvent) + Send + Sync>;
SnapshotAccumulator folds the snapshot-bearing events (snapshot/settled/faulted) into
the single most recent RunSnapshot: observe(&RunEvent), snapshot() -> Option<RunSnapshot>,
has_snapshot(), reset(), and attach(&RunLedger) -> Unsubscribe to wire it to a ledger
automatically. Delta and tool-lifecycle events carry no snapshot and are ignored. The simplest
path skips it entirely: agent.submit(...) already resolves the terminal snapshot, and
agent.snapshot() reads the latest state at any time.
Wire projection
wire/projectors.rs assembles a runtime message list plus its AgentConfig into the
gateway's provider-neutral Conversation:
pub fn build_conversation(messages: &[Turn], cfg: &AgentConfig) -> Conversation;
The assembly is table-driven: a HashMap<TurnRole, Projector> dispatches each message to its
projector (Projector = fn(&Turn, &ProjectionContext) -> Vec<Turn>); returning a list keeps
the contract flexible (a role may expand into several wire turns or collapse to none). Today
every role passes through verbatim — the runtime already stores history in the gateway's turn
shape — so projection is the identity; the seam is kept explicit so role-specific shaping
(redaction, block filtering) would change only one entry, not dead code. system and tools
are sourced from cfg and omitted (None) when absent/empty.
Cancellation and the turn budget
abort() cancels the run's root CancellationToken (cutting the model stream and tool
dispatch at the source), cancels the scheduler's rounds, and steers an Abort signal into
the reducer so the loop unwinds to a faulted terminal state. The model stream is opened with
options.cancel = Some(active), and spawn_invoke_model drops any stream signal after the
token is cancelled, because a connector parked on a network read may not poll the token until
its next chunk.
The drive loop counts the InvokeModel effects each transition asks for and force-faults
with RunErrorKind::TurnBudget once they would exceed max_turns (config.max_turns ?? 64,
i.e. DEFAULT_MAX_TURNS = 64), guarding against a model that loops forever. Over-budget
invoke_model effects are dropped while every other effect of that transition is still
honoured.
Notable behavior
RunPhase::Compactingis reserved but unused — the reducer never sets it; condensation is handled at the conductor's invocation boundary viaCompactedsignals.- History is
Arc-shared.RunSnapshot.messagesisArc<Vec<Turn>>, so a transition that does not touch history shares it structurally instead of deep-cloning. The pure layer treats every field as read-only and produces new owned values; the input snapshot is never mutated (thenever_mutates_the_input_snapshottest asserts byte-for-byte equality of the input before/after a step). - The
invalidhelper is dead by construction but kept for parity. In TS it guarded a signal/phase mismatch; in Ruststepdestructures the payload out of the variant before dispatch, so a handler can never receive a mismatched signal. The helper still formats the exact"signal '<kind>' is not valid in phase '<phase>'"message. - Wire fields stay camelCase across the seam —
runId/sessionId/usageTotal,isError,createdAt— so a RustRunSnapshot/SessionNode/ToolOutcomeround-trips to the same JSON keys the TS/Python editions emit. RunError.causeis a free-formserde_json::Value(rather thanBox<dyn Error>) to preserveClone + PartialEq + Serialize, which the snapshot equality/wire tests rely on.- The capabilities bridge converts nominally distinct but structurally identical types.
CapabilitiesToolBoxfield-for-field copiesToolDescriptor/ToolCall/ToolOutcomeacross thecapabilities↔runtimeseam (no behavior change), andagent_with_capabilitiesis the prompt'screate_agent(model, tools)production wiring.
Relationship to neighbors
The runtime depends on the LLM gateway for its provider-neutral vocabulary (Turn, Block,
Emission, Conversation, StreamOptions, Channel, Usage, Reply, ModelCard,
ToolDescriptor, ThinkingLevel) and for the gateway::stream function plus
catalog::get_card used by the default GatewayInvoker and the compaction window sizing. It
deliberately does not re-export those gateway types — they have a single source of truth
in indusagi::llmgateway::contract.
Downstream, the capabilities tool kernel and the shell/CLI build a ToolBox/ToolRunner,
then call create_agent(...) (or agent_with_capabilities) and submit/subscribe. The
cancellation currency (core::CancellationToken, root_token/round_token/child_token)
and content hashing (core::content_hash, canonical_json) come from the crate's core
module.
For the same engine in other editions, see the Python runtime. Back to the Architecture overview.
