Subsystemssubsystems/runtime

Runtime

indusagi.runtime is the engine that drives one model conversation from a prompt to settlement. Imported as import indusagi.runtime (or from indusagi import runtime), it pairs a pure finite-state reducer with a single stateful conductor that carries out the side effects the reducer asks for.

The runtime is split in two. The pure core is an immutable RunSnapshot advanced by a side-effect-free reducer (cadence) that returns Effects and never touches I/O. The thin stateful conductor (Agent / create_agent) interprets those effects — reach the model, run tools, condense history, persist, publish — and folds the resulting Signals back into the reducer until the run settles or faults. It owns the full agent loop: streamed emissions fold into an assistant turn, requested tools are dispatched with bounded concurrency, history is condensed when it nears the model's window, settled history is persisted to a content-addressed session DAG, and progress is published as RunEvents.

Table of Contents

Quickstart

create_agent builds a runnable Agent from a static AgentConfig. submit drives one prompt to settlement; the model entrypoint is injectable, so a test can script it without a network. A Channel is just an async iterable of Emissions.

import asyncio
from indusagi.runtime import create_agent, AgentConfig, AgentDeps
from indusagi.llmgateway.contract import (
    Conversation, StreamOptions, TextEmission, DoneEmission,
    AssistantTurn, TextBlock, Usage,
)


async def scripted(conversation: Conversation, options: StreamOptions):
    yield TextEmission(delta="Hello, ")
    yield TextEmission(delta="world!")
    reply = AssistantTurn(blocks=(TextBlock(text="Hello, world!"),))
    yield DoneEmission(reply=reply, usage=Usage(input_tokens=10, output_tokens=3))


def invoke(conversation, options):
    return scripted(conversation, options)


async def main() -> None:
    agent = create_agent(
        AgentConfig(model="some-model"),
        AgentDeps(invoke_model=invoke),
    )
    final = await agent.submit("hi")   # string sugar -> a single user text turn
    print(final.phase)                 # 'settled'
    print(final.messages[-1])          # the folded AssistantTurn


asyncio.run(main())

create_agent(config, deps=None) accepts an AgentConfigmodel (a catalog id) is required; system, tools, max_output_tokens, thinking, compaction, and max_turns are optional. submit takes a bare string (sugar for one user turn) or a sequence of Turns and returns the terminal RunSnapshot.

Public API

The package barrel (indusagi.runtime) exposes the host-facing factory, its handle, the type vocabulary callers branch on, and the pure transition machine for advanced hosts.

