Subsystemssubsystems/swarm

Swarm

indusagi::swarm is the multi-agent crew layer: a Crew coordinator drives teammate agents over a shared, dependency-aware ticket board, a cursor-based mailbox, and an append-only activity transcript — all built on a tiny frozen kernel of lockable JSON cells (JsonCell) and append-only JSONL logs (JsonlLog). Reach it through use indusagi::swarm::{create_crew, Crew, ...}.

A crew coordinates entirely through files on disk — no broker, no daemon, no shared memory. Multiple processes can safely share one crew directory because every mutation funnels through the kernel's cooperative mkdir-lock + atomic-rename + serde-validate machinery. The Crew owns the policy (match ready tickets to idle members, spawn one agent per pair, fold the result back onto the board and channel, narrate every step to the activity log); four thin domain services (roster, workboard, postbox, telemetry) plus an optional git-worktree isolation seam own the persisted state. Agent spawning is injectable (SpawnAgent), so a round can be driven by real LLM-backed agents in production or scripted, network-free fakes in tests.

This is the Rust edition of the swarm documented for Python; the two share the same architecture (one frozen kernel, four domain services, an injectable spawner, dep-gated readiness, cursor-exact delivery) but the Rust port leans on serde for "validated on every crossing", tokio for the async file I/O and backoff, an RAII LockGuard for lock release, an internally-tagged enum for each discriminated union, and IndexMap/IndexSet for deterministic cycle detection.

Table of Contents

Public exports

Most names below are re-exported from swarm/mod.rs (so use indusagi::swarm::{…} reaches them directly); the few that are reachable only through a submodule path (e.g. swarm::postbox::EnvelopePayload) are flagged in their row. The coordinator surface:

Name Kind Source Purpose
create_crew async fn coordinator.rs Host-facing factory; stands up a Crew from CrewOptions (delegates to Crew::open)
Crew struct coordinator.rs The coordinator. Verbs: open, add_member, post_task, run_round, status
CrewOptions struct coordinator.rs Knobs: dir, base_repo, spawn_agent override, injected now clock
CrewStatus struct coordinator.rs Point-in-time aggregate (crew_id, members, tickets, ready, counts, activity)
StatusCounts struct coordinator.rs Per-state ticket tally (open/claimed/blocked/done/failed: usize); the type of CrewStatus.counts. Reachable via swarm::coordinator::, not top-level re-exported
RoundOutcome struct coordinator.rs Result of one run_round: completed and failed ticket vecs
TaskDraft struct coordinator.rs Ticket draft for Crew::post_task (title, optional body, optional deps)
SpawnAgent type alias coordinator.rs Arc<dyn Fn(&CrewMember) -> Agent + Send + Sync> — the injectable spawner seam
default_spawn_agent fn coordinator.rs Default spawner: a real agent_with_capabilities bound to the member's model + tool_box

The domain services and kernel primitives:

