LLM Gateway
indusagi.llmgatewayis the framework's unified, multi-provider door to every LLM. It is a zero-SDK gateway: you hand it a model id and a provider-neutralConversation, and it routes to one of elevenhttpx-backed wire connectors and streams normalizedEmissions back. Import everything from the barrel:from indusagi.llmgateway import stream, complete, Conversation, ....
The gateway resolves a model id to a ModelCard, selects the Connector that
speaks that card's wire dialect, talks raw HTTP via httpx, decodes the
SSE/NDJSON stream, and folds the deltas into a terminal Reply. No vendor SDKs
are imported anywhere — only httpx for transport. Unknown ids fail fast with an
unsupported GatewayError.
Table of Contents
- Public exports
- Sub-directories
- Dispatch
- The type contract
- The model catalog
- Connectors and credentials
- Streaming and conversion
- Key concepts
- Examples
- Relationship to neighbors
Public exports
Everything callers need is re-exported from indusagi/llmgateway/__init__.py.
| Name | Kind | Source | Purpose |
|---|---|---|---|
stream |
function | gateway.py |
Resolve a model id, route to its connector, return a streaming Channel. Sync — I/O happens on iteration. Raises unsupported GatewayError for unknown ids |
complete |
async function | gateway.py |
One-shot: resolve id, run the connector, return the assembled Reply. Raises unsupported for unknown ids |
stream_with_card |
function | gateway.py |
Like stream() but takes a pre-resolved ModelCard, bypassing catalog lookup |
complete_with_card |
async function | gateway.py |
One-shot variant taking a pre-resolved ModelCard |
MODEL_CARDS |
const | catalog/cards.py |
tuple[ModelCard, ...] — the hand-curated catalog |
models |
function | catalog/query.py |
Open a fluent CardSelection — curated cards first, then the full-catalog fallback |
get_card |
function | catalog/query.py |
Direct id lookup: curated first, then full-catalog fallback; None if unknown |
curated_card |
function | catalog/query.py |
Id lookup restricted to MODEL_CARDS only (no fallback) |
CardSelection |
dataclass | catalog/query.py |
Lazy, immutable, chainable query; build via CardSelection.over(pool) |
estimate_cost |
function | catalog/cost.py |
Turn a card's per-MTok CostSheet + a Usage into a USD float |
CONNECTOR_REGISTRY |
const | connectors/__init__.py |
dict[ApiKind, Connector] — the total routing table |
connector_for_api |
function | connectors/__init__.py |
Return the Connector that speaks a given ApiKind |
The barrel also re-exports the entire contract package — the frozen type
vocabulary plus the GatewayError class and its gateway_error() factory.
Sub-directories
| Directory | Holds |
|---|---|
contract/ |
The frozen type contract — model_card.py, conversation.py, reply.py, emission.py, options.py, errors.py, connector.py |
connectors/ |
One Connector + create_*_connector factory per ApiKind (anthropic, openai_chat, openai_responses, google, google_vertex, bedrock, azure_openai, nvidia, kimi, ollama, mock) plus the registry __init__.py |
conversion/ |
Pure wire translation: mappers.py (Conversation → native bodies), reduce.py (fold_reply), openai_compatible.py (shared OpenAI chat streamer) |
catalog/ |
cards.py (MODEL_CARDS), query.py (models/get_card/CardSelection), cost.py (estimate_cost), full.py (the 841-model full catalog), fallback.py (on-the-fly cards), bridge.py (vocabulary bridge) |
credentials/ |
secrets.py (SECRET_TABLE + resolve_secret), pkce.py (RFC 7636 S256), oauth.py (RFC 6749 authorization-code + refresh) |
streaming/ |
channel.py (channel_of/collect_reply), sse.py (sse_events), ndjson.py (ndjson_lines) — wire-framing decoders over httpx byte streams |
Dispatch
stream and complete take the same shape — a model id, a Conversation, and
optional StreamOptions. The dispatch path in gateway.py is:
get_card(model_id)resolves aModelCard— curatedMODEL_CARDSfirst, then the full-catalog fallback. Unknown id raises anunsupportedGatewayError.connector_for_api(card.api)indexes the totalCONNECTOR_REGISTRY.- The connector builds a re-iterable
Channel.stream()itself stays sync and does no I/O — everyasync forre-issues the HTTP request. - On iteration the connector resolves its key, maps the
Conversationto the provider's wire body,POSTs with rawhttpx, and decodes the byte stream into neutralEmissions. - The emissions are folded with
fold_replyinto aReply, emitted as the terminalDoneEmission.complete()drains the same channel and returns thatReply(or raises theGatewayError).
from indusagi.llmgateway import stream, complete, Conversation, UserTurn, TextBlock
convo = Conversation(turns=(UserTurn(blocks=(TextBlock(text="hi"),)),))
# Streaming: iterate the Channel of Emissions (I/O happens on iteration).
channel = stream("claude-sonnet-4", convo)
async for emission in channel:
if emission.kind == "text":
print(emission.delta, end="")
# One-shot: await the assembled Reply.
reply = await complete("claude-sonnet-4", convo)
Note the asymmetry: a streaming failure surfaces as a terminal ErrorEmission
on the channel, while complete() raises a GatewayError. An
asyncio.CancelledError is always re-raised, never folded into the reply.
The type contract
The contract/ package is the frozen, provider-neutral vocabulary every layer
shares. Each type is a @dataclass(frozen=True, slots=True).
| Name | Kind | Source | Purpose |
|---|---|---|---|
ModelCard |
dataclass | contract/model_card.py |
Static descriptor: id, provider, api, display_name, base_url, context_window, max_output_tokens, modalities, reasoning, cost, wire_id (defaults to id) |
Conversation |
dataclass | contract/conversation.py |
Provider-neutral request: turns tuple, optional system preamble and tools |
Block |
union | contract/conversation.py |
TextBlock, ThinkingBlock, ToolCallBlock, ToolResultBlock, ImageBlock, CommandBlock |
Turn |
union | contract/conversation.py |
UserTurn / AssistantTurn / ToolTurn |
ToolDescriptor |
dataclass | contract/conversation.py |
Declares a callable tool with a JsonSchema parameter spec |
Reply |
dataclass | contract/reply.py |
Terminal materialized response: model, blocks, usage, stop |
StreamOptions |
dataclass | contract/options.py |
Per-invocation knobs: cancel, thinking, max_output_tokens, temperature, top_p, tool_choice, api_key |
Emission |
union | contract/emission.py |
TextEmission, ThinkingEmission, ToolCallStartEmission, ToolCallDeltaEmission, UsageEmission, StopEmission, DoneEmission, ErrorEmission |
Channel |
type | contract/emission.py |
AsyncIterable[Emission] |
Connector |
Protocol | contract/connector.py |
runtime_checkable Protocol with id, api, stream(), complete() |
CredentialResolver |
Protocol | contract/connector.py |
`async resolve(provider, override) -> str |
GatewayError |
class | contract/errors.py |
Normalized exception with kind, status, provider, cause; gateway_error() is the factory |
A bare StreamOptions() is a valid "use the model's defaults" request, so the
options argument may be omitted entirely.
The model catalog
MODEL_CARDS is the static, hand-curated registry (24 cards: claude-opus-4,
claude-sonnet-4, gpt-4o, o3, gemini-2.5-pro, kimi-k2, mock-1, ...).
Behind it sits the 841-model full catalog generated into catalog/full.py from
models_data.json (24 providers).
get_card(id)resolves curated first, then falls back into the full catalog so any catalog id resolves; returnsNoneonly when neither layer knows the id.curated_card(id)stays curated-only — no fallback.models()opens a fluentCardSelectionover the curated cards first followed by every usable full-catalog record..by_provider(),.by_api(), and.reasoning()narrow immutably;.all()and.find(id)materialize.estimate_cost(card, usage)prices aUsageagainst the card's per-MTokCostSheet; cache tiers are billed additively when present.
from indusagi.llmgateway import models, get_card, estimate_cost, Usage
# Reasoning-capable Anthropic cards, curated-first.
cards = models().by_provider("anthropic").reasoning(True).all()
card = get_card("claude-sonnet-4")
cost = estimate_cost(card, Usage(input_tokens=1000, output_tokens=500)) # USD float
Connectors and credentials
CONNECTOR_REGISTRY is a dict[ApiKind, Connector] that is total over ApiKind,
so connector_for_api(card.api) never misses a known dialect. The registry is
built once with every factory fed a deps bag wrapping resolve_secret. The wire
dialects (ApiKind) are:
anthropic-messages, openai-completions, openai-responses,
google-generative, google-vertex, amazon-bedrock, azure-openai,
nvidia-openai-compatible, kimi-openai-compatible, ollama, mock
ProviderId is the vendor identity; ApiKind is the wire dialect a connector
implements. Several providers share the OpenAI-compatible dialect, so the
registry is keyed by ApiKind, not provider.
Credential sourcing lives in credentials/. resolve_secret(provider, override)
honors a caller override first, then walks the ordered env_vars for that
provider in the declarative SECRET_TABLE and yields the first non-empty value.
Standard keys include ANTHROPIC_API_KEY, OPENAI_API_KEY, GEMINI_API_KEY
(then GOOGLE_API_KEY), AZURE_OPENAI_API_KEY, NVIDIA_API_KEY, and
MOONSHOT_API_KEY (then KIMI_API_KEY); Bedrock and Vertex use a cloud-IAM
scheme, and Ollama and mock need no secret. environment_credential_resolver
is a ready-made CredentialResolver bound to resolve_secret.
The directory also ships pkce.py (create_pkce_pair, RFC 7636 S256) and
oauth.py (build_auth_url / exchange_code / refresh_token, RFC 6749). The
OAUTH_PROVIDERS client ids are empty placeholders that a deployment must
supply.
from indusagi.llmgateway.credentials import resolve_secret, create_pkce_pair
key = await resolve_secret("openai") # reads OPENAI_API_KEY, or None
pair = create_pkce_pair() # RFC 7636 S256 verifier + challenge
Streaming and conversion
The wire work splits across conversion/ (pure translation) and streaming/
(byte-stream framing).
| Name | Kind | Source | Purpose |
|---|---|---|---|
fold_reply |
function | conversion/reduce.py |
Pure reducer folding an Emission sequence into a Reply (thinking, then text, then tool calls in first-seen order) |
to_anthropic_request |
function | conversion/mappers.py |
Map a Conversation to the Anthropic Messages wire body |
to_openai_chat_messages |
function | conversion/mappers.py |
Map a Conversation to OpenAI Chat-Completions messages |
to_google_contents |
function | conversion/mappers.py |
Map a Conversation to Google generative contents |
to_responses_input |
function | conversion/mappers.py |
Map a Conversation to the OpenAI Responses input shape |
stream_openai_compatible_chat |
function | conversion/openai_compatible.py |
Shared OpenAI Chat-Completions streamer reused by the openai/nvidia/kimi/azure/ollama connectors |
sse_events |
async function | streaming/sse.py |
Decode an httpx byte stream into ServerEvents (SSE) with cancel-token support |
ndjson_lines |
async function | streaming/ndjson.py |
Decode an httpx byte stream into parsed JSON values (NDJSON, e.g. Ollama) |
channel_of |
function | streaming/channel.py |
Wrap a generator factory as a re-iterable Channel |
collect_reply |
function | streaming/channel.py |
Drain a Channel to its Reply, raising on a terminal error |
OpenAI-dialect connectors (openai_chat, nvidia, kimi, azure_openai,
ollama) delegate the wire loop to stream_openai_compatible_chat; anthropic,
google, google_vertex, bedrock, and openai_responses own bespoke decode
loops. JSON bodies are emitted with compact json.dumps separators so the bytes
match a JS JSON.stringify exactly. Cancellation threads from
StreamOptions.cancel into the SSE/NDJSON readers, which race reads against the
token and raise an aborted GatewayError.
Key concepts
ModelCard.idvswire_id—idis the catalog/user-facing identity (what you pass to-mor the gateway);wire_idis what's actually sent to the provider (requestmodelfield or URL segment), defaulting toid. They diverge when a friendly catalog id is rejected by the live API.ApiKindvsProviderId—ProviderIdis the vendor;ApiKindis the wire dialect. The connector registry is keyed byApiKind.Channel/Emission— aChannelis a re-iterableAsyncIterableof incremental deltas terminated by exactly onedone(success) orerror(failure) emission. Re-iterating re-POSTs.fold_reply— the pure reducer that collapses an emission sequence into aReply: accumulates thinking and text, reassembles tool-call argument JSON fragments, tracks the latest usage/stop, and emits blocks in thinking → text → tool-call order.- Curated vs full catalog —
MODEL_CARDS(24 cards) is checked first;catalog/full(841 records) is consulted as fallback so any catalog id resolves.curated_cardstays curated-only. - Vocabulary bridge —
catalog/bridge.pyis the single place mapping the full-catalog/facadeapi+providertags onto coreApiKind/ProviderId, keeping the facade and gateway from drifting. GatewayError.kind— a discriminator (auth,rate_limit,http,transport,parse,unsupported,aborted) so callers branch on failure class without string-matching messages.
Examples
Stream from the curated mock card and consume Emissions:
import asyncio
from indusagi.llmgateway import (
stream, Conversation, UserTurn, TextBlock, TextEmission, DoneEmission,
)
async def main():
convo = Conversation(
turns=(UserTurn(blocks=(TextBlock(text="hello"),)),),
system="Be terse.",
)
channel = stream("mock-1", convo) # sync; no I/O yet
async for emission in channel: # I/O happens here
if isinstance(emission, TextEmission):
print(emission.delta, end="")
elif isinstance(emission, DoneEmission):
print("\nstop:", emission.reply.stop, "usage:", emission.reply.usage)
asyncio.run(main())
One-shot complete with options and cost estimation:
import asyncio
from indusagi.llmgateway import (
complete, get_card, estimate_cost,
Conversation, UserTurn, TextBlock, StreamOptions,
)
async def main():
convo = Conversation(turns=(UserTurn(blocks=(TextBlock(text="summarize X"),)),))
opts = StreamOptions(temperature=0.2, max_output_tokens=512, thinking="low")
reply = await complete("claude-sonnet-4", convo, opts)
card = get_card("claude-sonnet-4")
print(reply.stop, estimate_cost(card, reply.usage), "USD")
asyncio.run(main())
Fluent catalog query and credential resolution:
import asyncio
from indusagi.llmgateway import models
from indusagi.llmgateway.credentials import resolve_secret, create_pkce_pair
# Reasoning-capable Anthropic cards, curated-first.
cards = models().by_provider("anthropic").reasoning(True).all()
print([c.id for c in cards])
found = models().by_api("openai-completions").find("gpt-4o")
print(found.wire_id if found else None)
async def keys():
key = await resolve_secret("openai") # reads OPENAI_API_KEY (or None)
pair = create_pkce_pair() # RFC 7636 S256 verifier + challenge
print(bool(key), pair.method)
asyncio.run(keys())
Relationship to neighbors
The gateway is the lower wire layer beneath the facade. The
ai facade and the Runtime
agent loop import stream/complete and the contract types straight from this
barrel so there is one source of truth. The facade's ModelRegistry consumes the
full catalog (catalog/full), with MODEL_CARDS as the curated supplement;
catalog/bridge.py is the single place that bridges this gateway's core
ApiKind/ProviderId vocabulary to the older facade vocabulary. The gateway also
shares the JsonSchema / ToolDescriptor contract with the
Capabilities tool kernel. No vendor SDKs are
depended on — only httpx for transport.
Back to the Architecture overview.
