Subsystemssubsystems/connectors

Connectors (Composio)

indusagi.connectors is a hexagonal SaaS-connector bridge: it turns a vendor catalog (Composio) into native agent tools through one SaasBackend port, a stateful SaasGateway façade, and four control tools. It is imported as from indusagi.connectors import create_composio_gateway, SaasBackend, ... — a single barrel a host wires into an Agent session.

A coding agent should be able to discover, authorize, and call third-party products (GitHub, Gmail, Slack, …) as if they were built-in tools. This package delivers that ports-and-adapters style: a dependency-inverted core (the SaasBackend Protocol plus a frozen data vocabulary) holds all the logic, while exactly one adapter — the Composio backend — translates the vendor SDK into that vocabulary. The SaasGateway façade assembles the pieces (a live ToolRegistry, a content-hash enable cache, an immutable ScopePlanner, and a rule-table connection driver) and hands the agent four control tools it starts with.

Note: this SaaS connectors package is unrelated to indusagi.llmgateway.connectors, the LLM-provider connector registry — same word, different subsystem. See LLM Gateway.

Table of Contents

Public exports

From indusagi/connectors/__init__.py — the one import site for the whole bridge. The render and control internals stay private behind the gateway; they are reachable only via the subpackage barrels (.core, .control, .render).

Name Kind Source Purpose
create_saas_gateway function gateway.py Assemble a SaasGateway over any SaasBackend; takes optional SaasGatewayOptions
create_composio_gateway function gateway.py Convenience: create_saas_gateway wired to the real Composio adapter from a keyword api_key= (lazy vendor import)
SaasGateway class gateway.py Protocol for the stateful façade: the control_tool_box/tool_box/enable_toolkit assembly surface plus the enable/execute/connect/status verbs
SaasGatewayOptions dataclass gateway.py Tuning knobs: account_id pin and connect (AwaitOptions pacing)
EnableReport dataclass gateway.py Outcome of hydrating a toolkit: toolkit, hydrated names, tools (DefinedTool tuple), cached flag
ConnectReport dataclass gateway.py Terminal outcome of a connect flow: action, request_id, auth_url, account_id, reason
StatusReport dataclass gateway.py Snapshot of accounts (ConnectedAccount tuple) and enabled_tools (in-scope remote tool names)
SaasBackend class core/port.py The dependency-inversion PORT: an async Protocol with list_toolkits/list_tools/execute/list_connected_accounts/initiate_connection/check_connection
ToolkitInfo dataclass core/port.py A connectable product: slug, name, description, optional connected
RemoteTool dataclass core/port.py One callable catalog operation: name, toolkit, description, input_schema (JsonSchema)
RemoteResult dataclass core/port.py Outcome of running a RemoteTool: ok, data, error, log_id
ConnectedAccount dataclass core/port.py A credential link: id, toolkit, normalized status, updated_at
ConnectionRequest dataclass core/port.py A freshly opened connection attempt: id, toolkit, status, auth_url
ConnectionState dataclass core/port.py A polled snapshot: id, status, account_id (once active), auth_url
ConnectionStatus type core/port.py Normalized Literal["active","pending","expired","failed"] the adapter folds the vendor enum into
ExecuteOptions dataclass core/port.py Per-call execute knob: optional account_id
InitiateOptions dataclass core/port.py Connection-open inputs: auth_config_id, callback_url
create_composio_backend function adapter/composio_backend.py Build the lone Composio-backed SaasBackend from a keyword api_key=; raises ComposioBackendError if the key is missing or the SDK absent
ComposioBackendError class adapter/composio_backend.py Exception carrying the offending port method name when a request cannot map onto the port
ComposioBackendOptions dataclass adapter/composio_backend.py Adapter config: api_key and optional user_id (defaults to "default")
ScopePlanner class core/scope_planner.py Fluent immutable builder: create() then include(*toolkits)/only(*tools), resolved by plan() into a ResolvedScope
ResolvedScope type core/scope_planner.py Union ByTools | ByToolkits | Empty discriminated by .kind
ScopeKind type core/scope_planner.py Literal["byTools","byToolkits","empty"] tag

The three resolved-scope shapes (ByTools / ByToolkits / Empty), hash_key / HashCache, build_remote_tool, the CONNECTOR_TOOLS passthrough table, and the whole render/control layers are deliberately not on the top barrel — they live on the core / control / render subpackage barrels.

Sub-directories

