Runtime
indusagi.runtimeis the engine that drives one model conversation from a prompt to settlement. Imported asimport indusagi.runtime(orfrom indusagi import runtime), it pairs a pure finite-state reducer with a single stateful conductor that carries out the side effects the reducer asks for.
The runtime is split in two. The pure core is an immutable RunSnapshot advanced by a
side-effect-free reducer (cadence) that returns Effects and never touches I/O. The thin
stateful conductor (Agent / create_agent) interprets those effects — reach the model,
run tools, condense history, persist, publish — and folds the resulting Signals back into
the reducer until the run settles or faults. It owns the full agent loop: streamed emissions
fold into an assistant turn, requested tools are dispatched with bounded concurrency, history
is condensed when it nears the model's window, settled history is persisted to a
content-addressed session DAG, and progress is published as RunEvents.
Table of Contents
- Quickstart
- Public API
- Sub-directories
- Effect / Signal loop
- Driving the pure FSM directly
- Events and snapshots
- Tool dispatch
- Compaction
- Sessions and persistence
- Key concepts
- Notable behavior
- Relationship to neighbors
Quickstart
create_agent builds a runnable Agent from a static AgentConfig. submit drives one
prompt to settlement; the model entrypoint is injectable, so a test can script it without a
network. A Channel is just an async iterable of Emissions.
import asyncio
from indusagi.runtime import create_agent, AgentConfig, AgentDeps
from indusagi.llmgateway.contract import (
Conversation, StreamOptions, TextEmission, DoneEmission,
AssistantTurn, TextBlock, Usage,
)
async def scripted(conversation: Conversation, options: StreamOptions):
yield TextEmission(delta="Hello, ")
yield TextEmission(delta="world!")
reply = AssistantTurn(blocks=(TextBlock(text="Hello, world!"),))
yield DoneEmission(reply=reply, usage=Usage(input_tokens=10, output_tokens=3))
def invoke(conversation, options):
return scripted(conversation, options)
async def main() -> None:
agent = create_agent(
AgentConfig(model="some-model"),
AgentDeps(invoke_model=invoke),
)
final = await agent.submit("hi") # string sugar -> a single user text turn
print(final.phase) # 'settled'
print(final.messages[-1]) # the folded AssistantTurn
asyncio.run(main())
create_agent(config, deps=None) accepts an AgentConfig — model (a catalog id) is
required; system, tools, max_output_tokens, thinking, compaction, and max_turns
are optional. submit takes a bare string (sugar for one user turn) or a sequence of Turns
and returns the terminal RunSnapshot.
Public API
The package barrel (indusagi.runtime) exposes the host-facing factory, its handle, the
type vocabulary callers branch on, and the pure transition machine for advanced hosts.
| Name | Kind | Source | Purpose |
|---|---|---|---|
create_agent |
function | conductor/agent.py |
The single entry point: build a runnable Agent from an AgentConfig (+ optional AgentDeps), wiring the pure reducer to real effect handlers |
Agent |
class | conductor/agent.py |
The driveable handle: await submit(input), subscribe(handler), abort(), snapshot(), await resume(session_id) |
AgentDeps |
dataclass | conductor/agent.py |
Frozen, optional injectable collaborators: invoke_model (defaults to the gateway), store, ledger |
AgentEventHandler |
type | conductor/agent.py |
Alias of RunEventHandler — the Callable[[RunEvent], None] shape registered with Agent.subscribe |
cadence |
function | cadence/reducer.py |
The pure FSM factory: cadence(config) binds the static AgentConfig and returns a StepFn |
step |
function | runtime/__init__.py |
Re-export alias of cadence, for hosts that drive the FSM themselves |
Transition |
dataclass | cadence/reducer.py |
The result of one step: state: RunSnapshot and effects: tuple[Effect, ...] |
StepFn |
type | cadence/reducer.py |
Callable[[RunSnapshot, Signal], Transition] — a reducer bound to its config |
initial_snapshot |
function | cadence/reducer.py |
Build a clean starting RunSnapshot (phase 'idle') for a session id + model |
RunSnapshot |
dataclass | contract/run_state.py |
Frozen, event-sourced state: run_id, session_id, phase, messages, pending, usage_total, model, optional error |
RunPhase |
type | contract/run_state.py |
Literal macro-states: 'idle', 'invoking', 'streaming', 'dispatching', 'compacting', 'settled', 'faulted' |
PendingTool |
dataclass | contract/run_state.py |
One outstanding tool, stage 'queued'→'running'→'done' (id, name, stage) |
AgentConfig |
dataclass | contract/config.py |
Immutable run setup: model, system, tools (ToolBox), max_output_tokens, thinking, compaction, max_turns (default 64) |
CompactionPolicy |
dataclass | contract/config.py |
Threshold policy: trigger_ratio (fraction of window) and keep_recent (trailing turns kept verbatim) |
ToolBox |
class | contract/tools.py |
Protocol bundling the model-visible catalog (descriptors()) with the runner that fulfils calls |
ToolRunner |
class | contract/tools.py |
Protocol: run(call, cancel) -> Awaitable[ToolOutcome] — the injectable side that executes tools |
ToolCall |
dataclass | contract/tools.py |
A parsed request to invoke a named tool: id, name, input |
ToolOutcome |
dataclass | contract/tools.py |
Result of one ToolCall: id, output, is_error |
RunError |
dataclass | contract/errors.py |
Normalized failure: kind (RunErrorKind), message, optional cause; becomes the error of a faulted snapshot |
RunErrorKind |
type | contract/errors.py |
Literal classes: 'model_failed', 'tool_failed', 'aborted', 'compaction_failed', 'turn_budget', 'invalid_state' |
run_error |
function | contract/errors.py |
Convenience constructor for RunError(kind, message, cause) |
RunEvent |
type | contract/events.py |
Tagged union of notifications: SnapshotEvent, TextDeltaEvent, ThinkingDeltaEvent, ToolStartedEvent, ToolFinishedEvent, SettledEvent, FaultedEvent |
ModelInvoker |
type | contract/events.py |
Callable[[Conversation, StreamOptions], Channel] — the single injectable seam to a model |
Effects, signals, and the lower-layer drivers live on the sub-packages below.
| Name | Kind | Source | Purpose |
|---|---|---|---|
Effect |
type | contract/effect.py |
Tagged union of requested side-effects: InvokeModelEffect, RunToolEffect, PersistEffect, CompactEffect, PublishEffect |
Signal |
type | contract/signal.py |
Tagged union driving the machine: SubmitSignal, EmissionSignal, StreamEndSignal, ToolSettledSignal, CompactedSignal, AbortSignalInput, FaultSignal |
RunLedger |
class | ledger/bus.py |
Synchronous fan-out hub: subscribe/publish/clear/size; isolates throwing subscribers; snapshots the handler set per publish |
SnapshotAccumulator |
class | ledger/accumulator.py |
Folds snapshot-bearing events into the latest RunSnapshot: observe/get_snapshot/reset/attach(ledger) |
Scheduler |
class | dispatch/scheduler.py |
Tool scheduler over a ToolRunner: run(calls, cancel) async-yields ToolSettledSignals in completion order with bounded concurrency (DEFAULT_CONCURRENCY=8); cancel() aborts |
create_scheduler |
function | dispatch/scheduler.py |
Build a Scheduler over a ToolRunner (optional concurrency) |
drive_turn |
async function | turn/driver.py |
Adapter turning a Channel into a Signal stream: emission → EmissionSignal, clean exhaustion → StreamEndSignal, raised error → FaultSignal (CancelledError re-raised) |
compact |
async function | memory/compactor.py |
Condense a history: replace the older prefix with one distilled summary turn, keep the recent tail verbatim |
should_compact |
function | memory/compactor.py |
Decide if estimated tokens >= trigger_ratio * context_window (False on non-positive window) |
find_cut_point |
function | memory/compactor.py |
Locate the prefix/tail boundary, nudged forward so a tool_call/tool_result pair is never split |
summarize |
async function | memory/compactor.py |
Distill a stretch of turns into one synthetic user-role summary via the model and DISTILL_INSTRUCTION |
estimate_context_tokens |
function | memory/estimate.py |
Heuristic token count: chars/4 (CHARS_PER_TOKEN) plus 4 (TURN_OVERHEAD_TOKENS) per turn |
SessionGraph |
class | store/dag.py |
In-memory content-addressed DAG: append/branch_from/path_to/resume/hydrate; tracks a moving leaf head |
SessionStore |
class | store/persist.py |
Filesystem JSONL store of session DAGs: append_node, load_session (replays + migrator pipeline), list_sessions |
hash_node |
function | store/hash.py |
SHA-256 content hash (truncated to 32 hex chars) over parent id + canonical-JSON turn + created_at |
build_conversation |
function | wire/projectors.py |
Pure assembly of a message list + AgentConfig into a gateway Conversation via per-role PROJECTORS |
SessionNode |
dataclass | contract/session.py |
One immutable DAG node: id (content hash), parent, turn, created_at (epoch ms) |
Migrator |
class | contract/session.py |
Protocol for one schema-upgrade step: id + upgrade(raw), composed in sequence on load |
Sub-directories
Internal layering is strict: contract (types) ← cadence (pure reducer) ← turn / dispatch / memory / store / ledger / wire (single-purpose drivers) ← conductor (the only orchestrator that touches all of them).
| Directory | Holds |
|---|---|
contract/ |
The frozen type vocabulary every module imports: run_state, signal, effect, tools, errors, events, session, config |
cadence/ |
The pure FSM: reducer.py (cadence, Transition, StepFn, initial_snapshot) and fold.py (fold_blocks/fold_turn/seal_tool_args/tool_calls_of/add_usage) |
conductor/ |
agent.py — the sole stateful layer: Agent, create_agent, AgentDeps, the _SignalQueue, the _drive loop, and all effect handlers |
dispatch/ |
scheduler.py — Scheduler/create_scheduler with bounded concurrency, per-call cancellation, completion-order ToolSettledSignals |
turn/ |
driver.py — drive_turn, the stateless Channel→Signal adapter for one model invocation |
memory/ |
compactor.py (should_compact/find_cut_point/summarize/compact, DISTILL_INSTRUCTION, DEFAULT_POLICY) and estimate.py |
store/ |
Content-addressed sessions: dag.py (SessionGraph/Clock), persist.py (SessionStore), hash.py (hash_node + canonical_json) |
ledger/ |
bus.py (RunLedger fan-out hub, RunEventHandler, Unsubscribe) and accumulator.py (SnapshotAccumulator) |
wire/ |
projectors.py — build_conversation + PROJECTORS/Projector/ProjectionContext, the table-driven (currently passthrough) message→Conversation assembly |
Effect / Signal loop
The reducer requests work as effects; the conductor performs each and feeds results back
as signals, closing the loop until a terminal phase. The agent loop is encoded as a
transition table keyed on (phase, signal.kind), not nested control flow:
submit → invoke_model · phase invoking
emission (text/thinking) → fold + publish delta · phase streaming
emission (tool start/delta) → fold + tool_started · phase streaming
emission (usage) → accumulate usage · (phase kept)
stream_end · has tool calls → run_tool × N · phase dispatching
stream_end · no tool calls → persist + settled · phase settled
tool_settled · more pending → mark done, keep waiting · phase dispatching
tool_settled · all done → append tool turn, · phase invoking
invoke_model (next turn)
compacted → splice summary, invoke · phase invoking
abort / fault / error → faulted · phase faulted
A submit seeds a blank AssistantTurn and emits an InvokeModelEffect (phase → invoking).
Streamed EmissionSignals fold into the open assistant turn (text/thinking deltas merge,
tool-call blocks open, raw JSON argument fragments concatenate; usage accumulates) and publish
TextDeltaEvent/ThinkingDeltaEvent/ToolStartedEvent. StreamEndSignal seals the buffered
tool args, then either settles (no tool calls → PersistEffect + SettledEvent) or
dispatches (one RunToolEffect per call, phase → dispatching). ToolSettledSignals arrive
in any order; once every pending tool is 'done' the reducer re-invokes the model.
The conductor's _drive() is the heartbeat: pull a signal from a future-backed _SignalQueue,
step the reducer, swap in the new snapshot, and perform the effects. A run is genuinely
finished only when it is terminal AND no effect is in flight AND no signal is buffered — so
a steering submit/compacted/fault that lands while a trailing persist/publish is still
draining can legitimately reopen the run.
Driving the pure FSM directly
Advanced hosts can bypass the conductor and drive the reducer themselves. cadence(config)
returns a step(state, signal) -> Transition; it performs no I/O and never awaits.
from indusagi.runtime import cadence, initial_snapshot, AgentConfig
from indusagi.runtime.contract import SubmitSignal
from indusagi.llmgateway.contract import UserTurn, TextBlock
step = cadence(AgentConfig(model="some-model")) # also exported as `step`
state = initial_snapshot(session_id="s1", model="some-model") # phase 'idle'
signal = SubmitSignal(input=(UserTurn(blocks=(TextBlock(text="hi"),)),))
transition = step(state, signal)
print(transition.state.phase) # 'invoking'
for effect in transition.effects:
print(type(effect).__name__) # InvokeModelEffect, PublishEffect
Events and snapshots
subscribe taps the live RunEvent stream and returns a disposer. A SnapshotAccumulator
folds the snapshot-bearing events into the single latest RunSnapshot; attach(ledger)
wires it to the agent's ledger and returns its own disposer.
from indusagi.runtime import create_agent, AgentConfig
from indusagi.runtime.contract import TextDeltaEvent
from indusagi.runtime.ledger import SnapshotAccumulator
agent = create_agent(AgentConfig(model="some-model"))
def on_event(event):
if isinstance(event, TextDeltaEvent):
print(event.delta, end="")
unsubscribe = agent.subscribe(on_event)
acc = SnapshotAccumulator()
acc_dispose = acc.attach(agent._ledger) # or feed acc.observe(event) from on_event
# ... await agent.submit(...) elsewhere ...
latest = acc.get_snapshot() # most recent RunSnapshot or None
unsubscribe()
acc_dispose()
The simplest path is to skip the accumulator entirely: agent.submit(...) already resolves the
terminal RunSnapshot, and agent.snapshot() reads the latest state synchronously at any time.
Tool dispatch
When AgentConfig.tools is present, the conductor builds a Scheduler over tools.runner.
One stream-end round's tool calls run as a single batch through the scheduler at most
DEFAULT_CONCURRENCY (8) at a time, each under a child CancelToken chained to the run's
active token. ToolSettledSignals are emitted in completion order, not request order, and each
appends a ToolResultBlock to the trailing ToolTurn. If a tool is requested but no runner is
configured, the round faults with 'tool_failed' rather than hanging.
Compaction
Condensation is not a reducer transition — it happens lazily at the conductor's invocation
boundary. Before opening a stream, if should_compact() trips on the live snapshot's history,
the older prefix is distilled into one summary turn and a compacted signal redirects the
reducer to re-invoke over the shrunken history. The functions are usable standalone:
from indusagi.runtime.memory import estimate_context_tokens, should_compact, find_cut_point
from indusagi.runtime.contract import CompactionPolicy
from indusagi.llmgateway.catalog import get_card
from indusagi.llmgateway.contract import UserTurn, TextBlock
messages = (UserTurn(blocks=(TextBlock(text="x" * 4000),)),)
print(estimate_context_tokens(messages)) # ~ len/4 + 4 per turn
card = get_card("some-model")
if card is not None:
policy = CompactionPolicy(trigger_ratio=0.8, keep_recent=8)
print(should_compact(messages, card, policy))
print(find_cut_point(messages, policy.keep_recent)) # tool-safe boundary index
The summary is filed under the user role (marked with SUMMARY_HEADING,
[condensed earlier context]) so it stays valid input for any provider. A pre-check on
find_cut_point(...) <= 1 avoids spending a model call when history is already as small as the
policy allows, and stops the invocation gate from re-triggering forever.
Sessions and persistence
When an AgentDeps.store is injected, settled history is appended to a content-addressed
SessionGraph. Each SessionNode's id is a SHA-256 hash of (parent, turn, created_at);
branching is two nodes sharing a parent, and the model's linear context is the root→leaf path.
Re-appending an identical turn is a no-op head advance, so re-persisting an unchanged prefix
writes nothing new. Persistence is advisory — a store error is swallowed so an otherwise
complete run still settles.
import asyncio
from indusagi.runtime.store import SessionStore, SessionGraph
from indusagi.llmgateway.contract import UserTurn, TextBlock
async def main() -> None:
store = SessionStore("/tmp/indus-sessions")
graph = SessionGraph("sess-1")
node = graph.append(UserTurn(blocks=(TextBlock(text="remember this"),)))
await store.append_node("sess-1", node)
reloaded = await store.load_session("sess-1")
leaf = reloaded.leaf
if leaf is not None:
print(reloaded.path_to(leaf)) # root->leaf turns
print(await store.list_sessions()) # ['sess-1']
asyncio.run(main())
An Agent with a store can await agent.resume(session_id) to re-seat its in-memory snapshot
on the rehydrated history; the phase returns to 'idle' so the next submit continues the
conversation from where it left off.
Key concepts
- Pure reducer / cadence —
cadence(config)returns a side-effect-freestep(state, signal) -> Transition. It derives a fresh immutableRunSnapshotand a flat tuple ofEffects per signal; it never awaits or does I/O. - Effect / Signal loop — the reducer requests work (invoke_model, run_tool, persist, compact, publish); the conductor performs each and feeds results back as signals (emission, stream_end, tool_settled, compacted, abort, fault).
- RunSnapshot (event-sourced state) — the single immutable source of truth, never mutated
in place; every transition produces a new snapshot via
dataclasses.replace. - Content-addressed session DAG — history is an immutable DAG keyed by content hash; the model's context is the root→leaf path.
- Lazy compaction gate — condensation fires at the conductor's invocation boundary, not as a reducer transition.
- Steering submit — a
submit()arriving mid-run is folded into the live signal queue rather than starting a competing loop; the caller awaits the shared driving task. - Turn budget — the drive loop counts
InvokeModelEffects and force-faults with'turn_budget'once they would exceedmax_turns(default 64), guarding against a model that loops forever. - Bounded-concurrency tool dispatch — one round's calls run as a single batch at most
DEFAULT_CONCURRENCY(8) at a time, each under a childCancelToken.
Notable behavior
- The compaction gate in
_on_invoke_modelreads the liveself._snapshot.messages, not theconversationpayload captured in the effect, so it always sizes the freshest history. AbortSignalInputis named that way (notAbortSignal) to avoid shadowing the DOM global.RunPhaseincludes'compacting', but the reducer never sets it — compaction is handled at the conductor's invocation boundary viacompactedsignals, so the phase is effectively unused.abort()hard-cancels the model-stream tasks in addition to firing the cooperative token, because a connector parked on a network read may not poll the token until its next chunk.CancelledErroris consistently re-raised and never folded into a fault or swallowed by ledger fan-out.- The
wire/projectors.pyprojectors are identity passthroughs today (history is already stored in gateway turn shape) — a deliberate seam, not dead code.
Relationship to neighbors
The runtime depends on LLM Gateway for its provider-neutral
vocabulary (Turn, Block, Emission, Conversation, StreamOptions, Channel, Usage,
ModelCard, ToolDescriptor, ThinkingLevel) and for the stream function plus
catalog.get_card used by the default ModelInvoker and the compaction window sizing. It
deliberately does not re-export those gateway types — they have a single source of truth in
the gateway, imported straight from indusagi.llmgateway or indusagi.llmgateway.contract.
Downstream, the Capabilities tool kernel and the
Shell App build a ToolBox/ToolRunner, then call
create_agent(...) and submit/subscribe. The high-level agent facade
is the ergonomic front door over this engine, and the ai facade wraps the
gateway it talks to.
Back to the Architecture overview.
