Subsystemssubsystems/llm-gateway

LLM Gateway

indusagi.llmgateway is the framework's unified, multi-provider door to every LLM. It is a zero-SDK gateway: you hand it a model id and a provider-neutral Conversation, and it routes to one of eleven httpx-backed wire connectors and streams normalized Emissions back. Import everything from the barrel: from indusagi.llmgateway import stream, complete, Conversation, ....

The gateway resolves a model id to a ModelCard, selects the Connector that speaks that card's wire dialect, talks raw HTTP via httpx, decodes the SSE/NDJSON stream, and folds the deltas into a terminal Reply. No vendor SDKs are imported anywhere — only httpx for transport. Unknown ids fail fast with an unsupported GatewayError.

Table of Contents

Public exports

Everything callers need is re-exported from indusagi/llmgateway/__init__.py.

Name Kind Source Purpose
stream function gateway.py Resolve a model id, route to its connector, return a streaming Channel. Sync — I/O happens on iteration. Raises unsupported GatewayError for unknown ids
complete async function gateway.py One-shot: resolve id, run the connector, return the assembled Reply. Raises unsupported for unknown ids
stream_with_card function gateway.py Like stream() but takes a pre-resolved ModelCard, bypassing catalog lookup
complete_with_card async function gateway.py One-shot variant taking a pre-resolved ModelCard
MODEL_CARDS const catalog/cards.py tuple[ModelCard, ...] — the hand-curated catalog
models function catalog/query.py Open a fluent CardSelection — curated cards first, then the full-catalog fallback
get_card function catalog/query.py Direct id lookup: curated first, then full-catalog fallback; None if unknown
curated_card function catalog/query.py Id lookup restricted to MODEL_CARDS only (no fallback)
CardSelection dataclass catalog/query.py Lazy, immutable, chainable query; build via CardSelection.over(pool)
estimate_cost function catalog/cost.py Turn a card's per-MTok CostSheet + a Usage into a USD float
CONNECTOR_REGISTRY const connectors/__init__.py dict[ApiKind, Connector] — the total routing table
connector_for_api function connectors/__init__.py Return the Connector that speaks a given ApiKind

The barrel also re-exports the entire contract package — the frozen type vocabulary plus the GatewayError class and its gateway_error() factory.

Sub-directories

Directory Holds
contract/ The frozen type contract — model_card.py, conversation.py, reply.py, emission.py, options.py, errors.py, connector.py
connectors/ One Connector + create_*_connector factory per ApiKind (anthropic, openai_chat, openai_responses, google, google_vertex, bedrock, azure_openai, nvidia, kimi, ollama, mock) plus the registry __init__.py
conversion/ Pure wire translation: mappers.py (Conversation → native bodies), reduce.py (fold_reply), openai_compatible.py (shared OpenAI chat streamer)
catalog/ cards.py (MODEL_CARDS), query.py (models/get_card/CardSelection), cost.py (estimate_cost), full.py (the 841-model full catalog), fallback.py (on-the-fly cards), bridge.py (vocabulary bridge)
credentials/ secrets.py (SECRET_TABLE + resolve_secret), pkce.py (RFC 7636 S256), oauth.py (RFC 6749 authorization-code + refresh)
streaming/ channel.py (channel_of/collect_reply), sse.py (sse_events), ndjson.py (ndjson_lines) — wire-framing decoders over httpx byte streams

Dispatch

stream and complete take the same shape — a model id, a Conversation, and optional StreamOptions. The dispatch path in gateway.py is:

  1. get_card(model_id) resolves a ModelCard — curated MODEL_CARDS first, then the full-catalog fallback. Unknown id raises an unsupported GatewayError.
  2. connector_for_api(card.api) indexes the total CONNECTOR_REGISTRY.
  3. The connector builds a re-iterable Channel. stream() itself stays sync and does no I/O — every async for re-issues the HTTP request.
  4. On iteration the connector resolves its key, maps the Conversation to the provider's wire body, POSTs with raw httpx, and decodes the byte stream into neutral Emissions.
  5. The emissions are folded with fold_reply into a Reply, emitted as the terminal DoneEmission. complete() drains the same channel and returns that Reply (or raises the GatewayError).
from indusagi.llmgateway import stream, complete, Conversation, UserTurn, TextBlock

convo = Conversation(turns=(UserTurn(blocks=(TextBlock(text="hi"),)),))

# Streaming: iterate the Channel of Emissions (I/O happens on iteration).
channel = stream("claude-sonnet-4", convo)
async for emission in channel:
    if emission.kind == "text":
        print(emission.delta, end="")

# One-shot: await the assembled Reply.
reply = await complete("claude-sonnet-4", convo)

Note the asymmetry: a streaming failure surfaces as a terminal ErrorEmission on the channel, while complete() raises a GatewayError. An asyncio.CancelledError is always re-raised, never folded into the reply.

The type contract

The contract/ package is the frozen, provider-neutral vocabulary every layer shares. Each type is a @dataclass(frozen=True, slots=True).

