Subsystemssubsystems/runtime-bridge

Runtime Bridge

induscode.runtime_bridge is the per-turn routing seam that decides where an assistant message is actually produced: by the indusagi framework's own network stream, or by an external child coding-agent (a spawned claude / codex CLI, or a peer agent over JSON-RPC). Reach it via from induscode.runtime_bridge import create_runtime_broker. It is a complete, fully-tested library layer not yet wired into the boot path — every consumer goes through the package barrel.

The framework runs a normal HTTP provider via indusagi.ai.stream_simple. The runtime bridge lets a model instead be annotated with an ExternalRuntimeSpec saying "this model is backed by a spawned CLI / a peer agent — here is how to reach and authenticate it". A RuntimeBroker resolves that annotation and either drives the owning RuntimeBridge over an injectable ChildTransport, or falls through to the framework stream. Every external wire dialect is collapsed into one provider-neutral NormalizedEvent union, which a single sink translates into the framework's AssistantMessageEventStream push shape — so the two paths are indistinguishable to the caller.

Table of Contents

Where It Sits

The runtime bridge builds directly on the indusagi framework. It consumes framework vocabulary verbatim from indusagi.aiModel, Context, AssistantMessage, AssistantMessageEventStream, Api, StopReason, ToolCall, SimpleStreamOptions, KnownProvider, plus the factory values stream_simple, create_assistant_message_event_stream, and create_zero_usage — and never re-derives event shapes. It writes through the framework push surface and honours the framework CancelToken (from indusagi._internal.cancel) via ExchangeOptions.signal.

Within induscode it has exactly one cross-subsystem import: bridges/builtins.py imports CatalogCard from induscode.conductor.catalog — the model catalog owns the model list, and the runtime bridge only annotates, never re-catalogues. The resume-token persistence boundary (RuntimeLinkStore) is meant to be bound by the conductor's transcript store at assembly time. The subsystem is exposed from the top-level induscode package as a lazy namespace re-export (_SUBSYSTEMS maps 'runtime_bridge').

The Routing Decision

The product never calls a bridge or the framework stream directly. It asks a RuntimeBrokerRuntime (from create_runtime_broker) to exchange(model, context, opts), and the broker picks the path. Routing is data, not control flow:

def route(self, model, context, opts) -> RuntimeRoute:
    spec = self.resolve_spec(model)          # decode bridge:<adapter> baseUrl
    if spec is None:
        return FrameworkRoute()              # plain HTTP model
    bridge = self._bridges.get(spec.adapter)
    if bridge is None:
        return FrameworkRoute()              # no bridge registered for this adapter
    return ExternalRoute(bridge=bridge, spec=spec)

resolve_spec(model) calls spec_from_model(), which returns a shipped ExternalRuntimeSpec only when the model's baseUrl starts with the synthetic bridge: scheme and names a built-in adapter — otherwise None. The two RuntimeRoute outcomes are:

Route target Carries Behaviour
ExternalRoute "external" bridge + spec The broker drives the bridge's run_exchange over a freshly-built ChildTransport.
FrameworkRoute "framework" The turn falls through to the framework stream_simple unchanged.

exchange() then acts on the decision. A framework route — or any route when no transportFactory is wired — calls the framework stream (stream_simple, or the injected frameworkStream override). An external route resolves a persisted resume token, builds the transport, attaches a best-effort resume tap, and calls bridge.run_exchange(model, context, opts, transport). Both branches return the same AssistantMessageEventStream synchronously, populated asynchronously as the turn streams.

The bridge: Synthetic Endpoint

A runtime-backed model is recognized purely from its address. Annotating a model rewrites its baseUrl to the synthetic bridge:<adapter> endpoint, so a routing consumer can spot a runtime model with baseUrl.startswith(RUNTIME_ENDPOINT_SCHEME) and recover the full spec from the shipped table — no schema change to the catalog card, which is frozen.

