Subsystemssubsystems/llm-gateway

LLM Gateway

indusagi::llmgateway is the Rust framework's unified, multi-provider door to every LLM. It is a zero-SDK gateway: you hand it a model id and a provider-neutral Conversation, and it resolves a ModelCard, routes to one of eleven reqwest-backed wire connectors, and streams normalized Emissions back through a re-iterable Channel. No vendor SDK is imported anywhere — only reqwest for HTTP transport. Unknown model ids fail fast with an Unsupported GatewayError.

The gateway resolves a model id to a ModelCard, selects the Connector that speaks that card's ApiKind wire dialect, talks raw HTTP via an injected HttpTransport, decodes the SSE/NDJSON byte stream into neutral Emissions, and folds them into a terminal Reply with [fold_reply]. This crate ports the TypeScript llmgateway/ tree module-for-module and mirrors the Python indusagi.llmgateway layer, with Rust-native Result-based error propagation in place of thrown exceptions.

Table of Contents

Module layout

The barrel lives in crates/indusagi/src/llmgateway/mod.rs and re-exports the symbols the rest of the framework reaches for. The subtree is split into six sibling modules plus the top-level gateway dispatcher.

Module Path Holds
contract llmgateway/contract/ The frozen type vocabulary — model_card.rs, conversation.rs, reply.rs, emission.rs, options.rs, errors.rs, connector.rs
connectors llmgateway/connectors/ One Connector + create_*_connector factory per ApiKind (anthropic, openai_chat, openai_responses, google, google_vertex, bedrock, azure_openai, nvidia, kimi, ollama, mock), the HttpTransport seam (http.rs), and the registry mod.rs
conversion llmgateway/conversion/ Pure wire translation: mappers.rs (Conversation → native bodies), reduce.rs (fold_reply), openai_compatible.rs (shared OpenAI chat streamer)
catalog llmgateway/catalog/ cards.rs (the 13 curated ModelCards), query.rs (models/get_card/CardSelection), cost.rs (estimate_cost), generated.rs (the 708-model discovery catalog made routable)
credentials llmgateway/credentials/ secrets.rs (secret_table + resolve_secret), pkce.rs (RFC 7636 S256), oauth.rs (RFC 6749 authorization-code + refresh)
streaming llmgateway/streaming/ channel.rs (channel_of/collect_reply), sse.rs (sse_events), ndjson.rs (ndjson_lines), utf8.rs (incremental UTF-8 decode)
gateway llmgateway/gateway.rs The top-level stream/complete (+ *_with_card) entry points

Public exports

Everything callers need is re-exported from llmgateway/mod.rs.

Name Kind Source Purpose
stream fn gateway.rs Resolve a model id, route to its connector, return a streaming Channel. Sync — I/O happens on iteration. Err(Unsupported) for unknown ids
complete async fn gateway.rs One-shot: resolve id, drain the connector channel, return the assembled Reply
stream_with_card fn gateway.rs Like stream but takes a pre-resolved ModelCard, bypassing catalog lookup
complete_with_card async fn gateway.rs One-shot variant taking a pre-resolved ModelCard
model_cards fn catalog/cards.rs &'static [ModelCard] — the 13 hand-curated cards, built once via OnceLock
models fn catalog/query.rs Open a fluent CardSelection over model_cards()
get_card fn catalog/query.rs Id lookup: curated first, then synthesize from the generated catalog; None if unroutable
CardSelection struct catalog/query.rs Lazy, immutable, chainable query; build via CardSelection::over(pool)
estimate_cost fn catalog/cost.rs Turn a card's per-MTok CostSheet + a Usage into a USD f64
connector_for_api fn connectors/mod.rs Return the Arc<dyn Connector> that speaks a given ApiKind

