Subsystemssubsystems/conductor

Conductor

The conductor is induscode's product-level session-orchestration layer — it wraps exactly one framework Agent and drives a prompt to settlement, re-emits a stable product signal stream, persists a branchable transcript, retries transient faults, and supports abort. Reach it with from induscode.conductor import create_session_conductor; it is the runtime core every pindus run mode (interactive console, -p print, --json link) drives.

The indusagi framework Agent is a raw LLM conversation loop: it streams a fine-grained AgentEvent surface and owns a flat message list. The conductor turns one such agent into a coding-agent session. It drives turns to settlement, projects the framework loop events into a distinct, stable SessionSignal stream, persists every produced message into an on-disk transcript tree, retries transient model faults with backoff, auto-condenses when the transcript nears the context window, surfaces typed discriminated faults (never string sentinels), and maintains an immutable ConductorState snapshot through a pure reducer.

Table of Contents

What the Conductor Owns

The conductor is the typed seam between the induscode product (its UI/print/RPC channels) and the framework runtime. It does not re-declare framework shapes — it composes Agent, AgentEvent, AgentTool, ThinkingLevel from indusagi.agent and AgentMessage, Model, Usage, KnownProvider from indusagi.ai. Crucially, the framework's AgentEvent loop stream is deliberately not the same union as the product SessionSignal stream: the conductor consumes the former internally and re-emits the latter, so the app surface can evolve independently of the framework loop.

Within one session the conductor owns:

  • The turn loop — drive a prompt through retry, project events to signals, persist the per-turn tail, condense, settle.
  • The signal bus (SignalHub) — a synchronous fan-out over SessionSignal.
  • The transcript (TranscriptStore) — an append-only, branchable on-disk tree.
  • The model catalog/matcher — a normalized, validated view over the framework model registry plus a scored resolver.
  • The immutable state snapshot (ConductorState) maintained by the pure reduce_state reducer.
  • The pending-input queue, fork/navigate branch ops, resume/new-session lifecycle, in-session shell, and skill-block parsing.

The Public Surface

SessionConductor (a Protocol in contract.py) is the full product surface all three run modes drive. The concrete orchestrator is SessionConductorImpl; it is constructed only through the factory.

Name Kind Source Purpose
SessionConductor Protocol contract.py The full product surface: submit/enqueue, subscribe, abort, snapshot, resume, new_session, fork/navigate_tree, condense, execute_bash, stats, model select/cycle, thinking-level cycle, session naming, pending-queue ops.
SessionConductorImpl class conductor.py The concrete turn-loop orchestrator. Only orchestrates — built via the factory.
create_session_conductor function conductor.py Factory that resolves or accepts every collaborator (agent/store/hub/matcher/condense/retry) and returns a SessionConductor.
ConductorDeps dataclass conductor.py Frozen bundle of all-optional injectable collaborators (agent, store, hub, matcher, condense, retry, compactAt, sleep) so tests run with no network and no disk.
SessionConductorOptions dataclass contract.py Assembly-time config: modelId (required) plus system/tools/thinking/workspace/sessionsDir/autoCompact/getApiKey.
ConductorState dataclass contract.py Immutable observable snapshot: phase, head, cumulative usage, contextTokens, modelId, optional fault.
reduce_state function contract.py Pure transition over ConductorState; always returns a fresh object. Raises ValueError on an unknown action.
ConductorFault / conductor_fault dataclass / function contract.py Typed discriminated failure value plus its single sanctioned constructor; emitted on fault signals, never raised.
SessionSignal type contract.py The product signal union (see The Signal Stream).
SignalHub class signal_hub.py Synchronous insertion-ordered fan-out bus over SessionSignal.
translate_agent_event / TRANSLATOR_TABLE function / const signal_hub.py Pure projection of one AgentEvent into zero-or-more SessionSignals, backed by a tag→rule dispatch table.

The full surface (including transcript, serialization, catalog, and matcher symbols) is re-exported from induscode.conductor.__all__ — import from the package root, never from individual files.

Assembling a Conductor

create_session_conductor(options, deps=None) resolves every collaborator and returns the orchestrator. Only SessionConductorOptions.modelId is required; the factory defaults a live LazyAgent, a filesystem-or-memory TranscriptStore, and a ModelMatcher over ModelCatalog.

import asyncio
from induscode.conductor import (
    create_session_conductor,
    SessionConductorOptions,
    TextSignal,
)