Directory Holds
core/ The dependency-inverted core: port.py (SaasBackend + data vocabulary), scope_planner.py (immutable ScopePlanner), cache.py (hash_key + HashCache), builder.py (build_remote_tool + the CONNECTOR_TOOLS passthrough table). No vendor symbol crosses this barrel.
adapter/ Exactly one module: composio_backend.py — the single place the composio SDK leaks in. Exports create_composio_backend, ComposioBackendError, ComposioBackendOptions.
control/ The control surface: connect.py (the CONNECT_RULES state machine + initiate_and_await driver) and tools.py (the four gateway-bound control tools).
render/ Pure presentation: summarizers.py (SUMMARIZERS registry + summarize_result) and format.py (format_* text-block renderers). No I/O, no vendor SDK.

The backend port

SaasBackend (core/port.py) is the dependency-inversion seam — the only thing core logic depends on. It is an async Protocol of six methods grouped into the three jobs the bridge does:

  • Discoverylist_toolkits() and list_tools(toolkit)
  • Executionexecute(tool_name, args, opts)
  • Linking / authlist_connected_accounts(), initiate_connection(toolkit, opts), check_connection(id)

Every shape it traffics in is a frozen, slotted dataclass owned by the bridge (ToolkitInfo, RemoteTool, RemoteResult, ConnectedAccount, ConnectionRequest, ConnectionState). TS optional fields become ... | None; absence is modelled as None throughout. Because the port is a structural Protocol, a test backend is a few dataclass literals away — see the fake-backend example below.

ConnectionStatus is a normalized Literal["active", "pending", "expired", "failed"]. The adapter folds the catalog's richer enum down to these four buckets so the core only ever branches on a closed set it owns.

The single vendor adapter

ComposioBackend (adapter/composio_backend.py) is the lone implementation of the port and the only place the composio SDK is imported — lazily, inside the constructor, so the package imports cleanly even when the SDK is absent. ComposioBackendError is raised on first use if it is missing. Any is permitted only inside this file.

The adapter is user-scoped: nearly every Composio call wants a user_id, so a fixed default ("default", configurable via ComposioBackendOptions) is threaded through. Method delegation:

Port method Composio call
list_toolkits toolkits.get()
list_tools tools.get_raw_composio_tools(toolkits=[slug])
execute tools.execute(slug, user_id=…, arguments=…, connected_account_id=…)
list_connected_accounts connected_accounts.list(user_ids=[…])
initiate_connection toolkits.authorize(user_id, slug, auth_config_id?)
check_connection connected_accounts.get(id)

The synchronous SDK runs through asyncio.to_thread; pydantic models are flattened with model_dump(); duck-typed _to_* projectors re-narrow vendor shapes back to the frozen port vocabulary, accepting both camelCase (TS SDK) and snake_case (Python SDK) keys before anything untyped escapes.

import indusagi.connectors as connectors

# Build the adapter directly (keyword-only api_key):
backend = connectors.create_composio_backend(api_key="comp_...")

The stateful gateway

create_saas_gateway(backend, opts=None) returns a LiveSaasGateway — the concrete façade that owns the state the stateless core deliberately lacks:

  • a live ToolRegistry (from Capabilities) that hydrated remote tools register into,
  • the four control tools, built once and registered up front so control_tool_box() works before any toolkit is hydrated,
  • a dict of hydrated remote tools keyed by name,
  • a HashCache of enable results.

It exposes three assembly methods and four delegate verbs:

Method Returns Purpose
control_tool_box() ToolBox Just the four control tools — the surface an agent starts with
tool_box() ToolBox The control tools plus every hydrated remote tool
enable_toolkit(name, only=None) list[DefinedTool] Hydrate a toolkit into the live registry; content-hash cached
enable(toolkit, only=None) EnableReport The verb behind saas_enable
execute(tool, args, account_id=None) ToolResult The verb behind saas_execute — run one operation by slug
connect(toolkit, *, callback_url=None, auth_config_id=None) ConnectReport The verb behind saas_connect — drive auth to a terminal state
status() StatusReport The verb behind saas_status

The four control tools (CONTROL_SPECS in control/tools.py) are a declarative table: one row per tool — id, description, JSON-Schema parameters, and a run(gateway, input) thunk — mapped by a single build_control_tools, never a factory family. The tools delegate to the verbs above; the gateway holds the logic.

Tool dispatch runs each call through an inert ToolContext whose fs/shell are a _Forbidden guard that raises on any attribute access — the SaaS tools route entirely through the port and never touch the filesystem, so a future tool that does fails loudly instead of silently mis-operating.

create_composio_gateway(api_key=...) is the one convenience that pairs the gateway with the live adapter; it imports create_composio_backend lazily, so the vendor SDK stays optional.

