Subsystemssubsystems/interop

Interop (MCP Bridge)

indusagi.interop is the bidirectional Model Context Protocol bridge, built on the official mcp Python SDK. As a client it connects to external MCP servers and grafts their tools into the kernel; as a provider host it publishes the agent's own tools to outside MCP clients. Imported as import indusagi.interop (one barrel re-exporting the whole protocol_bridge sub-package).

The interop layer is indusagi's seam to the wider MCP ecosystem, and it works in both directions over the same ToolRegistry/ToolBox abstractions the Capabilities kernel uses for local tools. As a client, it dials external MCP servers (stdio subprocess or SSE/HTTP), enumerates their tools, normalizes their JSON schemas, and registers each remote tool as a kernel tool the agent can call like any built-in. As a provider host, it stands up an SDK server that answers tools/list and tools/call against a Runtime ToolBox. Conceptually it is the inverse of the kernel: capabilities defines local tools; interop grafts remote tools in and publishes local tools out.

Table of Contents

Public exports

From indusagi/interop/__init__.py, which re-exports the entire protocol_bridge barrel:

Name Kind Source Purpose
mount_protocol_bridge async function protocol_bridge/bridge.py The join point: start a fleet, list each ready endpoint's tools, wrap each as a kernel tool, return a MountedProtocolBridge (ToolBox + live fleet)
MountedProtocolBridge dataclass protocol_bridge/bridge.py The mount return value: frozen box (ToolBox) plus fleet (ServerFleet)
create_server_endpoint function protocol_bridge/endpoint.py Factory for a ServerEndpointImpl over one ServerConfig; accepts an optional session_factory test seam
ServerEndpointImpl class protocol_bridge/endpoint.py The one concrete client endpoint: a single live connection over the SDK ClientSession, driving the phase machine
start_server_fleet async function protocol_bridge/fleet.py Construct and start() a ServerFleetImpl from a BridgeConfig, returning it after every connection has settled
ServerFleetImpl class protocol_bridge/fleet.py The one concrete fleet: mints one endpoint per ServerConfig, connects them in parallel with per-server failure isolation
create_provider_host function protocol_bridge/host.py Provider-host factory: stand up an SDK low-level Server over a ToolBox, eagerly registering tools/list and tools/call
ProviderHost type protocol_bridge/host.py Protocol for the agent-as-MCP-server: a single connect(transport) that binds and serves
ProviderHostInfo dataclass protocol_bridge/host.py Frozen identity (name, version) the host advertises to connecting clients
normalize_schema function protocol_bridge/schema.py Reshape an untrusted remote inputSchema into a draft-agnostic subset every provider accepts
ProtocolFault class protocol_bridge/contract.py The normalized, discriminable exception raised everywhere in the bridge; class on .kind, cause on __cause__
protocol_fault function protocol_bridge/contract.py Terse constructor protocol_fault(kind, message, cause)
is_protocol_fault function protocol_bridge/contract.py TypeGuard narrowing a value to ProtocolFault
is_usable_phase function protocol_bridge/contract.py True only when EndpointPhase == READY (can accept tool calls)
is_terminal_phase function protocol_bridge/contract.py True when phase is CLOSED or FAULTED
qualify_tool_name function protocol_bridge/contract.py Build the agent-facing "<server>__<tool>" from a RemoteToolRef
QUALIFIER const protocol_bridge/contract.py The "__" delimiter joining server id and tool name
ServerEndpoint type protocol_bridge/contract.py Protocol for a live single-server connection: config/phase plus open/list_tools/invoke/status/close
ServerFleet type protocol_bridge/contract.py Protocol for a name-keyed endpoint collection: spin_up/tear_down/endpoint/endpoints/status
EndpointPhase enum protocol_bridge/contract.py StrEnum lifecycle: IDLE, CONNECTING, READY, CLOSING, CLOSED, FAULTED
FaultKind type protocol_bridge/contract.py Literal union: transport, protocol, timeout, tool_error, not_connected, spawn_failed
StdioServerConfig dataclass protocol_bridge/contract.py Frozen stdio config: id, command, args, env; kind ClassVar "stdio"
SseServerConfig dataclass protocol_bridge/contract.py Frozen SSE/HTTP config: id, url, headers; kind ClassVar "sse"
ServerConfig type protocol_bridge/contract.py Discriminated union StdioServerConfig | SseServerConfig
TransportKind type protocol_bridge/contract.py Literal["stdio", "sse"]
BridgeConfig dataclass protocol_bridge/contract.py Frozen wrapper holding the tuple of ServerConfigs to connect on start
RemoteToolRef dataclass protocol_bridge/contract.py Frozen (server, name) pair identifying one tool on one server
RemoteTool dataclass protocol_bridge/contract.py Frozen pairing of a RemoteToolRef with the model-facing ToolDescriptor
RemoteCallResult dataclass protocol_bridge/contract.py Frozen raw invocation result: content (MCP blocks) plus is_error
EndpointStatus dataclass protocol_bridge/contract.py Frozen point-in-time health: server, phase, tool_count, optional fault
FleetStatus type protocol_bridge/contract.py Mapping[str, EndpointStatus] keyed by server id
MountedBridge dataclass protocol_bridge/contract.py The contract's mount outcome: fleet, tool_box, tool_count

