Swarm
indusagi.swarmis the multi-agent crew layer: aCrewcoordinator drives teammate agents over a shared ticket board, a cursor-based mailbox, and an activity log, all built on a tiny frozen kernel of lockable JSON files and append-only JSONL logs. Import asfrom indusagi import swarmorimport indusagi.swarm.
A crew coordinates entirely through files on disk — no broker, no daemon, no
shared memory. Multiple processes can safely share the same crew directory
because every mutation goes through the kernel's cooperative-lock +
atomic-publish + schema-validate machinery. The Crew owns the policy (match
ready tickets to idle members, spawn one agent per pair, fold the result back
onto the board and channel, narrate every step); four thin domain services
(roster, workboard, postbox, telemetry) plus an optional git-worktree isolation
seam own the persisted state. Agent spawning is injectable, so a round can be
driven by real LLM-backed agents in production or scripted fakes in tests with
no model or network.
Table of Contents
- Public exports
- Sub-directories
- The coordinator
- The workboard
- The postbox
- The kernel
- Roster, telemetry, and isolation
- Key concepts
- Relationship to neighbors
- Parity notes
Public exports
From indusagi/swarm/__init__.py. The coordinator surface:
| Name | Kind | Source | Purpose |
|---|---|---|---|
create_crew |
async function | coordinator.py |
Host-facing factory: stand up a Crew from CrewOptions |
Crew |
class | coordinator.py |
The coordinator. Verbs: add_member, post_task, run_round, status; classmethod open |
CrewOptions |
dataclass | coordinator.py |
Knobs: dir, base_repo, spawn_agent override, injected now clock |
CrewStatus |
dataclass | coordinator.py |
Point-in-time aggregate (crew_id, members, tickets, ready, counts, activity) |
RoundOutcome |
dataclass | coordinator.py |
Result of one run_round: completed and failed ticket tuples |
TaskDraft |
dataclass | coordinator.py |
Ticket draft for Crew.post_task (title, optional body, optional deps) |
SpawnAgent |
TypeAlias | coordinator.py |
Callable[[CrewMember], Agent] — the injectable spawner seam |
default_spawn_agent |
function | coordinator.py |
Default spawner: a real create_agent bound to the member's model + tool_box |
The domain services and kernel primitives:
| Name | Kind | Source | Purpose |
|---|---|---|---|
CrewManifest |
class | roster/manifest.py |
Concurrency-safe roster service over JsonCell |
ModelPolicy |
class | roster/manifest.py |
Role→model lookup table; of, for_, model_for_role, roles |
CrewMember |
model | roster/manifest.py |
One teammate (id, role, optional model, optional tool_collection) |
CrewRecord |
model | roster/manifest.py |
Persisted crew doc (crew_id, members, created_at) |
MemberDraft |
dataclass | roster/manifest.py |
Inputs to enroll a member (role required) |
CrewToolCollection |
TypeAlias | roster/manifest.py |
Literal['read-only','coding','all'] |
TicketBoard |
class | workboard/board.py |
Shared dependency-aware task store over JsonCell |
Ticket |
model | workboard/board.py |
One unit of work (lifecycle, deps, assignee, result, …) |
TicketStatus |
TypeAlias | workboard/board.py |
Literal['open','claimed','blocked','done','failed'] |
TicketDraft |
dataclass | workboard/board.py |
Fields for TicketBoard.add |
ticket_schema |
const | workboard/board.py |
TypeAdapter[Ticket] validator |
ticket_statuses |
const | workboard/board.py |
Tuple of the five lifecycle states, in order |
DepGraph |
class | workboard/dep_graph.py |
Pure directed dependency graph; from_deps, first_cycle |
Channel |
class | postbox/channel.py |
Cursor-based mailbox over one shared transcript + per-reader cursors |
Envelope |
TypeAlias | postbox/codecs.py |
Discriminated union over type: Task / Result / Note / Control |
EnvelopeType |
TypeAlias | postbox/codecs.py |
Literal['task','result','note','control'] |
encode / decode |
function | postbox/codecs.py |
Validate+serialize / parse+validate one wire line |
payload_of |
function | postbox/codecs.py |
Return the payload model class for an EnvelopeType |
ENVELOPE_SCHEMA |
const | postbox/codecs.py |
TypeAdapter[Envelope] discriminated-union validator |
ENVELOPE_TYPES |
const | postbox/codecs.py |
Tuple of the four message-class tags |
Workspace |
class | isolation/worktree.py |
Git-worktree provisioner over an injected ShellRunner |
WorktreeRef / WorktreeEntry |
dataclass | isolation/worktree.py |
Worktree coordinates / a parsed git worktree list entry |
CreateOptions / RemoveOptions |
dataclass | isolation/worktree.py |
Options for Workspace.create / Workspace.remove |
ShellRunner |
Protocol | isolation/runner.py |
async exec(command, cwd) -> RunResult; must not raise on non-zero exit |
RunResult |
dataclass | isolation/runner.py |
One shelled command's outcome (stdout, stderr, code) |
local_runner |
const | isolation/runner.py |
Production runner: shell-less asyncio.create_subprocess_exec |
ActivityLog |
class | telemetry/activity.py |
Append-only coordination transcript over JsonlLog |
CrewEvent |
TypeAlias | telemetry/activity.py |
Discriminated union over kind (six event kinds) |
CrewEventKind / CrewEventInput |
TypeAlias | telemetry/activity.py |
The six kind literals / the Mapping record() accepts |
CREW_EVENT_KINDS |
const | telemetry/activity.py |
Tuple of the six coordination-event kinds |
JsonCell |
class | kernel/json_cell.py |
Lockable, schema-validated, atomically-published JSON file |
GuardOptions |
dataclass | kernel/json_cell.py |
Lock-acquisition tunables (timeout_ms, stale_ms, backoff) |
JsonlLog |
class | kernel/jsonl_log.py |
Append-only schema-validated JSONL log; append, read_all, thread_of |
TreeNode |
Protocol | kernel/jsonl_log.py |
(id, parent_id) an entry must expose to use thread_of |
new_id |
function | kernel/ids.py |
Mint a sortable ULID-backed id, optionally prefixed |
SwarmFault |
class | kernel/faults.py |
Typed exception with discriminable .kind; is_(value, kind) guard |
swarm_fault |
function | kernel/faults.py |
Ergonomic constructor for SwarmFault(kind, message, cause) |
SwarmFaultKind |
TypeAlias | kernel/faults.py |
Literal['lock_timeout','validation','not_found','conflict','isolation','spawn'] |
Sub-directories
| Directory | Holds |
|---|---|
kernel/ |
The frozen foundation: JsonCell, JsonlLog, new_id, SwarmFault/swarm_fault, GuardOptions, TreeNode |
roster/ |
Team config: CrewManifest, ModelPolicy; CrewMember/CrewRecord/MemberDraft/CrewToolCollection |
workboard/ |
Shared task store: TicketBoard, DepGraph; Ticket/TicketStatus/TicketDraft, ticket_schema, ticket_statuses |
postbox/ |
Mailbox: Channel and codecs (table-driven Envelope union, encode/decode/payload_of) |
isolation/ |
Git-worktree isolation: Workspace over an injectable ShellRunner; local_runner, RunResult, option/ref dataclasses |
telemetry/ |
Observer: ActivityLog over JsonlLog and the CrewEvent discriminated union (six kinds) |
The coordinator
Crew.open(opts) roots manifest.json, board.json, channel/, and
activity.jsonl under opts.dir (and a Workspace when base_repo is set),
then eagerly creates the crew record (tolerating a conflict on re-open). The
agent spawner is injectable: omit spawn_agent to get default_spawn_agent
(a live agent bound to the member's model and tool_box(collection), defaulting
to read-only / mock-1), or inject a scripted fake whose submit returns a
canned snapshot for deterministic tests.
run_round() is the heartbeat. It reads the board's ready() tickets and the
idle members (members not holding a claimed or blocked ticket), pairs them
one-to-one in board order (min(ready, idle)), and for each pair: claims the
ticket, records ticket_claimed, spawns the member's agent, runs it over the
ticket brief, sends a result Envelope back to the coordinator reader,
records message_sent, completes the ticket, and records ticket_done. Any
Exception (but not BaseException/CancelledError) instead fails the ticket
and records ticket_failed, so one bad teammate cannot wedge the round.
import asyncio
from indusagi.swarm import create_crew, CrewOptions, MemberDraft, TaskDraft
from indusagi.swarm import CrewMember
async def main() -> None:
# In real use omit spawn_agent to get default_spawn_agent (a live agent).
# Here we inject a fake whose .submit/.snapshot satisfy the coordinator:
# the coordinator only reads the last assistant turn's text blocks.
class FakeSnapshot:
def __init__(self, text: str) -> None:
block = type("Block", (), {"kind": "text", "text": text})()
turn = type("Turn", (), {"role": "assistant", "blocks": [block]})()
self.messages = [turn]
class FakeAgent:
def __init__(self, role: str) -> None:
self.role = role
async def submit(self, brief: str) -> FakeSnapshot:
return FakeSnapshot(f"completed by {self.role}")
def snapshot(self): ...
def spawn(member: CrewMember) -> FakeAgent:
return FakeAgent(member.role)
crew = await create_crew(CrewOptions(dir="/tmp/crew1", spawn_agent=spawn))
await crew.add_member(MemberDraft(role="coder", tool_collection="coding"))
await crew.add_member(MemberDraft(role="reviewer", tool_collection="read-only"))
t1 = await crew.post_task(TaskDraft(title="implement feature"))
t2 = await crew.post_task(TaskDraft(title="review feature", deps=[t1.id]))
outcome = await crew.run_round() # only t1 is ready (t2 deps on t1)
print([t.id for t in outcome.completed])
status = await crew.status()
print(status.counts.done, status.counts.open)
print([e.kind for e in status.activity])
asyncio.run(main())
The workboard
TicketBoard is a dependency-aware task store: one JSON file mediated by
JsonCell, so every write is a single guarded read-modify-write. The lifecycle
is open → claimed → done | failed plus a blocked side-state. deps form a
directed graph; add() rebuilds a DepGraph including the newcomer and rejects
the insert with a conflict fault if first_cycle() finds a loop, and rejects
a dangling dep with not_found — so neither a cycle nor a phantom dependency
ever reaches disk. ready() returns the open tickets whose every dep id names
a ticket that reached done; a failed dep never satisfies a dependent.
import asyncio
from indusagi.swarm import TicketBoard, TicketDraft, SwarmFault
async def main() -> None:
board = await TicketBoard.open("/tmp/board.json")
a = await board.add(TicketDraft(title="build"))
b = await board.add(TicketDraft(title="test", deps=[a.id]))
print([t.id for t in await board.ready()]) # [a] only: b waits on a
await board.claim(a.id, member="mbr_1") # atomic: two racers can't both win
await board.complete(a.id, result="built ok")
print([t.id for t in await board.ready()]) # now [b]
# A dangling dep raises not_found; a cycle raises conflict.
try:
await board.add(TicketDraft(title="bad", deps=["tkt_missing"]))
except SwarmFault as e:
print(e.kind) # 'not_found'
asyncio.run(main())
The postbox
A Channel is one shared JsonlLog transcript (no file-per-recipient
fan-out) plus a private JsonCell cursor per reader. send is one validated
append. poll(reader) reads everything past the reader's cursor, returns the
slice addressed to that reader, and advances the cursor to the end of what it
scanned — so messages addressed to other readers are skipped permanently for
this reader, keeping polls O(new lines). peek(reader) does the same without
advancing. Delivery is a pull, not a push.
Every message is an Envelope — a discriminated union over type
(task / result / note / control) with a type-specific payload.
encode/decode dispatch through ENVELOPE_SCHEMA (a pydantic discriminated
union); the codec table's exhaustiveness over ENVELOPE_TYPES is proven at
import time, so a drifted branch raises AssertionError on import.
import asyncio
from indusagi.swarm import Channel, new_id
from indusagi.swarm.postbox.codecs import NoteEnvelope, NotePayload
async def main() -> None:
ch = await Channel.open("/tmp/chan")
env = NoteEnvelope(
id=new_id("env"), from_="coder", to="reviewer",
ts=0, type="note", payload=NotePayload(text="PR is up"),
)
await ch.send(env)
peeked = await ch.peek("reviewer") # look without consuming
print(len(peeked)) # 1
polled = await ch.poll("reviewer") # consume + advance cursor
print(polled[0].payload.text) # 'PR is up'
print(len(await ch.poll("reviewer"))) # 0 — cursor advanced past it
asyncio.run(main())
Note that from_ carries the wire alias from (a Python keyword), so on disk
the field is serialized as "from".
The kernel
The kernel is the frozen foundation every service is a thin wrapper over.
JsonCell owns one JSON file plus a pydantic TypeAdapter: read() is
lock-free (atomic publish guarantees a reader sees a whole previous-or-next
value, never torn), and mutate(fn) is the one transactional primitive —
acquire an exclusive guard, read+validate, apply the pure transform fn,
re-validate, publish atomically (temp file → os.replace), release in a
finally. The mutex is an atomic mkdir of a <file>.lockdir directory, with
jittered exponential backoff to a deadline and stale-lock reclamation. Because
every domain mutator hands a pure transform to cell.mutate, cross-process
edits to the same file serialize with zero lost updates.
JsonlLog is append-only NDJSON: append() validates one line and writes it,
read_all() validates every line, and thread_of() walks parent_id
pointers root→leaf with cycle detection. new_id() mints time-sortable ULIDs.
SwarmFault is the single typed error every kernel op raises, branchable by
.kind.
import asyncio
from pydantic import TypeAdapter, BaseModel
from indusagi.swarm import JsonCell, new_id
class Counter(BaseModel):
n: int = 0
async def main() -> None:
cell = await JsonCell.open("/tmp/counter.json", TypeAdapter(Counter), Counter())
after = await cell.mutate(lambda c: Counter(n=c.n + 1)) # guarded RMW
print(after.n) # 1
print((await cell.read()).n) # 1, lock-free read
print(new_id("tkt")) # e.g. 'tkt_01J...'
asyncio.run(main())
Roster, telemetry, and isolation
CrewManifest persists a CrewRecord (born unborn with an empty crew_id
until create() runs); add_member enrolls a CrewMember from a MemberDraft.
ModelPolicy is a role→model lookup table built with ModelPolicy.of(table, fallback).
ActivityLog records a CrewEvent discriminated union over kind
(member_spawned, ticket_posted, ticket_claimed, ticket_done,
ticket_failed, message_sent); record(event) auto-stamps id/ts when
absent, and timeline() returns the transcript oldest-first.
Workspace provisions git worktrees over an injectable ShellRunner. The
production local_runner splits the command string on whitespace and spawns
with no shell (asyncio.create_subprocess_exec, never /bin/sh -c),
removing an injection surface; any non-zero exit or spawn failure maps to an
isolation SwarmFault.
Key concepts
| Term | Definition |
|---|---|
JsonCell.mutate (guarded RMW) |
The one transactional primitive: take the mkdir-lockdir guard, read+validate, apply a pure transform, re-validate, atomically publish via temp-file + os.replace, release in finally. Cross-process edits serialize with zero lost updates. |
| Unborn crew sentinel | A manifest cell that has never been written reads back as CrewRecord(crew_id="", members=[], created_at=0). create() replaces it (and raises conflict if already born); other mutators refuse until create() has run, so a typo'd path can't grow a phantom crew. |
| Readiness from the dep graph | ready() returns open tickets whose every dep reached done. A failed dep never satisfies a dependent; add() rejects a cyclic or dangling dep before it reaches disk. |
| Cursor-based delivery | One shared transcript, no file-per-recipient; each reader keeps a private JsonCell offset. poll returns new-since-cursor envelopes addressed to the reader and advances the cursor past everything it scanned; peek does the same without advancing. |
| Injectable agent spawner | SpawnAgent = Callable[[CrewMember], Agent]. The default builds a real create_agent; tests inject a fake whose submit returns a canned snapshot, so a whole round runs with no model or network. |
| Typed faults over kind | Every failure is a SwarmFault carrying one of six kinds (lock_timeout, validation, not_found, conflict, isolation, spawn). Callers branch on SwarmFault.is_(err, "conflict") instead of matching message text; the trigger is preserved on .cause. |
Relationship to neighbors
Swarm is a self-contained subsystem with only two upward dependencies, both in
the coordinator: it imports the Runtime (Agent,
AgentConfig, RunSnapshot, create_agent) to spawn teammate agents, and the
Capabilities layer (ToolCollection,
tool_box) to bind each member's tool surface. The coordinator only touches an
Agent's submit/snapshot surface, so a fake satisfies it. The domain
services and kernel have no upward dependencies (pure pydantic + stdlib +
the ulid package), making the kernel and each service independently testable.
Internally everything funnels through swarm/kernel. The root indusagi
package re-exports the subpackage as swarm. See the
Architecture overview for where the layer sits.
Parity notes
This package is an explicit Python port; a handful of choices are load-bearing and worth knowing about:
os.replace, notos.renamefor atomic publish —os.renamefails over an existing file on Windows.- mkdir-lockdir, not
O_EXCL, with stale reclamation afterstale_ms(default 30s) — a crashed holder is forcibly reclaimed, so a wedged process won't deadlock the crew forever. DepGraphusesdict[str, None]as an insertion-ordered set because Python'ssetiteration order is non-deterministic, keepingfirst_cycle()and neighbour ordering stable.- Persisted models use
strict=True(no string→number coercion), strip extra keys, and dropNone-valued fields on serialize, so optional keys are simply absent on disk. Wire names stay camelCase via aliases (crewId,createdAt,ticketId,memberId,envelopeId,envelopeType, andfromaliased tofrom_). - Codec exhaustiveness is proven at import time by
_assert_codec_table_exhaustive()— importingcodecs.pyraises if a branch, the table, andENVELOPE_TYPESdrift apart. CrewRecord.crew_idis deliberately not min-length-1: the empty string is the unborn sentinel; born-vs-unborn is enforced bycreate(), not the schema.run_roundonly foldsExceptioninto a failed ticket —asyncio.CancelledErroris aBaseExceptionand propagates, so cancellation is never silently swallowed.
For a full account of intentional divergences across the framework, see the Parity reference. No part of the swarm package is a stub — all modules are fully implemented and exercised by tests; see the Testing guide.
