Runtime Bridge
induscode.runtime_bridgeis the per-turn routing seam that decides where an assistant message is actually produced: by the indusagi framework's own network stream, or by an external child coding-agent (a spawnedclaude/codexCLI, or a peer agent over JSON-RPC). Reach it viafrom induscode.runtime_bridge import create_runtime_broker. It is a complete, fully-tested library layer not yet wired into the boot path — every consumer goes through the package barrel.
The framework runs a normal HTTP provider via indusagi.ai.stream_simple. The runtime bridge lets a model instead be annotated with an ExternalRuntimeSpec saying "this model is backed by a spawned CLI / a peer agent — here is how to reach and authenticate it". A RuntimeBroker resolves that annotation and either drives the owning RuntimeBridge over an injectable ChildTransport, or falls through to the framework stream. Every external wire dialect is collapsed into one provider-neutral NormalizedEvent union, which a single sink translates into the framework's AssistantMessageEventStream push shape — so the two paths are indistinguishable to the caller.
Table of Contents
- Where It Sits
- The Routing Decision
- The bridge: Synthetic Endpoint
- Normalized Events and the Sink
- Bridges and the Shared Driver
- The Injectable Child Transport
- Auth as Data
- Resume Links
- Module Map
- Public Surface
- Examples
- Parity Notes
- On the Framework
Where It Sits
The runtime bridge builds directly on the indusagi framework. It consumes framework vocabulary verbatim from indusagi.ai — Model, Context, AssistantMessage, AssistantMessageEventStream, Api, StopReason, ToolCall, SimpleStreamOptions, KnownProvider, plus the factory values stream_simple, create_assistant_message_event_stream, and create_zero_usage — and never re-derives event shapes. It writes through the framework push surface and honours the framework CancelToken (from indusagi._internal.cancel) via ExchangeOptions.signal.
Within induscode it has exactly one cross-subsystem import: bridges/builtins.py imports CatalogCard from induscode.conductor.catalog — the model catalog owns the model list, and the runtime bridge only annotates, never re-catalogues. The resume-token persistence boundary (RuntimeLinkStore) is meant to be bound by the conductor's transcript store at assembly time. The subsystem is exposed from the top-level induscode package as a lazy namespace re-export (_SUBSYSTEMS maps 'runtime_bridge').
The Routing Decision
The product never calls a bridge or the framework stream directly. It asks a RuntimeBrokerRuntime (from create_runtime_broker) to exchange(model, context, opts), and the broker picks the path. Routing is data, not control flow:
def route(self, model, context, opts) -> RuntimeRoute:
spec = self.resolve_spec(model) # decode bridge:<adapter> baseUrl
if spec is None:
return FrameworkRoute() # plain HTTP model
bridge = self._bridges.get(spec.adapter)
if bridge is None:
return FrameworkRoute() # no bridge registered for this adapter
return ExternalRoute(bridge=bridge, spec=spec)
resolve_spec(model) calls spec_from_model(), which returns a shipped ExternalRuntimeSpec only when the model's baseUrl starts with the synthetic bridge: scheme and names a built-in adapter — otherwise None. The two RuntimeRoute outcomes are:
| Route | target |
Carries | Behaviour |
|---|---|---|---|
ExternalRoute |
"external" |
bridge + spec |
The broker drives the bridge's run_exchange over a freshly-built ChildTransport. |
FrameworkRoute |
"framework" |
— | The turn falls through to the framework stream_simple unchanged. |
exchange() then acts on the decision. A framework route — or any route when no transportFactory is wired — calls the framework stream (stream_simple, or the injected frameworkStream override). An external route resolves a persisted resume token, builds the transport, attaches a best-effort resume tap, and calls bridge.run_exchange(model, context, opts, transport). Both branches return the same AssistantMessageEventStream synchronously, populated asynchronously as the turn streams.
The bridge: Synthetic Endpoint
A runtime-backed model is recognized purely from its address. Annotating a model rewrites its baseUrl to the synthetic bridge:<adapter> endpoint, so a routing consumer can spot a runtime model with baseUrl.startswith(RUNTIME_ENDPOINT_SCHEME) and recover the full spec from the shipped table — no schema change to the catalog card, which is frozen.
| Symbol | Value / Shape | Role |
|---|---|---|
RUNTIME_ENDPOINT_SCHEME |
Final = "bridge:" |
The synthetic non-HTTP scheme stamped onto the model's baseUrl. |
runtime_endpoint(adapter) |
f"bridge:{adapter}" |
The single sanctioned way to mint the convention. |
with_runtime_endpoint(model, spec) |
fresh Model |
Rewrites baseUrl via dataclasses.replace (input never mutated). |
spec_from_model(model) |
ExternalRuntimeSpec | None |
The inverse decode the broker uses to recognize a runtime model. |
annotate_card(card, adapter, spec?) |
RuntimeAnnotatedModel | None |
Annotates a CatalogCard's model; None when no spec is known. |
annotated = annotate_card(card, "claude-cli") # RuntimeAnnotatedModel | None
# annotated.model.baseUrl == runtime_endpoint("claude-cli") == "bridge:claude-cli"
assert spec_from_model(annotated.model) == annotated.spec
assert spec_from_model(plain_http_model) is None # real URL -> not a runtime model
Normalized Events and the Sink
Every external runtime speaks a different wire dialect, but the broker should not care. Each bridge is reduced to a parser that maps its vocabulary onto a small, closed, provider-neutral union, and one BridgeEventSink translates those events into the framework push shape. This is the seam that removes per-bridge stream.push* duplication.
NormalizedEvent is the union TextEvent | ThinkingEvent | ToolCallEvent | ResumeEvent | FinishEvent | FailedEvent. Each variant is a frozen dataclass with a ClassVar kind discriminant. NORMALIZED_EVENT_KINDS is the frozen tuple ("text", "thinking", "tool_call", "resume", "finish", "failed"); the sink's dispatch table and a coverage test pin against it — the Python replacement for TS mapped-type exhaustiveness.
BridgeEventSink (Protocol) is the single place the imperative start → push* → finish idiom lives. Bridges stay pure parsers that call its convenience methods:
| Method | Effect |
|---|---|
start() |
Push the framework start event (idempotent; implicit on first emission). |
text(delta) |
Append answer text (opens a text block on first call). |
thinking(delta) |
Append reasoning text (opens a thinking block on first call). |
tool_call(call) |
Emit one ToolCall as a self-contained block (start → delta(json args) → end), closing any open text/thinking block first. |
emit(event) |
Map one raw NormalizedEvent onto the matching method by kind. |
finish_success(reason="stop") |
Close any open block and push done. Terminal. |
finish_error(error) |
Push error; BridgeFailure.aborted selects aborted vs error. Terminal. |
.stream |
The AssistantMessageEventStream the broker hands back. |
create_bridge_sink(seed) builds the concrete sink from a BridgeMessageSeed (the api / provider / model triple stamped on every partial message, so a CLI turn looks like a network turn). The sink keeps a mutable builder list of ["text", buf] / ["thinking", buf] / ["toolCall", ToolCall] parts (a block's list index == its content index) and materializes a fresh frozen AssistantMessage snapshot per push — because the framework's AssistantMessage and content are frozen. It emits a zeroed usage (create_zero_usage) because a CLI owns its own token metering.
Bridges and the Shared Driver
Three RuntimeBridge singletons ship, each a RuntimeBridge Protocol implementation (.adapter id, run_exchange(...), requires_credential(spec)):
| Bridge | Adapter | Default binary / args | Wire dialect |
|---|---|---|---|
claude_cli_bridge |
claude-cli |
claude --output-format stream-json --verbose |
Anthropic stream-json NDJSON content blocks |
codex_cli_bridge |
codex-cli |
codex --json |
OpenAI --json turn/item events |
indusagi_cli_bridge |
indusagi-cli |
indusagi --rpc |
Peer JSON-RPC notifications + runExchange response |
A bridge is meant to be only a per-dialect parser. The lifecycle wiring common to all three is factored into drive_exchange() in bridges/_drive.py, which: derives the sink seed (seed_from_model), subscribes a ChildParser to transport.on_message, sends the opening ChildRequest, and settles + tears down on a terminal parser result, transport closure, a CancelToken (opts.signal) abort, or a thrown parser error.
A parser is Callable[[ChildMessage, BridgeEventSink], ParseStep]. ParseStep has two frozen singletons: CONTINUE (events emitted, stream stays open) and DONE (the payload ended the exchange — the driver finishes successfully if the sink has not already settled). The dialect parsers narrow opaque payloads with the shared helpers as_record / str_field and format a single-line error with error_text.
Dialect specifics worth knowing: claude_cli intentionally drops input_json_delta chunks (it emits the tool call from the terminal content_block_start snapshot); codex_cli prefers item-level *.delta text but falls back to whole-item text, and settles turn.completed as toolUse only if it saw a tool call; indusagi_cli's parser is stateless and treats a bare JSON-RPC result response as a terminal DONE if streaming notifications did not already finish. make_indusagi_cli_bridge(spec) builds a peer bridge bound to a spec so its delegate is forwarded in the opening runExchange call — the broker makes one per routed spec.
The Injectable Child Transport
The bridge ↔ child boundary is an injectable ChildTransport Protocol — never a hard-coded spawn. No subprocess module is imported anywhere in the package, and tests pass a fake, so no real claude / codex binary is launched.
| Member | Signature | Role |
|---|---|---|
send |
(ChildRequest) -> Awaitable[None] |
Relay one request to the child (write a prompt to stdin, issue a JSON-RPC call). |
on_message |
(listener) -> disposer |
Register a listener for inbound ChildMessages; returns an unsubscribe. |
close |
() -> Awaitable[None] |
Terminate the child; idempotent. |
ChildMessage and ChildRequest carry an opaque payload / body — whatever the underlying protocol relays (a parsed JSON line, an RPC frame, a text chunk). The broker reaches a transport only through a ChildTransportFactory = Callable[[TransportContext], ChildTransport], fed a TransportContext (the resolved spec, model, opts, and resolved resume token) so a production factory can launch and wire the child without the broker touching a process.
Auth as Data
Authentication policy is data, not control flow. RuntimeAuthMode is Literal["external-cli", "api-key"], and requires_credential(spec) answers the single question "does this model need a key on disk before being offered?":
"external-cli"→False. The spawned child owns its own login (its own keychain), so the model is offered as available with an empty credential vault. All three shipped specs areexternal-cli."api-key"→True. The runtime still resolves a key from the credential vault, same as a normal HTTP provider; only the transport differs.
The broker delegates requires_credential to the owning bridge when one is registered, and falls back to spec.authMode == "api-key" when no bridge is present — so the predicate answers strictly the runtime credential question. See auth for the disk-backed vault this policy gates.
Resume Links
An underlying CLI session can be reattached across exchanges. When a bridge surfaces a resume token (a CLI session id / thread id), the broker persists it as a renamed custom transcript entry through an injected RuntimeLinkStore:
RUNTIME_LINK_ENTRY="external-runtime-link"— the transcript tag the record is logged under.RuntimeLink={source, bridge, resumeToken, at}— the serialized payload (sourceis the model's provider slug,bridgeis the adapter id,atis the ISO instant).runtime_source_key(source, model_id, bridge)composes the"source|model_id|bridge"reuse key a stored link is matched against on a later exchange.
The resume tap (_make_resume_tap → _resume_token_of) is independent of the bridge's own parsing — both read the same messages. The tap recognizes the cross-dialect token shapes: a session_id (Anthropic system/init), a thread_id (OpenAI thread.started), and an explicit resumeToken (peer session/resume). It is permissive (a stale token just yields a fresh session) and best-effort (a persist failure never breaks a turn). The conductor's transcript store is meant to bind a real RuntimeLinkStore at assembly time; omit it and tokens simply are not persisted.
Module Map
| Module | Holds |
|---|---|
contract.py |
The frozen type seam: ExternalRuntimeSpec, the NormalizedEvent union + variants, BridgeEventSink / ChildTransport Protocols, ExchangeOptions, RuntimeBridge / RuntimeBroker Protocols, RuntimeRoute, RuntimeLink / RuntimeLinkStore, TransportContext, ChildTransportFactory, FrameworkStream. |
broker.py |
_Broker (private) + create_runtime_broker, RuntimeBrokerRuntime, RuntimeBrokerDeps, runtime_source_key, and the internal resume tap + best-effort async-settle plumbing. |
sink.py |
The one BridgeEventSink impl (BridgeSink, private) + create_bridge_sink, BridgeMessageSeed, and _EMIT_DISPATCH (with the import-time exhaustiveness assert). |
bridges/_drive.py |
The shared driver drive_exchange + parser surface (ChildParser, ParseStep, CONTINUE, DONE) and the payload/seed helpers (seed_from_model, as_record, str_field, error_text). |
bridges/builtins.py |
The shipped runtime catalog (BUILTIN_RUNTIME_SPECS, BUILTIN_ADAPTERS, builtin_runtime_spec) + the annotation helpers (annotate_card, with_runtime_endpoint, spec_from_model, RuntimeAnnotatedModel). The only cross-subsystem import: induscode.conductor.catalog.CatalogCard. |
bridges/claude_cli.py |
claude_cli_bridge: parses Anthropic stream-json content blocks. |
bridges/codex_cli.py |
codex_cli_bridge: parses OpenAI --json turn/item events. |
bridges/indusagi_cli.py |
indusagi_cli_bridge + make_indusagi_cli_bridge: parses peer JSON-RPC and forwards spec.delegate. |
Public Surface
The whole subsystem is consumed through the induscode.runtime_bridge barrel.
| Name | Kind | Source | Purpose |
|---|---|---|---|
create_runtime_broker |
function | broker.py |
Sole constructor of a RuntimeBrokerRuntime from optional RuntimeBrokerDeps. |
RuntimeBrokerRuntime |
Protocol | broker.py |
The broker the product drives: RuntimeBroker + exchange(model, context, opts). |
RuntimeBroker |
Protocol | contract.py |
The routing surface: register, route, resolve_spec, requires_credential. |
RuntimeBrokerDeps |
dataclass | broker.py |
Construction deps (all optional): transportFactory, linkStore, frameworkStream, bridges. |
runtime_source_key |
function | broker.py |
Composes the source|model_id|bridge resume reuse key. |
RuntimeBridge |
Protocol | contract.py |
One external-runtime adapter: .adapter, run_exchange(...), requires_credential(spec). |
ExternalRuntimeSpec |
dataclass | contract.py |
Frozen annotation: adapter, authMode, binaryPath?, args?, env?, delegate?. |
RuntimeAdapterId |
TypeAlias | contract.py |
str (open, so extensions can register more than the three shipped). |
RuntimeAuthMode |
TypeAlias | contract.py |
Literal["external-cli", "api-key"]. |
RUNTIME_ENDPOINT_SCHEME / runtime_endpoint |
const / function | contract.py |
The bridge: scheme and the bridge:<adapter> minter. |
NormalizedEvent |
union | contract.py |
TextEvent | ThinkingEvent | ToolCallEvent | ResumeEvent | FinishEvent | FailedEvent. |
NORMALIZED_EVENT_KINDS |
const | contract.py |
Frozen tuple of the six kind literals; pinned by the sink dispatch + a test. |
BridgeFailure |
dataclass | contract.py |
Typed fault: message, aborted, cause. |
FinishReason |
TypeAlias | contract.py |
Literal["stop", "length", "toolUse"]. |
BridgeEventSink |
Protocol | contract.py |
The single push-stream helper bridges write through. |
create_bridge_sink / BridgeMessageSeed |
function / dataclass | sink.py |
Build the concrete sink; the api/provider/model identity it stamps. |
ChildTransport / ChildMessage / ChildRequest |
Protocol / dataclass | contract.py |
The injectable child boundary and its opaque-payload envelopes. |
ChildTransportFactory / TransportContext |
TypeAlias / dataclass | contract.py |
The single seam to the outside world and the context fed to it. |
ExchangeOptions |
dataclass | contract.py |
Extends SimpleStreamOptions with cwd and resume (kw-only). |
ExternalRoute / FrameworkRoute |
dataclass | contract.py |
The two RuntimeRoute outcomes. |
RuntimeLink / RuntimeLinkStore / RUNTIME_LINK_ENTRY |
dataclass / Protocol / const | contract.py |
Resume-token persistence under the external-runtime-link tag. |
FrameworkStream |
TypeAlias | contract.py |
Callable[[Model, Context, ExchangeOptions], AssistantMessageEventStream]. |
claude_cli_bridge / codex_cli_bridge / indusagi_cli_bridge |
const | bridges/*.py |
The three shipped bridge singletons. |
make_indusagi_cli_bridge |
function | bridges/indusagi_cli.py |
Builds a peer bridge bound to a spec (forwards delegate). |
drive_exchange |
function | bridges/_drive.py |
The shared exchange-lifecycle driver. |
ChildParser / ParseStep / CONTINUE / DONE |
TypeAlias / dataclass / const | bridges/_drive.py |
The parser surface. |
seed_from_model / as_record / str_field / error_text |
function | bridges/_drive.py |
Shared parser helpers. |
BUILTIN_RUNTIME_SPECS / BUILTIN_ADAPTERS / builtin_runtime_spec |
const / function | bridges/builtins.py |
The shipped runtime catalog + by-id lookup. |
annotate_card / with_runtime_endpoint / spec_from_model / RuntimeAnnotatedModel |
function / dataclass | bridges/builtins.py |
Annotate a card with a runtime and the inverse decode. |
See Package Exports for the full barrel.
Examples
Assemble a broker with the three shipped bridges and route a turn:
from induscode.runtime_bridge import (
create_runtime_broker, RuntimeBrokerDeps, ExchangeOptions,
claude_cli_bridge, codex_cli_bridge, indusagi_cli_bridge,
)
broker = create_runtime_broker(RuntimeBrokerDeps(
bridges=(claude_cli_bridge, codex_cli_bridge, indusagi_cli_bridge),
transportFactory=my_factory, # (TransportContext) -> ChildTransport
linkStore=my_link_store, # optional RuntimeLinkStore
))
# The two paths are indistinguishable to the caller:
stream = broker.exchange(model, context, ExchangeOptions(cwd="/repo"))
# stream is an indusagi.ai AssistantMessageEventStream, populated async
Write a custom bridge as a pure parser over the shared driver:
from induscode.runtime_bridge import (
drive_exchange, ChildRequest, CONTINUE, DONE, ParseStep,
)
def parse(message, sink) -> ParseStep:
p = message.payload
if p.get("type") == "text": sink.text(p["delta"]); return CONTINUE
if p.get("type") == "done": sink.finish_success("stop"); return DONE
return CONTINUE
class MyBridge:
adapter = "my-cli"
def run_exchange(self, model, context, opts, transport):
return drive_exchange(model, opts, transport,
ChildRequest(body={"type": "turn"}), parse)
def requires_credential(self, spec): return spec.authMode == "api-key"
# drive_exchange needs a running asyncio event loop at call time
RuntimeAdapterId is an open str, so register(MyBridge()) on a broker (or passing it in RuntimeBrokerDeps.bridges) lets a model annotated with baseUrl == "bridge:my-cli" route to it — provided a matching spec is resolvable.
Parity Notes
This is a complete, fully-tested library layer. Its tests live in tests/runtime_bridge/test_runtime_bridge.py (18 tests) using a FakeTransport and a fake framework stream — no real binary is spawned anywhere. Several deliberate behaviours are worth flagging:
- Sync fast-path resume.
_Broker.exchangeonly reattaches a synchronously-available resume token.RuntimeLinkStore.findmay return an awaitable, but a coroutine result is.close()'d and never awaited — so a disk-backed store that wants reattach must answerfind()synchronously. - Frozen-snapshot sink. The framework's
AssistantMessageis frozen with a tuplecontent, so the sink keeps a mutable builder and materializes a fresh frozen snapshot per push (mirroring the framework's own accumulator), rather than mutating one shared message in place. - A running event loop is required.
drive_exchangecallsasyncio.get_running_loop(), usesloop.call_soonfor the off-stack pre-aborted settle, and tracks fire-and-forget send/close/save tasks in module-level sets (_PENDING_TASKS/_PENDING_SAVES) so they are not GC'd mid-flight. - Exhaustiveness via assert.
_EMIT_DISPATCHhas a module-import-timeassertthat its keys exactly equalNORMALIZED_EVENT_KINDS. Aresumeevent is a stream no-op (persisted out of band by the broker tap).
For the broader TS-to-Python translation record, see parity; for the test harness see testing.
On the Framework
Everything below the routing seam belongs to the indusagi framework. The network fall-through is the framework runtime, reached through the LLM gateway via stream_simple; the message/event types, the push stream, and the cancel token are all framework surfaces consumed verbatim from indusagi.ai. The runtime bridge adds exactly one thing on top: a typed choice, per turn, between that network path and an external child runtime — with both paths returning the same AssistantMessageEventStream.
