LLM Gateway
indusagi::llmgatewayis the Rust framework's unified, multi-provider door to every LLM. It is a zero-SDK gateway: you hand it a model id and a provider-neutralConversation, and it resolves aModelCard, routes to one of elevenreqwest-backed wire connectors, and streams normalizedEmissions back through a re-iterableChannel. No vendor SDK is imported anywhere — onlyreqwestfor HTTP transport. Unknown model ids fail fast with anUnsupportedGatewayError.
The gateway resolves a model id to a ModelCard, selects the Connector that
speaks that card's ApiKind wire dialect, talks raw HTTP via an injected
HttpTransport, decodes the SSE/NDJSON byte stream into neutral Emissions, and
folds them into a terminal Reply with [fold_reply]. This crate ports the
TypeScript llmgateway/ tree module-for-module and mirrors the Python
indusagi.llmgateway layer, with Rust-native
Result-based error propagation in place of thrown exceptions.
Table of Contents
- Module layout
- Public exports
- Dispatch
- The type contract
- The model catalog
- Providers and wire dialects
- Connectors and the registry
- Credentials
- Conversion
- Streaming
- Key concepts
- Examples
- Relationship to neighbors
Module layout
The barrel lives in crates/indusagi/src/llmgateway/mod.rs and re-exports the
symbols the rest of the framework reaches for. The subtree is split into six
sibling modules plus the top-level gateway dispatcher.
| Module | Path | Holds |
|---|---|---|
contract |
llmgateway/contract/ |
The frozen type vocabulary — model_card.rs, conversation.rs, reply.rs, emission.rs, options.rs, errors.rs, connector.rs |
connectors |
llmgateway/connectors/ |
One Connector + create_*_connector factory per ApiKind (anthropic, openai_chat, openai_responses, google, google_vertex, bedrock, azure_openai, nvidia, kimi, ollama, mock), the HttpTransport seam (http.rs), and the registry mod.rs |
conversion |
llmgateway/conversion/ |
Pure wire translation: mappers.rs (Conversation → native bodies), reduce.rs (fold_reply), openai_compatible.rs (shared OpenAI chat streamer) |
catalog |
llmgateway/catalog/ |
cards.rs (the 13 curated ModelCards), query.rs (models/get_card/CardSelection), cost.rs (estimate_cost), generated.rs (the 708-model discovery catalog made routable) |
credentials |
llmgateway/credentials/ |
secrets.rs (secret_table + resolve_secret), pkce.rs (RFC 7636 S256), oauth.rs (RFC 6749 authorization-code + refresh) |
streaming |
llmgateway/streaming/ |
channel.rs (channel_of/collect_reply), sse.rs (sse_events), ndjson.rs (ndjson_lines), utf8.rs (incremental UTF-8 decode) |
gateway |
llmgateway/gateway.rs |
The top-level stream/complete (+ *_with_card) entry points |
Public exports
Everything callers need is re-exported from llmgateway/mod.rs.
| Name | Kind | Source | Purpose |
|---|---|---|---|
stream |
fn | gateway.rs |
Resolve a model id, route to its connector, return a streaming Channel. Sync — I/O happens on iteration. Err(Unsupported) for unknown ids |
complete |
async fn | gateway.rs |
One-shot: resolve id, drain the connector channel, return the assembled Reply |
stream_with_card |
fn | gateway.rs |
Like stream but takes a pre-resolved ModelCard, bypassing catalog lookup |
complete_with_card |
async fn | gateway.rs |
One-shot variant taking a pre-resolved ModelCard |
model_cards |
fn | catalog/cards.rs |
&'static [ModelCard] — the 13 hand-curated cards, built once via OnceLock |
models |
fn | catalog/query.rs |
Open a fluent CardSelection over model_cards() |
get_card |
fn | catalog/query.rs |
Id lookup: curated first, then synthesize from the generated catalog; None if unroutable |
CardSelection |
struct | catalog/query.rs |
Lazy, immutable, chainable query; build via CardSelection::over(pool) |
estimate_cost |
fn | catalog/cost.rs |
Turn a card's per-MTok CostSheet + a Usage into a USD f64 |
connector_for_api |
fn | connectors/mod.rs |
Return the Arc<dyn Connector> that speaks a given ApiKind |
The barrel also re-exports the entire contract vocabulary (ApiKind,
AssistantRole, Block, Channel, Connector, Conversation, CostSheet,
CredentialResolver, Emission, GatewayError, GatewayErrorExtra,
GatewayErrorKind, JsonSchema, Modality, ModelCard, ProviderId, Reply,
StopReason, StreamOptions, ThinkingLevel, ToolChoice, ToolDescriptor,
Turn, Usage, gateway_error), the conversion mappers and OpenAI-compatible
streamer (incl. to_google_*, to_responses_*, complete_openai_compatible_chat,
OpenAICompatibleConfig), the credentials helpers (build_auth_url,
exchange_code, refresh_token, oauth_config_for, AuthUrlInputs,
OAuthConfig, OAuthTokens, PkcePair, create_pkce_pair,
derive_code_challenge, …), the streaming helpers (channel_of,
collect_reply, sse_events, ndjson_lines, ServerEvent), and the
HttpRequest/HttpResponse/HttpTransport/ReqwestTransport/reqwest_transport
seam plus every create_*_connector factory.
Dispatch
stream and complete take the same shape — a &str model id, a Conversation,
and an optional StreamOptions. The dispatch path in gateway.rs is:
get_card(model_id)resolves aModelCard— the curatedmodel_cards()first, then a synthesized card from the generated discovery catalog. An unknown id returnsErrwith anUnsupportedGatewayError("unknown model: <id>").connector_for_api(card.api)indexes the totalCONNECTOR_REGISTRY.- The connector builds a re-iterable
Channel.streamitself stays sync and does no I/O — everyChannel::iterre-issues the HTTP request. - On iteration the connector resolves its key, maps the
Conversationto the provider's wire body,POSTs through theHttpTransport, and decodes the byte stream into neutralEmissions. - The emissions are folded with
fold_replyinto aReply, emitted as the terminalEmission::Done.completedrains the same channel viacollect_replyand returns thatReply(orErr(GatewayError)).
pub fn stream(
model_id: &str,
conversation: Conversation,
options: Option<StreamOptions>,
) -> Result<Channel, GatewayError>;
pub async fn complete(
model_id: &str,
conversation: Conversation,
options: Option<StreamOptions>,
) -> Result<Reply, GatewayError>;
Note the asymmetry that survives from the TS source: a streaming failure surfaces
as a terminal Emission::Error on the channel, while complete returns
Err(GatewayError). stream returns Ok(channel) even when the dialect has no
registered connector — the channel then yields exactly one Emission::Error
("no connector for dialect: <api>") so a streaming caller always observes a
terminal envelope, never an empty channel. complete_with_card returns the same
error directly as Err. Because the registry is now total over ApiKind, that
path is unreachable for any catalog card.
The type contract
The contract/ module is the frozen, provider-neutral vocabulary every layer
shares. Each type derives Serialize/Deserialize with serde tag/rename
attributes chosen to match the TS wire shape byte-for-byte (snake_case
discriminants on kind/role, camelCase field names like callId, argsDelta,
inputTokens).
| Name | Kind | Source | Notes |
|---|---|---|---|
ProviderId |
enum | model_card.rs |
The vendor identity: Anthropic, Openai, Google, GoogleVertex, Amazon, Azure, Nvidia, Kimi, Minimax, Ollama, Mock (serde kebab-case) |
ApiKind |
enum | model_card.rs |
The wire dialect (11 variants); ApiKind::ALL is the hand-maintained iteration array |
Modality |
enum | model_card.rs |
Text / Image / Audio (serde lowercase) |
CostSheet |
struct | model_card.rs |
Per-MTok pricing: input_per_mtok, output_per_mtok, optional cache_read_per_mtok/cache_write_per_mtok |
ModelCard |
struct | model_card.rs |
Static descriptor: id, provider, api, display_name, optional base_url, context_window, max_output_tokens, modalities, reasoning, cost |
Block |
enum | conversation.rs |
Text / Thinking / ToolCall / ToolResult / Image / Command (internally tagged on kind) |
Turn |
enum | conversation.rs |
User / Assistant / Tool (tagged on role) |
ToolDescriptor |
struct | conversation.rs |
name, description, parameters: JsonSchema |
JsonSchema |
type alias | conversation.rs |
= serde_json::Value |
Conversation |
struct | conversation.rs |
Provider-neutral request: optional system, turns: Vec<Turn>, optional tools |
Reply |
struct | reply.rs |
Terminal response: role: AssistantRole, model, blocks, usage, stop |
AssistantRole |
enum | reply.rs |
One-variant enum (Assistant) so serde renders/parses exactly the literal "assistant" on Reply.role |
Usage |
struct | reply.rs |
input_tokens, output_tokens, optional cache tiers; Usage::EMPTY is the fold seed |
StopReason |
enum | reply.rs |
Complete / ToolCalls / MaxOutput / Refusal / Aborted / Error |
StreamOptions |
struct | options.rs |
Per-invocation knobs (see below) |
ThinkingLevel |
enum | options.rs |
Off / Low / Medium / High / Max |
ToolChoice |
enum | options.rs |
Auto / Required / None |
Emission |
enum | emission.rs |
The 8-variant streaming union (see Streaming) |
Channel |
type alias | emission.rs |
= crate::core::channel::Channel<Emission> |
Connector |
trait | connector.rs |
id, api, stream, async complete (object-safe via #[async_trait]) |
CredentialResolver |
trait | connector.rs |
async fn resolve(provider, override) -> Option<String> |
GatewayError |
struct | errors.rs |
Normalized error with kind, message, optional status/provider/cause |
GatewayErrorKind |
enum | errors.rs |
Auth / RateLimit / Http / Transport / Parse / Unsupported / Aborted |
StreamOptions is the per-invocation option bag. A bare StreamOptions::default()
is a valid "use the model's defaults" request, so the options argument can be
passed as None:
pub struct StreamOptions {
#[serde(skip)]
pub cancel: Option<CancellationToken>, // replaces the TS AbortSignal
pub thinking: Option<ThinkingLevel>,
pub max_output_tokens: Option<u64>, // wire: maxOutputTokens
pub temperature: Option<f64>,
pub top_p: Option<f64>, // wire: topP
pub tool_choice: Option<ToolChoice>, // wire: toolChoice
pub api_key: Option<String>, // wire: apiKey; overrides the env credential
}
The Connector trait is the seam every dialect implements:
#[async_trait]
pub trait Connector: Send + Sync {
fn id(&self) -> &str;
fn api(&self) -> ApiKind;
fn stream(&self, model: ModelCard, conversation: Conversation, options: StreamOptions) -> Channel;
async fn complete(&self, model: ModelCard, conversation: Conversation, options: StreamOptions)
-> Result<Reply, GatewayError>;
}
GatewayError mirrors the TS class GatewayError extends Error. Because Rust
carries it as a concrete typed value inside Result<_, GatewayError> (and inside
Emission::Error), the TS instanceof-by-name re-raise check collapses to
ordinary type matching. cause is a #[serde(skip)] boxed `Arc<dyn Error + Send
- Sync>
(lossy across a serialization boundary, exactly like the TScause).gateway_error(kind, message)is the free-function factory; builder methods (.provider(..),.status(..),.cause(..)) andGatewayError::with_extra` attach the optional context.
The model catalog
model_cards() is the static, hand-curated registry: exactly 13 cards, built
once on first access via OnceLock (a Vec<ModelCard> rather than a const
because the modalities lists are not const-constructible). The card ids, in
order:
| Id | Provider | ApiKind | Reasoning | Context |
|---|---|---|---|---|
claude-opus-4 |
Anthropic | AnthropicMessages | yes | 200k |
claude-sonnet-4 |
Anthropic | AnthropicMessages | yes | 200k |
claude-haiku-3-5 |
Anthropic | AnthropicMessages | no | 200k |
gpt-4o |
Openai | OpenaiCompletions | no | 128k |
gpt-4o-mini |
Openai | OpenaiCompletions | no | 128k |
o3 |
Openai | OpenaiResponses | yes | 200k |
o4-mini |
Openai | OpenaiResponses | yes | 200k |
gemini-2.5-pro |
GoogleGenerative | yes | 1M | |
gemini-2.5-flash |
GoogleGenerative | yes | 1M | |
nvidia/llama-3.1-nemotron-70b-instruct |
Nvidia | NvidiaOpenaiCompatible | no | 128k |
kimi-k2 |
Kimi | KimiOpenaiCompatible | yes | 256k |
llama3.1:8b |
Ollama | Ollama | no | 128k |
mock-1 |
Mock | Mock | no | 32k |
Every number in the
costsheets is an APPROXIMATE, best-effort per-MTok figure for local estimation only — not transcribed from any vendor price list. Context-window and max-output figures are likewise approximate and rounded.
Behind the curated list sits the generated discovery catalog
(catalog/generated.rs), embedded from models_generated.json via
include_str!. The JSON is a nested { provider: { id: model } } map (24
providers, 708 records); it is parsed once via serde_json::from_str into a
BTreeMap<String, BTreeMap<String, GeneratedModel>> and flattened into a
&'static [GeneratedModel] by generated_models(). Each record deserializes
into a GeneratedModel (with a nested GeneratedCost); both generated_model(id)
(exact-id lookup) and generated_models() are exported from
catalog::generated. The picker lets a user select any model from that larger
set; get_card resolves it as follows:
get_card(id)returns the curated card ifidis among the 13.- On a miss it calls
generated_model(id)thensynthesize_card(g), which projects aGeneratedModelinto a routableModelCardonly when both its provider (map_provider) and wire protocol (map_api) map onto the gateway's enums. Soclaude-sonnet-4-5orMiniMax-M2.7resolve and route; ids on providers the gateway can't reach (xai, groq, cerebras, openrouter, …) stayNone— non-routable, honestly, not a silent failure. models()opens aCardSelectionovermodel_cards()..by_provider(..),.by_api(..), and.reasoning(bool)narrow immutably (each returns a fresh selection cloning the accumulatedArc<dyn Fn>predicates);.all()and.find(id)materialize. The builder is immutable — narrowing a base selection never mutates it.
estimate_cost(card, usage) prices a Usage against the card's per-MTok
CostSheet. Cache tiers are billed additively, and only when both the usage
reports a non-zero count and the card prices the tier (mirroring the TS
truthiness guard where 0 is falsy):
pub fn estimate_cost(card: &ModelCard, usage: &Usage) -> f64;
Providers and wire dialects
ProviderId is the vendor identity; ApiKind is the concrete request/response
wire protocol a connector implements. They are distinct because several providers
share a dialect (everything OpenAI-compatible speaks OpenaiCompletions), and the
connector registry is keyed by ApiKind, not by provider. The 11 ApiKind
variants (serde kebab-case in parentheses) are:
ApiKind variant |
Wire literal | Connector id() |
Decode loop |
|---|---|---|---|
AnthropicMessages |
anthropic-messages |
anthropic |
bespoke SSE |
OpenaiCompletions |
openai-completions |
openai-chat |
shared OpenAI-compatible |
OpenaiResponses |
openai-responses |
openai-responses |
bespoke SSE (typed events) |
GoogleGenerative |
google-generative |
google-generative |
bespoke SSE |
GoogleVertex |
google-vertex |
google-vertex |
bespoke SSE |
AmazonBedrock |
amazon-bedrock |
bedrock |
fail-fast (SigV4 TODO) |
AzureOpenai |
azure-openai |
azure-openai |
shared OpenAI-compatible |
NvidiaOpenaiCompatible |
nvidia-openai-compatible |
nvidia |
shared OpenAI-compatible |
KimiOpenaiCompatible |
kimi-openai-compatible |
kimi |
shared OpenAI-compatible |
Ollama |
ollama |
ollama |
NDJSON |
Mock |
mock |
mock |
network-free script |
The Anthropic Messages dialect is notably spoken by more than one vendor: the
Anthropic connector resolves its credential from model.provider, so a MiniMax
card (provider Minimax, api AnthropicMessages, base URL
https://api.minimax.io/anthropic) reaches MINIMAX_API_KEY while an Anthropic
card reaches ANTHROPIC_API_KEY.
Connectors and the registry
connectors/mod.rs holds the routing table, built once on first access via
OnceLock and total over ApiKind (a registry_is_total_with_exactly_eleven_dialects
test enforces all 11). connector_for_api(api) -> Option<Arc<dyn Connector>>
indexes it; it stays an Option to match the call-site contract but never misses
a known dialect. Each connector is built by its factory, all sharing one
dependency — the environment-backed credential resolver:
pub fn connector_for_api(api: ApiKind) -> Option<Arc<dyn Connector>>;
// Per-dialect factories (all take the shared resolver):
pub fn create_anthropic_connector(resolver: Arc<dyn CredentialResolver>) -> Arc<dyn Connector>;
pub fn create_openai_chat_connector(resolver: Arc<dyn CredentialResolver>) -> Arc<dyn Connector>;
pub fn create_openai_responses_connector(resolver: Arc<dyn CredentialResolver>) -> Arc<dyn Connector>;
// ...google, google_vertex, bedrock, azure_openai, nvidia, kimi, ollama, mock
The HttpTransport seam
The TS connectors call ambient fetch directly. In Rust the network call is
hidden behind a small HttpTransport trait (connectors/http.rs) so connectors
are pure with respect to I/O and can be driven, network-free, against a local
transcript server in tests. The production implementation ReqwestTransport
wraps a shared reqwest::Client; reqwest_transport() returns the shared
Arc<dyn HttpTransport>.
#[async_trait]
pub trait HttpTransport: Send + Sync {
async fn post(
&self,
request: HttpRequest,
provider: &str,
transport_message: &str,
aborted_message: &str,
) -> Result<HttpResponse, GatewayError>;
}
The transport's only jobs are exactly the TS fetch: POST a URL with headers and
a JSON body honoring a CancellationToken; classify a connect/transport failure
as Transport (or Aborted when the token was already tripped — checked both
before the send and, racing each send via tokio::select!, during it); and return
an HttpResponse exposing status, ok, has_body(), a streaming body
(into_byte_stream(cancel)), and a best-effort text() for error diagnostics
(the body is taken by exactly one of into_byte_stream/text). The request
struct is HttpRequest { url, headers, body: Vec<u8>, cancel }; the transport
adds content-type: application/json when the connector did not supply one.
Per-connector HTTP-status → GatewayErrorKind classification
(401|403 → Auth, 429 → RateLimit, else Http) stays in each connector,
matching the TS where each postX owns its own !response.ok branch.
Connector responsibilities
Each live connector: resolves its key lazily on first iteration (a missing key
becomes a terminal Emission::Error { kind: Auth } naming the env var); assembles
the wire body from the shared conversion mappers plus a stream flag; POSTs
through the transport; decodes the byte stream (its own loop or the shared OpenAI
streamer); folds the collected deltas with fold_reply; and emits a terminal
Done or Error. The four OpenAI-dialect connectors (openai_chat, nvidia,
kimi, azure_openai) delegate the entire wire loop to
stream_openai_compatible_chat / complete_openai_compatible_chat; anthropic,
google, google_vertex, and openai_responses own bespoke SSE decode loops;
ollama uses the NDJSON framer.
Per-connector wire specifics worth knowing:
anthropic(idanthropic) — POSTs{baseUrl}/v1/messages(defaulthttps://api.anthropic.com), auth headerx-api-keyplusanthropic-version: 2023-06-01, resolves its key bymodel.provider(so a MiniMax card reachesMINIMAX_API_KEY), and decodes typedmessage_*/content_block_*SSE events. Stop-reason map:end_turn/stop_sequence→Complete,tool_use→ToolCalls,max_tokens→MaxOutput,refusal→Refusal.openai_chat(idopenai-chat) —{baseUrl}/chat/completions(defaulthttps://api.openai.com/v1),authorization: Bearer …, delegates to the shared streamer.openai_responses(idopenai-responses) —POST /v1/responses; mapsThinkingLevelontoreasoning.effort(low/medium/high); maps the completionstatus(completed/incomplete+max_output_tokens/failed/cancelled) onto aStopReason.nvidia(idnvidia) — hosted NIM athttps://integrate.api.nvidia.com/v1, bearer auth, and a reasoning-only body overridechat_template_kwargs.thinkingpassed asbody_extra.azure_openai(idazure-openai) — deployment-scoped URL on the resourcebaseUrl(no public default; a missing endpoint isUnsupported), the deployment name taken frommodel.id,api-keyheader, andapi-version=2024-08-01-preview.kimi(idkimi) — Moonshot hosthttps://api.moonshot.ai/v1, bearer auth, shared streamer.ollama(idollama) — local server (defaulthttp://localhost:11434), no credential, NDJSON body framing.
bedrock is present and assembles its Converse request body, model addressing,
and converse-stream event mapper (all production-ready and exported as
build_converse_body / map_converse_stream_event / map_bedrock_stop_reason),
but fails fast with an Unsupported GatewayError ("bedrock streaming requires SigV4 (todo)") at call time — no network request is issued — because SigV4
signing and the binary event-stream codec are deferred. mock is fully
network-free: it replays one of two deterministic transcripts (a scripted echo
tool-call path or a terminal prose answer) chosen from the Conversation, so the
gateway smoke test never opens a socket or reads a credential.
Credentials
Credential sourcing lives in credentials/secrets.rs. secret_table() is the
declarative, ordered table — one SecretDescriptor per ProviderId (total over
all 11), built once via OnceLock. Each SecretDescriptor carries four fields:
provider, scheme (a SecretScheme: ApiKey, CloudIam, or None), the
ordered env_vars: &'static [&'static str] to probe, and a human-readable hint.
resolve_secret(provider, override) honors a trimmed caller override first, then
(unless the scheme is None, which never carries a key) walks the descriptor's
env_vars (through crate::core::env::read_raw, the single env registry, which
already trims and treats empty/whitespace as absent) and yields the first
non-empty value. describe_secret(provider) returns the (always-present)
descriptor for a provider.
The barrel (llmgateway/mod.rs) re-exports resolve_secret, SecretDescriptor,
SecretScheme, environment_credential_resolver, and the OAuth/PKCE helpers, but
NOT secret_table / describe_secret — those are reached on the
credentials::secrets path (the credentials barrel itself does re-export them).
pub fn resolve_secret(provider: ProviderId, override_value: Option<&str>) -> Option<String>;
pub fn describe_secret(provider: ProviderId) -> &'static SecretDescriptor;
pub fn environment_credential_resolver() -> EnvironmentCredentialResolver;
| Provider | Scheme | Env vars (in order) |
|---|---|---|
| Anthropic | ApiKey | ANTHROPIC_API_KEY |
| Openai | ApiKey | OPENAI_API_KEY |
| ApiKey | GEMINI_API_KEY, GOOGLE_API_KEY |
|
| GoogleVertex | CloudIam | GOOGLE_APPLICATION_CREDENTIALS, GOOGLE_VERTEX_PROJECT, GOOGLE_CLOUD_PROJECT |
| Amazon | CloudIam | AWS_BEARER_TOKEN_BEDROCK, AWS_ACCESS_KEY_ID, AWS_PROFILE |
| Azure | ApiKey | AZURE_OPENAI_API_KEY, AZURE_API_KEY |
| Nvidia | ApiKey | NVIDIA_API_KEY |
| Kimi | ApiKey | MOONSHOT_API_KEY, KIMI_API_KEY |
| Minimax | ApiKey | MINIMAX_API_KEY |
| Ollama | None | (none) |
| Mock | None | (none) |
EnvironmentCredentialResolver is the ready-made CredentialResolver
implementation bound to resolve_secret; it is the shared dependency every
connector factory receives.
The module also ships PKCE (pkce.rs, RFC 7636 S256) and OAuth 2.0 (oauth.rs,
RFC 6749). create_pkce_pair(verifier_length) mints a CSPRNG verifier (clamped to
43–128 chars) and its S256 challenge; derive_code_challenge(verifier) is
synchronous and byte-identical to the TS/Python output (RFC 7636 Appendix B vector
verified). build_auth_url, exchange_code, and refresh_token drive the
authorization-code-with-PKCE and refresh flows with hand-rolled reqwest form
posts (no oauth2 crate, for parity-locked error strings). oauth_config_for
returns an OAuthConfig only for Anthropic and Openai; the client_id
fields are empty placeholders a deployment must supply.
pub fn create_pkce_pair(verifier_length: usize) -> PkcePair; // PKCE_METHOD = "S256"
pub fn derive_code_challenge(verifier: &str) -> String;
pub fn oauth_config_for(provider: ProviderId) -> Option<OAuthConfig>;
Conversion
The conversion/ module is pure (no network, no mutation). It splits into the
emission folder, the per-dialect request mappers, and the shared OpenAI-compatible
streamer.
| Name | Kind | Source | Purpose |
|---|---|---|---|
fold_reply |
fn | reduce.rs |
Pure reducer folding an &Emission sequence into a Reply |
to_openai_chat_messages |
fn | mappers.rs |
Conversation → OpenAI Chat-Completions messages |
to_openai_tools / to_openai_tool_choice |
fn | mappers.rs |
OpenAI tools array / tool_choice string |
to_anthropic_request |
fn | mappers.rs |
Conversation → the full Anthropic /v1/messages body |
to_responses_input / to_responses_tools |
fn | mappers.rs |
OpenAI Responses input items / flat function tools |
to_google_contents / to_google_system_instruction / to_google_tools |
fn | mappers.rs |
Google contents, systemInstruction, functionDeclarations |
stream_openai_compatible_chat |
fn | openai_compatible.rs |
Shared OpenAI Chat streamer reused by openai/nvidia/kimi/azure |
complete_openai_compatible_chat |
async fn | openai_compatible.rs |
One-shot view over the shared streamer |
OpenAICompatibleConfig |
struct | openai_compatible.rs |
Connector-supplied url, headers, optional body_extra |
fold_reply is parity-locked: it accumulates thinking and text, reassembles
split tool-call argument-JSON fragments (preserving the raw string if malformed),
tracks the latest usage and stop reason, and emits blocks in thinking → text →
tool-call order (tool calls in first-seen id order). A model that produced tool
calls but no explicit stop reason is promoted to StopReason::ToolCalls.
pub fn fold_reply<'a, I>(model: &str, emissions: I) -> Reply
where I: IntoIterator<Item = &'a Emission>;
The Anthropic mapper hoists system to a top-level field, maps known-stale
catalog ids to current wire model strings (e.g. claude-opus-4 →
claude-opus-4-8), and derives a thinking budget for reasoning cards. JSON bodies
are serialized with serde_json::to_vec, and the shared OpenAI streamer asks
compatible servers for a final usage chunk via stream_options.include_usage.
Streaming
The wire framing lives in streaming/, over byte-chunk streams produced by the
transport.
| Name | Kind | Source | Purpose |
|---|---|---|---|
channel_of |
fn | channel.rs |
Wrap a BoxStream<Emission> factory as a re-iterable Channel |
collect_reply |
async fn | channel.rs |
Drain a Channel to its Reply, returning Err on a terminal error |
sse_events |
fn | sse.rs |
Decode a byte stream into ServerEvents (WHATWG SSE), with cancel support |
ndjson_lines |
fn | ndjson.rs |
Decode a byte stream into parsed serde_json::Values (NDJSON, Ollama) |
ServerEvent |
struct | sse.rs |
One dispatched SSE event: event type + accumulated data |
Utf8Decoder |
struct (internal) | utf8.rs |
Incremental, lossy UTF-8 decoder (carries split code points). The utf8 module is private (mod utf8;), so this is an implementation detail shared by the two framers, not re-exported from the barrel |
The streaming model centers on two ideas. First, the Emission union — the
8 variants a Channel yields:
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Emission {
Text { delta: String },
Thinking { delta: String },
ToolCallStart { id: String, name: String },
ToolCallDelta { id: String, args_delta: String }, // wire: argsDelta
Usage { usage: Usage },
Stop { stop: StopReason },
Done { reply: Reply }, // terminal success
Error { error: GatewayError }, // terminal failure
}
A connector never panics out of the stream: a failure becomes a terminal
Emission::Error (error-as-terminal-emission). Emission::is_terminal() is true
for exactly Done/Error; folding treats both as no-ops and a channel ends on
the first one.
Second, the re-iterable Channel (= crate::core::channel::Channel<Emission>).
channel_of(factory) wraps a stream factory so each Channel::iter re-runs the
factory and re-issues the work — re-iterating re-POSTs. collect_reply drains a
channel: a terminal Done returns its reply verbatim; a terminal Error becomes
Err; otherwise the buffered deltas are folded with the placeholder model
"unknown"; an empty channel is a Transport error
("channel closed without emitting any data").
Cancellation threads from StreamOptions.cancel (a CancellationToken) into the
transport and into both framers. The SSE and NDJSON decoders check a pre-tripped
token before reading and race each chunk read against token.cancelled() via
tokio::select! { biased; … }, yielding an Aborted GatewayError with distinct
"before" vs "while" messages. Both framers share Utf8Decoder, which carries an
incomplete trailing multibyte sequence across chunk boundaries so a clean split
emits no spurious U+FFFD (matching TextDecoder({stream:true}) byte-for-byte) —
genuinely invalid bytes still become U+FFFD.
Key concepts
ApiKindvsProviderId—ProviderIdis the vendor;ApiKindis the wire dialect. The connector registry is keyed byApiKind, because several providers share one dialect.- Curated vs generated catalog —
model_cards()(13 cards) is checked first;generated.rs(708+ records) is consulted as fallback, synthesizing a routableModelCardonly when both the provider and api map onto the gateway enums. Channel/Emission— aChannelis a re-iterableAsyncIterableof incremental deltas terminated by exactly oneDone(success) orError(failure). Re-iterating re-POSTs;streamdoes no I/O until iteration.fold_reply— the pure reducer that collapses an emission sequence into aReply: thinking → text → tool-call block order, first-seen id order, split-JSON-arg reassembly, latest-usage/explicit-stop wins, and aComplete-with-calls →ToolCallspromotion.GatewayError.kind— aGatewayErrorKinddiscriminator (Auth,RateLimit,Http,Transport,Parse,Unsupported,Aborted) so callers branch on failure class without string-matching messages.Abortedis the one class connectors propagate verbatim (cancellation is never folded into a generic transport error).HttpTransportseam — the network call lives behind a trait, so connectors are pure I/O-wise and testable against a transcript server; production usesReqwestTransport.- Zero SDK — no vendor SDK and no
aws-sdk/oauth2crate; every wire shape is re-derived from public HTTP docs and assembled/parsed by hand overreqwest.
Examples
Stream from the deterministic mock card and consume Emissions:
use indusagi::llmgateway::{stream, Block, Conversation, Emission, Turn};
use futures::StreamExt;
let convo = Conversation {
system: Some("Be terse.".into()),
turns: vec![Turn::User { blocks: vec![Block::Text { text: "hello".into() }] }],
tools: None,
};
let channel = stream("mock-1", convo, None)?; // sync; no I/O yet
let mut it = channel.iter(); // I/O happens on iteration
while let Some(emission) = it.next().await {
match emission {
Emission::Text { delta } => print!("{delta}"),
Emission::Done { reply } => println!("\nstop: {:?} usage: {:?}", reply.stop, reply.usage),
_ => {}
}
}
One-shot complete with options and cost estimation:
use indusagi::llmgateway::{complete, estimate_cost, get_card,
Block, Conversation, StreamOptions, ThinkingLevel, Turn};
let convo = Conversation {
system: None,
turns: vec![Turn::User { blocks: vec![Block::Text { text: "summarize X".into() }] }],
tools: None,
};
let opts = StreamOptions {
temperature: Some(0.2),
max_output_tokens: Some(512),
thinking: Some(ThinkingLevel::Low),
..Default::default()
};
let reply = complete("claude-sonnet-4", convo, Some(opts)).await?;
if let Some(card) = get_card("claude-sonnet-4") {
println!("{:?} {} USD", reply.stop, estimate_cost(&card, &reply.usage));
}
Fluent catalog query and credential resolution:
use indusagi::llmgateway::{models, ApiKind, ProviderId};
use indusagi::llmgateway::credentials::{resolve_secret, create_pkce_pair};
// Reasoning-capable Anthropic cards, curated-first.
let cards = models().by_provider(ProviderId::Anthropic).reasoning(true).all();
let found = models().by_api(ApiKind::OpenaiResponses).find("o3");
println!("{:?}", found.map(|c| c.id));
let key = resolve_secret(ProviderId::Openai, None); // reads OPENAI_API_KEY (or None)
let pkce = create_pkce_pair(64); // RFC 7636 S256 verifier + challenge
println!("{} {}", key.is_some(), pkce.method);
Relationship to neighbors
The gateway is the lower wire layer beneath the rest of the framework. The
Runtime agent loop drives stream/complete and the
contract types straight from this barrel, so there is one source of truth for the
Conversation/Reply/Emission vocabulary and the JsonSchema/ToolDescriptor
tool contract it shares with the tool kernel. The Channel/Emission plumbing
sits on crate::core::channel, and credential/env reads route through
crate::core::env. The CancellationToken is the framework's single cancellation
currency. This Rust edition is a module-for-module port of the TypeScript
llmgateway/ tree and parallels the
Python LLM Gateway; the chief divergence is
Rust's Result-based error propagation (no thrown exceptions) and the explicit
HttpTransport seam in place of ambient fetch.
Back to the Architecture overview.
