Conductor
The conductor is
induscode's product-level session-orchestration layer — it wraps exactly one frameworkAgentand drives a prompt to settlement, re-emits a stable product signal stream, persists a branchable transcript, retries transient faults, and supports abort. Reach it withfrom induscode.conductor import create_session_conductor; it is the runtime core everypindusrun mode (interactive console,-pprint,--jsonlink) drives.
The indusagi framework Agent is a raw LLM conversation loop: it streams a fine-grained AgentEvent surface and owns a flat message list. The conductor turns one such agent into a coding-agent session. It drives turns to settlement, projects the framework loop events into a distinct, stable SessionSignal stream, persists every produced message into an on-disk transcript tree, retries transient model faults with backoff, auto-condenses when the transcript nears the context window, surfaces typed discriminated faults (never string sentinels), and maintains an immutable ConductorState snapshot through a pure reducer.
Table of Contents
- What the Conductor Owns
- The Public Surface
- Assembling a Conductor
- Driving a Turn to Settlement
- The Signal Stream
- State and the Pure Reducer
- Typed Faults
- Retry and Abort
- The Queue: Re-entrant Input
- Persistence and Branching
- Condense Seam
- Model Catalog and Matcher
- In-session Shell
- Source Layout
What the Conductor Owns
The conductor is the typed seam between the induscode product (its UI/print/RPC channels) and the framework runtime. It does not re-declare framework shapes — it composes Agent, AgentEvent, AgentTool, ThinkingLevel from indusagi.agent and AgentMessage, Model, Usage, KnownProvider from indusagi.ai. Crucially, the framework's AgentEvent loop stream is deliberately not the same union as the product SessionSignal stream: the conductor consumes the former internally and re-emits the latter, so the app surface can evolve independently of the framework loop.
Within one session the conductor owns:
- The turn loop — drive a prompt through retry, project events to signals, persist the per-turn tail, condense, settle.
- The signal bus (
SignalHub) — a synchronous fan-out overSessionSignal. - The transcript (
TranscriptStore) — an append-only, branchable on-disk tree. - The model catalog/matcher — a normalized, validated view over the framework model registry plus a scored resolver.
- The immutable state snapshot (
ConductorState) maintained by the purereduce_statereducer. - The pending-input queue, fork/navigate branch ops, resume/new-session lifecycle, in-session shell, and skill-block parsing.
The Public Surface
SessionConductor (a Protocol in contract.py) is the full product surface all three run modes drive. The concrete orchestrator is SessionConductorImpl; it is constructed only through the factory.
| Name | Kind | Source | Purpose |
|---|---|---|---|
SessionConductor |
Protocol | contract.py |
The full product surface: submit/enqueue, subscribe, abort, snapshot, resume, new_session, fork/navigate_tree, condense, execute_bash, stats, model select/cycle, thinking-level cycle, session naming, pending-queue ops. |
SessionConductorImpl |
class | conductor.py |
The concrete turn-loop orchestrator. Only orchestrates — built via the factory. |
create_session_conductor |
function | conductor.py |
Factory that resolves or accepts every collaborator (agent/store/hub/matcher/condense/retry) and returns a SessionConductor. |
ConductorDeps |
dataclass | conductor.py |
Frozen bundle of all-optional injectable collaborators (agent, store, hub, matcher, condense, retry, compactAt, sleep) so tests run with no network and no disk. |
SessionConductorOptions |
dataclass | contract.py |
Assembly-time config: modelId (required) plus system/tools/thinking/workspace/sessionsDir/autoCompact/getApiKey. |
ConductorState |
dataclass | contract.py |
Immutable observable snapshot: phase, head, cumulative usage, contextTokens, modelId, optional fault. |
reduce_state |
function | contract.py |
Pure transition over ConductorState; always returns a fresh object. Raises ValueError on an unknown action. |
ConductorFault / conductor_fault |
dataclass / function | contract.py |
Typed discriminated failure value plus its single sanctioned constructor; emitted on fault signals, never raised. |
SessionSignal |
type | contract.py |
The product signal union (see The Signal Stream). |
SignalHub |
class | signal_hub.py |
Synchronous insertion-ordered fan-out bus over SessionSignal. |
translate_agent_event / TRANSLATOR_TABLE |
function / const | signal_hub.py |
Pure projection of one AgentEvent into zero-or-more SessionSignals, backed by a tag→rule dispatch table. |
The full surface (including transcript, serialization, catalog, and matcher symbols) is re-exported from induscode.conductor.__all__ — import from the package root, never from individual files.
Assembling a Conductor
create_session_conductor(options, deps=None) resolves every collaborator and returns the orchestrator. Only SessionConductorOptions.modelId is required; the factory defaults a live LazyAgent, a filesystem-or-memory TranscriptStore, and a ModelMatcher over ModelCatalog.
import asyncio
from induscode.conductor import (
create_session_conductor,
SessionConductorOptions,
TextSignal,
)
async def main():
conductor = create_session_conductor(
SessionConductorOptions(
modelId="anthropic/claude-sonnet-4",
system="You are a coding assistant.",
sessionsDir="~/.pindusagi/sessions", # persist + resumable
autoCompact=True,
)
)
# Subscribe before submitting; subscribe() returns an unsubscribe thunk.
def on_signal(sig):
if isinstance(sig, TextSignal):
print(sig.delta, end="")
unsubscribe = conductor.subscribe(on_signal)
state = await conductor.submit("explain this repository")
print("\nsettled phase:", state.phase)
unsubscribe()
asyncio.run(main())
SessionConductorOptions fields:
| Field | Type | Purpose |
|---|---|---|
modelId |
str |
Required. Canonical id (provider/modelId) to bind the session to. |
system |
str | None |
Initial system prompt seeding the conversation. |
tools |
Sequence[AgentTool] | None |
Tools made available to the agent. The flat capability list is threaded verbatim. |
thinking |
ThinkingLevel | None |
Initial reasoning effort for models that support it. |
workspace |
str | None |
Working directory the session is scoped to (defaults to process cwd). |
sessionsDir |
str | None |
When set, the transcript store uses a filesystem backend (one <sessionId>.ndjson per session); None keeps an in-memory store. |
autoCompact |
bool | None |
Auto-condense when the transcript nears the window. |
getApiKey |
Callable |
Per-call credential resolver threaded to the framework Agent (sync or async; None falls back to env lookup). |
ConductorDeps injects any collaborator for tests — pass a scripted-fake agent to run with no network, an in-memory store to run with no disk, or a sleep to make retry backoff deterministic. LazyAgent is a thin AgentLike wrapper that constructs the real framework Agent only on first use, so a live-path conductor stays cheap until the first turn.
Driving a Turn to Settlement
submit(input) is the single entry point that drives a prompt to settlement. The orchestration, inside SessionConductorImpl:
- Busy guard. If a turn is already in flight, the input is enqueued (never dropped) and the current snapshot returns immediately. Otherwise the busy flag is set and
_run_turnruns. - Skill unwrap.
parse_skill_invocationunwraps a leading<skill name=... location=...>body</skill>block to its body text (routing the named skill is a higher layer's job). - Baseline + phase. Record the message baseline (
len(agent.state.messages)), dispatch aPhaseAction(streaming). - Drive with retry.
_run_with_retry(prompt)subscribes_on_agent_eventto the framework agent's raw stream and awaitsagent.prompt(prompt), classifying transient faults (see Retry and Abort). - Project events. During the call,
_on_agent_eventkeeps an ordered in-flight-tool list (tool_execution_startappends and flips phase→tooling;tool_execution_endremoves by id and flips phase→streamingwhen empty), runstranslate_agent_event, and emits each resultingSessionSignalon the hub.turn_endfolds usage and records the latest-turncontextTokens; a streamingfaultdispatches aFaultAction. - Persist tail. On a clean turn,
_persist_tail(baseline)appends every message beyond the baseline to the transcript (onepersistedsignal per node). - Condense.
_maybe_condenseruns the pluggableCondenseFnwhen auto-compact is on and the branch length exceedscompactAt(default 200). - Settle. If still not faulted, dispatch
SettledAction+HeadActionand emitidle. - Drain. The finished turn drains the queue (
steerentries first), each as its own turn.
A faulted turn keeps its typed fault and does not persist a tail, condense, or settle on top of it — it stays faulted and the caller reads fault off the returned state.
The Signal Stream
The conductor re-emits a distinct, stable SessionSignal union — the surface the app renders. Subscribe via subscribe(handler), which returns an idempotent unsubscribe thunk. SignalHub fans out synchronously in insertion order over a snapshot of the handler list, isolating a throwing handler via on_handler_error.
| Signal | Discriminant kind |
Carries | Meaning |
|---|---|---|---|
PromptSignal |
prompt |
text |
The user's turn was committed (emitted before the model replies). |
TextSignal |
text |
delta |
A chunk of assistant answer text. |
ThinkingSignal |
thinking |
delta |
A chunk of reasoning/thinking text. |
ToolStartSignal |
tool_start |
id, name |
A tool invocation began (correlate by id). |
ToolEndSignal |
tool_end |
id, ok |
A tool invocation finished. |
TurnEndSignal |
turn_end |
usage |
The assistant turn settled; reports token spend. |
PersistedSignal |
persisted |
entryId |
The latest node was committed to the transcript. |
CompactedSignal |
compacted |
— | The transcript was condensed to fit the window. |
FaultSignal |
fault |
fault |
A typed ConductorFault occurred. |
QueueSignal |
queue |
count |
The pending-input queue changed; new depth. |
IdleSignal |
idle |
— | No in-flight work; ready for input. |
translate_agent_event is a pure projection backed by TRANSLATOR_TABLE (one rule per AgentEvent member). The key mappings: message_end(user) → prompt; message_end(assistant errored) → fault; tool_execution_start/end → tool_start/tool_end; turn_end → turn_end(usage); and message_update's inner AssistantMessageEvent (a best-effort dict) projects text_delta → text, thinking_delta → thinking, and error → a model fault.
State and the Pure Reducer
ConductorState is an immutable snapshot, never a live view. Read it with snapshot(); every transition runs through reduce_state, which always returns a brand-new object.
@dataclass(frozen=True, slots=True)
class ConductorState:
phase: ConductorPhase # idle | streaming | tooling | condensing | faulted
head: SessionHead # session id + active leaf
usage: Usage # cumulative token/cost spend
contextTokens: int # latest-turn window occupancy (NOT cumulative)
modelId: str
fault: ConductorFault | None = None
A subtle correctness point: contextTokens is the latest turn's window occupancy — it replaces rather than accumulates — so the footer ctx:% divides by the context window correctly. It is distinct from cumulative usage.totalTokens, which grows unbounded across turns and would inflate the ratio.
reduce_state(prev, action) handles the typed StateAction union — PhaseAction, HeadAction, UsageAction, ModelAction, FaultAction, SettledAction. settled clears any prior fault and returns to idle; fault records the fault and flips to faulted. An action outside the union fails loud with ValueError (the TS switch was compiler-exhaustive; the port fails at runtime).
Typed Faults
Faults are typed discriminated values, not raised exceptions and never string sentinels. A consumer switches on fault.kind, not on substring matching.
@dataclass(frozen=True, slots=True)
class ConductorFault:
kind: FaultKind # model | tool | persistence | aborted | overflow
message: str
cause: object | None = None
kind |
Recovery story |
|---|---|
model |
The LLM call itself failed (transport, provider, decode). |
tool |
A tool invocation threw or returned a hard error. |
persistence |
Writing/reading the on-disk transcript failed. |
aborted |
The caller cancelled the in-flight turn via abort(). |
overflow |
The context window was exceeded and not condensable. |
Mint one only with conductor_fault(kind, message, cause=None). A fault is dispatched into state (flipping phase to faulted) and emitted as a FaultSignal.
Retry and Abort
_run_with_retry retries transient model faults with exponential backoff. On a raised Exception, _is_transient classifies it by substring markers (429, rate limit, overloaded, timeout, 5xx, unavailable, …). A transient error retries up to RetryPolicy.maxAttempts with a delay of baseDelayMs * 2 ** (attempt - 1); the default DEFAULT_RETRY is maxAttempts=2, baseDelayMs=250. Exhausting the budget or hitting a non-transient raise dispatches a typed model FaultAction and ends the loop.
asyncio.CancelledError always propagates — it is never retried and never converted to a fault (it derives from BaseException on 3.11+, and every defensive except re-raises it explicitly).
abort() cancels the in-flight turn on the agent, dispatches an aborted FaultAction, emits a FaultSignal, and pulses idle:
state = conductor.snapshot()
if state.phase in ("streaming", "tooling"):
conductor.abort() # -> aborted fault, then idle
The Queue: Re-entrant Input
submit while busy does not drop input and does not fault — it hands the input to enqueue (mode followUp by default) and returns the current snapshot. The in-flight turn drains the queue once it settles, each entry running as its own turn. steer-mode entries are taken ahead of plain follow-ups so an interrupt is honored first. Each queue change emits a queue signal.
| Method | Purpose |
|---|---|
enqueue(input, mode="followUp") |
File an input for a later turn (steer or followUp). |
pending_count() / pending_inputs() |
Depth and a read-only view (oldest first). |
clear_queue() |
Discard every queued input. |
dequeue_last() |
Pop the most-recently queued text back into a prompt. |
Persistence and Branching
TranscriptStore is an append-only, branchable tree bound to one session id, over a pluggable TranscriptBackend (fs_backend(dir) writes <sessionId>.ndjson via asyncio.to_thread; memory_backend is the default). Derived state is rebuilt by the pure replay reducer (node table + head from an ordered entry list and a leaf id).
| Primitive | Behaviour |
|---|---|
append(message, ...) |
Mint a ULID node parented at the current leaf, encode via serialize.encode_entry, advance the head. |
branch_at(entryId) |
Repoint the leaf and rewrite the whole file. |
path_to() |
Walk parent links leaf→root, reversed (the active branch). |
load(sessionId) / resume |
Read the backend, parse via parse_session_text, rebuild via replay, rebind the agent to the active branch. |
The conductor exposes branch operations on top of this: fork(entryId) opens a new branch parented at a prior node and rebinds the agent's message list; navigate_tree(nodeId) walks between existing branches without forking. resume(sessionId) restores a persisted session; new_session() opens a fresh session id while leaving the prior transcript file intact on disk (this is what /clear and /new drive — see Slash Commands).
serialize.message_to_dict / message_from_dict is the single app-wide message⇄dict codec, reused by channels and transcript export. The on-disk vocabulary (TranscriptEntry, SessionHead) is the conductor's own — node ids are ULIDs, session ids are s_<epoch36>_<rand8>, and the schema is the namespaced indus/transcript@1.
Condense Seam
CondenseFn is the pluggable condense hook: given the active branch's messages, it returns the (smaller) list to replace them with. The conductor module itself does no real compaction — noop_condense (an identity hook) is the default. Real compaction arrives only when the production window-budget engine is wired in as a non-default CondenseFn (it structurally satisfies the protocol). The framework's own runtime.memory compactor is deliberately never wired into this seam — the two compaction engines stay distinct. See the framework runtime for the framework-side compactor.
Model Catalog and Matcher
ModelCatalog is a normalized, validated view over the framework model list (get_models / get_providers), keyed by canonical provider/modelId. It runs a validate → normalize → key pipeline; the mock provider is filtered out of the live catalog (it must never auto-select or appear in the picker), though tests can inject it via CatalogSource. See Models for the catalog surface.
ModelMatcher is a scored-candidate resolver over the catalog — resolve / resolve_card / resolve_all return best-first ModelCardRef(s) using a prioritized scoring pipeline:
| Match | Score |
|---|---|
| Pin | 1000 |
| Exact canonical id | 900 |
| Exact model id | 800 |
| Provider | 600 |
| Glob | 400 |
| Substring | 200 |
select_model(id) and cycle_model(id) both route through _bind_model: resolve the card, bind the full framework Model onto the agent, re-apply the tracked reasoning effort, and dispatch a ModelAction. A miss surfaces a typed model fault and leaves the current selection untouched. available_models() lists picker entries best-first (or [] when no matcher is wired).
In-session Shell
execute_bash(command, opts=None) runs a shell command in the session workspace and returns a BashOutcome (output, exitCode). It folds stderr into stdout and never raises — a spawn/exec failure resolves to exitCode=1. Unless ExecuteBashOptions.excludeFromContext is set, the output is recorded as a transcript note so it re-enters the agent's context on the next turn.
outcome = await conductor.execute_bash("git status --porcelain")
print(outcome.exitCode, outcome.output)
stats() returns a point-in-time SessionStats tally (message counts by role, tool calls/results, the cumulative TokenTally, and total cost) computed from the live message list plus the running usage on ConductorState.
Source Layout
The conductor is a flat 9-file package (no subdirectories). contract.py is documented as a frozen type seam carrying only shapes plus the two inert helpers conductor_fault and reduce_state.
| File | Holds |
|---|---|
contract.py |
Frozen type seam: signals, faults, transcript schema, catalog refs/queries, ConductorState + reduce_state, queue/stats/bash shapes, and the SessionConductor / AgentLike / CondenseFn / RetryPolicy Protocols. |
conductor.py |
SessionConductorImpl turn loop + create_session_conductor factory, ConductorDeps, LazyAgent, noop_condense, DEFAULT_RETRY. |
signal_hub.py |
SignalHub fan-out bus + translate_agent_event / TRANSLATOR_TABLE. |
transcript_store.py |
TranscriptStore, TranscriptBackend / fs_backend / memory_backend, the pure replay reducer. |
serialize.py |
message_to_dict / message_from_dict, NDJSON envelope codec, branch projection, legacy import, role_for_message. |
catalog.py |
ModelCatalog, CatalogCard, CatalogSource, canonical_id, to_card_ref. |
matcher.py |
ModelMatcher + ResolveInput scored resolver. |
skill_parse.py |
parse_skill_invocation — the <skill …>body</skill> block scanner. |
__init__.py |
The public barrel; re-exports the full surface. |
The conductor is assembled at startup by boot (boot/runners/session.py) and consumed by the interactive console, the print/JSON channels, and the sessions library. For how a session is wired together end-to-end, see the architecture overview.
