Output Channels
The non-interactive output contract for
induscode: a frozen JSON-RPC 2.0 / NDJSON wire surface plus two drivers — oneshot (the print/-p/--jsonpath) and the long-lived bidirectional link server/client — that emit a Conductor's results per CLI mode. Reached as theoneshotandlinkboot runners, or directly viafrom induscode.channels import ....
The channels subpackage is how induscode talks to a SessionConductor from outside the interactive console. It defines one declarative contract — a JSON-RPC 2.0 envelope, NDJSON framing, a declarative operation registry, ask/tell dialog primitives, and a flat wire snapshot — and two channels written against it. The oneshot channel runs prompts to settlement and writes either clean final text or a streamed NDJSON event log; the link channel is a long-lived JSON-RPC server plus a generated client that drives a child process over NDJSON. The protocol is a single op registry consumed by both the server (data-driven dispatch) and the driver (a generated client whose method set is exactly the registry keys), with transport, framer, conductor, and dialog all injected so tests run over in-memory pipes with no real stdio.
Table of Contents
- Reaching the Channels
- The Oneshot Channel (print mode)
- The Link Channel (NDJSON wire)
- The Wire Contract
- The Operation Registry
- Session Ops and the Snapshot
- NDJSON Framing
- The Dialog Bridge
- Public API
- Design Notes
- Source Files
Reaching the Channels
The two channels back two of the launcher's runner modes. Boot resolves a RunnerMode from your flags and dispatches to the matching runner:
| CLI invocation | Resolved mode | Runner | Channel |
|---|---|---|---|
pindus -p "…" |
oneshot (text) |
oneshot_runner |
oneshot, shape="text" |
pindus -p --json "…" |
oneshot (NDJSON) |
oneshot_runner |
oneshot, shape="ndjson" |
pindus routed to link |
link |
link_runner |
link server over stdin/stdout |
The oneshot runner builds a ChannelContext over a stdout WritableLine + ndjson_framer + inert_dialog and calls run_oneshot. The output shape is NDJSON when the invocation carries a json flag (-p --json), otherwise clean text. The link runner serves SESSION_OPS over a stdin/stdout LinkServerIo via create_link_server and awaits server.done.
channels owns no session or agent logic of its own — every operation delegates to the conductor on the context, and the two channels just emit a conductor's results. See Conductor for the session loop and Sessions for persistence. Signals and usage ultimately originate from the underlying framework agent loop, but a channel only ever switches on the conductor's own typed signal vocabulary, never a raw framework event.
The Oneshot Channel (print mode)
run_oneshot(ctx, OneshotRequest) runs one non-interactive request to settlement and returns a process exit code (0 clean / 1 faulted). The output shape is not a branch through the runner body — it is a strategy object selected by lookup. Each OneshotStrategy supplies an optional on_start, an optional on_signal, and a mandatory finish(state, ctx) -> exit_code. The runner is shape-agnostic: it subscribes the strategy to conductor.subscribe(...), runs on_start, submits each prompt in order via conductor.submit(...), hands the final ConductorState to finish, and always tears the subscription down.
| Shape | on_start |
on_signal |
finish writes |
|---|---|---|---|
text |
— | buffers each text delta |
one clean final line (the accumulated answer) |
ndjson |
a {"type":"signal","name":"start","body":{}} frame |
frames every signal via signal_to_wire |
a {"type":"signal","name":"end","body":{phase,usage}} frame |
The text shape emits the answer once, clean — so a caller capturing stdout gets the final answer rather than a token-by-token dribble. Thinking deltas, tool frames, and bookkeeping signals are ignored. The ndjson shape streams every conductor signal as one framed line; its end frame omits fault when None, matching how JSON.stringify dropped undefined fields.
# clean final text (text shape)
pindus -p "summarize this repo"
# streamed NDJSON event log (ndjson shape: the json flag selects it)
pindus -p --json "summarize this repo"
# emits {"type":"signal","name":"start","body":{}}
# {"type":"signal","name":"text","body":{"kind":"text","delta":"…"}}
# … {"type":"signal","name":"end","body":{"phase":"…","usage":{…}}}
Driven directly from Python:
from induscode.channels import (
ChannelContext, OneshotRequest, ndjson_framer, inert_dialog, run_oneshot,
)
ctx = ChannelContext(
conductor=my_conductor,
out=my_writable_line,
framer=ndjson_framer,
dialog=inert_dialog,
)
exit_code = await run_oneshot(ctx, OneshotRequest(shape="text", prompts=("fix the failing test",)))
# 0 on a clean settlement, 1 if state.phase == "faulted"
inert_dialog is the shared DialogBridge for headless runs: every ask() resolves immediately with the caller's fallback and every tell() is dropped, so the agent never wedges on a prompt with no driver attached. An OneshotRequest carries shape, an ordered tuple of prompts, and optional images (framework indusagi.ai.ImageContent values) riding the first prompt.
The Link Channel (NDJSON wire)
The link channel is a long-lived, bidirectional JSON-RPC 2.0 conversation. The link_runner serves SESSION_OPS over stdin/stdout; a driving parent process writes framed requests and reads framed replies:
-> {"jsonrpc":"2.0","id":"lnk-…","method":"submit","params":{"input":"hi"}}
<- {"jsonrpc":"2.0","id":"lnk-…","result":{"model":"…","streaming":false,"sessionId":"…","usage":{…}}}
Server
create_link_server(registry, conductor, io) starts a data-driven server. It owns a single async for reader over io.in_ (via decode_lines) and classifies each frame:
- an answer frame goes to
dialog.deliver(...), settling a suspendedask; - a request frame (with a
method) is dispatched as a concurrentasynciotask; a request carrying anidgets exactly one correlated reply (resultorerror), a notification (noid) is dispatched for effect and never replied to; - any other frame is a forward-compatible no-op.
Requests are dispatched as concurrent tasks (tracked in a set, gathered at stream end) rather than awaited in line. This is critical: a handler may itself suspend on a dialog ask whose answer arrives as a later frame on this same reader stream — awaiting in line would deadlock the reader. The returned LinkServer exposes done (the pump task, settles at stream end) and the LinkedDialogBridge it feeds. LinkServerIo is the injectable transport pair: in_ (framed inbound, stdin in prod) and out (framed outbound sink, stdout).
Driver
create_link_driver(io, ...) is the mirror — a generated client. LinkClient.__getattr__ yields a request thunk (params=None) -> awaitable result for any attribute, so the client mirrors whatever ops the server dispatches with a single body (no hand-written per-op methods; underscore names raise AttributeError). Each thunk mints an lnk--prefixed id, frames {jsonrpc,id,method,params?}, and awaits a future settled by the reply reader. The reader routes inbound frames three ways: replies settle pending requests, signals fan to on_signal, asks are handed to on_ask (which posts an answer back). Must run on a live event loop.
from induscode.channels.link import create_link_driver, LinkDriverIo
handle = create_link_driver(
LinkDriverIo(out=child_stdin, in_=child_stdout),
on_signal=lambda s: print(s.name),
)
snap = await handle.client.submit({"input": "hello"}) # any attribute -> a request thunk
models = await handle.client.listModels() # mirrors SESSION_OPS keys for free
handle.close() # rejects every still-pending request
LinkDriverHandle carries client, a done task (resolves at stream end), and close(reason) (rejects every pending request and stops accepting replies). A failed reply is raised as a LinkRequestError carrying the wire code and structured data. When on_ask is omitted the driver dismisses every ask (posts None), so a non-interactive driver never wedges the server.
The Wire Contract
contract.py is the frozen type surface — only shapes and inert pure helpers, no I/O. The envelope is JSON-RPC 2.0: a request is {jsonrpc, id?, method, params?}, a reply is {jsonrpc, id, result} or {jsonrpc, id, error}. Decoded frames are plain dicts; is_reply_ok(reply) discriminates the arms (True iff result is present). Every envelope is stamped with PROTOCOL_VERSION = "2.0" and the driver stamps outgoing ids with REQUEST_ID_PREFIX = "lnk-".
OP_ERROR pins the closed JSON-RPC error code set:
| Key | Code | Meaning |
|---|---|---|
parse |
-32700 |
the framed line was not valid JSON |
invalidRequest |
-32600 |
not a well-formed request |
unknownOp |
-32601 |
no operation registered under method |
invalidParams |
-32602 |
params failed the operation schema |
handlerFailed |
-32000 |
the operation handler threw |
Typed frozen frames cross Python API boundaries: Ask (type="ask", blocking agent→driver request), AskAnswer (type="answer", correlated reply), Tell (type="tell", one-way notice), and Signal (type="signal", uncorrelated server→driver event). An OpError(code, message, data?) frames as {code, message, data?}. mint_wire_id(prefix, now_ms, seq) produces a process-unique <prefix><now36>-<seq36> correlation id. ChannelTimings carries millisecond timeouts — startupMs=1500, shutdownMs=1200, requestMs=45000, dialogMs=90000 — and resolve_timings(overrides) merges a partial override onto the frozen DEFAULT_CHANNEL_TIMINGS (asyncio call sites divide by 1000).
WritableLine (a Protocol with one write(chunk: str) method) and ReadableChunks (AsyncIterable[str | bytes]) are the minimal injectable transport surfaces. ChannelContext bundles conductor, out, framer, and dialog, and is handed to every Op.handle and shared by both channels — fully injected so tests use a fake conductor and a list-backed sink.
The Operation Registry
The protocol is one declarative operation registry, not a hand-written dispatch ladder mirrored by hand-written client methods. An Op binds a wire method name to a typed async handle(params, ctx) -> result. An OpRegistry is Mapping[str, Op] whose keys equal each op's method, so server dispatch is one lookup and the driver's method set is exactly the key set.
define_ops(record)freezes a record into a read-onlyOpRegistry(MappingProxyType) — the only sanctioned way to mint a registry.build_ops(record)returns anOpRegistryHandle: the frozenregistry, its sortedmethodstuple, and a bounddispatchclosure. The server-side dispatch and the client-side method set can never drift.dispatch(registry, method, params, ctx)is the lone server dispatch path — a map lookup thenawait op.handle(...); raisesUnknownOpError(carrying-32601and the offending method) when unregistered, and propagates handler exceptions.has_op(registry, method)andmethods_of(registry)round out the helpers.
Adding a row to a registry adds both a server-handled method and a callable driver method for free:
from induscode.channels import Op, build_ops, create_link_server
from induscode.channels.link import LinkServerIo
async def ping(params, ctx):
return {"pong": True}
registry = build_ops({"ping": Op(method="ping", handle=ping)}).registry
server = create_link_server(registry, my_conductor, LinkServerIo(in_=stdin_chunks, out=stdout))
await server.done # resolves when the inbound stream ends
Session Ops and the Snapshot
SESSION_OPS is the concrete frozen registry the link serves, each op delegating to ctx.conductor:
| Method | Conductor call | Reply |
|---|---|---|
submit |
await conductor.submit(params["input"]) |
LinkSnapshot |
abort |
conductor.abort() |
LinkSnapshot |
snapshot |
conductor.snapshot() |
LinkSnapshot |
resume |
await conductor.resume(params["sessionId"]) |
LinkSnapshot |
listModels |
reads snapshot().modelId |
[{"id", "active": True}] |
cycleModel |
probes getattr(conductor, "cycle_model") |
LinkSnapshot |
Method names and snapshot field casing are kept verbatim (submit, listModels, cycleModel, sessionId, autoCondense, …) for cross-host compatibility — a foreign driver must speak to a Python server and vice versa. listModels currently returns only the single active model entry, and cycleModel probes for the conductor's optional cycle_model method; both are minimal projections of what the conductor exposes.
project_snapshot(state, ...) reshapes a ConductorState into the flat wire LinkSnapshot dict whose keys are exactly LINK_SNAPSHOT_FIELDS:
("model", "thinking", "streaming", "condensing", "faulted",
"sessionId", "sessionFile", "autoCondense", "messageCount", "queuedCount", "usage")
Busy flags are derived from the coarse phase (streaming is phase == "streaming" or "tooling", condensing and faulted from their phases). thinking defaults to DEFAULT_THINKING = "off" when the snapshot carries no reasoning effort. usage rides through the single app-wide codec. sessionFile is omitted (not nulled) when absent, exactly as JSON.stringify dropped the undefined field. The snapshot is the link's own vocabulary — a remote driver mirrors session state without holding a live conductor.
NDJSON Framing
Framing is one JSON value per line. framer.py implements the NdjsonFramer shape — a frozen (encode_line, decode_lines) pair bundled as the canonical ndjson_framer.
encode_line(value) serializes with json.dumps (compact (",", ":") separators, ensure_ascii=False), escapes U+2028 (LINE SEPARATOR) and U+2029 (PARAGRAPH SEPARATOR) to their \u… forms, and appends exactly one \n. Those two code points are legal inside a JSON string but break a naive line splitter, and json.dumps leaves them raw — so the explicit escape guarantees any value round-trips through the decoder intact.
decode_lines(stream) is a pull-model async generator: it splits strictly on \n, decodes byte chunks through a fresh per-call incremental UTF-8 decoder (so a rune split across chunk boundaries is reassembled), skips empty/whitespace lines, and emits a final unterminated frame at end-of-stream if it carries content. The server, the driver, and the oneshot NDJSON shape all share this one implementation; an embedder may inject an alternate framer via the driver/server options.
The Dialog Bridge
The whole dialog surface the agent needs — multiple-choice, confirm, free-text, editor sessions, notifications, status, titles — reduces to two generic primitives on the DialogBridge protocol: ask(kind, payload, fallback) (a round-trip that suspends for a correlated answer or resolves the fallback after dialogMs) and tell(kind, payload) (fire-and-forget). Concrete methods are rows in DIALOG_TABLE, not bespoke bodies:
| Name | Mode | Fallback |
|---|---|---|
select |
ask | None |
confirm |
ask | False |
input |
ask | None |
editor |
ask | None |
notify |
tell | — |
status |
tell | — |
title |
tell | — |
make_dialog_methods(bridge) closes the table over one bridge into named convenience thunks (generated from the table, no per-method choreography). On the link, create_dialog_bridge(deps) builds a LinkedDialogBridge whose pending answers are fed externally — the server owns the single reader and routes inbound answers to deliver(). ask() emits an ask--prefixed frame and arms a dialogMs fallback timer; deliver() settles the matching future (a no-op for late/duplicate answers so a misbehaving driver cannot throw into the read loop); tell() fires and returns; drain() settles all pending with None on shutdown. This is the seam that drives the interactive Dialogs when a console is attached.
Public API
All names are importable from induscode.channels; the link half is also under induscode.channels.link.
| Name | Kind | Source | Purpose |
|---|---|---|---|
run_oneshot |
async fn | oneshot.py |
run one request to settlement; returns the exit code |
inert_dialog |
const | oneshot.py |
headless DialogBridge: ask→fallback, tell dropped |
OneshotRequest |
dataclass | contract.py |
shape, ordered prompts, optional images |
OneshotStrategy |
dataclass | contract.py |
per-shape finish/on_start/on_signal |
OneshotShape |
type | contract.py |
Literal["text","ndjson"] |
encode_line |
fn | framer.py |
serialize one value to a separator-safe NDJSON line |
decode_lines |
async gen | framer.py |
parse a chunk stream into JSON values, split on \n |
ndjson_framer |
const | framer.py |
the canonical frozen NdjsonFramer |
NdjsonFramer |
dataclass | contract.py |
frozen (encode_line, decode_lines) pair |
ChannelContext |
dataclass | contract.py |
conductor + out + framer + dialog |
Op / OpRegistry |
dataclass / type | contract.py |
a named op; the frozen Mapping[str, Op] |
define_ops |
fn | contract.py |
freeze a record into an OpRegistry |
build_ops |
fn | ops.py |
build an OpRegistryHandle (registry + methods + dispatch) |
dispatch |
async fn | ops.py |
the lone server dispatch path |
has_op / methods_of |
fn | ops.py |
membership test / sorted method tuple |
OpRegistryHandle |
dataclass | ops.py |
a built registry plus its runtime face |
UnknownOpError |
class | ops.py |
raised on an unregistered method (-32601) |
SESSION_OPS |
const | session_ops.py |
the concrete link op registry |
project_snapshot |
fn | session_ops.py |
ConductorState → flat LinkSnapshot |
DEFAULT_THINKING |
const | session_ops.py |
"off" |
LinkSnapshot / LINK_SNAPSHOT_FIELDS |
type / const | contract.py |
the flat wire dict / its pinned field tuple |
create_link_server |
fn | link/server.py |
start a data-driven JSON-RPC server |
LinkServer / LinkServerIo |
dataclass | link/server.py |
running server / injectable transport pair |
create_link_driver |
fn | link/driver.py |
build a generated client over an injected transport |
LinkClient |
class | link/driver.py |
the __getattr__-generated client |
LinkDriverHandle / LinkDriverIo |
class / dataclass | link/driver.py |
live driver / injectable transport pair |
LinkRequestError |
class | link/driver.py |
a failed reply raised with wire code + data |
LinkedDialogBridge |
class | link/dialog.py |
externally-fed ask/tell bridge |
create_dialog_bridge |
fn | link/dialog.py |
construct a LinkedDialogBridge |
make_dialog_methods |
fn | link/dialog.py |
close DIALOG_TABLE over one bridge |
DIALOG_TABLE / DialogSpec |
const / dataclass | link/dialog.py |
the dialog table; one row as data |
DialogBridgeDeps |
dataclass | link/dialog.py |
out + framer + timings for the bridge |
DialogBridge |
class | contract.py |
the ask/tell protocol |
Ask / AskAnswer / Tell / Signal |
dataclass | contract.py |
typed wire frames |
OpError / OP_ERROR |
dataclass / const | contract.py |
a typed error; the frozen code set |
is_reply_ok |
fn | contract.py |
discriminate a reply frame's arm |
PROTOCOL_VERSION / REQUEST_ID_PREFIX |
const | contract.py |
"2.0" / "lnk-" |
mint_wire_id |
fn | contract.py |
mint a process-unique correlation id |
ChannelTimings / DEFAULT_CHANNEL_TIMINGS / resolve_timings |
dataclass / const / fn | contract.py |
millisecond timeouts + merge helper |
signal_to_wire |
fn | contract.py |
project a SessionSignal to its wire dict |
WritableLine / ReadableChunks |
type | contract.py |
the minimal injectable transport surfaces |
Design Notes
- Single declarative registry.
SESSION_OPSdrives both server dispatch (a map lookup, no switch) and the driver's method set (LinkClientis generated via__getattr__, never hand-written). An unregistered name simply rejects withunknownOp(-32601). - Concurrency. The server dispatches requests as concurrent
asynciotasks, not awaited in line, because a handler can suspend on a dialogaskwhose answer arrives as a later frame on the same reader stream; awaiting in line would deadlock. Answer frames are always delivered immediately. - Framer correctness. U+2028/U+2029 are escaped because
json.dumpsleaves them raw yet they break line splitters; eachdecode_linescall uses a fresh incremental UTF-8 decoder so a rune split across byte chunks is reassembled. - Resilience.
inert_dialogand the driver's missingon_askboth dismiss every ask (return fallback / postNone), so a headless run or disconnected driver never wedges the agent.LinkedDialogBridge.drain()and the driver's reject-all settle/reject every pending entry on shutdown;deliver()is a no-op for late/duplicate answers. - One serializer.
signal_to_wireis the single Python-specific divergence: it re-attaches thekindClassVardiscriminant that the app-widemessage_to_dictcodec does not emit. No second serializer is introduced; all field payloads still route through the one codec. - Verbatim casing & dropped fields. Wire field names/casing are pinned for cross-host compatibility;
project_snapshotomitssessionFilewhen absent and the NDJSONendframe omitsfaultwhenNone, matching howJSON.stringifydropped undefined fields.
Source Files
| File | Holds |
|---|---|
channels/contract.py |
the frozen type surface: envelope, framer shape, frames, ChannelContext, op registry, snapshot vocabulary, timings, id minting |
channels/framer.py |
the concrete NDJSON framer (encode_line / decode_lines / ndjson_framer) |
channels/ops.py |
the runtime face: build_ops, dispatch, OpRegistryHandle, UnknownOpError |
channels/session_ops.py |
the concrete SESSION_OPS registry + project_snapshot |
channels/oneshot.py |
the oneshot channel: run_oneshot, the two strategies, inert_dialog |
channels/link/server.py |
the data-driven JSON-RPC server + concurrent dispatch loop |
channels/link/driver.py |
the generated __getattr__ client + reply/signal/ask reader |
channels/link/dialog.py |
the ask/tell bridge + data-driven DIALOG_TABLE |
The runners that wire these into CLI modes live in boot/runners/oneshot_runner.py and boot/runners/link_runner.py — see Boot. For the full export map see Package Exports.