The barrel also re-exports the entire contract vocabulary (ApiKind, AssistantRole, Block, Channel, Connector, Conversation, CostSheet, CredentialResolver, Emission, GatewayError, GatewayErrorExtra, GatewayErrorKind, JsonSchema, Modality, ModelCard, ProviderId, Reply, StopReason, StreamOptions, ThinkingLevel, ToolChoice, ToolDescriptor, Turn, Usage, gateway_error), the conversion mappers and OpenAI-compatible streamer (incl. to_google_*, to_responses_*, complete_openai_compatible_chat, OpenAICompatibleConfig), the credentials helpers (build_auth_url, exchange_code, refresh_token, oauth_config_for, AuthUrlInputs, OAuthConfig, OAuthTokens, PkcePair, create_pkce_pair, derive_code_challenge, …), the streaming helpers (channel_of, collect_reply, sse_events, ndjson_lines, ServerEvent), and the HttpRequest/HttpResponse/HttpTransport/ReqwestTransport/reqwest_transport seam plus every create_*_connector factory.

Dispatch

stream and complete take the same shape — a &str model id, a Conversation, and an optional StreamOptions. The dispatch path in gateway.rs is:

  1. get_card(model_id) resolves a ModelCard — the curated model_cards() first, then a synthesized card from the generated discovery catalog. An unknown id returns Err with an Unsupported GatewayError ("unknown model: <id>").
  2. connector_for_api(card.api) indexes the total CONNECTOR_REGISTRY.
  3. The connector builds a re-iterable Channel. stream itself stays sync and does no I/O — every Channel::iter re-issues the HTTP request.
  4. On iteration the connector resolves its key, maps the Conversation to the provider's wire body, POSTs through the HttpTransport, and decodes the byte stream into neutral Emissions.
  5. The emissions are folded with fold_reply into a Reply, emitted as the terminal Emission::Done. complete drains the same channel via collect_reply and returns that Reply (or Err(GatewayError)).
pub fn stream(
    model_id: &str,
    conversation: Conversation,
    options: Option<StreamOptions>,
) -> Result<Channel, GatewayError>;

pub async fn complete(
    model_id: &str,
    conversation: Conversation,
    options: Option<StreamOptions>,
) -> Result<Reply, GatewayError>;

Note the asymmetry that survives from the TS source: a streaming failure surfaces as a terminal Emission::Error on the channel, while complete returns Err(GatewayError). stream returns Ok(channel) even when the dialect has no registered connector — the channel then yields exactly one Emission::Error ("no connector for dialect: <api>") so a streaming caller always observes a terminal envelope, never an empty channel. complete_with_card returns the same error directly as Err. Because the registry is now total over ApiKind, that path is unreachable for any catalog card.

The type contract

The contract/ module is the frozen, provider-neutral vocabulary every layer shares. Each type derives Serialize/Deserialize with serde tag/rename attributes chosen to match the TS wire shape byte-for-byte (snake_case discriminants on kind/role, camelCase field names like callId, argsDelta, inputTokens).

