Subsystemssubsystems/channels

Output Channels

The non-interactive output contract for induscode: a frozen JSON-RPC 2.0 / NDJSON wire surface plus two drivers — oneshot (the print/-p/--json path) and the long-lived bidirectional link server/client — that emit a Conductor's results per CLI mode. Reached as the oneshot and link boot runners, or directly via from induscode.channels import ....

The channels subpackage is how induscode talks to a SessionConductor from outside the interactive console. It defines one declarative contract — a JSON-RPC 2.0 envelope, NDJSON framing, a declarative operation registry, ask/tell dialog primitives, and a flat wire snapshot — and two channels written against it. The oneshot channel runs prompts to settlement and writes either clean final text or a streamed NDJSON event log; the link channel is a long-lived JSON-RPC server plus a generated client that drives a child process over NDJSON. The protocol is a single op registry consumed by both the server (data-driven dispatch) and the driver (a generated client whose method set is exactly the registry keys), with transport, framer, conductor, and dialog all injected so tests run over in-memory pipes with no real stdio.

Table of Contents

Reaching the Channels

The two channels back two of the launcher's runner modes. Boot resolves a RunnerMode from your flags and dispatches to the matching runner:

CLI invocation Resolved mode Runner Channel
pindus -p "…" oneshot (text) oneshot_runner oneshot, shape="text"
pindus -p --json "…" oneshot (NDJSON) oneshot_runner oneshot, shape="ndjson"
pindus routed to link link link_runner link server over stdin/stdout

The oneshot runner builds a ChannelContext over a stdout WritableLine + ndjson_framer + inert_dialog and calls run_oneshot. The output shape is NDJSON when the invocation carries a json flag (-p --json), otherwise clean text. The link runner serves SESSION_OPS over a stdin/stdout LinkServerIo via create_link_server and awaits server.done.

channels owns no session or agent logic of its own — every operation delegates to the conductor on the context, and the two channels just emit a conductor's results. See Conductor for the session loop and Sessions for persistence. Signals and usage ultimately originate from the underlying framework agent loop, but a channel only ever switches on the conductor's own typed signal vocabulary, never a raw framework event.

The Oneshot Channel (print mode)

run_oneshot(ctx, OneshotRequest) runs one non-interactive request to settlement and returns a process exit code (0 clean / 1 faulted). The output shape is not a branch through the runner body — it is a strategy object selected by lookup. Each OneshotStrategy supplies an optional on_start, an optional on_signal, and a mandatory finish(state, ctx) -> exit_code. The runner is shape-agnostic: it subscribes the strategy to conductor.subscribe(...), runs on_start, submits each prompt in order via conductor.submit(...), hands the final ConductorState to finish, and always tears the subscription down.

Shape on_start on_signal finish writes
text buffers each text delta one clean final line (the accumulated answer)
ndjson a {"type":"signal","name":"start","body":{}} frame frames every signal via signal_to_wire a {"type":"signal","name":"end","body":{phase,usage}} frame

The text shape emits the answer once, clean — so a caller capturing stdout gets the final answer rather than a token-by-token dribble. Thinking deltas, tool frames, and bookkeeping signals are ignored. The ndjson shape streams every conductor signal as one framed line; its end frame omits fault when None, matching how JSON.stringify dropped undefined fields.

# clean final text (text shape)
pindus -p "summarize this repo"

# streamed NDJSON event log (ndjson shape: the json flag selects it)
pindus -p --json "summarize this repo"
# emits {"type":"signal","name":"start","body":{}}
#       {"type":"signal","name":"text","body":{"kind":"text","delta":"…"}}
#       … {"type":"signal","name":"end","body":{"phase":"…","usage":{…}}}

Driven directly from Python:

from induscode.channels import (
    ChannelContext, OneshotRequest, ndjson_framer, inert_dialog, run_oneshot,
)

