Subsystemssubsystems/swarm

Swarm

indusagi.swarm is the multi-agent crew layer: a Crew coordinator drives teammate agents over a shared ticket board, a cursor-based mailbox, and an activity log, all built on a tiny frozen kernel of lockable JSON files and append-only JSONL logs. Import as from indusagi import swarm or import indusagi.swarm.

A crew coordinates entirely through files on disk — no broker, no daemon, no shared memory. Multiple processes can safely share the same crew directory because every mutation goes through the kernel's cooperative-lock + atomic-publish + schema-validate machinery. The Crew owns the policy (match ready tickets to idle members, spawn one agent per pair, fold the result back onto the board and channel, narrate every step); four thin domain services (roster, workboard, postbox, telemetry) plus an optional git-worktree isolation seam own the persisted state. Agent spawning is injectable, so a round can be driven by real LLM-backed agents in production or scripted fakes in tests with no model or network.

Table of Contents

Public exports

From indusagi/swarm/__init__.py. The coordinator surface:

Name Kind Source Purpose
create_crew async function coordinator.py Host-facing factory: stand up a Crew from CrewOptions
Crew class coordinator.py The coordinator. Verbs: add_member, post_task, run_round, status; classmethod open
CrewOptions dataclass coordinator.py Knobs: dir, base_repo, spawn_agent override, injected now clock
CrewStatus dataclass coordinator.py Point-in-time aggregate (crew_id, members, tickets, ready, counts, activity)
RoundOutcome dataclass coordinator.py Result of one run_round: completed and failed ticket tuples
TaskDraft dataclass coordinator.py Ticket draft for Crew.post_task (title, optional body, optional deps)
SpawnAgent TypeAlias coordinator.py Callable[[CrewMember], Agent] — the injectable spawner seam
default_spawn_agent function coordinator.py Default spawner: a real create_agent bound to the member's model + tool_box

The domain services and kernel primitives:

Name Kind Source Purpose
CrewManifest class roster/manifest.py Concurrency-safe roster service over JsonCell
ModelPolicy class roster/manifest.py Role→model lookup table; of, for_, model_for_role, roles
CrewMember model roster/manifest.py One teammate (id, role, optional model, optional tool_collection)
CrewRecord model roster/manifest.py Persisted crew doc (crew_id, members, created_at)
MemberDraft dataclass roster/manifest.py Inputs to enroll a member (role required)
CrewToolCollection TypeAlias roster/manifest.py Literal['read-only','coding','all']
TicketBoard class workboard/board.py Shared dependency-aware task store over JsonCell
Ticket model workboard/board.py One unit of work (lifecycle, deps, assignee, result, …)
TicketStatus TypeAlias workboard/board.py Literal['open','claimed','blocked','done','failed']
TicketDraft dataclass workboard/board.py Fields for TicketBoard.add
ticket_schema const workboard/board.py TypeAdapter[Ticket] validator
ticket_statuses const workboard/board.py Tuple of the five lifecycle states, in order
DepGraph class workboard/dep_graph.py Pure directed dependency graph; from_deps, first_cycle
Channel class postbox/channel.py Cursor-based mailbox over one shared transcript + per-reader cursors
Envelope TypeAlias postbox/codecs.py Discriminated union over type: Task / Result / Note / Control
EnvelopeType TypeAlias postbox/codecs.py Literal['task','result','note','control']
encode / decode function postbox/codecs.py Validate+serialize / parse+validate one wire line
payload_of function postbox/codecs.py Return the payload model class for an EnvelopeType
ENVELOPE_SCHEMA const postbox/codecs.py TypeAdapter[Envelope] discriminated-union validator
ENVELOPE_TYPES const postbox/codecs.py Tuple of the four message-class tags
Workspace class isolation/worktree.py Git-worktree provisioner over an injected ShellRunner
WorktreeRef / WorktreeEntry dataclass isolation/worktree.py Worktree coordinates / a parsed git worktree list entry
CreateOptions / RemoveOptions dataclass isolation/worktree.py Options for Workspace.create / Workspace.remove
ShellRunner Protocol isolation/runner.py async exec(command, cwd) -> RunResult; must not raise on non-zero exit
RunResult dataclass isolation/runner.py One shelled command's outcome (stdout, stderr, code)
local_runner const isolation/runner.py Production runner: shell-less asyncio.create_subprocess_exec
ActivityLog class telemetry/activity.py Append-only coordination transcript over JsonlLog
CrewEvent TypeAlias telemetry/activity.py Discriminated union over kind (six event kinds)
CrewEventKind / CrewEventInput TypeAlias telemetry/activity.py The six kind literals / the Mapping record() accepts
CREW_EVENT_KINDS const telemetry/activity.py Tuple of the six coordination-event kinds
JsonCell class kernel/json_cell.py Lockable, schema-validated, atomically-published JSON file
GuardOptions dataclass kernel/json_cell.py Lock-acquisition tunables (timeout_ms, stale_ms, backoff)
JsonlLog class kernel/jsonl_log.py Append-only schema-validated JSONL log; append, read_all, thread_of
TreeNode Protocol kernel/jsonl_log.py (id, parent_id) an entry must expose to use thread_of
new_id function kernel/ids.py Mint a sortable ULID-backed id, optionally prefixed
SwarmFault class kernel/faults.py Typed exception with discriminable .kind; is_(value, kind) guard
swarm_fault function kernel/faults.py Ergonomic constructor for SwarmFault(kind, message, cause)
SwarmFaultKind TypeAlias kernel/faults.py Literal['lock_timeout','validation','not_found','conflict','isolation','spawn']