Symbol Value / Shape Role
RUNTIME_ENDPOINT_SCHEME Final = "bridge:" The synthetic non-HTTP scheme stamped onto the model's baseUrl.
runtime_endpoint(adapter) f"bridge:{adapter}" The single sanctioned way to mint the convention.
with_runtime_endpoint(model, spec) fresh Model Rewrites baseUrl via dataclasses.replace (input never mutated).
spec_from_model(model) ExternalRuntimeSpec | None The inverse decode the broker uses to recognize a runtime model.
annotate_card(card, adapter, spec?) RuntimeAnnotatedModel | None Annotates a CatalogCard's model; None when no spec is known.
annotated = annotate_card(card, "claude-cli")          # RuntimeAnnotatedModel | None
# annotated.model.baseUrl == runtime_endpoint("claude-cli") == "bridge:claude-cli"
assert spec_from_model(annotated.model) == annotated.spec
assert spec_from_model(plain_http_model) is None        # real URL -> not a runtime model

Normalized Events and the Sink

Every external runtime speaks a different wire dialect, but the broker should not care. Each bridge is reduced to a parser that maps its vocabulary onto a small, closed, provider-neutral union, and one BridgeEventSink translates those events into the framework push shape. This is the seam that removes per-bridge stream.push* duplication.

NormalizedEvent is the union TextEvent | ThinkingEvent | ToolCallEvent | ResumeEvent | FinishEvent | FailedEvent. Each variant is a frozen dataclass with a ClassVar kind discriminant. NORMALIZED_EVENT_KINDS is the frozen tuple ("text", "thinking", "tool_call", "resume", "finish", "failed"); the sink's dispatch table and a coverage test pin against it — the Python replacement for TS mapped-type exhaustiveness.

BridgeEventSink (Protocol) is the single place the imperative start → push* → finish idiom lives. Bridges stay pure parsers that call its convenience methods:

Method Effect
start() Push the framework start event (idempotent; implicit on first emission).
text(delta) Append answer text (opens a text block on first call).
thinking(delta) Append reasoning text (opens a thinking block on first call).
tool_call(call) Emit one ToolCall as a self-contained block (start → delta(json args) → end), closing any open text/thinking block first.
emit(event) Map one raw NormalizedEvent onto the matching method by kind.
finish_success(reason="stop") Close any open block and push done. Terminal.
finish_error(error) Push error; BridgeFailure.aborted selects aborted vs error. Terminal.
.stream The AssistantMessageEventStream the broker hands back.