Hydration and the content-hash cache

Hydration is the act of turning a toolkit's remote operations into native tools. enable_toolkit lists the toolkit's operations, mints a DefinedTool per op via build_remote_tool, registers them, and content-hash-caches the result. Hydrated tools are named ext:<toolkit>.<slug> by _coin_remote_name so they never collide with built-ins and their origin is obvious in a tool list.

The cache is HashCache (core/cache.py), keyed on hash_key — a canonical JSON SHA-256 hex digest that is independent of key order. It stores in-flight asyncio.Tasks (not values), shields awaiters, and evicts a slot on task failure or cancel so failures are retryable. Because a repeat enable of the same shape resolves to the same hash, it returns the identical DefinedTool objects, making re-registration idempotent — EnableReport.cached is then True.

The connection state machine

control/connect.py expresses the OAuth-style link flow as policy-as-data. CONNECT_RULES is a total dict[ConnectionStatus, ConnectionState -> ConnectAction]: one row per normalized status, each a thunk returning the driver's next move.

plan_connect(state) is pure — state in, ConnectAction out — and an unknown status maps to Failed rather than a silent no-op. The action union has five frozen variants, each with a ClassVar kind tag: AwaitAuth, Poll, Done, Expired, Failed. is_terminal returns True for the last three.

initiate_and_await(backend, toolkit, options) is the one impure driver. It opens a connection once, then loops up to max_polls: consult plan_connect, return on a terminal action (checked before the sleep), else sleep(poll_interval_ms / 1000) and check_connection again. Exhausting the budget yields Failed rather than hanging. It does not re-initiate on expired — that decision is left to the caller, who may want to surface the lapse first.

AwaitOptions carries auth_config_id, callback_url, poll_interval_ms (default 1500), max_polls (default 40), and an injectable sleep (in seconds — the ms→s conversion happens at the call site) so tests drive the loop without real time passing.

Scope planning

ScopePlanner (core/scope_planner.py) is a fluent, immutable builder — every mutator returns a new instance. Start with ScopePlanner.create(), narrow with include(*toolkits) and only(*tools), and resolve once with plan(). The result is a ResolvedScope, discriminated by .kind:

  • ByTools(tools, toolkits) — pinned tool slugs win (the most specific intent)
  • ByToolkits(toolkits) — one or more whole toolkits in scope
  • Empty() — nothing declared

There is no nested if-cascade; resolution is a single ordered check over accumulated, normalized (trimmed, deduped, sorted) intent. The gateway uses the planner to normalize an enable request before content-hashing it.

The render layer

render/ is pure presentation — no I/O, no vendor SDK.

SUMMARIZERS (render/summarizers.py) is a mutable registry mapping a toolkit slug or a prefix:* glob to a Summarizer (Callable[[RemoteResult], str]), seeded with branded rows for github, gmail, and slack. summarize_result selects most-specific-first: exact slug, then longest matching glob, then default_summarizer. toolkit_of extracts the lower-cased toolkit slug implied by a <toolkit>_<op> or <toolkit>.<op> name.

default_summarizer renders a failure as its error line, and a success as a dedupe-aware payload preview: it walks the payload iteratively (an explicit deque, never recursion, so a deep or cyclic response cannot blow the stack), content-hashes every container, emits =#i back-references for repeats, and caps at 24 nodes / 200 characters.

The format_* functions in render/format.py (format_toolkit_list, format_tool_list, format_connected_accounts, format_connection_request, format_connection_state) are pure data-in / string-out renderers for the discovery roster, operation catalog, accounts, and link views.

Examples

Wire a gateway to Composio and hand its control tools to an agent

from indusagi.connectors import create_composio_gateway

gateway = create_composio_gateway(api_key="comp_...")

# Before any toolkit is hydrated, the agent only sees the four control tools:
box = gateway.control_tool_box()  # saas_enable / saas_execute / saas_connect / saas_status

# Drive an OAuth flow to a terminal state:
report = await gateway.connect("github")
if report.action == "await-auth":
    print("Authorize here:", report.auth_url)
elif report.action == "done":
    print("linked account:", report.account_id)

Hydrate a toolkit into native tools (content-hash cached)

from indusagi.connectors import create_composio_gateway

gateway = create_composio_gateway(api_key="comp_...")

enable = await gateway.enable("github", only=["GITHUB_CREATE_ISSUE"])
print(enable.hydrated)  # ('ext:github.GITHUB_CREATE_ISSUE',)
print(enable.cached)    # False the first time, True on an identical repeat

# tool_box() now carries the control tools PLUS the hydrated remote tools:
full_box = gateway.tool_box()