ctx = ChannelContext(
    conductor=my_conductor,
    out=my_writable_line,
    framer=ndjson_framer,
    dialog=inert_dialog,
)
exit_code = await run_oneshot(ctx, OneshotRequest(shape="text", prompts=("fix the failing test",)))
# 0 on a clean settlement, 1 if state.phase == "faulted"

inert_dialog is the shared DialogBridge for headless runs: every ask() resolves immediately with the caller's fallback and every tell() is dropped, so the agent never wedges on a prompt with no driver attached. An OneshotRequest carries shape, an ordered tuple of prompts, and optional images (framework indusagi.ai.ImageContent values) riding the first prompt.

The link channel is a long-lived, bidirectional JSON-RPC 2.0 conversation. The link_runner serves SESSION_OPS over stdin/stdout; a driving parent process writes framed requests and reads framed replies:

-> {"jsonrpc":"2.0","id":"lnk-…","method":"submit","params":{"input":"hi"}}
<- {"jsonrpc":"2.0","id":"lnk-…","result":{"model":"…","streaming":false,"sessionId":"…","usage":{…}}}

Server

create_link_server(registry, conductor, io) starts a data-driven server. It owns a single async for reader over io.in_ (via decode_lines) and classifies each frame:

  • an answer frame goes to dialog.deliver(...), settling a suspended ask;
  • a request frame (with a method) is dispatched as a concurrent asyncio task; a request carrying an id gets exactly one correlated reply (result or error), a notification (no id) is dispatched for effect and never replied to;
  • any other frame is a forward-compatible no-op.

Requests are dispatched as concurrent tasks (tracked in a set, gathered at stream end) rather than awaited in line. This is critical: a handler may itself suspend on a dialog ask whose answer arrives as a later frame on this same reader stream — awaiting in line would deadlock the reader. The returned LinkServer exposes done (the pump task, settles at stream end) and the LinkedDialogBridge it feeds. LinkServerIo is the injectable transport pair: in_ (framed inbound, stdin in prod) and out (framed outbound sink, stdout).

Driver

create_link_driver(io, ...) is the mirror — a generated client. LinkClient.__getattr__ yields a request thunk (params=None) -> awaitable result for any attribute, so the client mirrors whatever ops the server dispatches with a single body (no hand-written per-op methods; underscore names raise AttributeError). Each thunk mints an lnk--prefixed id, frames {jsonrpc,id,method,params?}, and awaits a future settled by the reply reader. The reader routes inbound frames three ways: replies settle pending requests, signals fan to on_signal, asks are handed to on_ask (which posts an answer back). Must run on a live event loop.

from induscode.channels.link import create_link_driver, LinkDriverIo

handle = create_link_driver(
    LinkDriverIo(out=child_stdin, in_=child_stdout),
    on_signal=lambda s: print(s.name),
)
snap = await handle.client.submit({"input": "hello"})   # any attribute -> a request thunk
models = await handle.client.listModels()               # mirrors SESSION_OPS keys for free
handle.close()                                          # rejects every still-pending request

LinkDriverHandle carries client, a done task (resolves at stream end), and close(reason) (rejects every pending request and stops accepting replies). A failed reply is raised as a LinkRequestError carrying the wire code and structured data. When on_ask is omitted the driver dismisses every ask (posts None), so a non-interactive driver never wedges the server.

The Wire Contract

contract.py is the frozen type surface — only shapes and inert pure helpers, no I/O. The envelope is JSON-RPC 2.0: a request is {jsonrpc, id?, method, params?}, a reply is {jsonrpc, id, result} or {jsonrpc, id, error}. Decoded frames are plain dicts; is_reply_ok(reply) discriminates the arms (True iff result is present). Every envelope is stamped with PROTOCOL_VERSION = "2.0" and the driver stamps outgoing ids with REQUEST_ID_PREFIX = "lnk-".

OP_ERROR pins the closed JSON-RPC error code set:

Key Code Meaning
parse -32700 the framed line was not valid JSON
invalidRequest -32600 not a well-formed request
unknownOp -32601 no operation registered under method
invalidParams -32602 params failed the operation schema
handlerFailed -32000 the operation handler threw