Note on MountedProtocolBridge. The name appears twice. In contract.py it is a TypeAlias for MountedBridge (fleet/tool_box/tool_count). The barrel re-exports the bridge.py dataclass — a separate frozen type with fields box and fleet — and that is what mount_protocol_bridge actually returns.

Sub-directories

Directory Holds
protocol_bridge/ The entire bridge: contract.py (frozen vocabulary), endpoint.py (single-server client connection), fleet.py (parallel multi-server lifecycle), bridge.py (graft remote tools into the kernel), host.py (agent-as-MCP-server provider), schema.py (JSON-Schema normalizer), and the re-export barrel

contract.py is the frozen vocabulary and declares no behavior; every behavioral module imports its nouns from there, giving a single source of truth. The behavioral modules are endpoint.py, fleet.py, host.py, bridge.py, and the standalone schema.py. Both __init__.py barrels only re-export.

Key concepts

Qualified tool name. Remote tool names are unique only within their own server, so each is grafted under "<server_id>__<tool_name>" (the QUALIFIER is "__", kept a legal identifier across providers). The wrapper advertises the qualified name to the model but routes invocations back under the unqualified server-side name (RemoteToolRef.name).

EndpointPhase machine. An endpoint advertises an explicit StrEnum lifecycle — idle → connecting → ready → closing → closed, with any failure short-circuiting to faulted — instead of scattered boolean flags. is_usable_phase (== READY) gates tool calls; is_terminal_phase is CLOSED | FAULTED.

Discriminated fault model. Every bridge error is a single ProtocolFault exception discriminated by .kind (a FaultKind literal), with the original triggering value on the native __cause__ chain. Callers branch on a cause class rather than parsing message text. asyncio.CancelledError is never wrapped and never swallowed.

Failure isolation. The fleet connects all endpoints in parallel via asyncio.gather(..., return_exceptions=True); a server that fails to spawn or handshake just stays faulted and contributes no tools, while healthy peers serve. mount_protocol_bridge skips non-ready endpoints.

Owner-task pattern. The SDK's anyio transports (stdio_client/sse_client) and ClientSession are async context managers whose cancel scopes must be entered and exited by the same task. Each endpoint therefore runs that async with chain inside a dedicated background owner task that reports readiness through an asyncio.Future and parks on a close Event. Concurrent open() callers share one single-flight attempt, shielded so one caller's cancellation cannot kill it.

Session-factory seam. create_server_endpoint accepts an optional zero-arg session_factory producing an async context manager that yields an already connected ClientSession. In-process callers and tests use it to bypass real process spawning (for example, with mcp.shared.memory.create_connected_server_and_client_session); when present, the config-derived transport is bypassed entirely.

Mounting external servers

mount_protocol_bridge is the high-level entry point. It starts a ServerFleetImpl from a BridgeConfig, iterates the endpoints, skips any whose phase fails is_usable_phase, lists the rest's tools, wraps each as a kernel DefinedTool (routing run() back to the owning endpoint under the unqualified name), registers them in a fresh ToolRegistry, and returns a MountedProtocolBridge carrying the boxed tools and the live fleet.

import asyncio
from indusagi.interop import (
    BridgeConfig,
    StdioServerConfig,
    SseServerConfig,
    mount_protocol_bridge,
)


async def main() -> None:
    config = BridgeConfig(
        servers=(
            StdioServerConfig(
                id="fs",
                command="npx",
                args=("-y", "@modelcontextprotocol/server-filesystem", "/tmp"),
            ),
            SseServerConfig(
                id="remote",
                url="https://example.com/mcp",
                headers={"authorization": "Bearer ..."},
            ),
        )
    )
    mounted = await mount_protocol_bridge(config)

    # `mounted` is the bridge.py dataclass: `.box` (ToolBox) and `.fleet`.
    for descriptor in mounted.box.descriptors():
        print(descriptor.name)  # e.g. 'fs__read_file', 'remote__search'

    # Inspect which servers came up ready vs. faulted.
    for server_id, status in mounted.fleet.status().items():
        print(server_id, status.phase, status.tool_count, status.fault)

    await mounted.fleet.tear_down()


asyncio.run(main())

Each remote tool's inputSchema is run through normalize_schema before it is shown to the model, and each RemoteCallResult is projected onto a kernel ToolResult — text blocks become TextContentBlock, everything else becomes JsonContentBlock, and is_error is carried straight through. Because remote tools do no local I/O, the registry is boxed against an inert ToolContext whose filesystem and shell backends raise only if a tool incorrectly reaches for them.

Driving a single endpoint

For finer control, drive a ServerEndpoint directly. It owns one transport and advances through the phase machine; invoke takes the unqualified server-side tool name and returns a RemoteCallResult.

import asyncio
from indusagi.interop import (
    create_server_endpoint,
    StdioServerConfig,
    EndpointPhase,
    is_usable_phase,
    is_protocol_fault,
)