Name Kind Source Purpose
CrewManifest struct roster/manifest.rs Concurrency-safe roster service over JsonCell<CrewRecord>
ModelPolicy struct roster/manifest.rs Role→model lookup table; of, for_member, model_for_role, roles
CrewMember struct roster/manifest.rs One teammate (id, role, optional model, optional tool_collection)
CrewRecord struct roster/manifest.rs Persisted crew doc (crew_id, members, created_at)
MemberDraft struct roster/manifest.rs Inputs to enroll a member (id?, role, model?, tool_collection?)
CrewToolCollection enum roster/manifest.rs ReadOnly / Coding / All (wire: read-only / coding / all)
TicketBoard struct workboard/board.rs Dependency-aware shared task store over JsonCell<BoardState>
Ticket struct workboard/board.rs One unit of work (lifecycle, deps, assignee, result, …)
TicketStatus enum workboard/board.rs Open / Claimed / Blocked / Done / Failed
TicketDraft struct workboard/board.rs Fields for TicketBoard::add (title, body?, deps?)
ticket_statuses fn workboard/board.rs Returns [TicketStatus; 5] in lifecycle order
DepGraph struct workboard/dep_graph.rs Pure directed dependency graph; from_deps, first_cycle, dependencies_of, dependents_of
Channel struct postbox/channel.rs Cursor-based mailbox over one shared transcript + per-reader cursors
Envelope struct postbox/codecs.rs Message header (id/from/to/ts) + flattened EnvelopePayload; envelope_type() helper
EnvelopePayload enum postbox/codecs.rs The #[serde(tag = "type")] union: Task / Result / Note / Control; envelope_type() helper. Reachable via swarm::postbox::, not top-level re-exported
EnvelopeType enum postbox/codecs.rs Task / Result / Note / Control (the class tag; as_str() → wire spelling)
TaskPriority / ResultStatus / ControlSignal enum postbox/codecs.rs Payload sub-enums: Low/Normal(default)/High; Ok/Error/Skipped; Pause/Resume/Drain/Shutdown. Reachable via swarm::postbox::, not top-level re-exported
encode / decode fn postbox/codecs.rs Validate+serialize / parse+validate one wire line
ENVELOPE_TYPES const postbox/codecs.rs [EnvelopeType; 4] of the four message-class tags
Workspace struct isolation/worktree.rs Git-worktree provisioner over an injected ShellRunner
WorktreeRef / WorktreeEntry struct isolation/worktree.rs Worktree coordinates / a parsed git worktree list entry
CreateOptions / RemoveOptions struct isolation/worktree.rs Options for Workspace::create / Workspace::remove
ShellRunner trait isolation/runner.rs exec(command, cwd) -> ExecFuture<'a> (a boxed Future<Output = Result<RunResult, io::Error>>); must not err on non-zero exit
RunResult struct isolation/runner.rs One shelled command's outcome (stdout, stderr, code: Option<i32>)
NodeRunner / ExecFuture struct / type alias isolation/runner.rs The production runner type / the boxed-future return of ShellRunner::exec (object-safe, no async-trait); reachable via swarm::isolation::runner::, not top-level re-exported
node_runner fn isolation/runner.rs Production runner factory: returns a NodeRunner over shell-less tokio::process::Command
ActivityLog struct telemetry/activity.rs Append-only coordination transcript over JsonlLog<CrewEvent>
CrewEvent struct telemetry/activity.rs The header (id, ts) + flattened CrewEventBody
CrewEventKind / CrewEventInput enum / struct telemetry/activity.rs The six kind discriminants / the body+overrides record() accepts
CREW_EVENT_KINDS const telemetry/activity.rs [CrewEventKind; 6] of the coordination-event kinds
JsonCell struct kernel/json_cell.rs Lockable, validated, atomically-published JSON file
GuardOptions struct kernel/json_cell.rs Lock-acquisition tunables (timeout_ms, stale_ms, base_delay_ms, max_delay_ms)
JsonlLog struct kernel/jsonl_log.rs Append-only validated JSONL log; append, read_all, thread_of
TreeNode trait kernel/jsonl_log.rs (id, parent_id) an entry must expose to use thread_of
new_id fn kernel/ids.rs Mint a sortable, monotonic ULID-backed id, optionally prefixed
SwarmFault struct kernel/faults.rs Typed error with a discriminable .kind(); is(kind) guard
swarm_fault fn kernel/faults.rs Ergonomic constructor for SwarmFault::new(kind, message)
SwarmFaultKind enum kernel/faults.rs LockTimeout / Validation / NotFound / Conflict / Isolation / Spawn

Module map

Module Holds
kernel/ The frozen foundation: JsonCell, JsonlLog/TreeNode, new_id, SwarmFault/swarm_fault/SwarmFaultKind, GuardOptions
roster/ Team config: CrewManifest, ModelPolicy; CrewMember/CrewRecord/MemberDraft/CrewToolCollection
workboard/ Shared task store: TicketBoard, DepGraph; Ticket/TicketStatus/TicketDraft, ticket_statuses
postbox/ Mailbox: Channel and codecs (the Envelope/EnvelopePayload union, encode/decode)
isolation/ Git-worktree isolation: Workspace over an injectable ShellRunner; node_runner, RunResult, option/ref structs
telemetry/ Observer: ActivityLog over JsonlLog and the CrewEvent/CrewEventBody union (six kinds)
coordinator.rs Crew — the policy layer that wires the four services + isolation into one driveable object

The kernel

The kernel (swarm/kernel/) is the frozen, load-bearing foundation every domain service is a thin wrapper over: four small primitives, deliberately minimal.

`JsonCell` — the one transactional primitive

A cell owns exactly one JSON file plus a serde-(de)serializable T (the Rust stand-in for the TS zod schema). Two surfaces:

impl<T> JsonCell<T>
where
    T: Serialize + DeserializeOwned + Clone,
{
    pub async fn open(
        path: impl AsRef<Path>,
        initial: T,
        guard: Option<GuardOptions>,
    ) -> Result<JsonCell<T>, SwarmFault>;

    pub async fn read(&self) -> Result<T, SwarmFault>;

    pub async fn mutate<F>(&self, f: F) -> Result<T, SwarmFault>
    where
        F: FnOnce(T) -> Result<T, SwarmFault>;
}