create_bridge_sink(seed) builds the concrete sink from a BridgeMessageSeed (the api / provider / model triple stamped on every partial message, so a CLI turn looks like a network turn). The sink keeps a mutable builder list of ["text", buf] / ["thinking", buf] / ["toolCall", ToolCall] parts (a block's list index == its content index) and materializes a fresh frozen AssistantMessage snapshot per push — because the framework's AssistantMessage and content are frozen. It emits a zeroed usage (create_zero_usage) because a CLI owns its own token metering.

Bridges and the Shared Driver

Three RuntimeBridge singletons ship, each a RuntimeBridge Protocol implementation (.adapter id, run_exchange(...), requires_credential(spec)):

Bridge Adapter Default binary / args Wire dialect
claude_cli_bridge claude-cli claude --output-format stream-json --verbose Anthropic stream-json NDJSON content blocks
codex_cli_bridge codex-cli codex --json OpenAI --json turn/item events
indusagi_cli_bridge indusagi-cli indusagi --rpc Peer JSON-RPC notifications + runExchange response

A bridge is meant to be only a per-dialect parser. The lifecycle wiring common to all three is factored into drive_exchange() in bridges/_drive.py, which: derives the sink seed (seed_from_model), subscribes a ChildParser to transport.on_message, sends the opening ChildRequest, and settles + tears down on a terminal parser result, transport closure, a CancelToken (opts.signal) abort, or a thrown parser error.

A parser is Callable[[ChildMessage, BridgeEventSink], ParseStep]. ParseStep has two frozen singletons: CONTINUE (events emitted, stream stays open) and DONE (the payload ended the exchange — the driver finishes successfully if the sink has not already settled). The dialect parsers narrow opaque payloads with the shared helpers as_record / str_field and format a single-line error with error_text.

Dialect specifics worth knowing: claude_cli intentionally drops input_json_delta chunks (it emits the tool call from the terminal content_block_start snapshot); codex_cli prefers item-level *.delta text but falls back to whole-item text, and settles turn.completed as toolUse only if it saw a tool call; indusagi_cli's parser is stateless and treats a bare JSON-RPC result response as a terminal DONE if streaming notifications did not already finish. make_indusagi_cli_bridge(spec) builds a peer bridge bound to a spec so its delegate is forwarded in the opening runExchange call — the broker makes one per routed spec.

The Injectable Child Transport

The bridge ↔ child boundary is an injectable ChildTransport Protocol — never a hard-coded spawn. No subprocess module is imported anywhere in the package, and tests pass a fake, so no real claude / codex binary is launched.

Member Signature Role
send (ChildRequest) -> Awaitable[None] Relay one request to the child (write a prompt to stdin, issue a JSON-RPC call).
on_message (listener) -> disposer Register a listener for inbound ChildMessages; returns an unsubscribe.
close () -> Awaitable[None] Terminate the child; idempotent.

ChildMessage and ChildRequest carry an opaque payload / body — whatever the underlying protocol relays (a parsed JSON line, an RPC frame, a text chunk). The broker reaches a transport only through a ChildTransportFactory = Callable[[TransportContext], ChildTransport], fed a TransportContext (the resolved spec, model, opts, and resolved resume token) so a production factory can launch and wire the child without the broker touching a process.

Auth as Data

Authentication policy is data, not control flow. RuntimeAuthMode is Literal["external-cli", "api-key"], and requires_credential(spec) answers the single question "does this model need a key on disk before being offered?":

  • "external-cli"False. The spawned child owns its own login (its own keychain), so the model is offered as available with an empty credential vault. All three shipped specs are external-cli.
  • "api-key"True. The runtime still resolves a key from the credential vault, same as a normal HTTP provider; only the transport differs.

The broker delegates requires_credential to the owning bridge when one is registered, and falls back to spec.authMode == "api-key" when no bridge is present — so the predicate answers strictly the runtime credential question. See auth for the disk-backed vault this policy gates.

An underlying CLI session can be reattached across exchanges. When a bridge surfaces a resume token (a CLI session id / thread id), the broker persists it as a renamed custom transcript entry through an injected RuntimeLinkStore:

  • RUNTIME_LINK_ENTRY = "external-runtime-link" — the transcript tag the record is logged under.
  • RuntimeLink = {source, bridge, resumeToken, at} — the serialized payload (source is the model's provider slug, bridge is the adapter id, at is the ISO instant).
  • runtime_source_key(source, model_id, bridge) composes the "source|model_id|bridge" reuse key a stored link is matched against on a later exchange.

The resume tap (_make_resume_tap_resume_token_of) is independent of the bridge's own parsing — both read the same messages. The tap recognizes the cross-dialect token shapes: a session_id (Anthropic system/init), a thread_id (OpenAI thread.started), and an explicit resumeToken (peer session/resume). It is permissive (a stale token just yields a fresh session) and best-effort (a persist failure never breaks a turn). The conductor's transcript store is meant to bind a real RuntimeLinkStore at assembly time; omit it and tokens simply are not persisted.

Module Map

Module Holds
contract.py The frozen type seam: ExternalRuntimeSpec, the NormalizedEvent union + variants, BridgeEventSink / ChildTransport Protocols, ExchangeOptions, RuntimeBridge / RuntimeBroker Protocols, RuntimeRoute, RuntimeLink / RuntimeLinkStore, TransportContext, ChildTransportFactory, FrameworkStream.
broker.py _Broker (private) + create_runtime_broker, RuntimeBrokerRuntime, RuntimeBrokerDeps, runtime_source_key, and the internal resume tap + best-effort async-settle plumbing.
sink.py The one BridgeEventSink impl (BridgeSink, private) + create_bridge_sink, BridgeMessageSeed, and _EMIT_DISPATCH (with the import-time exhaustiveness assert).
bridges/_drive.py The shared driver drive_exchange + parser surface (ChildParser, ParseStep, CONTINUE, DONE) and the payload/seed helpers (seed_from_model, as_record, str_field, error_text).
bridges/builtins.py The shipped runtime catalog (BUILTIN_RUNTIME_SPECS, BUILTIN_ADAPTERS, builtin_runtime_spec) + the annotation helpers (annotate_card, with_runtime_endpoint, spec_from_model, RuntimeAnnotatedModel). The only cross-subsystem import: induscode.conductor.catalog.CatalogCard.
bridges/claude_cli.py claude_cli_bridge: parses Anthropic stream-json content blocks.
bridges/codex_cli.py codex_cli_bridge: parses OpenAI --json turn/item events.
bridges/indusagi_cli.py indusagi_cli_bridge + make_indusagi_cli_bridge: parses peer JSON-RPC and forwards spec.delegate.

Public Surface

The whole subsystem is consumed through the induscode.runtime_bridge barrel.

Name Kind Source Purpose
create_runtime_broker function broker.py Sole constructor of a RuntimeBrokerRuntime from optional RuntimeBrokerDeps.
RuntimeBrokerRuntime Protocol broker.py The broker the product drives: RuntimeBroker + exchange(model, context, opts).
RuntimeBroker Protocol contract.py The routing surface: register, route, resolve_spec, requires_credential.
RuntimeBrokerDeps dataclass broker.py Construction deps (all optional): transportFactory, linkStore, frameworkStream, bridges.
runtime_source_key function broker.py Composes the source|model_id|bridge resume reuse key.
RuntimeBridge Protocol contract.py One external-runtime adapter: .adapter, run_exchange(...), requires_credential(spec).
ExternalRuntimeSpec dataclass contract.py Frozen annotation: adapter, authMode, binaryPath?, args?, env?, delegate?.
RuntimeAdapterId TypeAlias contract.py str (open, so extensions can register more than the three shipped).
RuntimeAuthMode TypeAlias contract.py Literal["external-cli", "api-key"].
RUNTIME_ENDPOINT_SCHEME / runtime_endpoint const / function contract.py The bridge: scheme and the bridge:<adapter> minter.
NormalizedEvent union contract.py TextEvent | ThinkingEvent | ToolCallEvent | ResumeEvent | FinishEvent | FailedEvent.
NORMALIZED_EVENT_KINDS const contract.py Frozen tuple of the six kind literals; pinned by the sink dispatch + a test.
BridgeFailure dataclass contract.py Typed fault: message, aborted, cause.
FinishReason TypeAlias contract.py Literal["stop", "length", "toolUse"].
BridgeEventSink Protocol contract.py The single push-stream helper bridges write through.
create_bridge_sink / BridgeMessageSeed function / dataclass sink.py Build the concrete sink; the api/provider/model identity it stamps.
ChildTransport / ChildMessage / ChildRequest Protocol / dataclass contract.py The injectable child boundary and its opaque-payload envelopes.
ChildTransportFactory / TransportContext TypeAlias / dataclass contract.py The single seam to the outside world and the context fed to it.
ExchangeOptions dataclass contract.py Extends SimpleStreamOptions with cwd and resume (kw-only).
ExternalRoute / FrameworkRoute dataclass contract.py The two RuntimeRoute outcomes.
RuntimeLink / RuntimeLinkStore / RUNTIME_LINK_ENTRY dataclass / Protocol / const contract.py Resume-token persistence under the external-runtime-link tag.
FrameworkStream TypeAlias contract.py Callable[[Model, Context, ExchangeOptions], AssistantMessageEventStream].
claude_cli_bridge / codex_cli_bridge / indusagi_cli_bridge const bridges/*.py The three shipped bridge singletons.
make_indusagi_cli_bridge function bridges/indusagi_cli.py Builds a peer bridge bound to a spec (forwards delegate).
drive_exchange function bridges/_drive.py The shared exchange-lifecycle driver.
ChildParser / ParseStep / CONTINUE / DONE TypeAlias / dataclass / const bridges/_drive.py The parser surface.
seed_from_model / as_record / str_field / error_text function bridges/_drive.py Shared parser helpers.
BUILTIN_RUNTIME_SPECS / BUILTIN_ADAPTERS / builtin_runtime_spec const / function bridges/builtins.py The shipped runtime catalog + by-id lookup.
annotate_card / with_runtime_endpoint / spec_from_model / RuntimeAnnotatedModel function / dataclass bridges/builtins.py Annotate a card with a runtime and the inverse decode.

See Package Exports for the full barrel.

Examples

Assemble a broker with the three shipped bridges and route a turn:

from induscode.runtime_bridge import (
    create_runtime_broker, RuntimeBrokerDeps, ExchangeOptions,
    claude_cli_bridge, codex_cli_bridge, indusagi_cli_bridge,
)

broker = create_runtime_broker(RuntimeBrokerDeps(
    bridges=(claude_cli_bridge, codex_cli_bridge, indusagi_cli_bridge),
    transportFactory=my_factory,   # (TransportContext) -> ChildTransport
    linkStore=my_link_store,       # optional RuntimeLinkStore
))

# The two paths are indistinguishable to the caller:
stream = broker.exchange(model, context, ExchangeOptions(cwd="/repo"))
# stream is an indusagi.ai AssistantMessageEventStream, populated async

Write a custom bridge as a pure parser over the shared driver:

from induscode.runtime_bridge import (
    drive_exchange, ChildRequest, CONTINUE, DONE, ParseStep,
)

def parse(message, sink) -> ParseStep:
    p = message.payload
    if p.get("type") == "text":  sink.text(p["delta"]);     return CONTINUE
    if p.get("type") == "done":  sink.finish_success("stop"); return DONE
    return CONTINUE

class MyBridge:
    adapter = "my-cli"
    def run_exchange(self, model, context, opts, transport):
        return drive_exchange(model, opts, transport,
                              ChildRequest(body={"type": "turn"}), parse)
    def requires_credential(self, spec): return spec.authMode == "api-key"
# drive_exchange needs a running asyncio event loop at call time

RuntimeAdapterId is an open str, so register(MyBridge()) on a broker (or passing it in RuntimeBrokerDeps.bridges) lets a model annotated with baseUrl == "bridge:my-cli" route to it — provided a matching spec is resolvable.

Parity Notes

This is a complete, fully-tested library layer. Its tests live in tests/runtime_bridge/test_runtime_bridge.py (18 tests) using a FakeTransport and a fake framework stream — no real binary is spawned anywhere. Several deliberate behaviours are worth flagging:

  • Sync fast-path resume. _Broker.exchange only reattaches a synchronously-available resume token. RuntimeLinkStore.find may return an awaitable, but a coroutine result is .close()'d and never awaited — so a disk-backed store that wants reattach must answer find() synchronously.
  • Frozen-snapshot sink. The framework's AssistantMessage is frozen with a tuple content, so the sink keeps a mutable builder and materializes a fresh frozen snapshot per push (mirroring the framework's own accumulator), rather than mutating one shared message in place.
  • A running event loop is required. drive_exchange calls asyncio.get_running_loop(), uses loop.call_soon for the off-stack pre-aborted settle, and tracks fire-and-forget send/close/save tasks in module-level sets (_PENDING_TASKS / _PENDING_SAVES) so they are not GC'd mid-flight.
  • Exhaustiveness via assert. _EMIT_DISPATCH has a module-import-time assert that its keys exactly equal NORMALIZED_EVENT_KINDS. A resume event is a stream no-op (persisted out of band by the broker tap).

For the broader TS-to-Python translation record, see parity; for the test harness see testing.

On the Framework

Everything below the routing seam belongs to the indusagi framework. The network fall-through is the framework runtime, reached through the LLM gateway via stream_simple; the message/event types, the push stream, and the cancel token are all framework surfaces consumed verbatim from indusagi.ai. The runtime bridge adds exactly one thing on top: a typed choice, per turn, between that network path and an external child runtime — with both paths returning the same AssistantMessageEventStream.