# Or run an operation directly without hydrating it first:
result = await gateway.execute("GITHUB_CREATE_ISSUE", {"repo": "o/r", "title": "hi"})

Implement a fake `SaasBackend` for tests (no vendor SDK)

Any object structurally matching the Protocol satisfies it — no subclassing, no SDK install:

from indusagi.connectors import (
    ToolkitInfo, RemoteTool, RemoteResult,
    ConnectedAccount, ConnectionRequest, ConnectionState,
    ExecuteOptions, InitiateOptions, create_saas_gateway,
)


class FakeBackend:
    async def list_toolkits(self):
        return [ToolkitInfo(slug="gh", name="GitHub", description="git")]

    async def list_tools(self, toolkit):
        return [RemoteTool(name="GH_X", toolkit="gh", description="do x",
                           input_schema={"type": "object", "properties": {}})]

    async def execute(self, tool_name, args, opts: ExecuteOptions):
        return RemoteResult(ok=True, data={"echo": args})

    async def list_connected_accounts(self):
        return [ConnectedAccount(id="a1", toolkit="gh", status="active")]

    async def initiate_connection(self, toolkit, opts: InitiateOptions | None = None):
        return ConnectionRequest(id="r1", toolkit=toolkit, status="pending",
                                 auth_url="https://x")

    async def check_connection(self, id):
        return ConnectionState(id=id, status="active", account_id="a1")


gw = create_saas_gateway(FakeBackend())
status = await gw.status()  # StatusReport(accounts=(...), enabled_tools=())

Plan scope, summarize a result, and format a roster (pure helpers)

These live on the subpackage barrels, not the top barrel:

from indusagi.connectors.core import ScopePlanner, ByTools, RemoteResult, ToolkitInfo
from indusagi.connectors.render import summarize_result, format_toolkit_list

scope = ScopePlanner.create().include("github").only("GH_CREATE", "GH_LIST").plan()
assert isinstance(scope, ByTools) and scope.tools == ("GH_CREATE", "GH_LIST")

line = summarize_result("github_list_issues",
                        RemoteResult(ok=True, data={"issues": [1, 2, 3]}))
print(line)  # 'github ok: 3 issues'

print(format_toolkit_list(
    [ToolkitInfo(slug="gh", name="GitHub", description="git host", connected=True)]
))

Notable behaviors

  • Two functions named build_control_tools. core/builder.py's takes a SaasBackend and builds low-level passthrough tools (saas.attach / saas.run / saas.link / saas.status, dotted, from CONNECTOR_TOOLS). control/tools.py's takes a SaasGateway and builds the four agent-facing control tools (saas_enable / saas_execute / saas_connect / saas_status, underscored, from CONTROL_SPECS). The gateway uses the control one; the dotted core set is a separate passthrough surface. Neither is re-exported from the top barrel.
  • Lazy, optional SDK. create_composio_backend and create_composio_gateway are keyword-only (api_key=). The composio import is lazy, so the package imports fine without it; ComposioBackendError is raised on first use if it is absent.
  • Pacing-only stored connect options. SaasGatewayOptions.connect honors only the pacing fields (poll_interval_ms / max_polls / sleep); auth_config_id / callback_url on stored connect opts are ignored (those come from the per-call connect(...) arguments).
  • Adapter identity quirk. The Composio adapter treats the connection-request id and the connected-account id as the same handle (check_connection polls connected_accounts.get(id)). callback_url is accepted by initiate_connection / saas_connect but silently unused, since toolkits.authorize has no callback override.
  • Failures surface as flagged results, not exceptions. execute swallows all non-CancelledError exceptions into RemoteResult(ok=False), so the model boundary sees a flagged ToolResult rather than a raised error.
  • Status normalization is conservative. _normalize_status maps INACTIVE/REVOKED to "failed" and any unknown vendor state to "pending" — never silently "active".

Relationship to neighbors

The bridge builds on Capabilities for the tool kernel (define_tool, ToolSpec, DefinedTool, ToolRegistry, ToolContext, ToolResult, TextContentBlock / JsonContentBlock, OutputBudget, Fs, Shell), on LLM Gateway for JsonSchema, on the Runtime ToolBox (under TYPE_CHECKING only), and on indusagi._internal.cancel.CancelToken. The agent tool shim explicitly excludes the Composio family, deferring it to this package.

The render and control layers are private behind the gateway; the top-level connectors/__init__.py barrel is the intended import site, and the gateway's control_tool_box() / tool_box() are how its tools reach an Agent session. For where this fits in the layer stack, see the Architecture overview.