Typed frozen frames cross Python API boundaries: Ask (type="ask", blocking agent→driver request), AskAnswer (type="answer", correlated reply), Tell (type="tell", one-way notice), and Signal (type="signal", uncorrelated server→driver event). An OpError(code, message, data?) frames as {code, message, data?}. mint_wire_id(prefix, now_ms, seq) produces a process-unique <prefix><now36>-<seq36> correlation id. ChannelTimings carries millisecond timeouts — startupMs=1500, shutdownMs=1200, requestMs=45000, dialogMs=90000 — and resolve_timings(overrides) merges a partial override onto the frozen DEFAULT_CHANNEL_TIMINGS (asyncio call sites divide by 1000).

WritableLine (a Protocol with one write(chunk: str) method) and ReadableChunks (AsyncIterable[str | bytes]) are the minimal injectable transport surfaces. ChannelContext bundles conductor, out, framer, and dialog, and is handed to every Op.handle and shared by both channels — fully injected so tests use a fake conductor and a list-backed sink.

The Operation Registry

The protocol is one declarative operation registry, not a hand-written dispatch ladder mirrored by hand-written client methods. An Op binds a wire method name to a typed async handle(params, ctx) -> result. An OpRegistry is Mapping[str, Op] whose keys equal each op's method, so server dispatch is one lookup and the driver's method set is exactly the key set.

  • define_ops(record) freezes a record into a read-only OpRegistry (MappingProxyType) — the only sanctioned way to mint a registry.
  • build_ops(record) returns an OpRegistryHandle: the frozen registry, its sorted methods tuple, and a bound dispatch closure. The server-side dispatch and the client-side method set can never drift.
  • dispatch(registry, method, params, ctx) is the lone server dispatch path — a map lookup then await op.handle(...); raises UnknownOpError (carrying -32601 and the offending method) when unregistered, and propagates handler exceptions.
  • has_op(registry, method) and methods_of(registry) round out the helpers.

Adding a row to a registry adds both a server-handled method and a callable driver method for free:

from induscode.channels import Op, build_ops, create_link_server
from induscode.channels.link import LinkServerIo

async def ping(params, ctx):
    return {"pong": True}

registry = build_ops({"ping": Op(method="ping", handle=ping)}).registry
server = create_link_server(registry, my_conductor, LinkServerIo(in_=stdin_chunks, out=stdout))
await server.done   # resolves when the inbound stream ends

Session Ops and the Snapshot

SESSION_OPS is the concrete frozen registry the link serves, each op delegating to ctx.conductor:

Method Conductor call Reply
submit await conductor.submit(params["input"]) LinkSnapshot
abort conductor.abort() LinkSnapshot
snapshot conductor.snapshot() LinkSnapshot
resume await conductor.resume(params["sessionId"]) LinkSnapshot
listModels reads snapshot().modelId [{"id", "active": True}]
cycleModel probes getattr(conductor, "cycle_model") LinkSnapshot

Method names and snapshot field casing are kept verbatim (submit, listModels, cycleModel, sessionId, autoCondense, …) for cross-host compatibility — a foreign driver must speak to a Python server and vice versa. listModels currently returns only the single active model entry, and cycleModel probes for the conductor's optional cycle_model method; both are minimal projections of what the conductor exposes.

project_snapshot(state, ...) reshapes a ConductorState into the flat wire LinkSnapshot dict whose keys are exactly LINK_SNAPSHOT_FIELDS:

("model", "thinking", "streaming", "condensing", "faulted",
 "sessionId", "sessionFile", "autoCondense", "messageCount", "queuedCount", "usage")

Busy flags are derived from the coarse phase (streaming is phase == "streaming" or "tooling", condensing and faulted from their phases). thinking defaults to DEFAULT_THINKING = "off" when the snapshot carries no reasoning effort. usage rides through the single app-wide codec. sessionFile is omitted (not nulled) when absent, exactly as JSON.stringify dropped the undefined field. The snapshot is the link's own vocabulary — a remote driver mirrors session state without holding a live conductor.

