Agent Layer
The everything_is_an_actor.agents module provides higher-level abstractions specifically for AI agent systems, built on top of the core actor primitives.
The progressive API
The agent layer has 5 levels. Start at the lowest level you need — upgrading later requires no rewrites.
Zero actor knowledge required. Just implement execute().
Works with AgentSystem.
Add initialization and cleanup.
Configure mailbox size and restart limits without inheriting from AgentActor.
Full power: strong typing, supervision strategy, emit_progress(), access to actor context.
from everything_is_an_actor.agents import AgentActor
class SearchAgent(AgentActor[str, str]):
def supervisor_strategy(self):
return OneForOneStrategy(max_restarts=5)
async def on_started(self):
self.client = aiohttp.ClientSession()
async def execute(self, input: str) -> str:
await self.emit_progress("searching...")
result = await self.client.get(input)
await self.emit_progress("done")
return result
async def on_stopped(self):
await self.client.close()
Task lifecycle
Every message to an AgentActor is a Task. The framework manages the lifecycle automatically.
from everything_is_an_actor.agents import Task, TaskResult, TaskStatus
task: Task[str] = Task(input="what is the actor model?")
# task.id is auto-generated (uuid hex)
result: TaskResult[str] = await ref.ask(task)
print(result.output) # the value execute() returned
print(result.status) # TaskStatus.COMPLETED
print(result.task_id) # same as task.id
Status transitions
graph LR
P(PENDING) --> R(RUNNING)
R --> C(COMPLETED)
R --> F(FAILED)
style P fill:#d9d4b8,stroke:#b5b07a,color:#2c3e50
style R fill:#b8c9d9,stroke:#7a9bb5,color:#2c3e50
style C fill:#b8d9c4,stroke:#7ab59b,color:#2c3e50
style F fill:#d9a3a3,stroke:#b57a7a,color:#2c3e50
| Status | When |
|---|---|
COMPLETED |
execute() returned normally |
FAILED |
execute() raised an exception |
Events
AgentActor emits TaskEvent objects automatically at each lifecycle stage:
| Event type | When emitted |
|---|---|
task_started |
execute() begins |
task_progress |
You call emit_progress(data) |
task_completed |
execute() returns successfully |
task_failed |
execute() raises an exception (data contains the error message) |
@dataclass
class TaskEvent:
type: str # one of the four types above
task_id: str # matches the Task.id
agent_path: str # e.g. "/app/summarizer"
data: Any # the progress data or final output
Events flow to a RunStream when using AgentSystem.run() or ActorRef.ask_stream(). With plain ActorSystem, events are silently dropped.
Span linking fields
TaskEvent also carries two fields for distributed trace reconstruction:
| Field | Description |
|---|---|
parent_task_id |
task_id of the calling agent's task. None for the root agent. |
parent_agent_path |
Actor path of the parent agent. None for the root agent. |
These let you reconstruct the full call tree from a flat event stream (OpenTelemetry-style spans).
Streaming output from execute()
execute() supports two output modes:
The agent computes one value and returns it. Use when the consumer only needs the final result.
class GeoAgent(AgentActor[str, dict]):
async def execute(self, input: str) -> dict:
return await self.client.get(f"/geocode?q={input}")
Result: TaskResult.output is a dict.
The agent is an async generator. Each yield emits a task_chunk event immediately.
Use when the consumer should process values as they arrive (LLM tokens, file chunks, etc.).
class LLMAgent(AgentActor[str, list]):
async def execute(self, prompt: str):
async for token in openai.stream(prompt):
yield token # → task_chunk event, data=token
Result: TaskResult.output is list[token] (all yielded values collected).
When to use which:
return await |
yield |
|
|---|---|---|
| Consumer needs | Final result only | Values as they arrive |
TaskResult.output |
The return value | list of all yielded values |
| Event emitted | task_completed |
task_chunk × N, then task_completed |
| Example | DB query, API call | LLM tokens, file download, pipeline |
emit_progress
Use emit_progress() for status/progress updates — not for streaming output content.
class SearchAgent(AgentActor[str, list]):
async def execute(self, input: str) -> list:
await self.emit_progress("searching...") # status update
results = await self.search(input)
await self.emit_progress(f"found {len(results)} results")
return results
Note
emit_progress() emits task_progress events — semantically "how is the task going",
not "here is output content". For streaming output use yield instead.
It is a no-op if called outside of execute() or without an event sink attached.
Concurrency primitives
self.context exposes six concurrency primitives for orchestrating child agents. All ephemeral actors are automatically stopped and cleaned up after each call.
ask — single dispatch
sequence — parallel wait-all (fail-fast)
Runs all tasks concurrently, returns results in original order. On first failure, cancels remaining siblings immediately.
results = await self.context.sequence([
(AgentA, Task(input="x")),
(AgentB, Task(input="y")),
(AgentC, Task(input="z")),
])
return [r.output for r in results]
traverse — map a list through one agent
results = await self.context.traverse(["a", "b", "c"], UpperAgent)
return [r.output for r in results] # ["A", "B", "C"]
race — first-wins, cancel the rest
Returns the result of whichever task finishes first (success or failure). All losers are cancelled.
r: TaskResult[str] = await self.context.race([
(FastAgent, Task(input=query)),
(SlowAgent, Task(input=query)),
])
return r.output
zip — two tasks, typed pair
a, b = await self.context.zip(
(SearchAgent, Task(input=query)),
(FactCheckAgent, Task(input=query)),
)
return (a.output, b.output)
stream — streaming dispatch
Use inside a streaming execute() to transparently forward child chunks:
class OrchestratorAgent(AgentActor[str, list]):
async def execute(self, input: str):
async for item in self.context.stream(LLMAgent, Task(input=input)):
match item:
case StreamEvent(event=e) if e.type == "task_chunk":
yield e.data # re-yield → becomes task_chunk for caller
case StreamResult(result=r):
pass # final result available here
ask |
stream |
|
|---|---|---|
| Child output | Single TaskResult |
StreamItem sequence |
| Ephemeral actor | Stopped after await |
Stopped after generator exhausted |
| Use when child | Returns one result | Streams chunks |
Failure handling
AgentActor follows the actor model's let-it-crash philosophy:
execute()raises → framework emitstask_failed, re-raises for supervision- The parent's
supervisor_strategy()decides: restart / stop / escalate - The
ask()caller receives the exception
class FlakyAgent(AgentActor[str, str]):
async def execute(self, input: str) -> str:
if random.random() < 0.3:
raise TransientError("try again")
return process(input)
# Use plugins.retry for automatic retries
from everything_is_an_actor.plugins.retry import ask_with_retry
result = await ask_with_retry(
ref, Task(input="data"),
max_attempts=3,
base_backoff_s=0.1,
)
AgentSystem
AgentSystem extends ActorSystem with event streaming. It is a drop-in replacement — all existing APIs work unchanged.
from everything_is_an_actor import ActorSystem
from everything_is_an_actor.agents import AgentSystem
system = AgentSystem(ActorSystem("app"))
run() — spawn and stream
Spawns a fresh root agent for each call, streams all TaskEvents from the entire actor tree.
async for event in system.run(ResearchOrchestrator, user_query):
if event.type == "task_progress":
print(event.data)
Child agents spawned via ask() automatically route their events to the same stream.
ask_stream() — stream from existing ref
Reuses an already-spawned agent. Returns a stream of StreamItem objects — events first, then the final result.
ref = await system.spawn(SummaryAgent, "summarizer")
async for item in ref.ask_stream(Task(input="long document...")):
match item:
case StreamEvent(event=e):
print(e.type, e.data) # intermediate events
case StreamResult(result=r):
print(r.output) # final output (last item)
ask_stream is symmetric with ref.ask(). Use it when the agent is long-lived and handles multiple requests.
Comparison
run() |
ask_stream() |
|
|---|---|---|
| Agent lifecycle | Fresh spawn per call | Reuse existing ref |
| Where to call | On the system | On the ref |
| Input | raw value | Task |
| Output | TaskEvent stream |
StreamItem stream (StreamEvent \| StreamResult) |
Stream types
ask_stream() yields a sealed StreamItem ADT:
from everything_is_an_actor.agents.task import StreamEvent, StreamResult
async for item in ref.ask_stream(Task(input="...")):
match item:
case StreamEvent(event=e): # TaskEvent wrapper
...
case StreamResult(result=r): # TaskResult wrapper — always last
...
| Type | Field | Description |
|---|---|---|
StreamEvent |
event: TaskEvent |
One lifecycle event |
StreamResult |
result: TaskResult |
Final outcome, always the last item |
Guard: don't override on_receive
If you accidentally override on_receive() in an AgentActor subclass, the framework emits a UserWarning at class definition time: