Swarm
indusagi::swarmis the multi-agent crew layer: aCrewcoordinator drives teammate agents over a shared, dependency-aware ticket board, a cursor-based mailbox, and an append-only activity transcript — all built on a tiny frozen kernel of lockable JSON cells (JsonCell) and append-only JSONL logs (JsonlLog). Reach it throughuse indusagi::swarm::{create_crew, Crew, ...}.
A crew coordinates entirely through files on disk — no broker, no daemon, no
shared memory. Multiple processes can safely share one crew directory because
every mutation funnels through the kernel's cooperative mkdir-lock +
atomic-rename + serde-validate machinery. The Crew owns the policy (match
ready tickets to idle members, spawn one agent per pair, fold the result back
onto the board and channel, narrate every step to the activity log); four thin
domain services (roster, workboard, postbox, telemetry) plus an optional
git-worktree isolation seam own the persisted state. Agent spawning is
injectable (SpawnAgent), so a round can be driven by real
LLM-backed agents in production or scripted, network-free fakes in tests.
This is the Rust edition of the swarm documented for
Python; the two share the same architecture (one
frozen kernel, four domain services, an injectable spawner, dep-gated readiness,
cursor-exact delivery) but the Rust port leans on serde for "validated on
every crossing", tokio for the async file I/O and backoff, an RAII
LockGuard for lock release, an internally-tagged enum for each
discriminated union, and IndexMap/IndexSet for deterministic cycle
detection.
Table of Contents
- Public exports
- Module map
- The kernel
- The roster
- The workboard
- The postbox
- The isolation seam
- The telemetry log
- The coordinator
- The coordination model
- Faults
- Parity notes
- Relationship to neighbors
Public exports
Most names below are re-exported from swarm/mod.rs (so use indusagi::swarm::{…}
reaches them directly); the few that are reachable only through a submodule path
(e.g. swarm::postbox::EnvelopePayload) are flagged in their row. The coordinator
surface:
| Name | Kind | Source | Purpose |
|---|---|---|---|
create_crew |
async fn | coordinator.rs |
Host-facing factory; stands up a Crew from CrewOptions (delegates to Crew::open) |
Crew |
struct | coordinator.rs |
The coordinator. Verbs: open, add_member, post_task, run_round, status |
CrewOptions |
struct | coordinator.rs |
Knobs: dir, base_repo, spawn_agent override, injected now clock |
CrewStatus |
struct | coordinator.rs |
Point-in-time aggregate (crew_id, members, tickets, ready, counts, activity) |
StatusCounts |
struct | coordinator.rs |
Per-state ticket tally (open/claimed/blocked/done/failed: usize); the type of CrewStatus.counts. Reachable via swarm::coordinator::, not top-level re-exported |
RoundOutcome |
struct | coordinator.rs |
Result of one run_round: completed and failed ticket vecs |
TaskDraft |
struct | coordinator.rs |
Ticket draft for Crew::post_task (title, optional body, optional deps) |
SpawnAgent |
type alias | coordinator.rs |
Arc<dyn Fn(&CrewMember) -> Agent + Send + Sync> — the injectable spawner seam |
default_spawn_agent |
fn | coordinator.rs |
Default spawner: a real agent_with_capabilities bound to the member's model + tool_box |
The domain services and kernel primitives:
| Name | Kind | Source | Purpose |
|---|---|---|---|
CrewManifest |
struct | roster/manifest.rs |
Concurrency-safe roster service over JsonCell<CrewRecord> |
ModelPolicy |
struct | roster/manifest.rs |
Role→model lookup table; of, for_member, model_for_role, roles |
CrewMember |
struct | roster/manifest.rs |
One teammate (id, role, optional model, optional tool_collection) |
CrewRecord |
struct | roster/manifest.rs |
Persisted crew doc (crew_id, members, created_at) |
MemberDraft |
struct | roster/manifest.rs |
Inputs to enroll a member (id?, role, model?, tool_collection?) |
CrewToolCollection |
enum | roster/manifest.rs |
ReadOnly / Coding / All (wire: read-only / coding / all) |
TicketBoard |
struct | workboard/board.rs |
Dependency-aware shared task store over JsonCell<BoardState> |
Ticket |
struct | workboard/board.rs |
One unit of work (lifecycle, deps, assignee, result, …) |
TicketStatus |
enum | workboard/board.rs |
Open / Claimed / Blocked / Done / Failed |
TicketDraft |
struct | workboard/board.rs |
Fields for TicketBoard::add (title, body?, deps?) |
ticket_statuses |
fn | workboard/board.rs |
Returns [TicketStatus; 5] in lifecycle order |
DepGraph |
struct | workboard/dep_graph.rs |
Pure directed dependency graph; from_deps, first_cycle, dependencies_of, dependents_of |
Channel |
struct | postbox/channel.rs |
Cursor-based mailbox over one shared transcript + per-reader cursors |
Envelope |
struct | postbox/codecs.rs |
Message header (id/from/to/ts) + flattened EnvelopePayload; envelope_type() helper |
EnvelopePayload |
enum | postbox/codecs.rs |
The #[serde(tag = "type")] union: Task / Result / Note / Control; envelope_type() helper. Reachable via swarm::postbox::, not top-level re-exported |
EnvelopeType |
enum | postbox/codecs.rs |
Task / Result / Note / Control (the class tag; as_str() → wire spelling) |
TaskPriority / ResultStatus / ControlSignal |
enum | postbox/codecs.rs |
Payload sub-enums: Low/Normal(default)/High; Ok/Error/Skipped; Pause/Resume/Drain/Shutdown. Reachable via swarm::postbox::, not top-level re-exported |
encode / decode |
fn | postbox/codecs.rs |
Validate+serialize / parse+validate one wire line |
ENVELOPE_TYPES |
const | postbox/codecs.rs |
[EnvelopeType; 4] of the four message-class tags |
Workspace |
struct | isolation/worktree.rs |
Git-worktree provisioner over an injected ShellRunner |
WorktreeRef / WorktreeEntry |
struct | isolation/worktree.rs |
Worktree coordinates / a parsed git worktree list entry |
CreateOptions / RemoveOptions |
struct | isolation/worktree.rs |
Options for Workspace::create / Workspace::remove |
ShellRunner |
trait | isolation/runner.rs |
exec(command, cwd) -> ExecFuture<'a> (a boxed Future<Output = Result<RunResult, io::Error>>); must not err on non-zero exit |
RunResult |
struct | isolation/runner.rs |
One shelled command's outcome (stdout, stderr, code: Option<i32>) |
NodeRunner / ExecFuture |
struct / type alias | isolation/runner.rs |
The production runner type / the boxed-future return of ShellRunner::exec (object-safe, no async-trait); reachable via swarm::isolation::runner::, not top-level re-exported |
node_runner |
fn | isolation/runner.rs |
Production runner factory: returns a NodeRunner over shell-less tokio::process::Command |
ActivityLog |
struct | telemetry/activity.rs |
Append-only coordination transcript over JsonlLog<CrewEvent> |
CrewEvent |
struct | telemetry/activity.rs |
The header (id, ts) + flattened CrewEventBody |
CrewEventKind / CrewEventInput |
enum / struct | telemetry/activity.rs |
The six kind discriminants / the body+overrides record() accepts |
CREW_EVENT_KINDS |
const | telemetry/activity.rs |
[CrewEventKind; 6] of the coordination-event kinds |
JsonCell |
struct | kernel/json_cell.rs |
Lockable, validated, atomically-published JSON file |
GuardOptions |
struct | kernel/json_cell.rs |
Lock-acquisition tunables (timeout_ms, stale_ms, base_delay_ms, max_delay_ms) |
JsonlLog |
struct | kernel/jsonl_log.rs |
Append-only validated JSONL log; append, read_all, thread_of |
TreeNode |
trait | kernel/jsonl_log.rs |
(id, parent_id) an entry must expose to use thread_of |
new_id |
fn | kernel/ids.rs |
Mint a sortable, monotonic ULID-backed id, optionally prefixed |
SwarmFault |
struct | kernel/faults.rs |
Typed error with a discriminable .kind(); is(kind) guard |
swarm_fault |
fn | kernel/faults.rs |
Ergonomic constructor for SwarmFault::new(kind, message) |
SwarmFaultKind |
enum | kernel/faults.rs |
LockTimeout / Validation / NotFound / Conflict / Isolation / Spawn |
Module map
| Module | Holds |
|---|---|
kernel/ |
The frozen foundation: JsonCell, JsonlLog/TreeNode, new_id, SwarmFault/swarm_fault/SwarmFaultKind, GuardOptions |
roster/ |
Team config: CrewManifest, ModelPolicy; CrewMember/CrewRecord/MemberDraft/CrewToolCollection |
workboard/ |
Shared task store: TicketBoard, DepGraph; Ticket/TicketStatus/TicketDraft, ticket_statuses |
postbox/ |
Mailbox: Channel and codecs (the Envelope/EnvelopePayload union, encode/decode) |
isolation/ |
Git-worktree isolation: Workspace over an injectable ShellRunner; node_runner, RunResult, option/ref structs |
telemetry/ |
Observer: ActivityLog over JsonlLog and the CrewEvent/CrewEventBody union (six kinds) |
coordinator.rs |
Crew — the policy layer that wires the four services + isolation into one driveable object |
The kernel
The kernel (swarm/kernel/) is the frozen, load-bearing foundation every domain
service is a thin wrapper over: four small primitives, deliberately minimal.
`JsonCell` — the one transactional primitive
A cell owns exactly one JSON file plus a serde-(de)serializable T (the Rust
stand-in for the TS zod schema). Two surfaces:
impl<T> JsonCell<T>
where
T: Serialize + DeserializeOwned + Clone,
{
pub async fn open(
path: impl AsRef<Path>,
initial: T,
guard: Option<GuardOptions>,
) -> Result<JsonCell<T>, SwarmFault>;
pub async fn read(&self) -> Result<T, SwarmFault>;
pub async fn mutate<F>(&self, f: F) -> Result<T, SwarmFault>
where
F: FnOnce(T) -> Result<T, SwarmFault>;
}
read() is lock-free — atomic publish guarantees a reader sees a whole
previous-or-next value, never torn; a missing file returns the cell's initial,
and malformed JSON or a wrong shape both raise a Validation fault (decode is
two-stage: prove it is JSON, then prove it fits T). mutate(f) is the only
transactional primitive: acquire the exclusive guard, read+validate, apply the
pure transform f, re-validate, publish atomically (write a unique temp sibling
<path>.tmp.<pid>.<ts>.<ulid> then tokio::fs::rename over the target), and
release the guard in a finally-equivalent. Because every domain mutator hands a
pure transform to cell.mutate, cross-process edits to the same file serialize
with zero lost updates.
The mutex is an atomic tokio::fs::create_dir of a <file>.lockdir directory
(AlreadyExists means "held"), with a { pid, takenAt, cell } owner.json
marker dropped inside for stale detection. Acquisition retries on a jittered
exponential backoff bounded by a deadline; a lock dir older than stale_ms
(read from the marker's takenAt, falling back to the dir's mtime) is forcibly
reclaimed via remove_dir_all, so a crashed holder never deadlocks the crew
forever. The default GuardOptions (GuardOptions::resolved) are timeout_ms
10_000, stale_ms 30_000, base_delay_ms 12, max_delay_ms 250. Release is an
RAII LockGuard whose Drop removes the lock dir — the Rust analogue of the TS
finally { release() } (drop is synchronous, so it uses the blocking
std::fs::remove_dir_all, best-effort, never panics).
use indusagi::swarm::{JsonCell, GuardOptions};
use serde::{Serialize, Deserialize};
#[derive(Clone, Serialize, Deserialize)]
struct Counter { n: i64 }
let cell = JsonCell::open("/tmp/counter.json", Counter { n: 0 }, None).await?;
let after = cell.mutate(|c| Ok(Counter { n: c.n + 1 })).await?; // guarded RMW
assert_eq!(after.n, 1);
assert_eq!(cell.read().await?.n, 1); // lock-free read
`JsonlLog` — the append-only log
Where a cell owns a single mutable value, a log owns an immutable, ever-growing NDJSON/JSONL sequence: one JSON object per line.
impl<T: Serialize + DeserializeOwned> JsonlLog<T> {
pub async fn open(path: impl AsRef<Path>) -> Result<JsonlLog<T>, SwarmFault>;
pub async fn append(&self, entry: T) -> Result<T, SwarmFault>;
pub async fn read_all(&self) -> Result<Vec<T>, SwarmFault>;
pub async fn thread_of(&self, id: &str) -> Result<Vec<T>, SwarmFault>
where T: TreeNode + Clone;
}
append serialize-validates the entry (a Validation fault before anything
touches disk), then writes a single write_all on an O_APPEND file descriptor
(via spawn_blocking over std::fs::OpenOptions::new().create(true).append(true))
so two concurrent appends never tear a line in half — correctness comes from
never mutating existing lines, not from locking. read_all validates every line,
skips blanks, and a malformed line raises a Validation fault naming the 1-based
line number. thread_of is a parent-pointer tree helper: when entries implement
the TreeNode trait (fn id(&self) -> &str, fn parent_id(&self) -> Option<&str>)
it walks parent_id pointers from a leaf to its root and returns the lineage
root→…→leaf — NotFound if the id is absent, Conflict on a cycle, treating a
parent_id that points at a missing id as a root.
`new_id` and `SwarmFault`
pub fn new_id(prefix: Option<&str>) -> String;
new_id(None) returns a bare 26-char ULID; new_id(Some("tkt")) returns
tkt_<ULID> joined by the swarm-local _ PREFIX_SEPARATOR (note this differs
from crate::core::ids::prefixed_id, which joins with -). Ids are minted
through one process-wide monotonic ulid::Generator behind a LazyLock<Mutex>,
so ids minted later in call order sort lexicographically after earlier ones
even inside the same millisecond — which lets the workboard treat a plain string
sort as creation order without a separate sequence file.
SwarmFault is the single typed error every kernel op raises — a real
std::error::Error (with Display + source()) tagged with a discriminable
SwarmFaultKind; see Faults.
The roster
roster/manifest.rs persists the team configuration and nothing else — a thin
wrapper over JsonCell<CrewRecord>. The persisted document is one CrewRecord:
#[derive(Clone, Serialize, Deserialize)]
pub struct CrewRecord {
#[serde(rename = "crewId")] pub crew_id: String,
pub members: Vec<CrewMember>,
#[serde(rename = "createdAt")] pub created_at: u64,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct CrewMember {
pub id: String,
pub role: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
#[serde(default, rename = "toolCollection", skip_serializing_if = "Option::is_none")]
pub tool_collection: Option<CrewToolCollection>,
}
CrewManifest::open opens a cell born unborn: a fresh path reads back as
CrewRecord { crew_id: "", members: [], created_at: 0 }. create(crew_id)
mints a crew_<ULID> id (or honors an explicit one), stamps created_at, and
refuses to overwrite an existing crew (Conflict). The empty string is the
legal unborn sentinel — crew_id is deliberately not schema-required
non-empty; the born-vs-unborn invariant is enforced by create and the private
assert_born guard, which makes the other mutators refuse to operate on an
unborn crew (Conflict), so a typo'd path can never silently grow a phantom
crew.
impl CrewManifest {
pub async fn open(path: impl AsRef<Path>) -> Result<CrewManifest, SwarmFault>;
pub async fn create(&self, crew_id: Option<String>) -> Result<CrewRecord, SwarmFault>;
pub async fn add_member(&self, draft: MemberDraft) -> Result<CrewMember, SwarmFault>;
pub async fn remove_member(&self, id: &str) -> Result<Vec<CrewMember>, SwarmFault>;
pub async fn get(&self) -> Result<CrewRecord, SwarmFault>;
pub async fn members(&self) -> Result<Vec<CrewMember>, SwarmFault>;
}
add_member mints mbr_<ULID> (or uses the draft's id), appends to the end
(preserving enrollment order), and rejects a duplicate id with Conflict;
remove_member raises NotFound for an unknown id. Every mutator is a pure
transform fed to cell.mutate, so concurrent roster edits serialize through the
kernel guard.
ModelPolicy is a small per-role model-selection table with total precedence:
(1) an explicit member.model, (2) otherwise the table entry for
member.role, (3) otherwise the configured fallback. Build it with
ModelPolicy::of(table, fallback); resolve with for_member(member),
model_for_role(role), or list roles().
CrewToolCollection mirrors the named collections the capabilities
layer publishes (ReadOnly / Coding / All, serialized kebab as
read-only / coding / all); it is kept local to the roster rather than
imported, so the roster shape stays self-contained.
The workboard
workboard/board.rs is the swarm's shared, durable task store: one Ticket
lifecycle plus a first-class dependency relation, all in one JSON file mediated
by JsonCell<BoardState>, so every mutation is a single guarded
read-modify-write.
#[derive(Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum TicketStatus { Open, Claimed, Blocked, Done, Failed }
#[derive(Clone, Serialize, Deserialize)]
pub struct Ticket {
pub id: String, // tkt_<ULID>
pub title: String,
pub body: String,
pub status: TicketStatus,
#[serde(default, skip_serializing_if = "Option::is_none")] pub assignee: Option<String>,
pub deps: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")] pub result: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")] pub error: Option<String>,
#[serde(default, rename = "blockReason", skip_serializing_if = "Option::is_none")]
pub block_reason: Option<String>,
#[serde(rename = "createdAt")] pub created_at: u64,
#[serde(rename = "updatedAt")] pub updated_at: u64,
}
The lifecycle is open → claimed → done | failed plus a blocked side-state.
The writes:
impl TicketBoard {
pub async fn open(path: impl AsRef<Path>, now: Option<Clock>) -> Result<TicketBoard, SwarmFault>;
pub async fn add(&self, draft: TicketDraft) -> Result<Ticket, SwarmFault>;
pub async fn claim(&self, id: &str, member: &str) -> Result<Ticket, SwarmFault>;
pub async fn complete(&self, id: &str, result: &str) -> Result<Ticket, SwarmFault>;
pub async fn fail(&self, id: &str, err: &str) -> Result<Ticket, SwarmFault>;
pub async fn block(&self, id: &str, reason: Option<&str>) -> Result<Ticket, SwarmFault>;
pub async fn unblock(&self, id: &str) -> Result<Ticket, SwarmFault>;
// reads:
pub async fn get(&self, id: &str) -> Result<Option<Ticket>, SwarmFault>;
pub async fn list(&self, status: Option<TicketStatus>) -> Result<Vec<Ticket>, SwarmFault>;
pub async fn ready(&self) -> Result<Vec<Ticket>, SwarmFault>;
}
add mints a tkt_<ULID> id, de-duplicates deps (first-seen wins), and
admits the ticket open at the injected clock value (Clock is
Arc<dyn Fn() -> u64 + Send + Sync>, defaulting to wall-clock epoch ms). Every
declared dep must already name a board ticket (dangling → NotFound); the graph
is rebuilt including the newcomer and the insert is rejected with Conflict if
first_cycle() finds a loop — so neither a phantom dependency nor a cycle ever
reaches disk. claim is atomic: the status check and the write live in one
guarded transform, so two members racing for the same ticket cannot both win
(the loser gets Conflict). complete/fail require a claimed ticket;
block preserves the assignee; unblock returns a blocked ticket to open,
clearing its assignee and block reason. Every transition stamps a fresh
updated_at.
ready() returns the open tickets whose every dep id names a ticket that
reached done (a failed dep never satisfies a dependent), in insertion order —
exactly the tickets a member may claim. list is stable insertion order via the
BoardState.order vec, optionally filtered by status; BoardState.tickets is a
HashMap keyed by id for O(1) lookup.
use indusagi::swarm::{TicketBoard, TicketDraft, SwarmFaultKind};
let board = TicketBoard::open("/tmp/board.json", None).await?;
let a = board.add(TicketDraft { title: "build".into(), ..Default::default() }).await?;
let b = board.add(TicketDraft {
title: "test".into(),
deps: Some(vec![a.id.clone()]),
..Default::default()
}).await?;
assert_eq!(board.ready().await?.iter().map(|t| &t.id).collect::<Vec<_>>(), vec![&a.id]);
board.claim(&a.id, "mbr_1").await?; // atomic: a second claimer loses
board.complete(&a.id, "built ok").await?;
assert_eq!(board.ready().await?.iter().map(|t| &t.id).collect::<Vec<_>>(), vec![&b.id]);
// A dangling dep is NotFound; a cycle is Conflict.
let err = board.add(TicketDraft {
title: "bad".into(),
deps: Some(vec!["tkt_missing".into()]),
..Default::default()
}).await.unwrap_err();
assert_eq!(err.kind(), SwarmFaultKind::NotFound);
`DepGraph` — the dependency relation
workboard/dep_graph.rs is the pure, in-memory dependency view, rebuilt from the
ticket store whenever the board reasons about readiness or admits a ticket. An
edge dep → ticket records "dep must reach done before ticket is workable".
Build with DepGraph::from_deps(entries) — each (id, deps) pair contributes the
edges dep → id. (The struct's fields — a forward adjacency IndexMap, a
backward reverse map, and a nodes IndexSet — are private; from_deps is the
only constructor, so every graph goes through the de-duplicating edge-insertion
path, and a self-edge is admitted as an ordinary edge so it surfaces as the
trivial one-node cycle.) first_cycle() returns the first cycle as the node path
that closes the loop ([a, …, a]), or None when acyclic, via an iterative
white/grey/black DFS. Two read accessors expose the relation directly:
dependencies_of(id) lists a node's direct in-neighbours (the ids it depends on)
and dependents_of(id) lists its direct out-neighbours (the ids it unblocks);
both return an empty Vec for an unknown node rather than panicking. Crucially the
adjacency uses IndexMap/IndexSet (not HashMap/HashSet) on purpose:
their stable insertion order makes first_cycle return the same cycle on every
run for the same input — a hash-ordered graph would surface a different-but-valid
cycle each process, breaking determinism.
The postbox
postbox/channel.rs plus postbox/codecs.rs are the crew's mailbox. A Channel
is one shared JsonlLog<Envelope> transcript (no file-per-recipient
fan-out) plus a private JsonCell<Cursor> offset per reader, rooted at a
directory:
<root>/
transcript.jsonl ← the shared, append-only envelope log
cursors/<readerId>.json ← one offset cell per reader
impl Channel {
pub async fn open(root: impl AsRef<Path>, guard: Option<GuardOptions>) -> Result<Channel, SwarmFault>;
pub async fn send(&self, env: Envelope) -> Result<Envelope, SwarmFault>;
pub async fn poll(&self, reader_id: &str) -> Result<Vec<Envelope>, SwarmFault>;
pub async fn peek(&self, reader_id: &str) -> Result<Vec<Envelope>, SwarmFault>;
}
send is one validated JsonlLog::append and touches no cursor — delivery is a
pull. poll(reader) reads everything past the reader's cursor, returns the slice
addressed to that reader, and advances the cursor to the end of the
transcript-as-of-this-read; messages addressed to other readers are scanned
but skipped permanently for this reader, keeping polls O(new lines). The
delivered set is computed inside the cursor's JsonCell::mutate, so the read
and the cursor write are one atomic transaction and concurrent polls for the same
reader never double-deliver. peek does the same without advancing — a
non-destructive look-ahead. A reader id is percent-encoded (exactly the JS
encodeURIComponent escaping) for its cursor filename, so the same reader maps
to the same file across the TS and Rust ports; clamp_cursor defends against a
cursor sitting past the end of a truncated transcript.
Every message is an Envelope: the shared header plus a #[serde(flatten)]-ed
EnvelopePayload, an internally-tagged enum over type
(task / result / note / control):
#[derive(Clone, Serialize, Deserialize)]
pub struct Envelope {
pub id: String,
pub from: String,
pub to: String,
pub ts: u64,
#[serde(flatten)] pub payload: EnvelopePayload,
}
#[derive(Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum EnvelopePayload {
Task { title: String, brief: String,
#[serde(default, rename = "ticketId", skip_serializing_if = "Option::is_none")] ticket_id: Option<String>,
#[serde(default)] priority: TaskPriority },
Result { #[serde(rename = "taskId")] task_id: String, status: ResultStatus, summary: String,
#[serde(default, skip_serializing_if = "Option::is_none")] artifacts: Option<serde_json::Value> },
Note { text: String },
Control { signal: ControlSignal,
#[serde(default, skip_serializing_if = "Option::is_none")] reason: Option<String> },
}
#[serde(flatten)] splices the payload's type + fields onto the same JSON
object as the header, reproducing the flat wire shape
{ id, from, to, ts, type, ...payload } — there is no nested payload object.
The discriminated union is the codec table: encode(env) and decode(line)
dispatch through serde's internally-tagged enum, so an unknown or mismatched
type, or malformed JSON, raises a Validation fault rather than an
if/match ladder. TaskPriority (Low/Normal/High, default Normal),
ResultStatus (Ok/Error/Skipped), and ControlSignal
(Pause/Resume/Drain/Shutdown) are the payload sub-enums.
use indusagi::swarm::{Channel, Envelope};
use indusagi::swarm::postbox::EnvelopePayload;
let ch = Channel::open("/tmp/chan", None).await?;
ch.send(Envelope {
id: indusagi::swarm::new_id(Some("env")),
from: "coder".into(), to: "reviewer".into(), ts: 0,
payload: EnvelopePayload::Note { text: "PR is up".into() },
}).await?;
let peeked = ch.peek("reviewer").await?; // look without consuming → 1
let polled = ch.poll("reviewer").await?; // consume + advance cursor → 1
assert!(ch.poll("reviewer").await?.is_empty()); // cursor advanced past it
The isolation seam
isolation/ provisions a separate git worktree per teammate so each can mutate
its own working tree, branch, and index without colliding with its peers.
Workspace (isolation/worktree.rs) owns no process spawning of its own: every
git invocation flows through an injected ShellRunner (isolation/runner.rs).
pub trait ShellRunner: Send + Sync {
fn exec<'a>(&'a self, command: &'a str, cwd: Option<&'a str>) -> ExecFuture<'a>;
}
pub struct RunResult { pub stdout: String, pub stderr: String, pub code: Option<i32> }
A run never errors on a non-zero exit — a failed git command is an
Ok(RunResult) with a non-zero code; Err is reserved for the runner being
unable to spawn anything at all (binary missing/unrunnable). The production
node_runner() (NodeRunner) splits the command string on whitespace into an
argv vector and spawns with no shell via tokio::process::Command — there is
no /bin/sh -c, so command substitution, globbing, and quoting do not apply,
removing an injection surface (every arg this layer passes git is a literal token).
A process killed by a signal reports code: None (the TS null).
impl Workspace {
pub fn new() -> Workspace; // backed by node_runner
pub fn with_runner(runner: Arc<dyn ShellRunner>) -> Workspace; // test seam
pub async fn create(&self, base_repo: &str, branch: &str, opts: CreateOptions) -> Result<WorktreeRef, SwarmFault>;
pub async fn list(&self, base_repo: &str) -> Result<Vec<WorktreeEntry>, SwarmFault>;
pub async fn remove(&self, path: &str, base_repo: Option<&str>, opts: RemoveOptions) -> Result<(), SwarmFault>;
}
create runs git worktree add -b <branch> <path> [<start-point>] (defaulting
<path> to a sanitized <base_repo>/.worktrees/<branch> sibling). list runs
git worktree list --porcelain and parses the blank-line-delimited records into
WorktreeEntry structs (path, short branch via stripping refs/heads/, head,
and the detached/bare/locked flags; CRLF and unknown keys tolerated).
remove runs git worktree remove [--force] <path> then git worktree prune.
Every git failure — a non-zero exit (git's stderr is preserved in the message) or
a spawn Err (threaded onto the fault's source()) — maps onto a single
Isolation SwarmFault.
The telemetry log
telemetry/activity.rs is the third leg alongside the board (current state of
the work) and the channel (messages between teammates): ActivityLog is an
immutable, ordered record of the coordination events themselves — a pure
observer over JsonlLog<CrewEvent>; nothing in the crew depends on it for
correctness.
#[derive(Clone, Serialize, Deserialize)]
pub struct CrewEvent {
pub id: String, // act_<ULID>
pub ts: u64,
#[serde(flatten)] pub body: CrewEventBody,
}
#[derive(Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum CrewEventBody {
MemberSpawned { #[serde(rename = "memberId")] member_id: String, role: String },
TicketPosted { #[serde(rename = "ticketId")] ticket_id: String, title: String },
TicketClaimed { #[serde(rename = "ticketId")] ticket_id: String, #[serde(rename = "memberId")] member_id: String },
TicketDone { #[serde(rename = "ticketId")] ticket_id: String, #[serde(rename = "memberId")] member_id: String, summary: String },
TicketFailed { #[serde(rename = "ticketId")] ticket_id: String, #[serde(rename = "memberId")] member_id: String, error: String },
MessageSent { #[serde(rename = "envelopeId")] envelope_id: String, from: String, to: String, #[serde(rename = "envelopeType")] envelope_type: String },
}
record(input) validates and appends one event, minting an act_<ULID> id and
stamping ts from the injected clock when the caller leaves them off (a
caller-supplied id/ts in CrewEventInput wins — and From<CrewEventBody>
makes the common "no overrides" case ergonomic). timeline() reads back every
event in append (chronological) order; a missing log is an empty timeline, and a
corrupt line surfaces as a Validation fault on read-back. The six kinds are
frozen in CREW_EVENT_KINDS, internally tagged on kind (snake_case) and
flattened onto the header so the on-disk shape is flat camelCase with a kind
discriminant.
The coordinator
coordinator.rs is the conductor that wires the four services (and the optional
isolation seam) into one driveable Crew. Crew::open(opts) (via create_crew)
roots manifest.json, board.json, channel/, and activity.jsonl under
opts.dir, eagerly creates the crew record (tolerating a Conflict on re-open),
and constructs a Workspace when base_repo is set. The board's Clock and the
activity log's Clock are the same Arc, so one injected now threads both.
pub struct CrewOptions {
pub dir: String,
pub base_repo: Option<String>,
pub spawn_agent: Option<SpawnAgent>,
pub now: Option<crate::swarm::workboard::board::Clock>,
}
pub type SpawnAgent = Arc<dyn Fn(&CrewMember) -> Agent + Send + Sync>;
impl Crew {
pub async fn open(opts: CrewOptions) -> Result<Crew, SwarmFault>;
pub async fn add_member(&self, draft: MemberDraft) -> Result<CrewMember, SwarmFault>;
pub async fn post_task(&self, draft: TaskDraft) -> Result<Ticket, SwarmFault>;
pub async fn run_round(&self) -> Result<RoundOutcome, SwarmFault>;
pub async fn status(&self) -> Result<CrewStatus, SwarmFault>;
}
The agent spawner is injectable: omit spawn_agent to get
default_spawn_agent — a real agent_with_capabilities(model, Some(tool_box(collection, None)))
bound to the member's model (or DEFAULT_MEMBER_MODEL, "mock-1"); an unset
collection defaults to ReadOnly, the safe floor — or inject a scripted fake so a
whole round runs with no model or network. add_member enrolls a
teammate and narrates member_spawned; post_task admits a ticket and narrates
ticket_posted. status() is a pure read across the three services: it returns
the crew id, the full roster, every ticket, the ready ids, a StatusCounts
tally, and the activity timeline.
The coordination model
run_round() is the heartbeat. It reads the board's ready() tickets and the
idle members (members not in busy_member_ids() — i.e. not holding a
claimed or blocked ticket), pairs them one-to-one in board order via
pair_up (which stops at min(ready, idle) — one ticket per member per round),
and for each pair calls work_one:
- claim the ticket (
board.claim) and recordticket_claimed; - spawn the member's agent (
(self.spawn_agent)(&member)) and run it over the ticket brief (run_agent); - on a clean settlement: extract the teammate's final answer
(
final_text, the concatenated text of the last non-empty assistant turn, read out of the snapshot's serialized JSON), send aresultEnvelopeaddressed back to thecoordinatorreader (channel.send), recordmessage_sent, complete the ticket (board.complete), and recordticket_donewith aclamp_summary-trimmed (280-char, whitespace-collapsed) summary; - on failure — the run settled with
snapshot.errorset (the Rust structural analogue of the TSsubmitthrowing) — fail the ticket (board.fail) and recordticket_failed, so one bad teammate cannot wedge the round.
run_round returns a RoundOutcome of the completed and failed tickets.
Because the board gates readiness on the dependency graph and skips busy members,
a multi-step plan unfolds across rounds: a dependent ticket stays out of ready
until its dependency completes, and a member that completed its ticket is idle
again next round to pick up the now-unblocked work.
use std::sync::Arc;
use indusagi::swarm::{create_crew, CrewOptions, MemberDraft, TaskDraft, CrewToolCollection};
use indusagi::runtime::agent_with_capabilities;
let crew = create_crew(CrewOptions {
dir: "/tmp/crew1".into(),
base_repo: None,
// Inject a deterministic, tool-free agent (omit for default_spawn_agent).
spawn_agent: Some(Arc::new(|_m| agent_with_capabilities("mock-1", None))),
now: None,
}).await?;
crew.add_member(MemberDraft { role: "coder".into(),
tool_collection: Some(CrewToolCollection::Coding), ..Default::default() }).await?;
crew.add_member(MemberDraft { role: "reviewer".into(),
tool_collection: Some(CrewToolCollection::ReadOnly), ..Default::default() }).await?;
let t1 = crew.post_task(TaskDraft { title: "implement feature".into(), ..Default::default() }).await?;
let _ = crew.post_task(TaskDraft { title: "review feature".into(),
deps: Some(vec![t1.id.clone()]), ..Default::default() }).await?; // gated on t1
let outcome = crew.run_round().await?; // only t1 is ready
println!("{:?}", outcome.completed.iter().map(|t| &t.id).collect::<Vec<_>>());
let status = crew.status().await?;
println!("done={} open={}", status.counts.done, status.counts.open);
Faults
Every failure in the swarm is a SwarmFault — a real std::error::Error tagged
with a closed, exhaustive SwarmFaultKind. Callers branch on fault.kind() (or
the fault.is(kind) guard) instead of matching message text; the originating
trigger, when present, is threaded onto source().
SwarmFaultKind |
Wire spelling | Raised when |
|---|---|---|
LockTimeout |
lock_timeout |
An exclusive JsonCell guard could not be acquired before its deadline. Transient — retry or skip this poll. |
Validation |
validation |
A value failed its serde shape on read or write (a cell, a log line, an envelope, an event). |
NotFound |
not_found |
A named cell, ticket, member, or log entry does not exist (incl. a dangling ticket dep). |
Conflict |
conflict |
A guarded mutation lost a race or violated an invariant (claim an already-claimed ticket, re-create a crew, dep cycle, duplicate member). |
Isolation |
isolation |
A git worktree provisioning step failed (non-zero exit or a spawn error). |
Spawn |
spawn |
A teammate agent could not be brought up. |
Construct with swarm_fault(kind, message) (the ergonomic front door),
SwarmFault::new(kind, message), or SwarmFault::with_cause(kind, message, cause) to thread a trigger onto the source chain.
Parity notes
This subsystem is a faithful 100%-Rust port of the clean-room TypeScript swarm/
(indus-rebuild/src/swarm); a handful of choices are load-bearing:
- serde is "validated on every crossing". The zod schemas of the TS port
become
serde(de)serialization plus a two-stage decode (prove JSON, then prove the shape).JsonCell::openround-trips theinitialvalue at open time so a bad default fails loudly. - mkdir-lock, not
O_EXCL, with stale reclamation afterstale_ms— a crashed holder is forcibly reclaimed so a wedged process won't deadlock the crew forever; release is an RAIILockGuard(the TSfinally { release() }). tokio::fs::renamefor atomic publish over a unique temp sibling.DepGraphusesIndexMap/IndexSetsofirst_cycle()returns the same cycle on every run for the same input (aHashMap-backed graph would flip between valid cycles by hash seed).- Discriminated unions are internally-tagged enums.
EnvelopePayload(#[serde(tag = "type")]) andCrewEventBody(#[serde(tag = "kind")]) are flattened onto their headers, reproducing the flat TS wire shape; the enum itself is the codec table, so a drifted/unknown tag is aValidationfault. - Wire names stay camelCase via
#[serde(rename = ...)](crewId,createdAt,ticketId,memberId,envelopeId,envelopeType,taskId,blockReason,toolCollection) and the enums keep their kebab/snake/lowercase spellings; absentOptionfields are dropped on serialize. CrewRecord.crew_idis deliberately not min-length-1: the empty string is the unborn sentinel; born-vs-unborn is enforced bycreate()andassert_born, not the schema.run_roundfolds a faulted run into a failed ticket — a run that settles withsnapshot.errorset is the Rust structural analogue of the TSsubmitthrowing, so one bad teammate fails its ticket instead of wedging the round.new_idjoins with_(tkt_<ULID>) to preserve the TS on-disk spelling, diverging fromcrate::core::ids::prefixed_id's-.
Every public type and function in this subsystem is fully implemented and unit-tested (no stubs). See the Architecture overview for where the layer sits.
Relationship to neighbors
Swarm is a self-contained subsystem with only two upward dependencies, both in
the coordinator: it imports the Runtime (Agent,
RunSnapshot, agent_with_capabilities) to spawn teammate agents, and the
capabilities surface (ToolCollection, tool_box) to bind each member's tool
slice. The coordinator only touches an Agent's submit_prompt surface, so a
scripted fake satisfies it. The domain services and kernel have no upward
dependencies (pure serde + tokio + ulid + indexmap + regex), making
the kernel and each service independently testable; everything internally
funnels through swarm/kernel. The final assistant text is read out of the run
snapshot's serialized JSON rather than naming the gateway Turn/Block types,
which live in a crate the swarm does not directly depend on. The root indusagi
crate re-exports the module as swarm. See the Core
subsystem for now_ms and the shared id helpers the swarm delegates to.
