AgentActor API Reference
Task
A unit of work sent to an AgentActor.
Generic type parameter InputT constrains the input type.
Fields
| Field | Type | Default | Description |
|---|---|---|---|
input |
InputT |
required | The input data for the agent |
id |
str |
auto (uuid hex) | Unique task identifier |
Example
task: Task[str] = Task(input="summarize this document")
task: Task[dict] = Task(input={"query": "actor model", "limit": 5}, id="my-task-001")
TaskResult
The outcome returned by AgentActor.on_receive() after execute() completes.
Fields
| Field | Type | Description |
|---|---|---|
task_id |
str |
Matches the originating Task.id |
output |
OutputT \| None |
The value returned by execute() |
error |
str \| None |
Error message if status is FAILED |
status |
TaskStatus |
COMPLETED or FAILED |
TaskStatus
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
TaskEvent
An event emitted during task execution.
Fields
| Field | Type | Description |
|---|---|---|
type |
str |
task_started, task_progress, task_chunk, task_completed, task_failed |
task_id |
str |
The associated task ID |
agent_path |
str |
Actor path (e.g. /app/summarizer) |
data |
Any |
Progress data or final output |
parent_task_id |
str \| None |
task_id of the calling agent's task. None for the root agent. |
parent_agent_path |
str \| None |
Actor path of the parent agent. None for the root agent. |
parent_task_id and parent_agent_path enable OpenTelemetry-style trace reconstruction from a flat event stream.
StreamItem
Sealed ADT yielded by ActorRef.ask_stream(). Use match/case for exhaustive handling.
StreamEvent
Wraps a TaskEvent emitted during execution.
| Field | Type | Description |
|---|---|---|
event |
TaskEvent |
The wrapped lifecycle event |
StreamResult
Wraps the final TaskResult. Always the last item in the stream.
| Field | Type | Description |
|---|---|---|
result |
TaskResult[OutputT] |
The wrapped task outcome |
Example
async for item in ref.ask_stream(Task(input="...")):
match item:
case StreamEvent(event=e):
print(e.type, e.data)
case StreamResult(result=r):
print(r.output)
ActorConfig
Optional actor-level configuration for Level 1-3 plain agent classes.
Fields
| Field | Type | Default | Description |
|---|---|---|---|
mailbox_size |
int |
256 |
Max queued messages |
max_restarts |
int |
3 |
Max restarts within within_seconds |
within_seconds |
float |
60.0 |
Restart rate window |
Usage
class MyAgent:
__actor__ = ActorConfig(mailbox_size=128, max_restarts=5)
async def execute(self, input): ...
Note:
ActorConfigis used byAgentSystem(M3). Ignored when using plainActorSystem.
AgentActor
Base class for AI agents (Level 4). Inherits from Actor. Generic over input type InputT and output type OutputT.
Type parameters flow end-to-end: Task[InputT] → execute(input: InputT) -> OutputT → TaskResult[OutputT].
Methods to override
execute(input)
Implement your agent logic here. Supports two output modes:
Single result — return a value. Becomes TaskResult.output.
class SummaryAgent(AgentActor[str, str]):
async def execute(self, input: str) -> str:
return await llm.summarize(input)
Streaming — yield values (async generator). Each yield emits a task_chunk event immediately. TaskResult.output is the collected list of all yielded values.
class LLMAgent(AgentActor[str, list]):
async def execute(self, prompt: str):
async for token in openai.stream(prompt):
yield token # → task_chunk event, data=token
Raise any exception to signal failure. The framework emits task_failed and supervision handles the restart.
emit_progress(data)
Emit a task_progress event for status/progress updates during execute(). No-op if called outside an active task or without an event sink attached.
Use yield (streaming mode) for output content; use emit_progress() for "how is the task going" messages.
async def execute(self, input):
await self.emit_progress("searching...")
results = await self.search(input)
await self.emit_progress(f"found {len(results)} results")
return results
on_started()
Called once after the actor is spawned, before any messages are processed.
on_stopped()
Called on graceful shutdown. Release resources here.
on_restart(error)
Called on the new instance after a supervision-triggered restart.
supervisor_strategy()
Override to customize child supervision. Default: OneForOneStrategy(max_restarts=3, within_seconds=60).
Do not override
on_receive(message)
Managed by the framework. Handles Task wrapping, event emission, and error propagation.
Accidentally overriding this method emits a UserWarning at class definition time.
ActorContext
Injected as self.context before on_started. Available inside all lifecycle hooks and execute().
All ephemeral actors are automatically stopped and cleaned up after each call.
ask(target, message, *, timeout, name)
Spawn an ephemeral child actor (or send to an existing ref), await the result, stop the child.
result: TaskResult[str] = await self.context.ask(SearchAgent, Task(input=query))
return result.output
sequence(tasks, *, timeout)
Run tasks concurrently, return results in original order. On first failure, cancels remaining siblings immediately.
results = await self.context.sequence([
(AgentA, Task(input="x")),
(AgentB, Task(input="y")),
(AgentC, Task(input="z")),
])
return [r.output for r in results]
traverse(inputs, target, *, timeout)
Map a list of inputs through the same agent concurrently. Results preserve input order.
results = await self.context.traverse(["a", "b", "c"], UpperAgent)
return [r.output for r in results] # ["A", "B", "C"]
race(tasks, *, timeout)
Run tasks concurrently, return the first to complete. All losers are cancelled.
r: TaskResult[str] = await self.context.race([
(FastAgent, Task(input=query)),
(SlowAgent, Task(input=query)),
])
return r.output
zip(task_a, task_b, *, timeout)
Run two tasks concurrently, return both results as a typed pair. If either fails, the other is cancelled.
a, b = await self.context.zip(
(SearchAgent, Task(input=query)),
(FactCheckAgent, Task(input=query)),
)
return (a.output, b.output)
stream(target, message, *, timeout, name)
Spawn an ephemeral child actor (or reuse a ref) and stream its events. Yields StreamItem objects — events first, then the final StreamResult. Ephemeral children are stopped after the stream is exhausted or the caller breaks early.
Use inside a streaming execute() to transparently forward child chunks:
class OrchestratorAgent(AgentActor[str, list]):
async def execute(self, input: str):
async for item in self.context.stream(LLMAgent, Task(input=input)):
match item:
case StreamEvent(event=e) if e.type == "task_chunk":
yield e.data # re-yield → becomes task_chunk for caller
case StreamResult(result=r):
pass # final result available here
ask |
stream |
|
|---|---|---|
| Child output | Single TaskResult |
StreamItem sequence |
| Ephemeral actor | Stopped after await |
Stopped after generator exhausted |
| Use when child | Returns one result | Streams chunks |
Usage with ActorSystem
AgentActor works with the standard ActorSystem. Messages must be wrapped in Task.
from everything_is_an_actor import ActorSystem
from everything_is_an_actor.agents import AgentActor, Task
system = ActorSystem("app")
ref = await system.spawn(SummaryAgent, "summarizer")
result: TaskResult[str] = await ref.ask(Task(input="..."))
print(result.output) # str
print(result.status) # TaskStatus.COMPLETED
await system.shutdown()
ActorRef.ask_stream
Stream TaskEvents from an already-spawned AgentActor ref, then yield the final TaskResult.
- The agent is not re-spawned. Reuse the same ref across multiple calls.
- Child agents spawned via
ask()insideexecute()inherit the event sink automatically. - Raises the agent's exception after the stream is exhausted (if
execute()raised).
from everything_is_an_actor.agents.system import AgentSystem
from everything_is_an_actor.agents.task import StreamEvent, StreamResult
system = AgentSystem(ActorSystem())
ref = await system.spawn(SummaryAgent, "summarizer")
# First call
async for item in ref.ask_stream(Task(input="doc 1")):
match item:
case StreamEvent(event=e):
print(e.type, e.data)
case StreamResult(result=r):
print(r.output)
# Reuse same ref
async for item in ref.ask_stream(Task(input="doc 2")):
...
AgentSystem
Drop-in replacement for ActorSystem with event-streaming support.
run(agent_cls, input, *, run_id, timeout)
Spawns a fresh root agent and streams all TaskEvents from the entire actor tree.
async for event in system.run(ResearchOrchestrator, user_query, timeout=120.0):
if event.type == "task_progress":
yield event.data
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_cls |
type[AgentActor] |
required | Root agent class to instantiate |
input |
Any |
required | Passed to root agent as Task.input |
run_id |
str \| None |
auto | Stable ID for log correlation |
timeout |
float |
600.0 |
Max seconds for root agent to complete |
abort(run_id)
Cancel a running agent tree by run_id. No-op if already finished.