Name Kind Source Purpose
create_agent function conductor/agent.py The single entry point: build a runnable Agent from an AgentConfig (+ optional AgentDeps), wiring the pure reducer to real effect handlers
Agent class conductor/agent.py The driveable handle: await submit(input), subscribe(handler), abort(), snapshot(), await resume(session_id)
AgentDeps dataclass conductor/agent.py Frozen, optional injectable collaborators: invoke_model (defaults to the gateway), store, ledger
AgentEventHandler type conductor/agent.py Alias of RunEventHandler — the Callable[[RunEvent], None] shape registered with Agent.subscribe
cadence function cadence/reducer.py The pure FSM factory: cadence(config) binds the static AgentConfig and returns a StepFn
step function runtime/__init__.py Re-export alias of cadence, for hosts that drive the FSM themselves
Transition dataclass cadence/reducer.py The result of one step: state: RunSnapshot and effects: tuple[Effect, ...]
StepFn type cadence/reducer.py Callable[[RunSnapshot, Signal], Transition] — a reducer bound to its config
initial_snapshot function cadence/reducer.py Build a clean starting RunSnapshot (phase 'idle') for a session id + model
RunSnapshot dataclass contract/run_state.py Frozen, event-sourced state: run_id, session_id, phase, messages, pending, usage_total, model, optional error
RunPhase type contract/run_state.py Literal macro-states: 'idle', 'invoking', 'streaming', 'dispatching', 'compacting', 'settled', 'faulted'
PendingTool dataclass contract/run_state.py One outstanding tool, stage 'queued''running''done' (id, name, stage)
AgentConfig dataclass contract/config.py Immutable run setup: model, system, tools (ToolBox), max_output_tokens, thinking, compaction, max_turns (default 64)
CompactionPolicy dataclass contract/config.py Threshold policy: trigger_ratio (fraction of window) and keep_recent (trailing turns kept verbatim)
ToolBox class contract/tools.py Protocol bundling the model-visible catalog (descriptors()) with the runner that fulfils calls
ToolRunner class contract/tools.py Protocol: run(call, cancel) -> Awaitable[ToolOutcome] — the injectable side that executes tools
ToolCall dataclass contract/tools.py A parsed request to invoke a named tool: id, name, input
ToolOutcome dataclass contract/tools.py Result of one ToolCall: id, output, is_error
RunError dataclass contract/errors.py Normalized failure: kind (RunErrorKind), message, optional cause; becomes the error of a faulted snapshot
RunErrorKind type contract/errors.py Literal classes: 'model_failed', 'tool_failed', 'aborted', 'compaction_failed', 'turn_budget', 'invalid_state'
run_error function contract/errors.py Convenience constructor for RunError(kind, message, cause)
RunEvent type contract/events.py Tagged union of notifications: SnapshotEvent, TextDeltaEvent, ThinkingDeltaEvent, ToolStartedEvent, ToolFinishedEvent, SettledEvent, FaultedEvent
ModelInvoker type contract/events.py Callable[[Conversation, StreamOptions], Channel] — the single injectable seam to a model

Effects, signals, and the lower-layer drivers live on the sub-packages below.

Name Kind Source Purpose
Effect type contract/effect.py Tagged union of requested side-effects: InvokeModelEffect, RunToolEffect, PersistEffect, CompactEffect, PublishEffect
Signal type contract/signal.py Tagged union driving the machine: SubmitSignal, EmissionSignal, StreamEndSignal, ToolSettledSignal, CompactedSignal, AbortSignalInput, FaultSignal
RunLedger class ledger/bus.py Synchronous fan-out hub: subscribe/publish/clear/size; isolates throwing subscribers; snapshots the handler set per publish
SnapshotAccumulator class ledger/accumulator.py Folds snapshot-bearing events into the latest RunSnapshot: observe/get_snapshot/reset/attach(ledger)
Scheduler class dispatch/scheduler.py Tool scheduler over a ToolRunner: run(calls, cancel) async-yields ToolSettledSignals in completion order with bounded concurrency (DEFAULT_CONCURRENCY=8); cancel() aborts
create_scheduler function dispatch/scheduler.py Build a Scheduler over a ToolRunner (optional concurrency)
drive_turn async function turn/driver.py Adapter turning a Channel into a Signal stream: emission → EmissionSignal, clean exhaustion → StreamEndSignal, raised error → FaultSignal (CancelledError re-raised)
compact async function memory/compactor.py Condense a history: replace the older prefix with one distilled summary turn, keep the recent tail verbatim
should_compact function memory/compactor.py Decide if estimated tokens >= trigger_ratio * context_window (False on non-positive window)
find_cut_point function memory/compactor.py Locate the prefix/tail boundary, nudged forward so a tool_call/tool_result pair is never split
summarize async function memory/compactor.py Distill a stretch of turns into one synthetic user-role summary via the model and DISTILL_INSTRUCTION
estimate_context_tokens function memory/estimate.py Heuristic token count: chars/4 (CHARS_PER_TOKEN) plus 4 (TURN_OVERHEAD_TOKENS) per turn
SessionGraph class store/dag.py In-memory content-addressed DAG: append/branch_from/path_to/resume/hydrate; tracks a moving leaf head
SessionStore class store/persist.py Filesystem JSONL store of session DAGs: append_node, load_session (replays + migrator pipeline), list_sessions
hash_node function store/hash.py SHA-256 content hash (truncated to 32 hex chars) over parent id + canonical-JSON turn + created_at
build_conversation function wire/projectors.py Pure assembly of a message list + AgentConfig into a gateway Conversation via per-role PROJECTORS
SessionNode dataclass contract/session.py One immutable DAG node: id (content hash), parent, turn, created_at (epoch ms)
Migrator class contract/session.py Protocol for one schema-upgrade step: id + upgrade(raw), composed in sequence on load