Name Kind Source Notes
ProviderId enum model_card.rs The vendor identity: Anthropic, Openai, Google, GoogleVertex, Amazon, Azure, Nvidia, Kimi, Minimax, Ollama, Mock (serde kebab-case)
ApiKind enum model_card.rs The wire dialect (11 variants); ApiKind::ALL is the hand-maintained iteration array
Modality enum model_card.rs Text / Image / Audio (serde lowercase)
CostSheet struct model_card.rs Per-MTok pricing: input_per_mtok, output_per_mtok, optional cache_read_per_mtok/cache_write_per_mtok
ModelCard struct model_card.rs Static descriptor: id, provider, api, display_name, optional base_url, context_window, max_output_tokens, modalities, reasoning, cost
Block enum conversation.rs Text / Thinking / ToolCall / ToolResult / Image / Command (internally tagged on kind)
Turn enum conversation.rs User / Assistant / Tool (tagged on role)
ToolDescriptor struct conversation.rs name, description, parameters: JsonSchema
JsonSchema type alias conversation.rs = serde_json::Value
Conversation struct conversation.rs Provider-neutral request: optional system, turns: Vec<Turn>, optional tools
Reply struct reply.rs Terminal response: role: AssistantRole, model, blocks, usage, stop
AssistantRole enum reply.rs One-variant enum (Assistant) so serde renders/parses exactly the literal "assistant" on Reply.role
Usage struct reply.rs input_tokens, output_tokens, optional cache tiers; Usage::EMPTY is the fold seed
StopReason enum reply.rs Complete / ToolCalls / MaxOutput / Refusal / Aborted / Error
StreamOptions struct options.rs Per-invocation knobs (see below)
ThinkingLevel enum options.rs Off / Low / Medium / High / Max
ToolChoice enum options.rs Auto / Required / None
Emission enum emission.rs The 8-variant streaming union (see Streaming)
Channel type alias emission.rs = crate::core::channel::Channel<Emission>
Connector trait connector.rs id, api, stream, async complete (object-safe via #[async_trait])
CredentialResolver trait connector.rs async fn resolve(provider, override) -> Option<String>
GatewayError struct errors.rs Normalized error with kind, message, optional status/provider/cause
GatewayErrorKind enum errors.rs Auth / RateLimit / Http / Transport / Parse / Unsupported / Aborted

StreamOptions is the per-invocation option bag. A bare StreamOptions::default() is a valid "use the model's defaults" request, so the options argument can be passed as None:

pub struct StreamOptions {
    #[serde(skip)]
    pub cancel: Option<CancellationToken>,   // replaces the TS AbortSignal
    pub thinking: Option<ThinkingLevel>,
    pub max_output_tokens: Option<u64>,       // wire: maxOutputTokens
    pub temperature: Option<f64>,
    pub top_p: Option<f64>,                    // wire: topP
    pub tool_choice: Option<ToolChoice>,       // wire: toolChoice
    pub api_key: Option<String>,               // wire: apiKey; overrides the env credential
}

The Connector trait is the seam every dialect implements:

#[async_trait]
pub trait Connector: Send + Sync {
    fn id(&self) -> &str;
    fn api(&self) -> ApiKind;
    fn stream(&self, model: ModelCard, conversation: Conversation, options: StreamOptions) -> Channel;
    async fn complete(&self, model: ModelCard, conversation: Conversation, options: StreamOptions)
        -> Result<Reply, GatewayError>;
}

GatewayError mirrors the TS class GatewayError extends Error. Because Rust carries it as a concrete typed value inside Result<_, GatewayError> (and inside Emission::Error), the TS instanceof-by-name re-raise check collapses to ordinary type matching. cause is a #[serde(skip)] boxed `Arc<dyn Error + Send

  • Sync>(lossy across a serialization boundary, exactly like the TScause). gateway_error(kind, message) is the free-function factory; builder methods (.provider(..), .status(..), .cause(..)) and GatewayError::with_extra` attach the optional context.

The model catalog

model_cards() is the static, hand-curated registry: exactly 13 cards, built once on first access via OnceLock (a Vec<ModelCard> rather than a const because the modalities lists are not const-constructible). The card ids, in order:

Id Provider ApiKind Reasoning Context
claude-opus-4 Anthropic AnthropicMessages yes 200k
claude-sonnet-4 Anthropic AnthropicMessages yes 200k
claude-haiku-3-5 Anthropic AnthropicMessages no 200k
gpt-4o Openai OpenaiCompletions no 128k
gpt-4o-mini Openai OpenaiCompletions no 128k
o3 Openai OpenaiResponses yes 200k
o4-mini Openai OpenaiResponses yes 200k
gemini-2.5-pro Google GoogleGenerative yes 1M
gemini-2.5-flash Google GoogleGenerative yes 1M
nvidia/llama-3.1-nemotron-70b-instruct Nvidia NvidiaOpenaiCompatible no 128k
kimi-k2 Kimi KimiOpenaiCompatible yes 256k
llama3.1:8b Ollama Ollama no 128k
mock-1 Mock Mock no 32k

Every number in the cost sheets is an APPROXIMATE, best-effort per-MTok figure for local estimation only — not transcribed from any vendor price list. Context-window and max-output figures are likewise approximate and rounded.

Behind the curated list sits the generated discovery catalog (catalog/generated.rs), embedded from models_generated.json via include_str!. The JSON is a nested { provider: { id: model } } map (24 providers, 708 records); it is parsed once via serde_json::from_str into a BTreeMap<String, BTreeMap<String, GeneratedModel>> and flattened into a &'static [GeneratedModel] by generated_models(). Each record deserializes into a GeneratedModel (with a nested GeneratedCost); both generated_model(id) (exact-id lookup) and generated_models() are exported from catalog::generated. The picker lets a user select any model from that larger set; get_card resolves it as follows:

  • get_card(id) returns the curated card if id is among the 13.
  • On a miss it calls generated_model(id) then synthesize_card(g), which projects a GeneratedModel into a routable ModelCard only when both its provider (map_provider) and wire protocol (map_api) map onto the gateway's enums. So claude-sonnet-4-5 or MiniMax-M2.7 resolve and route; ids on providers the gateway can't reach (xai, groq, cerebras, openrouter, …) stay None — non-routable, honestly, not a silent failure.
  • models() opens a CardSelection over model_cards(). .by_provider(..), .by_api(..), and .reasoning(bool) narrow immutably (each returns a fresh selection cloning the accumulated Arc<dyn Fn> predicates); .all() and .find(id) materialize. The builder is immutable — narrowing a base selection never mutates it.

estimate_cost(card, usage) prices a Usage against the card's per-MTok CostSheet. Cache tiers are billed additively, and only when both the usage reports a non-zero count and the card prices the tier (mirroring the TS truthiness guard where 0 is falsy):

pub fn estimate_cost(card: &ModelCard, usage: &Usage) -> f64;

Providers and wire dialects

ProviderId is the vendor identity; ApiKind is the concrete request/response wire protocol a connector implements. They are distinct because several providers share a dialect (everything OpenAI-compatible speaks OpenaiCompletions), and the connector registry is keyed by ApiKind, not by provider. The 11 ApiKind variants (serde kebab-case in parentheses) are:

ApiKind variant Wire literal Connector id() Decode loop
AnthropicMessages anthropic-messages anthropic bespoke SSE
OpenaiCompletions openai-completions openai-chat shared OpenAI-compatible
OpenaiResponses openai-responses openai-responses bespoke SSE (typed events)
GoogleGenerative google-generative google-generative bespoke SSE
GoogleVertex google-vertex google-vertex bespoke SSE
AmazonBedrock amazon-bedrock bedrock fail-fast (SigV4 TODO)
AzureOpenai azure-openai azure-openai shared OpenAI-compatible
NvidiaOpenaiCompatible nvidia-openai-compatible nvidia shared OpenAI-compatible
KimiOpenaiCompatible kimi-openai-compatible kimi shared OpenAI-compatible
Ollama ollama ollama NDJSON
Mock mock mock network-free script

The Anthropic Messages dialect is notably spoken by more than one vendor: the Anthropic connector resolves its credential from model.provider, so a MiniMax card (provider Minimax, api AnthropicMessages, base URL https://api.minimax.io/anthropic) reaches MINIMAX_API_KEY while an Anthropic card reaches ANTHROPIC_API_KEY.

Connectors and the registry

connectors/mod.rs holds the routing table, built once on first access via OnceLock and total over ApiKind (a registry_is_total_with_exactly_eleven_dialects test enforces all 11). connector_for_api(api) -> Option<Arc<dyn Connector>> indexes it; it stays an Option to match the call-site contract but never misses a known dialect. Each connector is built by its factory, all sharing one dependency — the environment-backed credential resolver:

pub fn connector_for_api(api: ApiKind) -> Option<Arc<dyn Connector>>;

// Per-dialect factories (all take the shared resolver):
pub fn create_anthropic_connector(resolver: Arc<dyn CredentialResolver>) -> Arc<dyn Connector>;
pub fn create_openai_chat_connector(resolver: Arc<dyn CredentialResolver>) -> Arc<dyn Connector>;
pub fn create_openai_responses_connector(resolver: Arc<dyn CredentialResolver>) -> Arc<dyn Connector>;
// ...google, google_vertex, bedrock, azure_openai, nvidia, kimi, ollama, mock

The HttpTransport seam

The TS connectors call ambient fetch directly. In Rust the network call is hidden behind a small HttpTransport trait (connectors/http.rs) so connectors are pure with respect to I/O and can be driven, network-free, against a local transcript server in tests. The production implementation ReqwestTransport wraps a shared reqwest::Client; reqwest_transport() returns the shared Arc<dyn HttpTransport>.

#[async_trait]
pub trait HttpTransport: Send + Sync {
    async fn post(
        &self,
        request: HttpRequest,
        provider: &str,
        transport_message: &str,
        aborted_message: &str,
    ) -> Result<HttpResponse, GatewayError>;
}

The transport's only jobs are exactly the TS fetch: POST a URL with headers and a JSON body honoring a CancellationToken; classify a connect/transport failure as Transport (or Aborted when the token was already tripped — checked both before the send and, racing each send via tokio::select!, during it); and return an HttpResponse exposing status, ok, has_body(), a streaming body (into_byte_stream(cancel)), and a best-effort text() for error diagnostics (the body is taken by exactly one of into_byte_stream/text). The request struct is HttpRequest { url, headers, body: Vec<u8>, cancel }; the transport adds content-type: application/json when the connector did not supply one. Per-connector HTTP-status → GatewayErrorKind classification (401|403 → Auth, 429 → RateLimit, else Http) stays in each connector, matching the TS where each postX owns its own !response.ok branch.

Connector responsibilities

Each live connector: resolves its key lazily on first iteration (a missing key becomes a terminal Emission::Error { kind: Auth } naming the env var); assembles the wire body from the shared conversion mappers plus a stream flag; POSTs through the transport; decodes the byte stream (its own loop or the shared OpenAI streamer); folds the collected deltas with fold_reply; and emits a terminal Done or Error. The four OpenAI-dialect connectors (openai_chat, nvidia, kimi, azure_openai) delegate the entire wire loop to stream_openai_compatible_chat / complete_openai_compatible_chat; anthropic, google, google_vertex, and openai_responses own bespoke SSE decode loops; ollama uses the NDJSON framer.

Per-connector wire specifics worth knowing:

  • anthropic (id anthropic) — POSTs {baseUrl}/v1/messages (default https://api.anthropic.com), auth header x-api-key plus anthropic-version: 2023-06-01, resolves its key by model.provider (so a MiniMax card reaches MINIMAX_API_KEY), and decodes typed message_* / content_block_* SSE events. Stop-reason map: end_turn/stop_sequenceComplete, tool_useToolCalls, max_tokensMaxOutput, refusalRefusal.
  • openai_chat (id openai-chat) — {baseUrl}/chat/completions (default https://api.openai.com/v1), authorization: Bearer …, delegates to the shared streamer.
  • openai_responses (id openai-responses) — POST /v1/responses; maps ThinkingLevel onto reasoning.effort (low/medium/high); maps the completion status (completed/incomplete+max_output_tokens/failed/ cancelled) onto a StopReason.
  • nvidia (id nvidia) — hosted NIM at https://integrate.api.nvidia.com/v1, bearer auth, and a reasoning-only body override chat_template_kwargs.thinking passed as body_extra.
  • azure_openai (id azure-openai) — deployment-scoped URL on the resource baseUrl (no public default; a missing endpoint is Unsupported), the deployment name taken from model.id, api-key header, and api-version=2024-08-01-preview.
  • kimi (id kimi) — Moonshot host https://api.moonshot.ai/v1, bearer auth, shared streamer.
  • ollama (id ollama) — local server (default http://localhost:11434), no credential, NDJSON body framing.

bedrock is present and assembles its Converse request body, model addressing, and converse-stream event mapper (all production-ready and exported as build_converse_body / map_converse_stream_event / map_bedrock_stop_reason), but fails fast with an Unsupported GatewayError ("bedrock streaming requires SigV4 (todo)") at call time — no network request is issued — because SigV4 signing and the binary event-stream codec are deferred. mock is fully network-free: it replays one of two deterministic transcripts (a scripted echo tool-call path or a terminal prose answer) chosen from the Conversation, so the gateway smoke test never opens a socket or reads a credential.

Credentials

Credential sourcing lives in credentials/secrets.rs. secret_table() is the declarative, ordered table — one SecretDescriptor per ProviderId (total over all 11), built once via OnceLock. Each SecretDescriptor carries four fields: provider, scheme (a SecretScheme: ApiKey, CloudIam, or None), the ordered env_vars: &'static [&'static str] to probe, and a human-readable hint. resolve_secret(provider, override) honors a trimmed caller override first, then (unless the scheme is None, which never carries a key) walks the descriptor's env_vars (through crate::core::env::read_raw, the single env registry, which already trims and treats empty/whitespace as absent) and yields the first non-empty value. describe_secret(provider) returns the (always-present) descriptor for a provider.

The barrel (llmgateway/mod.rs) re-exports resolve_secret, SecretDescriptor, SecretScheme, environment_credential_resolver, and the OAuth/PKCE helpers, but NOT secret_table / describe_secret — those are reached on the credentials::secrets path (the credentials barrel itself does re-export them).

pub fn resolve_secret(provider: ProviderId, override_value: Option<&str>) -> Option<String>;
pub fn describe_secret(provider: ProviderId) -> &'static SecretDescriptor;
pub fn environment_credential_resolver() -> EnvironmentCredentialResolver;
Provider Scheme Env vars (in order)
Anthropic ApiKey ANTHROPIC_API_KEY
Openai ApiKey OPENAI_API_KEY
Google ApiKey GEMINI_API_KEY, GOOGLE_API_KEY
GoogleVertex CloudIam GOOGLE_APPLICATION_CREDENTIALS, GOOGLE_VERTEX_PROJECT, GOOGLE_CLOUD_PROJECT
Amazon CloudIam AWS_BEARER_TOKEN_BEDROCK, AWS_ACCESS_KEY_ID, AWS_PROFILE
Azure ApiKey AZURE_OPENAI_API_KEY, AZURE_API_KEY
Nvidia ApiKey NVIDIA_API_KEY
Kimi ApiKey MOONSHOT_API_KEY, KIMI_API_KEY
Minimax ApiKey MINIMAX_API_KEY
Ollama None (none)
Mock None (none)

EnvironmentCredentialResolver is the ready-made CredentialResolver implementation bound to resolve_secret; it is the shared dependency every connector factory receives.

The module also ships PKCE (pkce.rs, RFC 7636 S256) and OAuth 2.0 (oauth.rs, RFC 6749). create_pkce_pair(verifier_length) mints a CSPRNG verifier (clamped to 43–128 chars) and its S256 challenge; derive_code_challenge(verifier) is synchronous and byte-identical to the TS/Python output (RFC 7636 Appendix B vector verified). build_auth_url, exchange_code, and refresh_token drive the authorization-code-with-PKCE and refresh flows with hand-rolled reqwest form posts (no oauth2 crate, for parity-locked error strings). oauth_config_for returns an OAuthConfig only for Anthropic and Openai; the client_id fields are empty placeholders a deployment must supply.

pub fn create_pkce_pair(verifier_length: usize) -> PkcePair;   // PKCE_METHOD = "S256"
pub fn derive_code_challenge(verifier: &str) -> String;
pub fn oauth_config_for(provider: ProviderId) -> Option<OAuthConfig>;

Conversion

The conversion/ module is pure (no network, no mutation). It splits into the emission folder, the per-dialect request mappers, and the shared OpenAI-compatible streamer.

Name Kind Source Purpose
fold_reply fn reduce.rs Pure reducer folding an &Emission sequence into a Reply
to_openai_chat_messages fn mappers.rs Conversation → OpenAI Chat-Completions messages
to_openai_tools / to_openai_tool_choice fn mappers.rs OpenAI tools array / tool_choice string
to_anthropic_request fn mappers.rs Conversation → the full Anthropic /v1/messages body
to_responses_input / to_responses_tools fn mappers.rs OpenAI Responses input items / flat function tools
to_google_contents / to_google_system_instruction / to_google_tools fn mappers.rs Google contents, systemInstruction, functionDeclarations
stream_openai_compatible_chat fn openai_compatible.rs Shared OpenAI Chat streamer reused by openai/nvidia/kimi/azure
complete_openai_compatible_chat async fn openai_compatible.rs One-shot view over the shared streamer
OpenAICompatibleConfig struct openai_compatible.rs Connector-supplied url, headers, optional body_extra

fold_reply is parity-locked: it accumulates thinking and text, reassembles split tool-call argument-JSON fragments (preserving the raw string if malformed), tracks the latest usage and stop reason, and emits blocks in thinking → text → tool-call order (tool calls in first-seen id order). A model that produced tool calls but no explicit stop reason is promoted to StopReason::ToolCalls.

pub fn fold_reply<'a, I>(model: &str, emissions: I) -> Reply
where I: IntoIterator<Item = &'a Emission>;

The Anthropic mapper hoists system to a top-level field, maps known-stale catalog ids to current wire model strings (e.g. claude-opus-4claude-opus-4-8), and derives a thinking budget for reasoning cards. JSON bodies are serialized with serde_json::to_vec, and the shared OpenAI streamer asks compatible servers for a final usage chunk via stream_options.include_usage.

Streaming

The wire framing lives in streaming/, over byte-chunk streams produced by the transport.

Name Kind Source Purpose
channel_of fn channel.rs Wrap a BoxStream<Emission> factory as a re-iterable Channel
collect_reply async fn channel.rs Drain a Channel to its Reply, returning Err on a terminal error
sse_events fn sse.rs Decode a byte stream into ServerEvents (WHATWG SSE), with cancel support
ndjson_lines fn ndjson.rs Decode a byte stream into parsed serde_json::Values (NDJSON, Ollama)
ServerEvent struct sse.rs One dispatched SSE event: event type + accumulated data
Utf8Decoder struct (internal) utf8.rs Incremental, lossy UTF-8 decoder (carries split code points). The utf8 module is private (mod utf8;), so this is an implementation detail shared by the two framers, not re-exported from the barrel

The streaming model centers on two ideas. First, the Emission union — the 8 variants a Channel yields:

#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Emission {
    Text { delta: String },
    Thinking { delta: String },
    ToolCallStart { id: String, name: String },
    ToolCallDelta { id: String, args_delta: String },   // wire: argsDelta
    Usage { usage: Usage },
    Stop { stop: StopReason },
    Done { reply: Reply },     // terminal success
    Error { error: GatewayError },  // terminal failure
}

A connector never panics out of the stream: a failure becomes a terminal Emission::Error (error-as-terminal-emission). Emission::is_terminal() is true for exactly Done/Error; folding treats both as no-ops and a channel ends on the first one.

Second, the re-iterable Channel (= crate::core::channel::Channel<Emission>). channel_of(factory) wraps a stream factory so each Channel::iter re-runs the factory and re-issues the work — re-iterating re-POSTs. collect_reply drains a channel: a terminal Done returns its reply verbatim; a terminal Error becomes Err; otherwise the buffered deltas are folded with the placeholder model "unknown"; an empty channel is a Transport error ("channel closed without emitting any data").

Cancellation threads from StreamOptions.cancel (a CancellationToken) into the transport and into both framers. The SSE and NDJSON decoders check a pre-tripped token before reading and race each chunk read against token.cancelled() via tokio::select! { biased; … }, yielding an Aborted GatewayError with distinct "before" vs "while" messages. Both framers share Utf8Decoder, which carries an incomplete trailing multibyte sequence across chunk boundaries so a clean split emits no spurious U+FFFD (matching TextDecoder({stream:true}) byte-for-byte) — genuinely invalid bytes still become U+FFFD.

Key concepts

  • ApiKind vs ProviderIdProviderId is the vendor; ApiKind is the wire dialect. The connector registry is keyed by ApiKind, because several providers share one dialect.
  • Curated vs generated catalogmodel_cards() (13 cards) is checked first; generated.rs (708+ records) is consulted as fallback, synthesizing a routable ModelCard only when both the provider and api map onto the gateway enums.
  • Channel / Emission — a Channel is a re-iterable AsyncIterable of incremental deltas terminated by exactly one Done (success) or Error (failure). Re-iterating re-POSTs; stream does no I/O until iteration.
  • fold_reply — the pure reducer that collapses an emission sequence into a Reply: thinking → text → tool-call block order, first-seen id order, split-JSON-arg reassembly, latest-usage/explicit-stop wins, and a Complete-with-calls → ToolCalls promotion.
  • GatewayError.kind — a GatewayErrorKind discriminator (Auth, RateLimit, Http, Transport, Parse, Unsupported, Aborted) so callers branch on failure class without string-matching messages. Aborted is the one class connectors propagate verbatim (cancellation is never folded into a generic transport error).
  • HttpTransport seam — the network call lives behind a trait, so connectors are pure I/O-wise and testable against a transcript server; production uses ReqwestTransport.
  • Zero SDK — no vendor SDK and no aws-sdk/oauth2 crate; every wire shape is re-derived from public HTTP docs and assembled/parsed by hand over reqwest.

Examples

Stream from the deterministic mock card and consume Emissions:

use indusagi::llmgateway::{stream, Block, Conversation, Emission, Turn};
use futures::StreamExt;

let convo = Conversation {
    system: Some("Be terse.".into()),
    turns: vec![Turn::User { blocks: vec![Block::Text { text: "hello".into() }] }],
    tools: None,
};

let channel = stream("mock-1", convo, None)?;   // sync; no I/O yet
let mut it = channel.iter();                      // I/O happens on iteration
while let Some(emission) = it.next().await {
    match emission {
        Emission::Text { delta } => print!("{delta}"),
        Emission::Done { reply } => println!("\nstop: {:?} usage: {:?}", reply.stop, reply.usage),
        _ => {}
    }
}

One-shot complete with options and cost estimation:

use indusagi::llmgateway::{complete, estimate_cost, get_card,
    Block, Conversation, StreamOptions, ThinkingLevel, Turn};

let convo = Conversation {
    system: None,
    turns: vec![Turn::User { blocks: vec![Block::Text { text: "summarize X".into() }] }],
    tools: None,
};
let opts = StreamOptions {
    temperature: Some(0.2),
    max_output_tokens: Some(512),
    thinking: Some(ThinkingLevel::Low),
    ..Default::default()
};
let reply = complete("claude-sonnet-4", convo, Some(opts)).await?;
if let Some(card) = get_card("claude-sonnet-4") {
    println!("{:?} {} USD", reply.stop, estimate_cost(&card, &reply.usage));
}

Fluent catalog query and credential resolution:

use indusagi::llmgateway::{models, ApiKind, ProviderId};
use indusagi::llmgateway::credentials::{resolve_secret, create_pkce_pair};

// Reasoning-capable Anthropic cards, curated-first.
let cards = models().by_provider(ProviderId::Anthropic).reasoning(true).all();

let found = models().by_api(ApiKind::OpenaiResponses).find("o3");
println!("{:?}", found.map(|c| c.id));

let key = resolve_secret(ProviderId::Openai, None);   // reads OPENAI_API_KEY (or None)
let pkce = create_pkce_pair(64);                        // RFC 7636 S256 verifier + challenge
println!("{} {}", key.is_some(), pkce.method);

Relationship to neighbors

The gateway is the lower wire layer beneath the rest of the framework. The Runtime agent loop drives stream/complete and the contract types straight from this barrel, so there is one source of truth for the Conversation/Reply/Emission vocabulary and the JsonSchema/ToolDescriptor tool contract it shares with the tool kernel. The Channel/Emission plumbing sits on crate::core::channel, and credential/env reads route through crate::core::env. The CancellationToken is the framework's single cancellation currency. This Rust edition is a module-for-module port of the TypeScript llmgateway/ tree and parallels the Python LLM Gateway; the chief divergence is Rust's Result-based error propagation (no thrown exceptions) and the explicit HttpTransport seam in place of ambient fetch.

Back to the Architecture overview.