Design
Architecture overview and key design decisions for everything-is-an-actor.
Architecture
The framework has five layers with a strict downward dependency direction:
graph TB
int["<b>Integrations</b><br/>LangChain adapter"]
moa["<b>MOA</b><br/>moa_layer · moa_tree · MoASystem · LayerOutput"]
flow["<b>Flow ADT</b><br/>Flow · combinators · interpreter · serialize · viz"]
agent["<b>Agent layer</b><br/>Task · AgentActor · AgentSystem · streaming"]
core["<b>Core layer</b><br/>Actor · ActorRef · ActorSystem · Mailbox"]
int --> moa --> flow --> agent --> core
style int fill:#b8c9d9,stroke:#7a9bb5,color:#2c3e50
style moa fill:#c4b8d9,stroke:#9b7ab5,color:#2c3e50
style flow fill:#d9c4b8,stroke:#b59b7a,color:#2c3e50
style agent fill:#b8d9c4,stroke:#7ab59b,color:#2c3e50
style core fill:#d9d4b8,stroke:#b5b07a,color:#2c3e50
Dependency direction: integrations/ → flow/ → agents/ → core/ (and moa/ → flow/ → agents/ → core/).
- Core — general-purpose actor runtime, usable independently
- Agents — AI-specific abstractions (Task protocol, streaming events) built on core
- Flow — composable orchestration ADT with categorical combinators, interpreted over agents
- MOA — Mixture-of-Agents pattern library built on Flow primitives
- Integrations — adapters for external frameworks (LangChain)
Core layer
Actor cell
Each actor lives in a _ActorCell that the user never sees directly. The cell owns:
- mailbox —
asyncio.Queue(or pluggable backend) - task — a single
asyncio.Taskrunning the actor loop - children —
dict[name, _ActorCell] - ref — the public
ActorRefhandle
The actor loop processes one message at a time. There is no shared state between actors — all communication goes through the mailbox.
Message passing
Two primitives:
tell |
ask |
|
|---|---|---|
| Blocks caller | No | Yes |
| Returns | None | Actor's return value |
| Mechanism | Enqueue envelope | Correlation ID + Future in ReplyRegistry |
ask does not put a Future in the mailbox. Instead it registers a correlation ID in ReplyRegistry (on the system), enqueues the message, and awaits the future. When the actor replies, the cell resolves the future directly. This makes ask work across any mailbox backend including Redis.
Supervision tree
When a child actor raises, its parent's supervisor_strategy() is called with the exception. The default is OneForOneStrategy (restart only the failing child, up to 3 times per 60 s).
graph TB
system((system))
parent((parent))
other((other))
childA((child_A))
childB((child_B))
system --- parent
system --- other
parent --- childA
parent --- childB
style childB fill:#d9a3a3,stroke:#b57a7a,color:#2c3e50
style system fill:#b8c9d9,stroke:#7a9bb5,color:#2c3e50
style parent fill:#b8d9c4,stroke:#7ab59b,color:#2c3e50
style other fill:#b8d9c4,stroke:#7ab59b,color:#2c3e50
style childA fill:#b8d9c4,stroke:#7ab59b,color:#2c3e50
With OneForOneStrategy: only child_B is restarted. With AllForOneStrategy: both child_A and child_B are restarted.
Directives: restart → resume → stop → escalate (propagates to grandparent).
ActorRef
ActorRef is an immutable handle. It holds a reference to _ActorCell but exposes no cell internals to users. Equality is identity (is), not value. This matches the Erlang PID model.
join() waits for the actor task to complete using asyncio.shield — if the caller is cancelled, the wait is abandoned but the actor continues its own shutdown. Only CancelledError is suppressed; other task failures propagate to make teardown problems visible.
Stop Policy ADT
Actors are persistent by default (StopMode.NEVER). The stop_policy() method returns a policy that controls automatic lifecycle management:
| Policy | Behavior |
|---|---|
StopMode.NEVER |
Never auto-stop (default) |
StopMode.ONE_TIME |
Stop after processing one message |
AfterMessage(msg) |
Stop after receiving the specific message |
AfterIdle(seconds) |
Stop after being idle for N seconds |
The policy is checked after each message is processed. tell() (spawning temporary actors) requires a non-NEVER policy to prevent actor leaks.
Virtual Actor Registry
Virtual actors exist conceptually forever but only consume resources when active. The VirtualActorRegistry manages their lifecycle.
Two kinds of actor coexist:
| Type | Lifecycle | Recovery on restart | Use case |
|---|---|---|---|
| Declarative | Code-defined, lives with the process | Code recreates on startup | Infrastructure (Gateway, Router, RateLimiter) |
| Virtual | On-demand activation, idle deactivation | No recovery — messages trigger reactivation | Business entities (ChatSession, AgentTask) |
system = ActorSystem("app")
# Declarative: always running
gateway = await system.spawn(Gateway, "gateway")
# Virtual: only activated when needed
registry = VirtualActorRegistry(system)
reply = await registry.ask(ChatAgent, "session_123", "hello")
Key design decisions (Orleans model):
- Flat structure: virtual actors have no parent-child relationships. They address each other by ID through the registry, not through
context.spawn(). - No dynamic topology recovery: process restart does not rebuild previous actor trees. Virtual actors reactivate on demand; declarative actors are recreated by code.
- State is the business's responsibility: the runtime guarantees
on_started(activation) andon_stopped(deactivation) are called. What gets loaded/saved in those hooks is up to the business. - Supervision is per-activation only:
on_receiveexception → supervision strategy decides restart/stop. This does not persist across process restarts.
context.spawn() is for ephemeral child actors (run and discard, no cross-restart recovery). Long-lived actors go through VirtualActorRegistry.
Registry store is pluggable: default is in-memory. Replace with a persistent backend (Redis, DB) for:
- Querying "which virtual actors existed before" after restart (known_ids())
- Push scenarios: scheduled tasks, broadcasts that need to know all actor IDs
class RedisRegistryStore(RegistryStore):
async def put(self, key): await redis.sadd("actors", key)
async def delete(self, key): await redis.srem("actors", key)
async def list_all(self): return list(await redis.smembers("actors"))
registry = VirtualActorRegistry(system, store=RedisRegistryStore())
Lifecycle guarantees
The runtime guarantees:
- on_started: always called before the actor processes any message. If it fails, the actor is not created.
- on_stopped: always called when the actor stops (idle timeout, manual deactivation, system shutdown). Protected by asyncio.shield — external cancellation cannot skip it. Has a timeout (10s) to prevent deadlocks. On CancelledError, retried once.
The only case where on_stopped does not run: process killed by OS (kill -9, OOM). This is an OS-level limitation no user-space code can handle. For critical state, business code should save after each message, not rely solely on on_stopped.
Free Monad API
For composable actor workflows, the framework provides Free[ActorF, A] monad:
from everything_is_an_actor import ActorSystem, ActorRef
# Build a workflow using ref.free_xxx()
def workflow(ref: ActorRef):
return (
ref.free_ask("hello")
.flatMap(lambda r: ref.free_tell(r.upper()))
.flatMap(lambda _: ref.free_stop())
)
# Execute against the live system
result = await system.run_free(workflow(worker_ref))
The Free monad separates workflow description from interpretation, enabling testable mock interpreters.
Agent layer
The Task protocol
Every message to an AgentActor is a Task. This is the contract between orchestrators and workers:
graph LR
T["Task(input, id, event_sink_ref)"]
R["TaskResult(task_id, output, status)"]
T -->|on_receive| R
style T fill:#b8c9d9,stroke:#7a9bb5,color:#2c3e50
style R fill:#b8d9c4,stroke:#7ab59b,color:#2c3e50
The id field is a uuid hex generated at construction time. It becomes the task_id in all emitted events, enabling event stream consumers to correlate events across a distributed call tree.
AgentActor
AgentActor wraps the actor primitive with the Task protocol. The user overrides execute() and never touches on_receive(). The framework:
- Emits
task_started - Calls
execute(input) - If
executereturns a coroutine: awaits it, emitstask_completed - If
executereturns an async generator: drives it withasync for, emitstask_chunkper yield, thentask_completed - On exception: emits
task_failed, re-raises for supervision
Detection of async generator vs coroutine uses inspect.isasyncgen() after calling self.execute(input) — the call itself is synchronous; it only produces the generator or coroutine object.
Progressive API
Five levels of increasing power — pick the lowest level you need:
| Level | Pattern | Use when |
|---|---|---|
| 1 | Plain class with execute() |
Stateless workers |
| 2 | + on_started / on_stopped |
Resource management |
| 3 | + __actor__ = ActorConfig(...) |
Tune mailbox/restarts without subclassing |
| 4 | Subclass AgentActor |
Full power: typing, emit_progress, supervision |
| 5 | Subclass Actor |
Infrastructure actors (routers, caches, rate limiters) |
Streaming architecture
Problem
AI agents produce intermediate output (LLM tokens, progress updates, child agent events). Callers need this data in real time — not just the final TaskResult.
Solution: event sink routing
Every TaskEvent is routed to a sink — an _EventCollectorActor that feeds a RunStream. The routing path:
graph LR
A["AgentActor._emit_event()"]
B["_active_sink.tell(TaskEvent)"]
C["_EventCollectorActor.on_receive()"]
D["RunStream.put()"]
E["caller's async for loop"]
A --> B --> C --> D --> E
style A fill:#b8c9d9,stroke:#7a9bb5,color:#2c3e50
style E fill:#b8d9c4,stroke:#7ab59b,color:#2c3e50
Sink lifecycle
There are two ways to attach a sink:
AgentSystem.run() — sets _run_event_sink ContextVar before spawning the root agent. All AgentActor instances created within that asyncio task context automatically inherit the sink via asyncio.create_task() context copy.
ActorRef.ask_stream() — injects event_sink_ref into the Task itself. AgentActor.on_receive() picks it up as message.event_sink_ref and sets it as _active_sink for the duration of that call. It also sets the _run_event_sink ContextVar so child actors spawned via ask() during execute() inherit the same sink.
The per-ask sink (event_sink_ref) takes precedence over the run-level sink (_run_event_sink). This lets ask_stream() work on existing agents that were already spawned inside a run() session.
ContextVar propagation
asyncio.create_task() copies the current contextvars.Context into the new task. This is the mechanism by which the event sink propagates to child actors without any explicit wiring:
graph TB
run["run() sets _run_event_sink = collector"]
drive["create_task(_drive)<br/>context copied"]
init["AgentActor.__init__<br/>reads _run_event_sink"]
ask["ask() → create_task<br/>context copied again"]
child["child AgentActor.__init__<br/>gets same sink"]
run --> drive --> init --> ask --> child
style run fill:#b8c9d9,stroke:#7a9bb5,color:#2c3e50
style child fill:#b8d9c4,stroke:#7ab59b,color:#2c3e50
RunStream
RunStream is a thin wrapper over asyncio.Queue[TaskEvent | None]. The sentinel None signals end-of-stream. It is closed by the _drive background task when the root agent finishes or fails.
_EventCollectorActor
A minimal Actor that receives TaskEvent messages and puts them on the RunStream. One instance per run / per ask_stream call.
Instantiated via make_collector_cls(stream) — a factory that returns a subclass with the stream captured in __init__. This satisfies the framework's no-arg constructor contract without post-spawn private attribute injection.
Orchestration
ask()
Spawn a child, send one message, await result, stop child. The finally block guarantees cleanup even if the caller raises or is cancelled:
ref = await self.spawn(target, name)
try:
return await ref.ask(message, timeout=timeout)
finally:
ref.stop()
if cancelling() > 0:
ref.interrupt()
await ref.join()
interrupt() is called when the wrapper task is itself being cancelled (e.g. as a sibling in sequence()). It cancels the actor's asyncio task directly, so join() returns immediately instead of waiting for the actor's current sleep/IO to finish.
sequence()
Fan-out to N agents using asyncio.wait(FIRST_EXCEPTION). When the first task fails, siblings are cancelled immediately — their ask() finally blocks still run on CancelledError, so no ephemeral children are left running.
The key decision: fail-fast (stop expensive side effects ASAP) over guaranteed synchronous cleanup (wait for all tasks). Cleanup is still guaranteed — cancelled children process the _Stop sentinel from ref.stop() and shut down cleanly.
stream()
Streaming counterpart of ask(). An async generator that:
- Spawns ephemeral child (if class target)
- Iterates
ref.ask_stream(message)and yields eachStreamItem - Stops child in
finally— even if caller breaks early
This enables transparent chunk forwarding in orchestrators:
async def execute(self, input: str):
async for item in self.context.stream(LLMAgent, Task(input=input)):
match item:
case StreamEvent(event=e) if e.type == "task_chunk":
yield e.data # becomes task_chunk for the orchestrator's caller
Span linking
Every TaskEvent carries:
task_id— the current task's IDparent_task_id— the calling agent'stask_id(None for root)parent_agent_path— the calling agent's actor path (None for root)
parent_task_id is read from _current_task_id_var ContextVar at the start of on_receive(), before the current task_id is set. Child actors spawned via ask() inherit the parent's task_id through the context copy, so the full call tree is reconstructable from a flat event stream — equivalent to OpenTelemetry parent span injection.
Key tradeoffs
Sequential actor processing
Actors process one message at a time. This eliminates data races on actor state without locks. The tradeoff: a slow execute() blocks subsequent messages to the same actor. Solution: use ephemeral actors via ask() — each call gets a fresh actor with no queuing contention.
ask_stream uses a sidecar collector actor
ask_stream spawns a separate _EventCollectorActor as a peer (not a child) of the target actor. The alternative — having the target actor send events directly to the caller — would require passing a callback or queue into the actor, coupling the actor to the streaming API. The sidecar keeps the actor oblivious: it just calls _emit_event(), which routes to whatever sink is attached.
Async generator detection at call time
inspect.isasyncgen(self.execute(input)) is called after invoking execute(). The function is called once; the result determines the execution path. This means the same execute() method cannot switch modes per call, which is intentional — execute() should be consistently either a coroutine or an async generator.
No streaming in lifecycle hooks
on_started, on_stopped, on_restart are plain coroutines — no yield. Streaming only makes sense in response to a Task (there's a caller to stream to). Lifecycle hooks have no caller context.