read() is lock-free — atomic publish guarantees a reader sees a whole previous-or-next value, never torn; a missing file returns the cell's initial, and malformed JSON or a wrong shape both raise a Validation fault (decode is two-stage: prove it is JSON, then prove it fits T). mutate(f) is the only transactional primitive: acquire the exclusive guard, read+validate, apply the pure transform f, re-validate, publish atomically (write a unique temp sibling <path>.tmp.<pid>.<ts>.<ulid> then tokio::fs::rename over the target), and release the guard in a finally-equivalent. Because every domain mutator hands a pure transform to cell.mutate, cross-process edits to the same file serialize with zero lost updates.

The mutex is an atomic tokio::fs::create_dir of a <file>.lockdir directory (AlreadyExists means "held"), with a { pid, takenAt, cell } owner.json marker dropped inside for stale detection. Acquisition retries on a jittered exponential backoff bounded by a deadline; a lock dir older than stale_ms (read from the marker's takenAt, falling back to the dir's mtime) is forcibly reclaimed via remove_dir_all, so a crashed holder never deadlocks the crew forever. The default GuardOptions (GuardOptions::resolved) are timeout_ms 10_000, stale_ms 30_000, base_delay_ms 12, max_delay_ms 250. Release is an RAII LockGuard whose Drop removes the lock dir — the Rust analogue of the TS finally { release() } (drop is synchronous, so it uses the blocking std::fs::remove_dir_all, best-effort, never panics).

use indusagi::swarm::{JsonCell, GuardOptions};
use serde::{Serialize, Deserialize};

#[derive(Clone, Serialize, Deserialize)]
struct Counter { n: i64 }

let cell = JsonCell::open("/tmp/counter.json", Counter { n: 0 }, None).await?;
let after = cell.mutate(|c| Ok(Counter { n: c.n + 1 })).await?; // guarded RMW
assert_eq!(after.n, 1);
assert_eq!(cell.read().await?.n, 1);                            // lock-free read

`JsonlLog` — the append-only log

Where a cell owns a single mutable value, a log owns an immutable, ever-growing NDJSON/JSONL sequence: one JSON object per line.

impl<T: Serialize + DeserializeOwned> JsonlLog<T> {
    pub async fn open(path: impl AsRef<Path>) -> Result<JsonlLog<T>, SwarmFault>;
    pub async fn append(&self, entry: T) -> Result<T, SwarmFault>;
    pub async fn read_all(&self) -> Result<Vec<T>, SwarmFault>;
    pub async fn thread_of(&self, id: &str) -> Result<Vec<T>, SwarmFault>
    where T: TreeNode + Clone;
}

append serialize-validates the entry (a Validation fault before anything touches disk), then writes a single write_all on an O_APPEND file descriptor (via spawn_blocking over std::fs::OpenOptions::new().create(true).append(true)) so two concurrent appends never tear a line in half — correctness comes from never mutating existing lines, not from locking. read_all validates every line, skips blanks, and a malformed line raises a Validation fault naming the 1-based line number. thread_of is a parent-pointer tree helper: when entries implement the TreeNode trait (fn id(&self) -> &str, fn parent_id(&self) -> Option<&str>) it walks parent_id pointers from a leaf to its root and returns the lineage root→…→leaf — NotFound if the id is absent, Conflict on a cycle, treating a parent_id that points at a missing id as a root.

`new_id` and `SwarmFault`

pub fn new_id(prefix: Option<&str>) -> String;

new_id(None) returns a bare 26-char ULID; new_id(Some("tkt")) returns tkt_<ULID> joined by the swarm-local _ PREFIX_SEPARATOR (note this differs from crate::core::ids::prefixed_id, which joins with -). Ids are minted through one process-wide monotonic ulid::Generator behind a LazyLock<Mutex>, so ids minted later in call order sort lexicographically after earlier ones even inside the same millisecond — which lets the workboard treat a plain string sort as creation order without a separate sequence file.

SwarmFault is the single typed error every kernel op raises — a real std::error::Error (with Display + source()) tagged with a discriminable SwarmFaultKind; see Faults.

The roster

roster/manifest.rs persists the team configuration and nothing else — a thin wrapper over JsonCell<CrewRecord>. The persisted document is one CrewRecord:

#[derive(Clone, Serialize, Deserialize)]
pub struct CrewRecord {
    #[serde(rename = "crewId")]    pub crew_id: String,
    pub members: Vec<CrewMember>,
    #[serde(rename = "createdAt")] pub created_at: u64,
}

#[derive(Clone, Serialize, Deserialize)]
pub struct CrewMember {
    pub id: String,
    pub role: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub model: Option<String>,
    #[serde(default, rename = "toolCollection", skip_serializing_if = "Option::is_none")]
    pub tool_collection: Option<CrewToolCollection>,
}