Sub-directories

Internal layering is strict: contract (types) ← cadence (pure reducer) ← turn / dispatch / memory / store / ledger / wire (single-purpose drivers) ← conductor (the only orchestrator that touches all of them).

Directory Holds
contract/ The frozen type vocabulary every module imports: run_state, signal, effect, tools, errors, events, session, config
cadence/ The pure FSM: reducer.py (cadence, Transition, StepFn, initial_snapshot) and fold.py (fold_blocks/fold_turn/seal_tool_args/tool_calls_of/add_usage)
conductor/ agent.py — the sole stateful layer: Agent, create_agent, AgentDeps, the _SignalQueue, the _drive loop, and all effect handlers
dispatch/ scheduler.pyScheduler/create_scheduler with bounded concurrency, per-call cancellation, completion-order ToolSettledSignals
turn/ driver.pydrive_turn, the stateless Channel→Signal adapter for one model invocation
memory/ compactor.py (should_compact/find_cut_point/summarize/compact, DISTILL_INSTRUCTION, DEFAULT_POLICY) and estimate.py
store/ Content-addressed sessions: dag.py (SessionGraph/Clock), persist.py (SessionStore), hash.py (hash_node + canonical_json)
ledger/ bus.py (RunLedger fan-out hub, RunEventHandler, Unsubscribe) and accumulator.py (SnapshotAccumulator)
wire/ projectors.pybuild_conversation + PROJECTORS/Projector/ProjectionContext, the table-driven (currently passthrough) message→Conversation assembly

Effect / Signal loop

The reducer requests work as effects; the conductor performs each and feeds results back as signals, closing the loop until a terminal phase. The agent loop is encoded as a transition table keyed on (phase, signal.kind), not nested control flow:

submit                         → invoke_model              · phase invoking
emission (text/thinking)       → fold + publish delta       · phase streaming
emission (tool start/delta)    → fold + tool_started        · phase streaming
emission (usage)               → accumulate usage           · (phase kept)
stream_end · has tool calls    → run_tool × N               · phase dispatching
stream_end · no tool calls     → persist + settled          · phase settled
tool_settled · more pending    → mark done, keep waiting    · phase dispatching
tool_settled · all done        → append tool turn,          · phase invoking
                                  invoke_model (next turn)
compacted                      → splice summary, invoke     · phase invoking
abort / fault / error          → faulted                    · phase faulted

A submit seeds a blank AssistantTurn and emits an InvokeModelEffect (phase → invoking). Streamed EmissionSignals fold into the open assistant turn (text/thinking deltas merge, tool-call blocks open, raw JSON argument fragments concatenate; usage accumulates) and publish TextDeltaEvent/ThinkingDeltaEvent/ToolStartedEvent. StreamEndSignal seals the buffered tool args, then either settles (no tool calls → PersistEffect + SettledEvent) or dispatches (one RunToolEffect per call, phase → dispatching). ToolSettledSignals arrive in any order; once every pending tool is 'done' the reducer re-invokes the model.

The conductor's _drive() is the heartbeat: pull a signal from a future-backed _SignalQueue, step the reducer, swap in the new snapshot, and perform the effects. A run is genuinely finished only when it is terminal AND no effect is in flight AND no signal is buffered — so a steering submit/compacted/fault that lands while a trailing persist/publish is still draining can legitimately reopen the run.