async def main():
    conductor = create_session_conductor(
        SessionConductorOptions(
            modelId="anthropic/claude-sonnet-4",
            system="You are a coding assistant.",
            sessionsDir="~/.pindusagi/sessions",   # persist + resumable
            autoCompact=True,
        )
    )

    # Subscribe before submitting; subscribe() returns an unsubscribe thunk.
    def on_signal(sig):
        if isinstance(sig, TextSignal):
            print(sig.delta, end="")
    unsubscribe = conductor.subscribe(on_signal)

    state = await conductor.submit("explain this repository")
    print("\nsettled phase:", state.phase)
    unsubscribe()

asyncio.run(main())

SessionConductorOptions fields:

Field Type Purpose
modelId str Required. Canonical id (provider/modelId) to bind the session to.
system str | None Initial system prompt seeding the conversation.
tools Sequence[AgentTool] | None Tools made available to the agent. The flat capability list is threaded verbatim.
thinking ThinkingLevel | None Initial reasoning effort for models that support it.
workspace str | None Working directory the session is scoped to (defaults to process cwd).
sessionsDir str | None When set, the transcript store uses a filesystem backend (one <sessionId>.ndjson per session); None keeps an in-memory store.
autoCompact bool | None Auto-condense when the transcript nears the window.
getApiKey Callable Per-call credential resolver threaded to the framework Agent (sync or async; None falls back to env lookup).

ConductorDeps injects any collaborator for tests — pass a scripted-fake agent to run with no network, an in-memory store to run with no disk, or a sleep to make retry backoff deterministic. LazyAgent is a thin AgentLike wrapper that constructs the real framework Agent only on first use, so a live-path conductor stays cheap until the first turn.

Driving a Turn to Settlement