CrewManifest::open opens a cell born unborn: a fresh path reads back as CrewRecord { crew_id: "", members: [], created_at: 0 }. create(crew_id) mints a crew_<ULID> id (or honors an explicit one), stamps created_at, and refuses to overwrite an existing crew (Conflict). The empty string is the legal unborn sentinel — crew_id is deliberately not schema-required non-empty; the born-vs-unborn invariant is enforced by create and the private assert_born guard, which makes the other mutators refuse to operate on an unborn crew (Conflict), so a typo'd path can never silently grow a phantom crew.

impl CrewManifest {
    pub async fn open(path: impl AsRef<Path>) -> Result<CrewManifest, SwarmFault>;
    pub async fn create(&self, crew_id: Option<String>) -> Result<CrewRecord, SwarmFault>;
    pub async fn add_member(&self, draft: MemberDraft) -> Result<CrewMember, SwarmFault>;
    pub async fn remove_member(&self, id: &str) -> Result<Vec<CrewMember>, SwarmFault>;
    pub async fn get(&self) -> Result<CrewRecord, SwarmFault>;
    pub async fn members(&self) -> Result<Vec<CrewMember>, SwarmFault>;
}

add_member mints mbr_<ULID> (or uses the draft's id), appends to the end (preserving enrollment order), and rejects a duplicate id with Conflict; remove_member raises NotFound for an unknown id. Every mutator is a pure transform fed to cell.mutate, so concurrent roster edits serialize through the kernel guard.

ModelPolicy is a small per-role model-selection table with total precedence: (1) an explicit member.model, (2) otherwise the table entry for member.role, (3) otherwise the configured fallback. Build it with ModelPolicy::of(table, fallback); resolve with for_member(member), model_for_role(role), or list roles().

CrewToolCollection mirrors the named collections the capabilities layer publishes (ReadOnly / Coding / All, serialized kebab as read-only / coding / all); it is kept local to the roster rather than imported, so the roster shape stays self-contained.

The workboard

workboard/board.rs is the swarm's shared, durable task store: one Ticket lifecycle plus a first-class dependency relation, all in one JSON file mediated by JsonCell<BoardState>, so every mutation is a single guarded read-modify-write.

#[derive(Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum TicketStatus { Open, Claimed, Blocked, Done, Failed }

#[derive(Clone, Serialize, Deserialize)]
pub struct Ticket {
    pub id: String,            // tkt_<ULID>
    pub title: String,
    pub body: String,
    pub status: TicketStatus,
    #[serde(default, skip_serializing_if = "Option::is_none")] pub assignee: Option<String>,
    pub deps: Vec<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")] pub result: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")] pub error: Option<String>,
    #[serde(default, rename = "blockReason", skip_serializing_if = "Option::is_none")]
    pub block_reason: Option<String>,
    #[serde(rename = "createdAt")] pub created_at: u64,
    #[serde(rename = "updatedAt")] pub updated_at: u64,
}

The lifecycle is open → claimed → done | failed plus a blocked side-state. The writes:

impl TicketBoard {
    pub async fn open(path: impl AsRef<Path>, now: Option<Clock>) -> Result<TicketBoard, SwarmFault>;
    pub async fn add(&self, draft: TicketDraft) -> Result<Ticket, SwarmFault>;
    pub async fn claim(&self, id: &str, member: &str) -> Result<Ticket, SwarmFault>;
    pub async fn complete(&self, id: &str, result: &str) -> Result<Ticket, SwarmFault>;
    pub async fn fail(&self, id: &str, err: &str) -> Result<Ticket, SwarmFault>;
    pub async fn block(&self, id: &str, reason: Option<&str>) -> Result<Ticket, SwarmFault>;
    pub async fn unblock(&self, id: &str) -> Result<Ticket, SwarmFault>;
    // reads:
    pub async fn get(&self, id: &str) -> Result<Option<Ticket>, SwarmFault>;
    pub async fn list(&self, status: Option<TicketStatus>) -> Result<Vec<Ticket>, SwarmFault>;
    pub async fn ready(&self) -> Result<Vec<Ticket>, SwarmFault>;
}

add mints a tkt_<ULID> id, de-duplicates deps (first-seen wins), and admits the ticket open at the injected clock value (Clock is Arc<dyn Fn() -> u64 + Send + Sync>, defaulting to wall-clock epoch ms). Every declared dep must already name a board ticket (dangling → NotFound); the graph is rebuilt including the newcomer and the insert is rejected with Conflict if first_cycle() finds a loop — so neither a phantom dependency nor a cycle ever reaches disk. claim is atomic: the status check and the write live in one guarded transform, so two members racing for the same ticket cannot both win (the loser gets Conflict). complete/fail require a claimed ticket; block preserves the assignee; unblock returns a blocked ticket to open, clearing its assignee and block reason. Every transition stamps a fresh updated_at.