NDJSON Framing

Framing is one JSON value per line. framer.py implements the NdjsonFramer shape — a frozen (encode_line, decode_lines) pair bundled as the canonical ndjson_framer.

encode_line(value) serializes with json.dumps (compact (",", ":") separators, ensure_ascii=False), escapes U+2028 (LINE SEPARATOR) and U+2029 (PARAGRAPH SEPARATOR) to their \u… forms, and appends exactly one \n. Those two code points are legal inside a JSON string but break a naive line splitter, and json.dumps leaves them raw — so the explicit escape guarantees any value round-trips through the decoder intact.

decode_lines(stream) is a pull-model async generator: it splits strictly on \n, decodes byte chunks through a fresh per-call incremental UTF-8 decoder (so a rune split across chunk boundaries is reassembled), skips empty/whitespace lines, and emits a final unterminated frame at end-of-stream if it carries content. The server, the driver, and the oneshot NDJSON shape all share this one implementation; an embedder may inject an alternate framer via the driver/server options.

The Dialog Bridge

The whole dialog surface the agent needs — multiple-choice, confirm, free-text, editor sessions, notifications, status, titles — reduces to two generic primitives on the DialogBridge protocol: ask(kind, payload, fallback) (a round-trip that suspends for a correlated answer or resolves the fallback after dialogMs) and tell(kind, payload) (fire-and-forget). Concrete methods are rows in DIALOG_TABLE, not bespoke bodies:

Name Mode Fallback
select ask None
confirm ask False
input ask None
editor ask None
notify tell
status tell
title tell

make_dialog_methods(bridge) closes the table over one bridge into named convenience thunks (generated from the table, no per-method choreography). On the link, create_dialog_bridge(deps) builds a LinkedDialogBridge whose pending answers are fed externally — the server owns the single reader and routes inbound answers to deliver(). ask() emits an ask--prefixed frame and arms a dialogMs fallback timer; deliver() settles the matching future (a no-op for late/duplicate answers so a misbehaving driver cannot throw into the read loop); tell() fires and returns; drain() settles all pending with None on shutdown. This is the seam that drives the interactive Dialogs when a console is attached.

Public API

All names are importable from induscode.channels; the link half is also under induscode.channels.link.

Name Kind Source Purpose
run_oneshot async fn oneshot.py run one request to settlement; returns the exit code
inert_dialog const oneshot.py headless DialogBridge: ask→fallback, tell dropped
OneshotRequest dataclass contract.py shape, ordered prompts, optional images
OneshotStrategy dataclass contract.py per-shape finish/on_start/on_signal
OneshotShape type contract.py Literal["text","ndjson"]
encode_line fn framer.py serialize one value to a separator-safe NDJSON line
decode_lines async gen framer.py parse a chunk stream into JSON values, split on \n
ndjson_framer const framer.py the canonical frozen NdjsonFramer
NdjsonFramer dataclass contract.py frozen (encode_line, decode_lines) pair
ChannelContext dataclass contract.py conductor + out + framer + dialog
Op / OpRegistry dataclass / type contract.py a named op; the frozen Mapping[str, Op]
define_ops fn contract.py freeze a record into an OpRegistry
build_ops fn ops.py build an OpRegistryHandle (registry + methods + dispatch)
dispatch async fn ops.py the lone server dispatch path
has_op / methods_of fn ops.py membership test / sorted method tuple
OpRegistryHandle dataclass ops.py a built registry plus its runtime face
UnknownOpError class ops.py raised on an unregistered method (-32601)
SESSION_OPS const session_ops.py the concrete link op registry
project_snapshot fn session_ops.py ConductorState → flat LinkSnapshot
DEFAULT_THINKING const session_ops.py "off"
LinkSnapshot / LINK_SNAPSHOT_FIELDS type / const contract.py the flat wire dict / its pinned field tuple
create_link_server fn link/server.py start a data-driven JSON-RPC server
LinkServer / LinkServerIo dataclass link/server.py running server / injectable transport pair
create_link_driver fn link/driver.py build a generated client over an injected transport
LinkClient class link/driver.py the __getattr__-generated client
LinkDriverHandle / LinkDriverIo class / dataclass link/driver.py live driver / injectable transport pair
LinkRequestError class link/driver.py a failed reply raised with wire code + data
LinkedDialogBridge class link/dialog.py externally-fed ask/tell bridge
create_dialog_bridge fn link/dialog.py construct a LinkedDialogBridge
make_dialog_methods fn link/dialog.py close DIALOG_TABLE over one bridge
DIALOG_TABLE / DialogSpec const / dataclass link/dialog.py the dialog table; one row as data
DialogBridgeDeps dataclass link/dialog.py out + framer + timings for the bridge
DialogBridge class contract.py the ask/tell protocol
Ask / AskAnswer / Tell / Signal dataclass contract.py typed wire frames
OpError / OP_ERROR dataclass / const contract.py a typed error; the frozen code set
is_reply_ok fn contract.py discriminate a reply frame's arm
PROTOCOL_VERSION / REQUEST_ID_PREFIX const contract.py "2.0" / "lnk-"
mint_wire_id fn contract.py mint a process-unique correlation id
ChannelTimings / DEFAULT_CHANNEL_TIMINGS / resolve_timings dataclass / const / fn contract.py millisecond timeouts + merge helper
signal_to_wire fn contract.py project a SessionSignal to its wire dict
WritableLine / ReadableChunks type contract.py the minimal injectable transport surfaces