Driving the pure FSM directly

Advanced hosts can bypass the conductor and drive the reducer themselves. cadence(config) returns a step(state, signal) -> Transition; it performs no I/O and never awaits.

from indusagi.runtime import cadence, initial_snapshot, AgentConfig
from indusagi.runtime.contract import SubmitSignal
from indusagi.llmgateway.contract import UserTurn, TextBlock

step = cadence(AgentConfig(model="some-model"))            # also exported as `step`
state = initial_snapshot(session_id="s1", model="some-model")  # phase 'idle'

signal = SubmitSignal(input=(UserTurn(blocks=(TextBlock(text="hi"),)),))
transition = step(state, signal)
print(transition.state.phase)        # 'invoking'
for effect in transition.effects:
    print(type(effect).__name__)     # InvokeModelEffect, PublishEffect

Events and snapshots

subscribe taps the live RunEvent stream and returns a disposer. A SnapshotAccumulator folds the snapshot-bearing events into the single latest RunSnapshot; attach(ledger) wires it to the agent's ledger and returns its own disposer.

from indusagi.runtime import create_agent, AgentConfig
from indusagi.runtime.contract import TextDeltaEvent
from indusagi.runtime.ledger import SnapshotAccumulator

agent = create_agent(AgentConfig(model="some-model"))


def on_event(event):
    if isinstance(event, TextDeltaEvent):
        print(event.delta, end="")


unsubscribe = agent.subscribe(on_event)

acc = SnapshotAccumulator()
acc_dispose = acc.attach(agent._ledger)   # or feed acc.observe(event) from on_event

# ... await agent.submit(...) elsewhere ...
latest = acc.get_snapshot()               # most recent RunSnapshot or None
unsubscribe()
acc_dispose()

The simplest path is to skip the accumulator entirely: agent.submit(...) already resolves the terminal RunSnapshot, and agent.snapshot() reads the latest state synchronously at any time.

Tool dispatch

When AgentConfig.tools is present, the conductor builds a Scheduler over tools.runner. One stream-end round's tool calls run as a single batch through the scheduler at most DEFAULT_CONCURRENCY (8) at a time, each under a child CancelToken chained to the run's active token. ToolSettledSignals are emitted in completion order, not request order, and each appends a ToolResultBlock to the trailing ToolTurn. If a tool is requested but no runner is configured, the round faults with 'tool_failed' rather than hanging.

Compaction

Condensation is not a reducer transition — it happens lazily at the conductor's invocation boundary. Before opening a stream, if should_compact() trips on the live snapshot's history, the older prefix is distilled into one summary turn and a compacted signal redirects the reducer to re-invoke over the shrunken history. The functions are usable standalone:

from indusagi.runtime.memory import estimate_context_tokens, should_compact, find_cut_point
from indusagi.runtime.contract import CompactionPolicy
from indusagi.llmgateway.catalog import get_card
from indusagi.llmgateway.contract import UserTurn, TextBlock

messages = (UserTurn(blocks=(TextBlock(text="x" * 4000),)),)
print(estimate_context_tokens(messages))   # ~ len/4 + 4 per turn

card = get_card("some-model")
if card is not None:
    policy = CompactionPolicy(trigger_ratio=0.8, keep_recent=8)
    print(should_compact(messages, card, policy))
    print(find_cut_point(messages, policy.keep_recent))   # tool-safe boundary index

The summary is filed under the user role (marked with SUMMARY_HEADING, [condensed earlier context]) so it stays valid input for any provider. A pre-check on find_cut_point(...) <= 1 avoids spending a model call when history is already as small as the policy allows, and stops the invocation gate from re-triggering forever.

Sessions and persistence