ready() returns the open tickets whose every dep id names a ticket that reached done (a failed dep never satisfies a dependent), in insertion order — exactly the tickets a member may claim. list is stable insertion order via the BoardState.order vec, optionally filtered by status; BoardState.tickets is a HashMap keyed by id for O(1) lookup.

use indusagi::swarm::{TicketBoard, TicketDraft, SwarmFaultKind};

let board = TicketBoard::open("/tmp/board.json", None).await?;
let a = board.add(TicketDraft { title: "build".into(), ..Default::default() }).await?;
let b = board.add(TicketDraft {
    title: "test".into(),
    deps: Some(vec![a.id.clone()]),
    ..Default::default()
}).await?;

assert_eq!(board.ready().await?.iter().map(|t| &t.id).collect::<Vec<_>>(), vec![&a.id]);
board.claim(&a.id, "mbr_1").await?;          // atomic: a second claimer loses
board.complete(&a.id, "built ok").await?;
assert_eq!(board.ready().await?.iter().map(|t| &t.id).collect::<Vec<_>>(), vec![&b.id]);

// A dangling dep is NotFound; a cycle is Conflict.
let err = board.add(TicketDraft {
    title: "bad".into(),
    deps: Some(vec!["tkt_missing".into()]),
    ..Default::default()
}).await.unwrap_err();
assert_eq!(err.kind(), SwarmFaultKind::NotFound);

`DepGraph` — the dependency relation

