Interop (MCP Bridge)
indusagi.interopis the bidirectional Model Context Protocol bridge, built on the officialmcpPython SDK. As a client it connects to external MCP servers and grafts their tools into the kernel; as a provider host it publishes the agent's own tools to outside MCP clients. Imported asimport indusagi.interop(one barrel re-exporting the wholeprotocol_bridgesub-package).
The interop layer is indusagi's seam to the wider MCP ecosystem, and it works in
both directions over the same ToolRegistry/ToolBox abstractions the
Capabilities kernel uses for local tools.
As a client, it dials external MCP servers (stdio subprocess or SSE/HTTP),
enumerates their tools, normalizes their JSON schemas, and registers each remote
tool as a kernel tool the agent can call like any built-in. As a provider host,
it stands up an SDK server that answers tools/list and tools/call against a
Runtime ToolBox. Conceptually it is the inverse
of the kernel: capabilities defines local tools; interop grafts remote tools in
and publishes local tools out.
Table of Contents
- Public exports
- Sub-directories
- Key concepts
- Mounting external servers
- Driving a single endpoint
- The provider host
- Schema normalization
- The fault model
- Relationship to neighbors
Public exports
From indusagi/interop/__init__.py, which re-exports the entire
protocol_bridge barrel:
| Name | Kind | Source | Purpose |
|---|---|---|---|
mount_protocol_bridge |
async function | protocol_bridge/bridge.py |
The join point: start a fleet, list each ready endpoint's tools, wrap each as a kernel tool, return a MountedProtocolBridge (ToolBox + live fleet) |
MountedProtocolBridge |
dataclass | protocol_bridge/bridge.py |
The mount return value: frozen box (ToolBox) plus fleet (ServerFleet) |
create_server_endpoint |
function | protocol_bridge/endpoint.py |
Factory for a ServerEndpointImpl over one ServerConfig; accepts an optional session_factory test seam |
ServerEndpointImpl |
class | protocol_bridge/endpoint.py |
The one concrete client endpoint: a single live connection over the SDK ClientSession, driving the phase machine |
start_server_fleet |
async function | protocol_bridge/fleet.py |
Construct and start() a ServerFleetImpl from a BridgeConfig, returning it after every connection has settled |
ServerFleetImpl |
class | protocol_bridge/fleet.py |
The one concrete fleet: mints one endpoint per ServerConfig, connects them in parallel with per-server failure isolation |
create_provider_host |
function | protocol_bridge/host.py |
Provider-host factory: stand up an SDK low-level Server over a ToolBox, eagerly registering tools/list and tools/call |
ProviderHost |
type | protocol_bridge/host.py |
Protocol for the agent-as-MCP-server: a single connect(transport) that binds and serves |
ProviderHostInfo |
dataclass | protocol_bridge/host.py |
Frozen identity (name, version) the host advertises to connecting clients |
normalize_schema |
function | protocol_bridge/schema.py |
Reshape an untrusted remote inputSchema into a draft-agnostic subset every provider accepts |
ProtocolFault |
class | protocol_bridge/contract.py |
The normalized, discriminable exception raised everywhere in the bridge; class on .kind, cause on __cause__ |
protocol_fault |
function | protocol_bridge/contract.py |
Terse constructor protocol_fault(kind, message, cause) |
is_protocol_fault |
function | protocol_bridge/contract.py |
TypeGuard narrowing a value to ProtocolFault |
is_usable_phase |
function | protocol_bridge/contract.py |
True only when EndpointPhase == READY (can accept tool calls) |
is_terminal_phase |
function | protocol_bridge/contract.py |
True when phase is CLOSED or FAULTED |
qualify_tool_name |
function | protocol_bridge/contract.py |
Build the agent-facing "<server>__<tool>" from a RemoteToolRef |
QUALIFIER |
const | protocol_bridge/contract.py |
The "__" delimiter joining server id and tool name |
ServerEndpoint |
type | protocol_bridge/contract.py |
Protocol for a live single-server connection: config/phase plus open/list_tools/invoke/status/close |
ServerFleet |
type | protocol_bridge/contract.py |
Protocol for a name-keyed endpoint collection: spin_up/tear_down/endpoint/endpoints/status |
EndpointPhase |
enum | protocol_bridge/contract.py |
StrEnum lifecycle: IDLE, CONNECTING, READY, CLOSING, CLOSED, FAULTED |
FaultKind |
type | protocol_bridge/contract.py |
Literal union: transport, protocol, timeout, tool_error, not_connected, spawn_failed |
StdioServerConfig |
dataclass | protocol_bridge/contract.py |
Frozen stdio config: id, command, args, env; kind ClassVar "stdio" |
SseServerConfig |
dataclass | protocol_bridge/contract.py |
Frozen SSE/HTTP config: id, url, headers; kind ClassVar "sse" |
ServerConfig |
type | protocol_bridge/contract.py |
Discriminated union StdioServerConfig | SseServerConfig |
TransportKind |
type | protocol_bridge/contract.py |
Literal["stdio", "sse"] |
BridgeConfig |
dataclass | protocol_bridge/contract.py |
Frozen wrapper holding the tuple of ServerConfigs to connect on start |
RemoteToolRef |
dataclass | protocol_bridge/contract.py |
Frozen (server, name) pair identifying one tool on one server |
RemoteTool |
dataclass | protocol_bridge/contract.py |
Frozen pairing of a RemoteToolRef with the model-facing ToolDescriptor |
RemoteCallResult |
dataclass | protocol_bridge/contract.py |
Frozen raw invocation result: content (MCP blocks) plus is_error |
EndpointStatus |
dataclass | protocol_bridge/contract.py |
Frozen point-in-time health: server, phase, tool_count, optional fault |
FleetStatus |
type | protocol_bridge/contract.py |
Mapping[str, EndpointStatus] keyed by server id |
MountedBridge |
dataclass | protocol_bridge/contract.py |
The contract's mount outcome: fleet, tool_box, tool_count |
Note on
MountedProtocolBridge. The name appears twice. Incontract.pyit is aTypeAliasforMountedBridge(fleet/tool_box/tool_count). The barrel re-exports thebridge.pydataclass — a separate frozen type with fieldsboxandfleet— and that is whatmount_protocol_bridgeactually returns.
Sub-directories
| Directory | Holds |
|---|---|
protocol_bridge/ |
The entire bridge: contract.py (frozen vocabulary), endpoint.py (single-server client connection), fleet.py (parallel multi-server lifecycle), bridge.py (graft remote tools into the kernel), host.py (agent-as-MCP-server provider), schema.py (JSON-Schema normalizer), and the re-export barrel |
contract.py is the frozen vocabulary and declares no behavior; every
behavioral module imports its nouns from there, giving a single source of truth.
The behavioral modules are endpoint.py, fleet.py, host.py, bridge.py,
and the standalone schema.py. Both __init__.py barrels only re-export.
Key concepts
Qualified tool name. Remote tool names are unique only within their own
server, so each is grafted under "<server_id>__<tool_name>" (the QUALIFIER is
"__", kept a legal identifier across providers). The wrapper advertises the
qualified name to the model but routes invocations back under the unqualified
server-side name (RemoteToolRef.name).
EndpointPhase machine. An endpoint advertises an explicit StrEnum
lifecycle — idle → connecting → ready → closing → closed, with any failure
short-circuiting to faulted — instead of scattered boolean flags.
is_usable_phase (== READY) gates tool calls; is_terminal_phase is
CLOSED | FAULTED.
Discriminated fault model. Every bridge error is a single ProtocolFault
exception discriminated by .kind (a FaultKind literal), with the original
triggering value on the native __cause__ chain. Callers branch on a cause
class rather than parsing message text. asyncio.CancelledError is never
wrapped and never swallowed.
Failure isolation. The fleet connects all endpoints in parallel via
asyncio.gather(..., return_exceptions=True); a server that fails to spawn or
handshake just stays faulted and contributes no tools, while healthy peers
serve. mount_protocol_bridge skips non-ready endpoints.
Owner-task pattern. The SDK's anyio transports (stdio_client/sse_client)
and ClientSession are async context managers whose cancel scopes must be
entered and exited by the same task. Each endpoint therefore runs that
async with chain inside a dedicated background owner task that reports
readiness through an asyncio.Future and parks on a close Event. Concurrent
open() callers share one single-flight attempt, shielded so one caller's
cancellation cannot kill it.
Session-factory seam. create_server_endpoint accepts an optional zero-arg
session_factory producing an async context manager that yields an already
connected ClientSession. In-process callers and tests use it to bypass real
process spawning (for example, with
mcp.shared.memory.create_connected_server_and_client_session); when present,
the config-derived transport is bypassed entirely.
Mounting external servers
mount_protocol_bridge is the high-level entry point. It starts a
ServerFleetImpl from a BridgeConfig, iterates the endpoints, skips any whose
phase fails is_usable_phase, lists the rest's tools, wraps each as a kernel
DefinedTool (routing run() back to the owning endpoint under the unqualified
name), registers them in a fresh ToolRegistry, and returns a
MountedProtocolBridge carrying the boxed tools and the live fleet.
import asyncio
from indusagi.interop import (
BridgeConfig,
StdioServerConfig,
SseServerConfig,
mount_protocol_bridge,
)
async def main() -> None:
config = BridgeConfig(
servers=(
StdioServerConfig(
id="fs",
command="npx",
args=("-y", "@modelcontextprotocol/server-filesystem", "/tmp"),
),
SseServerConfig(
id="remote",
url="https://example.com/mcp",
headers={"authorization": "Bearer ..."},
),
)
)
mounted = await mount_protocol_bridge(config)
# `mounted` is the bridge.py dataclass: `.box` (ToolBox) and `.fleet`.
for descriptor in mounted.box.descriptors():
print(descriptor.name) # e.g. 'fs__read_file', 'remote__search'
# Inspect which servers came up ready vs. faulted.
for server_id, status in mounted.fleet.status().items():
print(server_id, status.phase, status.tool_count, status.fault)
await mounted.fleet.tear_down()
asyncio.run(main())
Each remote tool's inputSchema is run through normalize_schema before it is
shown to the model, and each RemoteCallResult is projected onto a kernel
ToolResult — text blocks become TextContentBlock, everything else becomes
JsonContentBlock, and is_error is carried straight through. Because remote
tools do no local I/O, the registry is boxed against an inert ToolContext
whose filesystem and shell backends raise only if a tool incorrectly reaches for
them.
Driving a single endpoint
For finer control, drive a ServerEndpoint directly. It owns one transport and
advances through the phase machine; invoke takes the unqualified server-side
tool name and returns a RemoteCallResult.
import asyncio
from indusagi.interop import (
create_server_endpoint,
StdioServerConfig,
EndpointPhase,
is_usable_phase,
is_protocol_fault,
)
async def main() -> None:
endpoint = create_server_endpoint(
StdioServerConfig(id="echo", command="python", args=("-m", "my_mcp_server"))
)
assert endpoint.phase == EndpointPhase.IDLE
try:
await endpoint.open() # idle -> connecting -> ready
if is_usable_phase(endpoint.phase):
for tool in await endpoint.list_tools():
print(tool.descriptor.name, "->", tool.ref.name)
result = await endpoint.invoke("echo", {"text": "hi"}) # unqualified name
print(result.is_error, result.content)
except Exception as exc:
if is_protocol_fault(exc):
print("bridge fault:", exc.kind, exc, "cause=", exc.cause)
raise
finally:
await endpoint.close() # -> closing -> closed
asyncio.run(main())
A ServerFleetImpl is the multi-server form: start(config) (or the
spin_up() vocabulary alias) mints one endpoint per ServerConfig and connects
them concurrently; endpoints() and status() return fresh, immutable
snapshots in configuration order; tear_down() closes everything in parallel.
The provider host
The host is the mirror image: it makes the agent an MCP server.
create_provider_host builds an SDK low-level Server over a ToolBox and
eagerly registers two handlers — tools/list maps box.descriptors() to MCP
Tools (each descriptor's parameters becomes the client-facing inputSchema)
and tools/call (with validate_input=False) routes a ToolCall into
box.runner.run with a fresh, never-cancelling CancelToken, then renders the
ToolOutcome as a CallToolResult.
connect(transport) takes the SDK (read_stream, write_stream) stream pair,
spawns a background task running the blocking server.run, and yields once so
the loop starts before the caller proceeds.
import asyncio
from mcp.shared.memory import create_client_server_memory_streams
from indusagi.interop import create_provider_host, ProviderHostInfo
async def main(box) -> None: # box: a runtime ToolBox from the capabilities layer
host = create_provider_host(box, ProviderHostInfo(name="my-agent", version="1.0"))
async with create_client_server_memory_streams() as (client_streams, server_streams):
await host.connect(server_streams) # serves tools/list + tools/call in background
# An MCP client on `client_streams` can now enumerate and call our tools.
# asyncio.run(main(box))
The transport the host accepts is the SDK's stream pair — exactly what
mcp.server.stdio.stdio_server() or one side of
create_client_server_memory_streams() yields. When info is omitted the host
advertises a generic default identity.
Schema normalization
normalize_schema is a pure, table-driven, total function that reshapes an
untrusted remote inputSchema into the small, draft-agnostic subset every
provider accepts. It reads the declared type and dispatches per type
(object/array/string/number/integer/boolean/null), falling back to
a closed {"type": "object", "properties": {}} when the type is absent or
unrecognized. It strips unsupported keywords ($schema, $ref, $defs,
default, examples, and similar), recurses into properties/items/
additionalProperties, collapses tuple-form array items to the first element,
and filters required to keys that actually exist.
from indusagi.interop import normalize_schema
raw = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"properties": {
"path": {"type": "string", "default": "/tmp"},
"tags": {"type": "array", "items": [{"type": "string"}, {"type": "number"}]},
},
"required": ["path", "missing"],
}
clean = normalize_schema(raw)
# `$schema` and `default` stripped; `required` filtered to declared keys (['path']);
# tuple-form `items` collapsed to the first element ({'type': 'string'}).
print(clean)
The fault model
Every failure surfaces as a ProtocolFault, and the fault's .kind is
site-determined:
| Site | FaultKind |
|---|---|
| Opening a stdio transport / handshake | spawn_failed |
| Opening an SSE transport / handshake | transport |
list_tools failure |
protocol |
invoke failure |
tool_error |
Operating on a non-READY endpoint |
not_connected |
The timeout kind is reserved in the literal union. Branch on the kind, and
read the underlying cause from the native exception chain:
from indusagi.interop import is_protocol_fault
try:
...
except Exception as exc:
if is_protocol_fault(exc):
if exc.kind == "spawn_failed":
... # the subprocess could not be started
elif exc.kind == "tool_error":
... # the remote tool ran and reported its own failure
raise exc from exc.__cause__
raise
Relationship to neighbors
Interop depends inward on several sibling subsystems. It pulls JsonSchema and
ToolDescriptor from the LLM Gateway contract;
ToolBox, ToolCall, and ToolOutcome from the
Runtime contract; and ToolRegistry,
DefinedTool, define_tool, ToolSpec, ToolContext, ToolResult, the
content-block types, Fs, Shell, and OutputBudget from the
Capabilities kernel. Externally it sits on top
of the official mcp SDK (ClientSession, StdioServerParameters,
stdio_client, sse_client, mcp.types, mcp.server.lowlevel.Server) and
anyio memory streams.
It is consumed by the MCP facade, whose MCPClient rides
one interop ServerEndpoint per connection and whose MCPServer wraps the
provider host, and by the Shell App boot wiring,
which mounts the configured external MCP servers into the running agent. See the
Architecture overview for where the layer sits.