Name Kind Source Purpose
ModelCard dataclass contract/model_card.py Static descriptor: id, provider, api, display_name, base_url, context_window, max_output_tokens, modalities, reasoning, cost, wire_id (defaults to id)
Conversation dataclass contract/conversation.py Provider-neutral request: turns tuple, optional system preamble and tools
Block union contract/conversation.py TextBlock, ThinkingBlock, ToolCallBlock, ToolResultBlock, ImageBlock, CommandBlock
Turn union contract/conversation.py UserTurn / AssistantTurn / ToolTurn
ToolDescriptor dataclass contract/conversation.py Declares a callable tool with a JsonSchema parameter spec
Reply dataclass contract/reply.py Terminal materialized response: model, blocks, usage, stop
StreamOptions dataclass contract/options.py Per-invocation knobs: cancel, thinking, max_output_tokens, temperature, top_p, tool_choice, api_key
Emission union contract/emission.py TextEmission, ThinkingEmission, ToolCallStartEmission, ToolCallDeltaEmission, UsageEmission, StopEmission, DoneEmission, ErrorEmission
Channel type contract/emission.py AsyncIterable[Emission]
Connector Protocol contract/connector.py runtime_checkable Protocol with id, api, stream(), complete()
CredentialResolver Protocol contract/connector.py `async resolve(provider, override) -> str
GatewayError class contract/errors.py Normalized exception with kind, status, provider, cause; gateway_error() is the factory

A bare StreamOptions() is a valid "use the model's defaults" request, so the options argument may be omitted entirely.

The model catalog

MODEL_CARDS is the static, hand-curated registry (24 cards: claude-opus-4, claude-sonnet-4, gpt-4o, o3, gemini-2.5-pro, kimi-k2, mock-1, ...). Behind it sits the 841-model full catalog generated into catalog/full.py from models_data.json (24 providers).

  • get_card(id) resolves curated first, then falls back into the full catalog so any catalog id resolves; returns None only when neither layer knows the id.
  • curated_card(id) stays curated-only — no fallback.
  • models() opens a fluent CardSelection over the curated cards first followed by every usable full-catalog record. .by_provider(), .by_api(), and .reasoning() narrow immutably; .all() and .find(id) materialize.
  • estimate_cost(card, usage) prices a Usage against the card's per-MTok CostSheet; cache tiers are billed additively when present.
from indusagi.llmgateway import models, get_card, estimate_cost, Usage

# Reasoning-capable Anthropic cards, curated-first.
cards = models().by_provider("anthropic").reasoning(True).all()

card = get_card("claude-sonnet-4")
cost = estimate_cost(card, Usage(input_tokens=1000, output_tokens=500))  # USD float

Connectors and credentials

CONNECTOR_REGISTRY is a dict[ApiKind, Connector] that is total over ApiKind, so connector_for_api(card.api) never misses a known dialect. The registry is built once with every factory fed a deps bag wrapping resolve_secret. The wire dialects (ApiKind) are:

anthropic-messages, openai-completions, openai-responses,
google-generative, google-vertex, amazon-bedrock, azure-openai,
nvidia-openai-compatible, kimi-openai-compatible, ollama, mock

ProviderId is the vendor identity; ApiKind is the wire dialect a connector implements. Several providers share the OpenAI-compatible dialect, so the registry is keyed by ApiKind, not provider.

Credential sourcing lives in credentials/. resolve_secret(provider, override) honors a caller override first, then walks the ordered env_vars for that provider in the declarative SECRET_TABLE and yields the first non-empty value. Standard keys include ANTHROPIC_API_KEY, OPENAI_API_KEY, GEMINI_API_KEY (then GOOGLE_API_KEY), AZURE_OPENAI_API_KEY, NVIDIA_API_KEY, and MOONSHOT_API_KEY (then KIMI_API_KEY); Bedrock and Vertex use a cloud-IAM scheme, and Ollama and mock need no secret. environment_credential_resolver is a ready-made CredentialResolver bound to resolve_secret.

The directory also ships pkce.py (create_pkce_pair, RFC 7636 S256) and oauth.py (build_auth_url / exchange_code / refresh_token, RFC 6749). The OAUTH_PROVIDERS client ids are empty placeholders that a deployment must supply.

from indusagi.llmgateway.credentials import resolve_secret, create_pkce_pair

key = await resolve_secret("openai")  # reads OPENAI_API_KEY, or None
pair = create_pkce_pair()             # RFC 7636 S256 verifier + challenge

Streaming and conversion

The wire work splits across conversion/ (pure translation) and streaming/ (byte-stream framing).

Name Kind Source Purpose
fold_reply function conversion/reduce.py Pure reducer folding an Emission sequence into a Reply (thinking, then text, then tool calls in first-seen order)
to_anthropic_request function conversion/mappers.py Map a Conversation to the Anthropic Messages wire body
to_openai_chat_messages function conversion/mappers.py Map a Conversation to OpenAI Chat-Completions messages
to_google_contents function conversion/mappers.py Map a Conversation to Google generative contents
to_responses_input function conversion/mappers.py Map a Conversation to the OpenAI Responses input shape
stream_openai_compatible_chat function conversion/openai_compatible.py Shared OpenAI Chat-Completions streamer reused by the openai/nvidia/kimi/azure/ollama connectors
sse_events async function streaming/sse.py Decode an httpx byte stream into ServerEvents (SSE) with cancel-token support
ndjson_lines async function streaming/ndjson.py Decode an httpx byte stream into parsed JSON values (NDJSON, e.g. Ollama)
channel_of function streaming/channel.py Wrap a generator factory as a re-iterable Channel
collect_reply function streaming/channel.py Drain a Channel to its Reply, raising on a terminal error

OpenAI-dialect connectors (openai_chat, nvidia, kimi, azure_openai, ollama) delegate the wire loop to stream_openai_compatible_chat; anthropic, google, google_vertex, bedrock, and openai_responses own bespoke decode loops. JSON bodies are emitted with compact json.dumps separators so the bytes match a JS JSON.stringify exactly. Cancellation threads from StreamOptions.cancel into the SSE/NDJSON readers, which race reads against the token and raise an aborted GatewayError.

Key concepts

  • ModelCard.id vs wire_idid is the catalog/user-facing identity (what you pass to -m or the gateway); wire_id is what's actually sent to the provider (request model field or URL segment), defaulting to id. They diverge when a friendly catalog id is rejected by the live API.
  • ApiKind vs ProviderIdProviderId is the vendor; ApiKind is the wire dialect. The connector registry is keyed by ApiKind.
  • Channel / Emission — a Channel is a re-iterable AsyncIterable of incremental deltas terminated by exactly one done (success) or error (failure) emission. Re-iterating re-POSTs.
  • fold_reply — the pure reducer that collapses an emission sequence into a Reply: accumulates thinking and text, reassembles tool-call argument JSON fragments, tracks the latest usage/stop, and emits blocks in thinking → text → tool-call order.
  • Curated vs full catalogMODEL_CARDS (24 cards) is checked first; catalog/full (841 records) is consulted as fallback so any catalog id resolves. curated_card stays curated-only.
  • Vocabulary bridgecatalog/bridge.py is the single place mapping the full-catalog/facade api + provider tags onto core ApiKind/ProviderId, keeping the facade and gateway from drifting.
  • GatewayError.kind — a discriminator (auth, rate_limit, http, transport, parse, unsupported, aborted) so callers branch on failure class without string-matching messages.

Examples

Stream from the curated mock card and consume Emissions:

import asyncio
from indusagi.llmgateway import (
    stream, Conversation, UserTurn, TextBlock, TextEmission, DoneEmission,
)

async def main():
    convo = Conversation(
        turns=(UserTurn(blocks=(TextBlock(text="hello"),)),),
        system="Be terse.",
    )
    channel = stream("mock-1", convo)   # sync; no I/O yet
    async for emission in channel:      # I/O happens here
        if isinstance(emission, TextEmission):
            print(emission.delta, end="")
        elif isinstance(emission, DoneEmission):
            print("\nstop:", emission.reply.stop, "usage:", emission.reply.usage)

asyncio.run(main())

One-shot complete with options and cost estimation:

import asyncio
from indusagi.llmgateway import (
    complete, get_card, estimate_cost,
    Conversation, UserTurn, TextBlock, StreamOptions,
)

async def main():
    convo = Conversation(turns=(UserTurn(blocks=(TextBlock(text="summarize X"),)),))
    opts = StreamOptions(temperature=0.2, max_output_tokens=512, thinking="low")
    reply = await complete("claude-sonnet-4", convo, opts)
    card = get_card("claude-sonnet-4")
    print(reply.stop, estimate_cost(card, reply.usage), "USD")

asyncio.run(main())

Fluent catalog query and credential resolution:

import asyncio
from indusagi.llmgateway import models
from indusagi.llmgateway.credentials import resolve_secret, create_pkce_pair

# Reasoning-capable Anthropic cards, curated-first.
cards = models().by_provider("anthropic").reasoning(True).all()
print([c.id for c in cards])

found = models().by_api("openai-completions").find("gpt-4o")
print(found.wire_id if found else None)

async def keys():
    key = await resolve_secret("openai")  # reads OPENAI_API_KEY (or None)
    pair = create_pkce_pair()             # RFC 7636 S256 verifier + challenge
    print(bool(key), pair.method)

asyncio.run(keys())

Relationship to neighbors

The gateway is the lower wire layer beneath the facade. The ai facade and the Runtime agent loop import stream/complete and the contract types straight from this barrel so there is one source of truth. The facade's ModelRegistry consumes the full catalog (catalog/full), with MODEL_CARDS as the curated supplement; catalog/bridge.py is the single place that bridges this gateway's core ApiKind/ProviderId vocabulary to the older facade vocabulary. The gateway also shares the JsonSchema / ToolDescriptor contract with the Capabilities tool kernel. No vendor SDKs are depended on — only httpx for transport.

Back to the Architecture overview.