workboard/dep_graph.rs is the pure, in-memory dependency view, rebuilt from the ticket store whenever the board reasons about readiness or admits a ticket. An edge dep → ticket records "dep must reach done before ticket is workable". Build with DepGraph::from_deps(entries) — each (id, deps) pair contributes the edges dep → id. (The struct's fields — a forward adjacency IndexMap, a backward reverse map, and a nodes IndexSet — are private; from_deps is the only constructor, so every graph goes through the de-duplicating edge-insertion path, and a self-edge is admitted as an ordinary edge so it surfaces as the trivial one-node cycle.) first_cycle() returns the first cycle as the node path that closes the loop ([a, …, a]), or None when acyclic, via an iterative white/grey/black DFS. Two read accessors expose the relation directly: dependencies_of(id) lists a node's direct in-neighbours (the ids it depends on) and dependents_of(id) lists its direct out-neighbours (the ids it unblocks); both return an empty Vec for an unknown node rather than panicking. Crucially the adjacency uses IndexMap/IndexSet (not HashMap/HashSet) on purpose: their stable insertion order makes first_cycle return the same cycle on every run for the same input — a hash-ordered graph would surface a different-but-valid cycle each process, breaking determinism.

The postbox

postbox/channel.rs plus postbox/codecs.rs are the crew's mailbox. A Channel is one shared JsonlLog<Envelope> transcript (no file-per-recipient fan-out) plus a private JsonCell<Cursor> offset per reader, rooted at a directory:

<root>/
  transcript.jsonl        ← the shared, append-only envelope log
  cursors/<readerId>.json ← one offset cell per reader
impl Channel {
    pub async fn open(root: impl AsRef<Path>, guard: Option<GuardOptions>) -> Result<Channel, SwarmFault>;
    pub async fn send(&self, env: Envelope) -> Result<Envelope, SwarmFault>;
    pub async fn poll(&self, reader_id: &str) -> Result<Vec<Envelope>, SwarmFault>;
    pub async fn peek(&self, reader_id: &str) -> Result<Vec<Envelope>, SwarmFault>;
}

send is one validated JsonlLog::append and touches no cursor — delivery is a pull. poll(reader) reads everything past the reader's cursor, returns the slice addressed to that reader, and advances the cursor to the end of the transcript-as-of-this-read; messages addressed to other readers are scanned but skipped permanently for this reader, keeping polls O(new lines). The delivered set is computed inside the cursor's JsonCell::mutate, so the read and the cursor write are one atomic transaction and concurrent polls for the same reader never double-deliver. peek does the same without advancing — a non-destructive look-ahead. A reader id is percent-encoded (exactly the JS encodeURIComponent escaping) for its cursor filename, so the same reader maps to the same file across the TS and Rust ports; clamp_cursor defends against a cursor sitting past the end of a truncated transcript.

Every message is an Envelope: the shared header plus a #[serde(flatten)]-ed EnvelopePayload, an internally-tagged enum over type (task / result / note / control):

#[derive(Clone, Serialize, Deserialize)]
pub struct Envelope {
    pub id: String,
    pub from: String,
    pub to: String,
    pub ts: u64,
    #[serde(flatten)] pub payload: EnvelopePayload,
}

#[derive(Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum EnvelopePayload {
    Task   { title: String, brief: String,
             #[serde(default, rename = "ticketId", skip_serializing_if = "Option::is_none")] ticket_id: Option<String>,
             #[serde(default)] priority: TaskPriority },
    Result { #[serde(rename = "taskId")] task_id: String, status: ResultStatus, summary: String,
             #[serde(default, skip_serializing_if = "Option::is_none")] artifacts: Option<serde_json::Value> },
    Note    { text: String },
    Control { signal: ControlSignal,
              #[serde(default, skip_serializing_if = "Option::is_none")] reason: Option<String> },
}

#[serde(flatten)] splices the payload's type + fields onto the same JSON object as the header, reproducing the flat wire shape { id, from, to, ts, type, ...payload } — there is no nested payload object. The discriminated union is the codec table: encode(env) and decode(line) dispatch through serde's internally-tagged enum, so an unknown or mismatched type, or malformed JSON, raises a Validation fault rather than an if/match ladder. TaskPriority (Low/Normal/High, default Normal), ResultStatus (Ok/Error/Skipped), and ControlSignal (Pause/Resume/Drain/Shutdown) are the payload sub-enums.

use indusagi::swarm::{Channel, Envelope};
use indusagi::swarm::postbox::EnvelopePayload;

let ch = Channel::open("/tmp/chan", None).await?;
ch.send(Envelope {
    id: indusagi::swarm::new_id(Some("env")),
    from: "coder".into(), to: "reviewer".into(), ts: 0,
    payload: EnvelopePayload::Note { text: "PR is up".into() },
}).await?;

let peeked = ch.peek("reviewer").await?;    // look without consuming → 1
let polled = ch.poll("reviewer").await?;    // consume + advance cursor → 1
assert!(ch.poll("reviewer").await?.is_empty()); // cursor advanced past it

The isolation seam

isolation/ provisions a separate git worktree per teammate so each can mutate its own working tree, branch, and index without colliding with its peers. Workspace (isolation/worktree.rs) owns no process spawning of its own: every git invocation flows through an injected ShellRunner (isolation/runner.rs).

pub trait ShellRunner: Send + Sync {
    fn exec<'a>(&'a self, command: &'a str, cwd: Option<&'a str>) -> ExecFuture<'a>;
}

pub struct RunResult { pub stdout: String, pub stderr: String, pub code: Option<i32> }

A run never errors on a non-zero exit — a failed git command is an Ok(RunResult) with a non-zero code; Err is reserved for the runner being unable to spawn anything at all (binary missing/unrunnable). The production node_runner() (NodeRunner) splits the command string on whitespace into an argv vector and spawns with no shell via tokio::process::Command — there is no /bin/sh -c, so command substitution, globbing, and quoting do not apply, removing an injection surface (every arg this layer passes git is a literal token). A process killed by a signal reports code: None (the TS null).

impl Workspace {
    pub fn new() -> Workspace;                                    // backed by node_runner
    pub fn with_runner(runner: Arc<dyn ShellRunner>) -> Workspace; // test seam
    pub async fn create(&self, base_repo: &str, branch: &str, opts: CreateOptions) -> Result<WorktreeRef, SwarmFault>;
    pub async fn list(&self, base_repo: &str) -> Result<Vec<WorktreeEntry>, SwarmFault>;
    pub async fn remove(&self, path: &str, base_repo: Option<&str>, opts: RemoveOptions) -> Result<(), SwarmFault>;
}

create runs git worktree add -b <branch> <path> [<start-point>] (defaulting <path> to a sanitized <base_repo>/.worktrees/<branch> sibling). list runs git worktree list --porcelain and parses the blank-line-delimited records into WorktreeEntry structs (path, short branch via stripping refs/heads/, head, and the detached/bare/locked flags; CRLF and unknown keys tolerated). remove runs git worktree remove [--force] <path> then git worktree prune. Every git failure — a non-zero exit (git's stderr is preserved in the message) or a spawn Err (threaded onto the fault's source()) — maps onto a single Isolation SwarmFault.

The telemetry log

telemetry/activity.rs is the third leg alongside the board (current state of the work) and the channel (messages between teammates): ActivityLog is an immutable, ordered record of the coordination events themselves — a pure observer over JsonlLog<CrewEvent>; nothing in the crew depends on it for correctness.

#[derive(Clone, Serialize, Deserialize)]
pub struct CrewEvent {
    pub id: String,             // act_<ULID>
    pub ts: u64,
    #[serde(flatten)] pub body: CrewEventBody,
}

#[derive(Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum CrewEventBody {
    MemberSpawned { #[serde(rename = "memberId")] member_id: String, role: String },
    TicketPosted  { #[serde(rename = "ticketId")] ticket_id: String, title: String },
    TicketClaimed { #[serde(rename = "ticketId")] ticket_id: String, #[serde(rename = "memberId")] member_id: String },
    TicketDone    { #[serde(rename = "ticketId")] ticket_id: String, #[serde(rename = "memberId")] member_id: String, summary: String },
    TicketFailed  { #[serde(rename = "ticketId")] ticket_id: String, #[serde(rename = "memberId")] member_id: String, error: String },
    MessageSent   { #[serde(rename = "envelopeId")] envelope_id: String, from: String, to: String, #[serde(rename = "envelopeType")] envelope_type: String },
}

record(input) validates and appends one event, minting an act_<ULID> id and stamping ts from the injected clock when the caller leaves them off (a caller-supplied id/ts in CrewEventInput wins — and From<CrewEventBody> makes the common "no overrides" case ergonomic). timeline() reads back every event in append (chronological) order; a missing log is an empty timeline, and a corrupt line surfaces as a Validation fault on read-back. The six kinds are frozen in CREW_EVENT_KINDS, internally tagged on kind (snake_case) and flattened onto the header so the on-disk shape is flat camelCase with a kind discriminant.

The coordinator

coordinator.rs is the conductor that wires the four services (and the optional isolation seam) into one driveable Crew. Crew::open(opts) (via create_crew) roots manifest.json, board.json, channel/, and activity.jsonl under opts.dir, eagerly creates the crew record (tolerating a Conflict on re-open), and constructs a Workspace when base_repo is set. The board's Clock and the activity log's Clock are the same Arc, so one injected now threads both.

pub struct CrewOptions {
    pub dir: String,
    pub base_repo: Option<String>,
    pub spawn_agent: Option<SpawnAgent>,
    pub now: Option<crate::swarm::workboard::board::Clock>,
}

pub type SpawnAgent = Arc<dyn Fn(&CrewMember) -> Agent + Send + Sync>;

impl Crew {
    pub async fn open(opts: CrewOptions) -> Result<Crew, SwarmFault>;
    pub async fn add_member(&self, draft: MemberDraft) -> Result<CrewMember, SwarmFault>;
    pub async fn post_task(&self, draft: TaskDraft) -> Result<Ticket, SwarmFault>;
    pub async fn run_round(&self) -> Result<RoundOutcome, SwarmFault>;
    pub async fn status(&self) -> Result<CrewStatus, SwarmFault>;
}

The agent spawner is injectable: omit spawn_agent to get default_spawn_agent — a real agent_with_capabilities(model, Some(tool_box(collection, None))) bound to the member's model (or DEFAULT_MEMBER_MODEL, "mock-1"); an unset collection defaults to ReadOnly, the safe floor — or inject a scripted fake so a whole round runs with no model or network. add_member enrolls a teammate and narrates member_spawned; post_task admits a ticket and narrates ticket_posted. status() is a pure read across the three services: it returns the crew id, the full roster, every ticket, the ready ids, a StatusCounts tally, and the activity timeline.

The coordination model

run_round() is the heartbeat. It reads the board's ready() tickets and the idle members (members not in busy_member_ids() — i.e. not holding a claimed or blocked ticket), pairs them one-to-one in board order via pair_up (which stops at min(ready, idle) — one ticket per member per round), and for each pair calls work_one:

  1. claim the ticket (board.claim) and record ticket_claimed;
  2. spawn the member's agent ((self.spawn_agent)(&member)) and run it over the ticket brief (run_agent);
  3. on a clean settlement: extract the teammate's final answer (final_text, the concatenated text of the last non-empty assistant turn, read out of the snapshot's serialized JSON), send a result Envelope addressed back to the coordinator reader (channel.send), record message_sent, complete the ticket (board.complete), and record ticket_done with a clamp_summary-trimmed (280-char, whitespace-collapsed) summary;
  4. on failure — the run settled with snapshot.error set (the Rust structural analogue of the TS submit throwing) — fail the ticket (board.fail) and record ticket_failed, so one bad teammate cannot wedge the round.

run_round returns a RoundOutcome of the completed and failed tickets. Because the board gates readiness on the dependency graph and skips busy members, a multi-step plan unfolds across rounds: a dependent ticket stays out of ready until its dependency completes, and a member that completed its ticket is idle again next round to pick up the now-unblocked work.

use std::sync::Arc;
use indusagi::swarm::{create_crew, CrewOptions, MemberDraft, TaskDraft, CrewToolCollection};
use indusagi::runtime::agent_with_capabilities;

let crew = create_crew(CrewOptions {
    dir: "/tmp/crew1".into(),
    base_repo: None,
    // Inject a deterministic, tool-free agent (omit for default_spawn_agent).
    spawn_agent: Some(Arc::new(|_m| agent_with_capabilities("mock-1", None))),
    now: None,
}).await?;

crew.add_member(MemberDraft { role: "coder".into(),
    tool_collection: Some(CrewToolCollection::Coding), ..Default::default() }).await?;
crew.add_member(MemberDraft { role: "reviewer".into(),
    tool_collection: Some(CrewToolCollection::ReadOnly), ..Default::default() }).await?;

let t1 = crew.post_task(TaskDraft { title: "implement feature".into(), ..Default::default() }).await?;
let _  = crew.post_task(TaskDraft { title: "review feature".into(),
    deps: Some(vec![t1.id.clone()]), ..Default::default() }).await?; // gated on t1

let outcome = crew.run_round().await?;            // only t1 is ready
println!("{:?}", outcome.completed.iter().map(|t| &t.id).collect::<Vec<_>>());

let status = crew.status().await?;
println!("done={} open={}", status.counts.done, status.counts.open);

Faults

Every failure in the swarm is a SwarmFault — a real std::error::Error tagged with a closed, exhaustive SwarmFaultKind. Callers branch on fault.kind() (or the fault.is(kind) guard) instead of matching message text; the originating trigger, when present, is threaded onto source().

SwarmFaultKind Wire spelling Raised when
LockTimeout lock_timeout An exclusive JsonCell guard could not be acquired before its deadline. Transient — retry or skip this poll.
Validation validation A value failed its serde shape on read or write (a cell, a log line, an envelope, an event).
NotFound not_found A named cell, ticket, member, or log entry does not exist (incl. a dangling ticket dep).
Conflict conflict A guarded mutation lost a race or violated an invariant (claim an already-claimed ticket, re-create a crew, dep cycle, duplicate member).
Isolation isolation A git worktree provisioning step failed (non-zero exit or a spawn error).
Spawn spawn A teammate agent could not be brought up.

Construct with swarm_fault(kind, message) (the ergonomic front door), SwarmFault::new(kind, message), or SwarmFault::with_cause(kind, message, cause) to thread a trigger onto the source chain.

Parity notes

This subsystem is a faithful 100%-Rust port of the clean-room TypeScript swarm/ (indus-rebuild/src/swarm); a handful of choices are load-bearing:

  • serde is "validated on every crossing". The zod schemas of the TS port become serde (de)serialization plus a two-stage decode (prove JSON, then prove the shape). JsonCell::open round-trips the initial value at open time so a bad default fails loudly.
  • mkdir-lock, not O_EXCL, with stale reclamation after stale_ms — a crashed holder is forcibly reclaimed so a wedged process won't deadlock the crew forever; release is an RAII LockGuard (the TS finally { release() }).
  • tokio::fs::rename for atomic publish over a unique temp sibling.
  • DepGraph uses IndexMap/IndexSet so first_cycle() returns the same cycle on every run for the same input (a HashMap-backed graph would flip between valid cycles by hash seed).
  • Discriminated unions are internally-tagged enums. EnvelopePayload (#[serde(tag = "type")]) and CrewEventBody (#[serde(tag = "kind")]) are flattened onto their headers, reproducing the flat TS wire shape; the enum itself is the codec table, so a drifted/unknown tag is a Validation fault.
  • Wire names stay camelCase via #[serde(rename = ...)] (crewId, createdAt, ticketId, memberId, envelopeId, envelopeType, taskId, blockReason, toolCollection) and the enums keep their kebab/snake/lowercase spellings; absent Option fields are dropped on serialize.
  • CrewRecord.crew_id is deliberately not min-length-1: the empty string is the unborn sentinel; born-vs-unborn is enforced by create() and assert_born, not the schema.
  • run_round folds a faulted run into a failed ticket — a run that settles with snapshot.error set is the Rust structural analogue of the TS submit throwing, so one bad teammate fails its ticket instead of wedging the round.
  • new_id joins with _ (tkt_<ULID>) to preserve the TS on-disk spelling, diverging from crate::core::ids::prefixed_id's -.

Every public type and function in this subsystem is fully implemented and unit-tested (no stubs). See the Architecture overview for where the layer sits.

Relationship to neighbors

Swarm is a self-contained subsystem with only two upward dependencies, both in the coordinator: it imports the Runtime (Agent, RunSnapshot, agent_with_capabilities) to spawn teammate agents, and the capabilities surface (ToolCollection, tool_box) to bind each member's tool slice. The coordinator only touches an Agent's submit_prompt surface, so a scripted fake satisfies it. The domain services and kernel have no upward dependencies (pure serde + tokio + ulid + indexmap + regex), making the kernel and each service independently testable; everything internally funnels through swarm/kernel. The final assistant text is read out of the run snapshot's serialized JSON rather than naming the gateway Turn/Block types, which live in a crate the swarm does not directly depend on. The root indusagi crate re-exports the module as swarm. See the Core subsystem for now_ms and the shared id helpers the swarm delegates to.