Sub-directories

Directory Holds
kernel/ The frozen foundation: JsonCell, JsonlLog, new_id, SwarmFault/swarm_fault, GuardOptions, TreeNode
roster/ Team config: CrewManifest, ModelPolicy; CrewMember/CrewRecord/MemberDraft/CrewToolCollection
workboard/ Shared task store: TicketBoard, DepGraph; Ticket/TicketStatus/TicketDraft, ticket_schema, ticket_statuses
postbox/ Mailbox: Channel and codecs (table-driven Envelope union, encode/decode/payload_of)
isolation/ Git-worktree isolation: Workspace over an injectable ShellRunner; local_runner, RunResult, option/ref dataclasses
telemetry/ Observer: ActivityLog over JsonlLog and the CrewEvent discriminated union (six kinds)

The coordinator

Crew.open(opts) roots manifest.json, board.json, channel/, and activity.jsonl under opts.dir (and a Workspace when base_repo is set), then eagerly creates the crew record (tolerating a conflict on re-open). The agent spawner is injectable: omit spawn_agent to get default_spawn_agent (a live agent bound to the member's model and tool_box(collection), defaulting to read-only / mock-1), or inject a scripted fake whose submit returns a canned snapshot for deterministic tests.

run_round() is the heartbeat. It reads the board's ready() tickets and the idle members (members not holding a claimed or blocked ticket), pairs them one-to-one in board order (min(ready, idle)), and for each pair: claims the ticket, records ticket_claimed, spawns the member's agent, runs it over the ticket brief, sends a result Envelope back to the coordinator reader, records message_sent, completes the ticket, and records ticket_done. Any Exception (but not BaseException/CancelledError) instead fails the ticket and records ticket_failed, so one bad teammate cannot wedge the round.

import asyncio
from indusagi.swarm import create_crew, CrewOptions, MemberDraft, TaskDraft
from indusagi.swarm import CrewMember


async def main() -> None:
    # In real use omit spawn_agent to get default_spawn_agent (a live agent).
    # Here we inject a fake whose .submit/.snapshot satisfy the coordinator:
    # the coordinator only reads the last assistant turn's text blocks.
    class FakeSnapshot:
        def __init__(self, text: str) -> None:
            block = type("Block", (), {"kind": "text", "text": text})()
            turn = type("Turn", (), {"role": "assistant", "blocks": [block]})()
            self.messages = [turn]

    class FakeAgent:
        def __init__(self, role: str) -> None:
            self.role = role

        async def submit(self, brief: str) -> FakeSnapshot:
            return FakeSnapshot(f"completed by {self.role}")

        def snapshot(self): ...

    def spawn(member: CrewMember) -> FakeAgent:
        return FakeAgent(member.role)

    crew = await create_crew(CrewOptions(dir="/tmp/crew1", spawn_agent=spawn))
    await crew.add_member(MemberDraft(role="coder", tool_collection="coding"))
    await crew.add_member(MemberDraft(role="reviewer", tool_collection="read-only"))

    t1 = await crew.post_task(TaskDraft(title="implement feature"))
    t2 = await crew.post_task(TaskDraft(title="review feature", deps=[t1.id]))

    outcome = await crew.run_round()        # only t1 is ready (t2 deps on t1)
    print([t.id for t in outcome.completed])

    status = await crew.status()
    print(status.counts.done, status.counts.open)
    print([e.kind for e in status.activity])


asyncio.run(main())

The workboard

TicketBoard is a dependency-aware task store: one JSON file mediated by JsonCell, so every write is a single guarded read-modify-write. The lifecycle is open → claimed → done | failed plus a blocked side-state. deps form a directed graph; add() rebuilds a DepGraph including the newcomer and rejects the insert with a conflict fault if first_cycle() finds a loop, and rejects a dangling dep with not_found — so neither a cycle nor a phantom dependency ever reaches disk. ready() returns the open tickets whose every dep id names a ticket that reached done; a failed dep never satisfies a dependent.

import asyncio
from indusagi.swarm import TicketBoard, TicketDraft, SwarmFault


async def main() -> None:
    board = await TicketBoard.open("/tmp/board.json")
    a = await board.add(TicketDraft(title="build"))
    b = await board.add(TicketDraft(title="test", deps=[a.id]))

    print([t.id for t in await board.ready()])   # [a] only: b waits on a
    await board.claim(a.id, member="mbr_1")       # atomic: two racers can't both win
    await board.complete(a.id, result="built ok")
    print([t.id for t in await board.ready()])   # now [b]

    # A dangling dep raises not_found; a cycle raises conflict.
    try:
        await board.add(TicketDraft(title="bad", deps=["tkt_missing"]))
    except SwarmFault as e:
        print(e.kind)   # 'not_found'


asyncio.run(main())

The postbox

A Channel is one shared JsonlLog transcript (no file-per-recipient fan-out) plus a private JsonCell cursor per reader. send is one validated append. poll(reader) reads everything past the reader's cursor, returns the slice addressed to that reader, and advances the cursor to the end of what it scanned — so messages addressed to other readers are skipped permanently for this reader, keeping polls O(new lines). peek(reader) does the same without advancing. Delivery is a pull, not a push.

Every message is an Envelope — a discriminated union over type (task / result / note / control) with a type-specific payload. encode/decode dispatch through ENVELOPE_SCHEMA (a pydantic discriminated union); the codec table's exhaustiveness over ENVELOPE_TYPES is proven at import time, so a drifted branch raises AssertionError on import.

import asyncio
from indusagi.swarm import Channel, new_id
from indusagi.swarm.postbox.codecs import NoteEnvelope, NotePayload


async def main() -> None:
    ch = await Channel.open("/tmp/chan")
    env = NoteEnvelope(
        id=new_id("env"), from_="coder", to="reviewer",
        ts=0, type="note", payload=NotePayload(text="PR is up"),
    )
    await ch.send(env)

    peeked = await ch.peek("reviewer")    # look without consuming
    print(len(peeked))                    # 1
    polled = await ch.poll("reviewer")    # consume + advance cursor
    print(polled[0].payload.text)         # 'PR is up'
    print(len(await ch.poll("reviewer"))) # 0 — cursor advanced past it


asyncio.run(main())

Note that from_ carries the wire alias from (a Python keyword), so on disk the field is serialized as "from".

The kernel

The kernel is the frozen foundation every service is a thin wrapper over. JsonCell owns one JSON file plus a pydantic TypeAdapter: read() is lock-free (atomic publish guarantees a reader sees a whole previous-or-next value, never torn), and mutate(fn) is the one transactional primitive — acquire an exclusive guard, read+validate, apply the pure transform fn, re-validate, publish atomically (temp file → os.replace), release in a finally. The mutex is an atomic mkdir of a <file>.lockdir directory, with jittered exponential backoff to a deadline and stale-lock reclamation. Because every domain mutator hands a pure transform to cell.mutate, cross-process edits to the same file serialize with zero lost updates.

JsonlLog is append-only NDJSON: append() validates one line and writes it, read_all() validates every line, and thread_of() walks parent_id pointers root→leaf with cycle detection. new_id() mints time-sortable ULIDs. SwarmFault is the single typed error every kernel op raises, branchable by .kind.

import asyncio
from pydantic import TypeAdapter, BaseModel
from indusagi.swarm import JsonCell, new_id


class Counter(BaseModel):
    n: int = 0


async def main() -> None:
    cell = await JsonCell.open("/tmp/counter.json", TypeAdapter(Counter), Counter())
    after = await cell.mutate(lambda c: Counter(n=c.n + 1))  # guarded RMW
    print(after.n)                 # 1
    print((await cell.read()).n)   # 1, lock-free read
    print(new_id("tkt"))           # e.g. 'tkt_01J...'


asyncio.run(main())

Roster, telemetry, and isolation

CrewManifest persists a CrewRecord (born unborn with an empty crew_id until create() runs); add_member enrolls a CrewMember from a MemberDraft. ModelPolicy is a role→model lookup table built with ModelPolicy.of(table, fallback).

ActivityLog records a CrewEvent discriminated union over kind (member_spawned, ticket_posted, ticket_claimed, ticket_done, ticket_failed, message_sent); record(event) auto-stamps id/ts when absent, and timeline() returns the transcript oldest-first.

Workspace provisions git worktrees over an injectable ShellRunner. The production local_runner splits the command string on whitespace and spawns with no shell (asyncio.create_subprocess_exec, never /bin/sh -c), removing an injection surface; any non-zero exit or spawn failure maps to an isolation SwarmFault.

Key concepts

Term Definition
JsonCell.mutate (guarded RMW) The one transactional primitive: take the mkdir-lockdir guard, read+validate, apply a pure transform, re-validate, atomically publish via temp-file + os.replace, release in finally. Cross-process edits serialize with zero lost updates.
Unborn crew sentinel A manifest cell that has never been written reads back as CrewRecord(crew_id="", members=[], created_at=0). create() replaces it (and raises conflict if already born); other mutators refuse until create() has run, so a typo'd path can't grow a phantom crew.
Readiness from the dep graph ready() returns open tickets whose every dep reached done. A failed dep never satisfies a dependent; add() rejects a cyclic or dangling dep before it reaches disk.
Cursor-based delivery One shared transcript, no file-per-recipient; each reader keeps a private JsonCell offset. poll returns new-since-cursor envelopes addressed to the reader and advances the cursor past everything it scanned; peek does the same without advancing.
Injectable agent spawner SpawnAgent = Callable[[CrewMember], Agent]. The default builds a real create_agent; tests inject a fake whose submit returns a canned snapshot, so a whole round runs with no model or network.
Typed faults over kind Every failure is a SwarmFault carrying one of six kinds (lock_timeout, validation, not_found, conflict, isolation, spawn). Callers branch on SwarmFault.is_(err, "conflict") instead of matching message text; the trigger is preserved on .cause.

Relationship to neighbors

Swarm is a self-contained subsystem with only two upward dependencies, both in the coordinator: it imports the Runtime (Agent, AgentConfig, RunSnapshot, create_agent) to spawn teammate agents, and the Capabilities layer (ToolCollection, tool_box) to bind each member's tool surface. The coordinator only touches an Agent's submit/snapshot surface, so a fake satisfies it. The domain services and kernel have no upward dependencies (pure pydantic + stdlib + the ulid package), making the kernel and each service independently testable. Internally everything funnels through swarm/kernel. The root indusagi package re-exports the subpackage as swarm. See the Architecture overview for where the layer sits.

Parity notes

This package is an explicit Python port; a handful of choices are load-bearing and worth knowing about:

  • os.replace, not os.rename for atomic publish — os.rename fails over an existing file on Windows.
  • mkdir-lockdir, not O_EXCL, with stale reclamation after stale_ms (default 30s) — a crashed holder is forcibly reclaimed, so a wedged process won't deadlock the crew forever.
  • DepGraph uses dict[str, None] as an insertion-ordered set because Python's set iteration order is non-deterministic, keeping first_cycle() and neighbour ordering stable.
  • Persisted models use strict=True (no string→number coercion), strip extra keys, and drop None-valued fields on serialize, so optional keys are simply absent on disk. Wire names stay camelCase via aliases (crewId, createdAt, ticketId, memberId, envelopeId, envelopeType, and from aliased to from_).
  • Codec exhaustiveness is proven at import time by _assert_codec_table_exhaustive() — importing codecs.py raises if a branch, the table, and ENVELOPE_TYPES drift apart.
  • CrewRecord.crew_id is deliberately not min-length-1: the empty string is the unborn sentinel; born-vs-unborn is enforced by create(), not the schema.
  • run_round only folds Exception into a failed ticketasyncio.CancelledError is a BaseException and propagates, so cancellation is never silently swallowed.

For a full account of intentional divergences across the framework, see the Parity reference. No part of the swarm package is a stub — all modules are fully implemented and exercised by tests; see the Testing guide.