When an AgentDeps.store is injected, settled history is appended to a content-addressed SessionGraph. Each SessionNode's id is a SHA-256 hash of (parent, turn, created_at); branching is two nodes sharing a parent, and the model's linear context is the root→leaf path. Re-appending an identical turn is a no-op head advance, so re-persisting an unchanged prefix writes nothing new. Persistence is advisory — a store error is swallowed so an otherwise complete run still settles.

import asyncio
from indusagi.runtime.store import SessionStore, SessionGraph
from indusagi.llmgateway.contract import UserTurn, TextBlock


async def main() -> None:
    store = SessionStore("/tmp/indus-sessions")
    graph = SessionGraph("sess-1")
    node = graph.append(UserTurn(blocks=(TextBlock(text="remember this"),)))
    await store.append_node("sess-1", node)

    reloaded = await store.load_session("sess-1")
    leaf = reloaded.leaf
    if leaf is not None:
        print(reloaded.path_to(leaf))         # root->leaf turns
    print(await store.list_sessions())        # ['sess-1']


asyncio.run(main())

An Agent with a store can await agent.resume(session_id) to re-seat its in-memory snapshot on the rehydrated history; the phase returns to 'idle' so the next submit continues the conversation from where it left off.

Key concepts

  • Pure reducer / cadencecadence(config) returns a side-effect-free step(state, signal) -> Transition. It derives a fresh immutable RunSnapshot and a flat tuple of Effects per signal; it never awaits or does I/O.
  • Effect / Signal loop — the reducer requests work (invoke_model, run_tool, persist, compact, publish); the conductor performs each and feeds results back as signals (emission, stream_end, tool_settled, compacted, abort, fault).
  • RunSnapshot (event-sourced state) — the single immutable source of truth, never mutated in place; every transition produces a new snapshot via dataclasses.replace.
  • Content-addressed session DAG — history is an immutable DAG keyed by content hash; the model's context is the root→leaf path.
  • Lazy compaction gate — condensation fires at the conductor's invocation boundary, not as a reducer transition.
  • Steering submit — a submit() arriving mid-run is folded into the live signal queue rather than starting a competing loop; the caller awaits the shared driving task.
  • Turn budget — the drive loop counts InvokeModelEffects and force-faults with 'turn_budget' once they would exceed max_turns (default 64), guarding against a model that loops forever.
  • Bounded-concurrency tool dispatch — one round's calls run as a single batch at most DEFAULT_CONCURRENCY (8) at a time, each under a child CancelToken.

Notable behavior

  • The compaction gate in _on_invoke_model reads the live self._snapshot.messages, not the conversation payload captured in the effect, so it always sizes the freshest history.
  • AbortSignalInput is named that way (not AbortSignal) to avoid shadowing the DOM global.
  • RunPhase includes 'compacting', but the reducer never sets it — compaction is handled at the conductor's invocation boundary via compacted signals, so the phase is effectively unused.
  • abort() hard-cancels the model-stream tasks in addition to firing the cooperative token, because a connector parked on a network read may not poll the token until its next chunk.
  • CancelledError is consistently re-raised and never folded into a fault or swallowed by ledger fan-out.
  • The wire/projectors.py projectors are identity passthroughs today (history is already stored in gateway turn shape) — a deliberate seam, not dead code.

Relationship to neighbors

The runtime depends on LLM Gateway for its provider-neutral vocabulary (Turn, Block, Emission, Conversation, StreamOptions, Channel, Usage, ModelCard, ToolDescriptor, ThinkingLevel) and for the stream function plus catalog.get_card used by the default ModelInvoker and the compaction window sizing. It deliberately does not re-export those gateway types — they have a single source of truth in the gateway, imported straight from indusagi.llmgateway or indusagi.llmgateway.contract.

Downstream, the Capabilities tool kernel and the Shell App build a ToolBox/ToolRunner, then call create_agent(...) and submit/subscribe. The high-level agent facade is the ergonomic front door over this engine, and the ai facade wraps the gateway it talks to.

Back to the Architecture overview.