everything-is-an-actor
Asyncio-native Actor framework for Python agent systems.
Inspired by Erlang/Akka. Built for AI agent orchestration.
Why everything-is-an-actor?
Multi-agent AI systems have a concurrency problem. When a lead agent delegates to multiple workers, you need:
- Structured concurrency — workers run in parallel without manual thread management
- Fault isolation — a failed worker shouldn't crash the orchestrator
- Task lifecycle — every unit of work has a status: pending → running → completed / failed
- Event streaming — consumers subscribe to what agents produce, in real time
The actor model solves all of these. everything-is-an-actor brings it to Python asyncio with five layers:
| Layer | What it provides |
|---|---|
Core (everything_is_an_actor.core) |
Generic actor primitives: mailbox, supervision, middleware |
Agents (everything_is_an_actor.agents) |
AI-specific abstractions: Task, AgentActor, streaming events |
Flow (everything_is_an_actor.flow) |
Composable orchestration ADT: categorical combinators, serialization, visualization |
MOA (everything_is_an_actor.moa) |
Mixture-of-Agents pattern: parallel proposers → quorum → aggregation |
Integrations (everything_is_an_actor.integrations) |
External framework adapters (LangChain) |
The project also introduces an original Flow model for agent orchestration: a typed Flow[I, O] semantic core that can be represented equivalently in Python, YAML, and JSON, while graph remains a derived view for visualization and execution inspection. See the Flow API guide for usage and the Flow DSL vs Graph analysis for the semantic rationale.
Install
pip install everything-is-an-actor
# With Redis mailbox support
pip install everything-is-an-actor[redis]
30-second example
import asyncio
from everything_is_an_actor import ActorSystem
from everything_is_an_actor.agents import AgentSystem, AgentActor, Task
class ResearchAgent(AgentActor[str, str]):
async def execute(self, input: str) -> str:
await self.emit_progress("searching...")
r = await self.context.ask(SummaryAgent, Task(input=input))
return r.output
async def main():
system = AgentSystem(ActorSystem("app"))
# Stream every event from the entire agent tree
async for event in system.run(ResearchAgent, "actor model"):
print(event.type, event.agent_path, event.data)
asyncio.run(main())
Key features
Actor core
tell(fire-and-forget) +ask(request-reply) messagingMemoryMailbox/FastMailbox— configurable message queue backendRedisMailbox— persistent, survives process restartsOneForOneStrategy/AllForOneStrategysupervision- Middleware pipeline for all lifecycle events
- Virtual actors —
VirtualActorRegistryactivates on first message and deactivates on idle; supportsask/tell/ask_stream, manualdeactivate, and pluggableRegistryStore - Stop Policy ADT — declarative lifecycle:
StopMode.NEVER/StopMode.ONE_TIME/AfterMessage(msg)/AfterIdle(seconds) - Path-based lookup — address actors by path:
system.get_actor("/app/workers/collector") - Free Monad API — composable workflows:
ref.free_ask()/ref.free_tell()/ref.free_stop()
Agent layer
Task/TaskResult/TaskEvent— first-class task lifecycleAgentActor— implementexecute(), noton_receive()emit_progress()— status/progress updates during execution- Streaming
execute()—yieldtokens/chunks as an async generator; emitstask_chunkevents - Progressive API: plain classes → full actor control (5 levels)
Orchestration
ask(AgentCls, message)— spawn ephemeral child, send once, await resultsequence([(A, msg), (B, msg)])— fan-out with fail-fast sibling cancellation; results in ordertraverse(inputs, AgentCls)— map a list through one agent concurrentlyrace([(A, msg), (B, msg)])— first-wins, cancel the restzip((A, msg), (B, msg))— two tasks, typed pairstream(AgentCls, message)— streaming counterpart; forward child chunks upstream
Flow API — composable agent orchestration
Flow[I, O]ADT — syntax tree for workflows, data not execution- Categorical combinators:
map,flat_map,zip,race,branch,recover,fallback_to,divert_to,loop at_least(k, *flows)— quorum parallelism ("N-way, at least K succeed")to_dict/from_dict— Flow serialization for persistence and transferto_mermaid— automatic Mermaid diagram generationFlowSystem/AgentSystem.run_flow()— interpreter execution
MOA — Mixture-of-Agents pattern
moa_layer(proposers, aggregator, min_success)— single MOA layermoa_tree([layers])— multi-layer pipeline with directive passingMoASystem— high-level entry point, zero boilerplateLayerOutput— inter-layer directive communication
Event streaming
AgentSystem.run(AgentCls, input)— spawn root agent, stream allTaskEvents from the entire actor treeref.ask_stream(Task(...))— stream events from an existing ref;StreamItemADT (StreamEvent | StreamResult) formatch/caseTaskEvent.parent_task_id+parent_agent_path— OpenTelemetry-style span linking; reconstruct full call tree from flat event stream
Benchmarks
Apple M-series, Python 3.12, asyncio:
Actor core
| Metric | Value |
|---|---|
tell throughput |
945K msg/s |
ask throughput |
29K msg/s |
ask latency p50 |
32 µs |
ask latency p99 |
46 µs |
| 1000 actors × 100 msgs | 879K msg/s, 0 loss |
| Spawn 5000 actors | 27 ms |
Agent layer
| Metric | Value |
|---|---|
AgentActor ask throughput |
27K tasks/s |
AgentActor ask latency p50 |
36 µs |
sequence(50) fan-out |
32K child tasks/s |
ask_stream chunk throughput |
227K chunks/s |
AgentSystem.run() latency p50 |
0.2 ms |