async def main() -> None:
    endpoint = create_server_endpoint(
        StdioServerConfig(id="echo", command="python", args=("-m", "my_mcp_server"))
    )
    assert endpoint.phase == EndpointPhase.IDLE
    try:
        await endpoint.open()  # idle -> connecting -> ready
        if is_usable_phase(endpoint.phase):
            for tool in await endpoint.list_tools():
                print(tool.descriptor.name, "->", tool.ref.name)
            result = await endpoint.invoke("echo", {"text": "hi"})  # unqualified name
            print(result.is_error, result.content)
    except Exception as exc:
        if is_protocol_fault(exc):
            print("bridge fault:", exc.kind, exc, "cause=", exc.cause)
        raise
    finally:
        await endpoint.close()  # -> closing -> closed


asyncio.run(main())

A ServerFleetImpl is the multi-server form: start(config) (or the spin_up() vocabulary alias) mints one endpoint per ServerConfig and connects them concurrently; endpoints() and status() return fresh, immutable snapshots in configuration order; tear_down() closes everything in parallel.

The provider host

The host is the mirror image: it makes the agent an MCP server. create_provider_host builds an SDK low-level Server over a ToolBox and eagerly registers two handlers — tools/list maps box.descriptors() to MCP Tools (each descriptor's parameters becomes the client-facing inputSchema) and tools/call (with validate_input=False) routes a ToolCall into box.runner.run with a fresh, never-cancelling CancelToken, then renders the ToolOutcome as a CallToolResult.

connect(transport) takes the SDK (read_stream, write_stream) stream pair, spawns a background task running the blocking server.run, and yields once so the loop starts before the caller proceeds.

import asyncio
from mcp.shared.memory import create_client_server_memory_streams
from indusagi.interop import create_provider_host, ProviderHostInfo


async def main(box) -> None:  # box: a runtime ToolBox from the capabilities layer
    host = create_provider_host(box, ProviderHostInfo(name="my-agent", version="1.0"))
    async with create_client_server_memory_streams() as (client_streams, server_streams):
        await host.connect(server_streams)  # serves tools/list + tools/call in background
        # An MCP client on `client_streams` can now enumerate and call our tools.


# asyncio.run(main(box))

The transport the host accepts is the SDK's stream pair — exactly what mcp.server.stdio.stdio_server() or one side of create_client_server_memory_streams() yields. When info is omitted the host advertises a generic default identity.

Schema normalization

normalize_schema is a pure, table-driven, total function that reshapes an untrusted remote inputSchema into the small, draft-agnostic subset every provider accepts. It reads the declared type and dispatches per type (object/array/string/number/integer/boolean/null), falling back to a closed {"type": "object", "properties": {}} when the type is absent or unrecognized. It strips unsupported keywords ($schema, $ref, $defs, default, examples, and similar), recurses into properties/items/ additionalProperties, collapses tuple-form array items to the first element, and filters required to keys that actually exist.

from indusagi.interop import normalize_schema

raw = {
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "type": "object",
    "properties": {
        "path": {"type": "string", "default": "/tmp"},
        "tags": {"type": "array", "items": [{"type": "string"}, {"type": "number"}]},
    },
    "required": ["path", "missing"],
}
clean = normalize_schema(raw)
# `$schema` and `default` stripped; `required` filtered to declared keys (['path']);
# tuple-form `items` collapsed to the first element ({'type': 'string'}).
print(clean)

The fault model

Every failure surfaces as a ProtocolFault, and the fault's .kind is site-determined:

Site FaultKind
Opening a stdio transport / handshake spawn_failed
Opening an SSE transport / handshake transport
list_tools failure protocol
invoke failure tool_error
Operating on a non-READY endpoint not_connected

The timeout kind is reserved in the literal union. Branch on the kind, and read the underlying cause from the native exception chain:

from indusagi.interop import is_protocol_fault

try:
    ...
except Exception as exc:
    if is_protocol_fault(exc):
        if exc.kind == "spawn_failed":
            ...  # the subprocess could not be started
        elif exc.kind == "tool_error":
            ...  # the remote tool ran and reported its own failure
        raise exc from exc.__cause__
    raise

Relationship to neighbors

Interop depends inward on several sibling subsystems. It pulls JsonSchema and ToolDescriptor from the LLM Gateway contract; ToolBox, ToolCall, and ToolOutcome from the Runtime contract; and ToolRegistry, DefinedTool, define_tool, ToolSpec, ToolContext, ToolResult, the content-block types, Fs, Shell, and OutputBudget from the Capabilities kernel. Externally it sits on top of the official mcp SDK (ClientSession, StdioServerParameters, stdio_client, sse_client, mcp.types, mcp.server.lowlevel.Server) and anyio memory streams.

It is consumed by the MCP facade, whose MCPClient rides one interop ServerEndpoint per connection and whose MCPServer wraps the provider host, and by the Shell App boot wiring, which mounts the configured external MCP servers into the running agent. See the Architecture overview for where the layer sits.