submit(input) is the single entry point that drives a prompt to settlement. The orchestration, inside SessionConductorImpl:

  1. Busy guard. If a turn is already in flight, the input is enqueued (never dropped) and the current snapshot returns immediately. Otherwise the busy flag is set and _run_turn runs.
  2. Skill unwrap. parse_skill_invocation unwraps a leading <skill name=... location=...>body</skill> block to its body text (routing the named skill is a higher layer's job).
  3. Baseline + phase. Record the message baseline (len(agent.state.messages)), dispatch a PhaseAction(streaming).
  4. Drive with retry. _run_with_retry(prompt) subscribes _on_agent_event to the framework agent's raw stream and awaits agent.prompt(prompt), classifying transient faults (see Retry and Abort).
  5. Project events. During the call, _on_agent_event keeps an ordered in-flight-tool list (tool_execution_start appends and flips phase→tooling; tool_execution_end removes by id and flips phase→streaming when empty), runs translate_agent_event, and emits each resulting SessionSignal on the hub. turn_end folds usage and records the latest-turn contextTokens; a streaming fault dispatches a FaultAction.
  6. Persist tail. On a clean turn, _persist_tail(baseline) appends every message beyond the baseline to the transcript (one persisted signal per node).
  7. Condense. _maybe_condense runs the pluggable CondenseFn when auto-compact is on and the branch length exceeds compactAt (default 200).
  8. Settle. If still not faulted, dispatch SettledAction + HeadAction and emit idle.
  9. Drain. The finished turn drains the queue (steer entries first), each as its own turn.

A faulted turn keeps its typed fault and does not persist a tail, condense, or settle on top of it — it stays faulted and the caller reads fault off the returned state.

The Signal Stream

The conductor re-emits a distinct, stable SessionSignal union — the surface the app renders. Subscribe via subscribe(handler), which returns an idempotent unsubscribe thunk. SignalHub fans out synchronously in insertion order over a snapshot of the handler list, isolating a throwing handler via on_handler_error.

Signal Discriminant kind Carries Meaning
PromptSignal prompt text The user's turn was committed (emitted before the model replies).
TextSignal text delta A chunk of assistant answer text.
ThinkingSignal thinking delta A chunk of reasoning/thinking text.
ToolStartSignal tool_start id, name A tool invocation began (correlate by id).
ToolEndSignal tool_end id, ok A tool invocation finished.
TurnEndSignal turn_end usage The assistant turn settled; reports token spend.
PersistedSignal persisted entryId The latest node was committed to the transcript.
CompactedSignal compacted The transcript was condensed to fit the window.
FaultSignal fault fault A typed ConductorFault occurred.
QueueSignal queue count The pending-input queue changed; new depth.
IdleSignal idle No in-flight work; ready for input.

translate_agent_event is a pure projection backed by TRANSLATOR_TABLE (one rule per AgentEvent member). The key mappings: message_end(user)prompt; message_end(assistant errored)fault; tool_execution_start/endtool_start/tool_end; turn_endturn_end(usage); and message_update's inner AssistantMessageEvent (a best-effort dict) projects text_deltatext, thinking_deltathinking, and error → a model fault.

State and the Pure Reducer

ConductorState is an immutable snapshot, never a live view. Read it with snapshot(); every transition runs through reduce_state, which always returns a brand-new object.

@dataclass(frozen=True, slots=True)
class ConductorState:
    phase: ConductorPhase          # idle | streaming | tooling | condensing | faulted
    head: SessionHead              # session id + active leaf
    usage: Usage                   # cumulative token/cost spend
    contextTokens: int             # latest-turn window occupancy (NOT cumulative)
    modelId: str
    fault: ConductorFault | None = None

A subtle correctness point: contextTokens is the latest turn's window occupancy — it replaces rather than accumulates — so the footer ctx:% divides by the context window correctly. It is distinct from cumulative usage.totalTokens, which grows unbounded across turns and would inflate the ratio.

reduce_state(prev, action) handles the typed StateAction union — PhaseAction, HeadAction, UsageAction, ModelAction, FaultAction, SettledAction. settled clears any prior fault and returns to idle; fault records the fault and flips to faulted. An action outside the union fails loud with ValueError (the TS switch was compiler-exhaustive; the port fails at runtime).

Typed Faults

Faults are typed discriminated values, not raised exceptions and never string sentinels. A consumer switches on fault.kind, not on substring matching.

@dataclass(frozen=True, slots=True)
class ConductorFault:
    kind: FaultKind          # model | tool | persistence | aborted | overflow
    message: str
    cause: object | None = None
kind Recovery story
model The LLM call itself failed (transport, provider, decode).
tool A tool invocation threw or returned a hard error.
persistence Writing/reading the on-disk transcript failed.
aborted The caller cancelled the in-flight turn via abort().
overflow The context window was exceeded and not condensable.

Mint one only with conductor_fault(kind, message, cause=None). A fault is dispatched into state (flipping phase to faulted) and emitted as a FaultSignal.

Retry and Abort

_run_with_retry retries transient model faults with exponential backoff. On a raised Exception, _is_transient classifies it by substring markers (429, rate limit, overloaded, timeout, 5xx, unavailable, …). A transient error retries up to RetryPolicy.maxAttempts with a delay of baseDelayMs * 2 ** (attempt - 1); the default DEFAULT_RETRY is maxAttempts=2, baseDelayMs=250. Exhausting the budget or hitting a non-transient raise dispatches a typed model FaultAction and ends the loop.

asyncio.CancelledError always propagates — it is never retried and never converted to a fault (it derives from BaseException on 3.11+, and every defensive except re-raises it explicitly).

abort() cancels the in-flight turn on the agent, dispatches an aborted FaultAction, emits a FaultSignal, and pulses idle:

state = conductor.snapshot()
if state.phase in ("streaming", "tooling"):
    conductor.abort()   # -> aborted fault, then idle

The Queue: Re-entrant Input

submit while busy does not drop input and does not fault — it hands the input to enqueue (mode followUp by default) and returns the current snapshot. The in-flight turn drains the queue once it settles, each entry running as its own turn. steer-mode entries are taken ahead of plain follow-ups so an interrupt is honored first. Each queue change emits a queue signal.

Method Purpose
enqueue(input, mode="followUp") File an input for a later turn (steer or followUp).
pending_count() / pending_inputs() Depth and a read-only view (oldest first).
clear_queue() Discard every queued input.
dequeue_last() Pop the most-recently queued text back into a prompt.

Persistence and Branching

TranscriptStore is an append-only, branchable tree bound to one session id, over a pluggable TranscriptBackend (fs_backend(dir) writes <sessionId>.ndjson via asyncio.to_thread; memory_backend is the default). Derived state is rebuilt by the pure replay reducer (node table + head from an ordered entry list and a leaf id).

Primitive Behaviour
append(message, ...) Mint a ULID node parented at the current leaf, encode via serialize.encode_entry, advance the head.
branch_at(entryId) Repoint the leaf and rewrite the whole file.
path_to() Walk parent links leaf→root, reversed (the active branch).
load(sessionId) / resume Read the backend, parse via parse_session_text, rebuild via replay, rebind the agent to the active branch.

The conductor exposes branch operations on top of this: fork(entryId) opens a new branch parented at a prior node and rebinds the agent's message list; navigate_tree(nodeId) walks between existing branches without forking. resume(sessionId) restores a persisted session; new_session() opens a fresh session id while leaving the prior transcript file intact on disk (this is what /clear and /new drive — see Slash Commands).

serialize.message_to_dict / message_from_dict is the single app-wide message⇄dict codec, reused by channels and transcript export. The on-disk vocabulary (TranscriptEntry, SessionHead) is the conductor's own — node ids are ULIDs, session ids are s_<epoch36>_<rand8>, and the schema is the namespaced indus/transcript@1.

Condense Seam

CondenseFn is the pluggable condense hook: given the active branch's messages, it returns the (smaller) list to replace them with. The conductor module itself does no real compaction — noop_condense (an identity hook) is the default. Real compaction arrives only when the production window-budget engine is wired in as a non-default CondenseFn (it structurally satisfies the protocol). The framework's own runtime.memory compactor is deliberately never wired into this seam — the two compaction engines stay distinct. See the framework runtime for the framework-side compactor.

Model Catalog and Matcher

ModelCatalog is a normalized, validated view over the framework model list (get_models / get_providers), keyed by canonical provider/modelId. It runs a validate → normalize → key pipeline; the mock provider is filtered out of the live catalog (it must never auto-select or appear in the picker), though tests can inject it via CatalogSource. See Models for the catalog surface.

ModelMatcher is a scored-candidate resolver over the catalog — resolve / resolve_card / resolve_all return best-first ModelCardRef(s) using a prioritized scoring pipeline:

Match Score
Pin 1000
Exact canonical id 900
Exact model id 800
Provider 600
Glob 400
Substring 200

select_model(id) and cycle_model(id) both route through _bind_model: resolve the card, bind the full framework Model onto the agent, re-apply the tracked reasoning effort, and dispatch a ModelAction. A miss surfaces a typed model fault and leaves the current selection untouched. available_models() lists picker entries best-first (or [] when no matcher is wired).

In-session Shell

execute_bash(command, opts=None) runs a shell command in the session workspace and returns a BashOutcome (output, exitCode). It folds stderr into stdout and never raises — a spawn/exec failure resolves to exitCode=1. Unless ExecuteBashOptions.excludeFromContext is set, the output is recorded as a transcript note so it re-enters the agent's context on the next turn.

outcome = await conductor.execute_bash("git status --porcelain")
print(outcome.exitCode, outcome.output)

stats() returns a point-in-time SessionStats tally (message counts by role, tool calls/results, the cumulative TokenTally, and total cost) computed from the live message list plus the running usage on ConductorState.

Source Layout

The conductor is a flat 9-file package (no subdirectories). contract.py is documented as a frozen type seam carrying only shapes plus the two inert helpers conductor_fault and reduce_state.

File Holds
contract.py Frozen type seam: signals, faults, transcript schema, catalog refs/queries, ConductorState + reduce_state, queue/stats/bash shapes, and the SessionConductor / AgentLike / CondenseFn / RetryPolicy Protocols.
conductor.py SessionConductorImpl turn loop + create_session_conductor factory, ConductorDeps, LazyAgent, noop_condense, DEFAULT_RETRY.
signal_hub.py SignalHub fan-out bus + translate_agent_event / TRANSLATOR_TABLE.
transcript_store.py TranscriptStore, TranscriptBackend / fs_backend / memory_backend, the pure replay reducer.
serialize.py message_to_dict / message_from_dict, NDJSON envelope codec, branch projection, legacy import, role_for_message.
catalog.py ModelCatalog, CatalogCard, CatalogSource, canonical_id, to_card_ref.
matcher.py ModelMatcher + ResolveInput scored resolver.
skill_parse.py parse_skill_invocation — the <skill …>body</skill> block scanner.
__init__.py The public barrel; re-exports the full surface.

The conductor is assembled at startup by boot (boot/runners/session.py) and consumed by the interactive console, the print/JSON channels, and the sessions library. For how a session is wired together end-to-end, see the architecture overview.