Design Notes

  • Single declarative registry. SESSION_OPS drives both server dispatch (a map lookup, no switch) and the driver's method set (LinkClient is generated via __getattr__, never hand-written). An unregistered name simply rejects with unknownOp (-32601).
  • Concurrency. The server dispatches requests as concurrent asyncio tasks, not awaited in line, because a handler can suspend on a dialog ask whose answer arrives as a later frame on the same reader stream; awaiting in line would deadlock. Answer frames are always delivered immediately.
  • Framer correctness. U+2028/U+2029 are escaped because json.dumps leaves them raw yet they break line splitters; each decode_lines call uses a fresh incremental UTF-8 decoder so a rune split across byte chunks is reassembled.
  • Resilience. inert_dialog and the driver's missing on_ask both dismiss every ask (return fallback / post None), so a headless run or disconnected driver never wedges the agent. LinkedDialogBridge.drain() and the driver's reject-all settle/reject every pending entry on shutdown; deliver() is a no-op for late/duplicate answers.
  • One serializer. signal_to_wire is the single Python-specific divergence: it re-attaches the kind ClassVar discriminant that the app-wide message_to_dict codec does not emit. No second serializer is introduced; all field payloads still route through the one codec.
  • Verbatim casing & dropped fields. Wire field names/casing are pinned for cross-host compatibility; project_snapshot omits sessionFile when absent and the NDJSON end frame omits fault when None, matching how JSON.stringify dropped undefined fields.

Source Files

File Holds
channels/contract.py the frozen type surface: envelope, framer shape, frames, ChannelContext, op registry, snapshot vocabulary, timings, id minting
channels/framer.py the concrete NDJSON framer (encode_line / decode_lines / ndjson_framer)
channels/ops.py the runtime face: build_ops, dispatch, OpRegistryHandle, UnknownOpError
channels/session_ops.py the concrete SESSION_OPS registry + project_snapshot
channels/oneshot.py the oneshot channel: run_oneshot, the two strategies, inert_dialog
channels/link/server.py the data-driven JSON-RPC server + concurrent dispatch loop
channels/link/driver.py the generated __getattr__ client + reply/signal/ask reader
channels/link/dialog.py the ask/tell bridge + data-driven DIALOG_TABLE

The runners that wire these into CLI modes live in boot/runners/oneshot_runner.py and boot/runners/link_runner.py — see Boot. For the full export map see Package Exports.