Connectors (Composio)
indusagi.connectorsis a hexagonal SaaS-connector bridge: it turns a vendor catalog (Composio) into native agent tools through oneSaasBackendport, a statefulSaasGatewayfaçade, and four control tools. It is imported asfrom indusagi.connectors import create_composio_gateway, SaasBackend, ...— a single barrel a host wires into an Agent session.
A coding agent should be able to discover, authorize, and call third-party
products (GitHub, Gmail, Slack, …) as if they were built-in tools. This package
delivers that ports-and-adapters style: a dependency-inverted core (the
SaasBackend Protocol plus a frozen data vocabulary) holds all the logic, while
exactly one adapter — the Composio backend — translates the vendor SDK into that
vocabulary. The SaasGateway façade assembles the pieces (a live
ToolRegistry, a content-hash enable cache, an immutable ScopePlanner, and a
rule-table connection driver) and hands the agent four control tools it starts
with.
Note: this SaaS connectors package is unrelated to
indusagi.llmgateway.connectors, the LLM-provider connector registry — same word, different subsystem. See LLM Gateway.
Table of Contents
- Public exports
- Sub-directories
- The backend port
- The single vendor adapter
- The stateful gateway
- Hydration and the content-hash cache
- The connection state machine
- Scope planning
- The render layer
- Examples
- Notable behaviors
- Relationship to neighbors
Public exports
From indusagi/connectors/__init__.py — the one import site for the whole
bridge. The render and control internals stay private behind the gateway; they
are reachable only via the subpackage barrels (.core, .control, .render).
| Name | Kind | Source | Purpose |
|---|---|---|---|
create_saas_gateway |
function | gateway.py |
Assemble a SaasGateway over any SaasBackend; takes optional SaasGatewayOptions |
create_composio_gateway |
function | gateway.py |
Convenience: create_saas_gateway wired to the real Composio adapter from a keyword api_key= (lazy vendor import) |
SaasGateway |
class | gateway.py |
Protocol for the stateful façade: the control_tool_box/tool_box/enable_toolkit assembly surface plus the enable/execute/connect/status verbs |
SaasGatewayOptions |
dataclass | gateway.py |
Tuning knobs: account_id pin and connect (AwaitOptions pacing) |
EnableReport |
dataclass | gateway.py |
Outcome of hydrating a toolkit: toolkit, hydrated names, tools (DefinedTool tuple), cached flag |
ConnectReport |
dataclass | gateway.py |
Terminal outcome of a connect flow: action, request_id, auth_url, account_id, reason |
StatusReport |
dataclass | gateway.py |
Snapshot of accounts (ConnectedAccount tuple) and enabled_tools (in-scope remote tool names) |
SaasBackend |
class | core/port.py |
The dependency-inversion PORT: an async Protocol with list_toolkits/list_tools/execute/list_connected_accounts/initiate_connection/check_connection |
ToolkitInfo |
dataclass | core/port.py |
A connectable product: slug, name, description, optional connected |
RemoteTool |
dataclass | core/port.py |
One callable catalog operation: name, toolkit, description, input_schema (JsonSchema) |
RemoteResult |
dataclass | core/port.py |
Outcome of running a RemoteTool: ok, data, error, log_id |
ConnectedAccount |
dataclass | core/port.py |
A credential link: id, toolkit, normalized status, updated_at |
ConnectionRequest |
dataclass | core/port.py |
A freshly opened connection attempt: id, toolkit, status, auth_url |
ConnectionState |
dataclass | core/port.py |
A polled snapshot: id, status, account_id (once active), auth_url |
ConnectionStatus |
type | core/port.py |
Normalized Literal["active","pending","expired","failed"] the adapter folds the vendor enum into |
ExecuteOptions |
dataclass | core/port.py |
Per-call execute knob: optional account_id |
InitiateOptions |
dataclass | core/port.py |
Connection-open inputs: auth_config_id, callback_url |
create_composio_backend |
function | adapter/composio_backend.py |
Build the lone Composio-backed SaasBackend from a keyword api_key=; raises ComposioBackendError if the key is missing or the SDK absent |
ComposioBackendError |
class | adapter/composio_backend.py |
Exception carrying the offending port method name when a request cannot map onto the port |
ComposioBackendOptions |
dataclass | adapter/composio_backend.py |
Adapter config: api_key and optional user_id (defaults to "default") |
ScopePlanner |
class | core/scope_planner.py |
Fluent immutable builder: create() then include(*toolkits)/only(*tools), resolved by plan() into a ResolvedScope |
ResolvedScope |
type | core/scope_planner.py |
Union ByTools | ByToolkits | Empty discriminated by .kind |
ScopeKind |
type | core/scope_planner.py |
Literal["byTools","byToolkits","empty"] tag |
The three resolved-scope shapes (ByTools / ByToolkits / Empty),
hash_key / HashCache, build_remote_tool, the CONNECTOR_TOOLS passthrough
table, and the whole render/control layers are deliberately not on the
top barrel — they live on the core / control / render subpackage barrels.
Sub-directories
| Directory | Holds |
|---|---|
core/ |
The dependency-inverted core: port.py (SaasBackend + data vocabulary), scope_planner.py (immutable ScopePlanner), cache.py (hash_key + HashCache), builder.py (build_remote_tool + the CONNECTOR_TOOLS passthrough table). No vendor symbol crosses this barrel. |
adapter/ |
Exactly one module: composio_backend.py — the single place the composio SDK leaks in. Exports create_composio_backend, ComposioBackendError, ComposioBackendOptions. |
control/ |
The control surface: connect.py (the CONNECT_RULES state machine + initiate_and_await driver) and tools.py (the four gateway-bound control tools). |
render/ |
Pure presentation: summarizers.py (SUMMARIZERS registry + summarize_result) and format.py (format_* text-block renderers). No I/O, no vendor SDK. |
The backend port
SaasBackend (core/port.py) is the dependency-inversion seam — the only thing
core logic depends on. It is an async Protocol of six methods grouped into the
three jobs the bridge does:
- Discovery —
list_toolkits()andlist_tools(toolkit) - Execution —
execute(tool_name, args, opts) - Linking / auth —
list_connected_accounts(),initiate_connection(toolkit, opts),check_connection(id)
Every shape it traffics in is a frozen, slotted dataclass owned by the bridge
(ToolkitInfo, RemoteTool, RemoteResult, ConnectedAccount,
ConnectionRequest, ConnectionState). TS optional fields become ... | None;
absence is modelled as None throughout. Because the port is a structural
Protocol, a test backend is a few dataclass literals away — see the fake-backend
example below.
ConnectionStatus is a normalized Literal["active", "pending", "expired", "failed"]. The adapter folds the catalog's richer enum down to these four
buckets so the core only ever branches on a closed set it owns.
The single vendor adapter
ComposioBackend (adapter/composio_backend.py) is the lone implementation
of the port and the only place the composio SDK is imported — lazily, inside
the constructor, so the package imports cleanly even when the SDK is absent.
ComposioBackendError is raised on first use if it is missing. Any is
permitted only inside this file.
The adapter is user-scoped: nearly every Composio call wants a user_id, so a
fixed default ("default", configurable via ComposioBackendOptions) is
threaded through. Method delegation:
| Port method | Composio call |
|---|---|
list_toolkits |
toolkits.get() |
list_tools |
tools.get_raw_composio_tools(toolkits=[slug]) |
execute |
tools.execute(slug, user_id=…, arguments=…, connected_account_id=…) |
list_connected_accounts |
connected_accounts.list(user_ids=[…]) |
initiate_connection |
toolkits.authorize(user_id, slug, auth_config_id?) |
check_connection |
connected_accounts.get(id) |
The synchronous SDK runs through asyncio.to_thread; pydantic models are
flattened with model_dump(); duck-typed _to_* projectors re-narrow vendor
shapes back to the frozen port vocabulary, accepting both camelCase (TS SDK) and
snake_case (Python SDK) keys before anything untyped escapes.
import indusagi.connectors as connectors
# Build the adapter directly (keyword-only api_key):
backend = connectors.create_composio_backend(api_key="comp_...")
The stateful gateway
create_saas_gateway(backend, opts=None) returns a LiveSaasGateway — the
concrete façade that owns the state the stateless core deliberately lacks:
- a live
ToolRegistry(from Capabilities) that hydrated remote tools register into, - the four control tools, built once and registered up front so
control_tool_box()works before any toolkit is hydrated, - a dict of hydrated remote tools keyed by name,
- a
HashCacheof enable results.
It exposes three assembly methods and four delegate verbs:
| Method | Returns | Purpose |
|---|---|---|
control_tool_box() |
ToolBox |
Just the four control tools — the surface an agent starts with |
tool_box() |
ToolBox |
The control tools plus every hydrated remote tool |
enable_toolkit(name, only=None) |
list[DefinedTool] |
Hydrate a toolkit into the live registry; content-hash cached |
enable(toolkit, only=None) |
EnableReport |
The verb behind saas_enable |
execute(tool, args, account_id=None) |
ToolResult |
The verb behind saas_execute — run one operation by slug |
connect(toolkit, *, callback_url=None, auth_config_id=None) |
ConnectReport |
The verb behind saas_connect — drive auth to a terminal state |
status() |
StatusReport |
The verb behind saas_status |
The four control tools (CONTROL_SPECS in control/tools.py) are a declarative
table: one row per tool — id, description, JSON-Schema parameters, and a
run(gateway, input) thunk — mapped by a single build_control_tools, never a
factory family. The tools delegate to the verbs above; the gateway holds the
logic.
Tool dispatch runs each call through an inert ToolContext whose fs/shell
are a _Forbidden guard that raises on any attribute access — the SaaS tools
route entirely through the port and never touch the filesystem, so a future tool
that does fails loudly instead of silently mis-operating.
create_composio_gateway(api_key=...) is the one convenience that pairs the
gateway with the live adapter; it imports create_composio_backend lazily, so
the vendor SDK stays optional.
Hydration and the content-hash cache
Hydration is the act of turning a toolkit's remote operations into native
tools. enable_toolkit lists the toolkit's operations, mints a DefinedTool
per op via build_remote_tool, registers them, and content-hash-caches the
result. Hydrated tools are named ext:<toolkit>.<slug> by _coin_remote_name
so they never collide with built-ins and their origin is obvious in a tool list.
The cache is HashCache (core/cache.py), keyed on hash_key — a canonical
JSON SHA-256 hex digest that is independent of key order. It stores in-flight
asyncio.Tasks (not values), shields awaiters, and evicts a slot on task
failure or cancel so failures are retryable. Because a repeat enable of the same
shape resolves to the same hash, it returns the identical DefinedTool
objects, making re-registration idempotent — EnableReport.cached is then
True.
The connection state machine
control/connect.py expresses the OAuth-style link flow as policy-as-data.
CONNECT_RULES is a total dict[ConnectionStatus, ConnectionState -> ConnectAction]: one row per normalized status, each a thunk returning the
driver's next move.
plan_connect(state) is pure — state in, ConnectAction out — and an unknown
status maps to Failed rather than a silent no-op. The action union has five
frozen variants, each with a ClassVar kind tag: AwaitAuth, Poll, Done,
Expired, Failed. is_terminal returns True for the last three.
initiate_and_await(backend, toolkit, options) is the one impure driver. It
opens a connection once, then loops up to max_polls: consult plan_connect,
return on a terminal action (checked before the sleep), else
sleep(poll_interval_ms / 1000) and check_connection again. Exhausting the
budget yields Failed rather than hanging. It does not re-initiate on
expired — that decision is left to the caller, who may want to surface the
lapse first.
AwaitOptions carries auth_config_id, callback_url, poll_interval_ms
(default 1500), max_polls (default 40), and an injectable sleep (in
seconds — the ms→s conversion happens at the call site) so tests drive the
loop without real time passing.
Scope planning
ScopePlanner (core/scope_planner.py) is a fluent, immutable builder —
every mutator returns a new instance. Start with ScopePlanner.create(), narrow
with include(*toolkits) and only(*tools), and resolve once with plan(). The
result is a ResolvedScope, discriminated by .kind:
ByTools(tools, toolkits)— pinned tool slugs win (the most specific intent)ByToolkits(toolkits)— one or more whole toolkits in scopeEmpty()— nothing declared
There is no nested if-cascade; resolution is a single ordered check over
accumulated, normalized (trimmed, deduped, sorted) intent. The gateway uses the
planner to normalize an enable request before content-hashing it.
The render layer
render/ is pure presentation — no I/O, no vendor SDK.
SUMMARIZERS (render/summarizers.py) is a mutable registry mapping a toolkit
slug or a prefix:* glob to a Summarizer (Callable[[RemoteResult], str]),
seeded with branded rows for github, gmail, and slack. summarize_result
selects most-specific-first: exact slug, then longest matching glob, then
default_summarizer. toolkit_of extracts the lower-cased toolkit slug implied
by a <toolkit>_<op> or <toolkit>.<op> name.
default_summarizer renders a failure as its error line, and a success as a
dedupe-aware payload preview: it walks the payload iteratively (an explicit
deque, never recursion, so a deep or cyclic response cannot blow the stack),
content-hashes every container, emits =#i back-references for repeats, and caps
at 24 nodes / 200 characters.
The format_* functions in render/format.py
(format_toolkit_list, format_tool_list, format_connected_accounts,
format_connection_request, format_connection_state) are pure data-in /
string-out renderers for the discovery roster, operation catalog, accounts, and
link views.
Examples
Wire a gateway to Composio and hand its control tools to an agent
from indusagi.connectors import create_composio_gateway
gateway = create_composio_gateway(api_key="comp_...")
# Before any toolkit is hydrated, the agent only sees the four control tools:
box = gateway.control_tool_box() # saas_enable / saas_execute / saas_connect / saas_status
# Drive an OAuth flow to a terminal state:
report = await gateway.connect("github")
if report.action == "await-auth":
print("Authorize here:", report.auth_url)
elif report.action == "done":
print("linked account:", report.account_id)
Hydrate a toolkit into native tools (content-hash cached)
from indusagi.connectors import create_composio_gateway
gateway = create_composio_gateway(api_key="comp_...")
enable = await gateway.enable("github", only=["GITHUB_CREATE_ISSUE"])
print(enable.hydrated) # ('ext:github.GITHUB_CREATE_ISSUE',)
print(enable.cached) # False the first time, True on an identical repeat
# tool_box() now carries the control tools PLUS the hydrated remote tools:
full_box = gateway.tool_box()
# Or run an operation directly without hydrating it first:
result = await gateway.execute("GITHUB_CREATE_ISSUE", {"repo": "o/r", "title": "hi"})
Implement a fake `SaasBackend` for tests (no vendor SDK)
Any object structurally matching the Protocol satisfies it — no subclassing, no SDK install:
from indusagi.connectors import (
ToolkitInfo, RemoteTool, RemoteResult,
ConnectedAccount, ConnectionRequest, ConnectionState,
ExecuteOptions, InitiateOptions, create_saas_gateway,
)
class FakeBackend:
async def list_toolkits(self):
return [ToolkitInfo(slug="gh", name="GitHub", description="git")]
async def list_tools(self, toolkit):
return [RemoteTool(name="GH_X", toolkit="gh", description="do x",
input_schema={"type": "object", "properties": {}})]
async def execute(self, tool_name, args, opts: ExecuteOptions):
return RemoteResult(ok=True, data={"echo": args})
async def list_connected_accounts(self):
return [ConnectedAccount(id="a1", toolkit="gh", status="active")]
async def initiate_connection(self, toolkit, opts: InitiateOptions | None = None):
return ConnectionRequest(id="r1", toolkit=toolkit, status="pending",
auth_url="https://x")
async def check_connection(self, id):
return ConnectionState(id=id, status="active", account_id="a1")
gw = create_saas_gateway(FakeBackend())
status = await gw.status() # StatusReport(accounts=(...), enabled_tools=())
Plan scope, summarize a result, and format a roster (pure helpers)
These live on the subpackage barrels, not the top barrel:
from indusagi.connectors.core import ScopePlanner, ByTools, RemoteResult, ToolkitInfo
from indusagi.connectors.render import summarize_result, format_toolkit_list
scope = ScopePlanner.create().include("github").only("GH_CREATE", "GH_LIST").plan()
assert isinstance(scope, ByTools) and scope.tools == ("GH_CREATE", "GH_LIST")
line = summarize_result("github_list_issues",
RemoteResult(ok=True, data={"issues": [1, 2, 3]}))
print(line) # 'github ok: 3 issues'
print(format_toolkit_list(
[ToolkitInfo(slug="gh", name="GitHub", description="git host", connected=True)]
))
Notable behaviors
- Two functions named
build_control_tools.core/builder.py's takes aSaasBackendand builds low-level passthrough tools (saas.attach/saas.run/saas.link/saas.status, dotted, fromCONNECTOR_TOOLS).control/tools.py's takes aSaasGatewayand builds the four agent-facing control tools (saas_enable/saas_execute/saas_connect/saas_status, underscored, fromCONTROL_SPECS). The gateway uses the control one; the dotted core set is a separate passthrough surface. Neither is re-exported from the top barrel. - Lazy, optional SDK.
create_composio_backendandcreate_composio_gatewayare keyword-only (api_key=). Thecomposioimport is lazy, so the package imports fine without it;ComposioBackendErroris raised on first use if it is absent. - Pacing-only stored connect options.
SaasGatewayOptions.connecthonors only the pacing fields (poll_interval_ms/max_polls/sleep);auth_config_id/callback_urlon stored connect opts are ignored (those come from the per-callconnect(...)arguments). - Adapter identity quirk. The Composio adapter treats the connection-request
id and the connected-account id as the same handle (
check_connectionpollsconnected_accounts.get(id)).callback_urlis accepted byinitiate_connection/saas_connectbut silently unused, sincetoolkits.authorizehas no callback override. - Failures surface as flagged results, not exceptions.
executeswallows all non-CancelledErrorexceptions intoRemoteResult(ok=False), so the model boundary sees a flaggedToolResultrather than a raised error. - Status normalization is conservative.
_normalize_statusmapsINACTIVE/REVOKEDto"failed"and any unknown vendor state to"pending"— never silently"active".
Relationship to neighbors
The bridge builds on Capabilities for the
tool kernel (define_tool, ToolSpec, DefinedTool, ToolRegistry,
ToolContext, ToolResult, TextContentBlock / JsonContentBlock,
OutputBudget, Fs, Shell), on LLM Gateway
for JsonSchema, on the Runtime ToolBox (under
TYPE_CHECKING only), and on indusagi._internal.cancel.CancelToken. The agent
tool shim explicitly excludes the Composio family, deferring it to this package.
The render and control layers are private behind the gateway; the top-level
connectors/__init__.py barrel is the intended import site, and the gateway's
control_tool_box() / tool_box() are how its tools reach an
Agent session. For where this fits in the layer stack,
see the Architecture overview.
