Skip to content

Agent Layer

The everything_is_an_actor.agents module provides higher-level abstractions specifically for AI agent systems, built on top of the core actor primitives.


The progressive API

The agent layer has 5 levels. Start at the lowest level you need — upgrading later requires no rewrites.

Zero actor knowledge required. Just implement execute().

class SearchAgent:
    async def execute(self, input: str) -> str:
        return await web_search(input)

Works with AgentSystem.

Add initialization and cleanup.

class SearchAgent:
    async def on_started(self):
        self.client = aiohttp.ClientSession()

    async def execute(self, input: str) -> str:
        return await self.client.get(input)

    async def on_stopped(self):
        await self.client.close()

Configure mailbox size and restart limits without inheriting from AgentActor.

from everything_is_an_actor.agents import ActorConfig

class SearchAgent:
    __actor__ = ActorConfig(mailbox_size=64, max_restarts=5)

    async def execute(self, input: str) -> str:
        return await web_search(input)

Full power: strong typing, supervision strategy, emit_progress(), access to actor context.

from everything_is_an_actor.agents import AgentActor

class SearchAgent(AgentActor[str, str]):
    def supervisor_strategy(self):
        return OneForOneStrategy(max_restarts=5)

    async def on_started(self):
        self.client = aiohttp.ClientSession()

    async def execute(self, input: str) -> str:
        await self.emit_progress("searching...")
        result = await self.client.get(input)
        await self.emit_progress("done")
        return result

    async def on_stopped(self):
        await self.client.close()

For infrastructure components (routers, caches, rate limiters) that don't follow the Task protocol.

from everything_is_an_actor import Actor

class RateLimiterActor(Actor):
    async def on_receive(self, message):
        await self.throttle()
        return await self.forward(message)

Task lifecycle

Every message to an AgentActor is a Task. The framework manages the lifecycle automatically.

from everything_is_an_actor.agents import Task, TaskResult, TaskStatus

task: Task[str] = Task(input="what is the actor model?")
# task.id is auto-generated (uuid hex)

result: TaskResult[str] = await ref.ask(task)
print(result.output)        # the value execute() returned
print(result.status)        # TaskStatus.COMPLETED
print(result.task_id)       # same as task.id

Status transitions

graph LR
    P(PENDING) --> R(RUNNING)
    R --> C(COMPLETED)
    R --> F(FAILED)

    style P fill:#d9d4b8,stroke:#b5b07a,color:#2c3e50
    style R fill:#b8c9d9,stroke:#7a9bb5,color:#2c3e50
    style C fill:#b8d9c4,stroke:#7ab59b,color:#2c3e50
    style F fill:#d9a3a3,stroke:#b57a7a,color:#2c3e50
Status When
COMPLETED execute() returned normally
FAILED execute() raised an exception

Events

AgentActor emits TaskEvent objects automatically at each lifecycle stage:

Event type When emitted
task_started execute() begins
task_progress You call emit_progress(data)
task_completed execute() returns successfully
task_failed execute() raises an exception (data contains the error message)
@dataclass
class TaskEvent:
    type: str         # one of the four types above
    task_id: str      # matches the Task.id
    agent_path: str   # e.g. "/app/summarizer"
    data: Any         # the progress data or final output

Events flow to a RunStream when using AgentSystem.run() or ActorRef.ask_stream(). With plain ActorSystem, events are silently dropped.

Span linking fields

TaskEvent also carries two fields for distributed trace reconstruction:

Field Description
parent_task_id task_id of the calling agent's task. None for the root agent.
parent_agent_path Actor path of the parent agent. None for the root agent.

These let you reconstruct the full call tree from a flat event stream (OpenTelemetry-style spans).


Streaming output from execute()

execute() supports two output modes:

The agent computes one value and returns it. Use when the consumer only needs the final result.

class GeoAgent(AgentActor[str, dict]):
    async def execute(self, input: str) -> dict:
        return await self.client.get(f"/geocode?q={input}")

Result: TaskResult.output is a dict.

The agent is an async generator. Each yield emits a task_chunk event immediately. Use when the consumer should process values as they arrive (LLM tokens, file chunks, etc.).

class LLMAgent(AgentActor[str, list]):
    async def execute(self, prompt: str):
        async for token in openai.stream(prompt):
            yield token          # → task_chunk event, data=token

Result: TaskResult.output is list[token] (all yielded values collected).

When to use which:

return await yield
Consumer needs Final result only Values as they arrive
TaskResult.output The return value list of all yielded values
Event emitted task_completed task_chunk × N, then task_completed
Example DB query, API call LLM tokens, file download, pipeline

emit_progress

Use emit_progress() for status/progress updates — not for streaming output content.

class SearchAgent(AgentActor[str, list]):
    async def execute(self, input: str) -> list:
        await self.emit_progress("searching...")   # status update
        results = await self.search(input)
        await self.emit_progress(f"found {len(results)} results")
        return results

Note

emit_progress() emits task_progress events — semantically "how is the task going", not "here is output content". For streaming output use yield instead.

It is a no-op if called outside of execute() or without an event sink attached.


Concurrency primitives

self.context exposes six concurrency primitives for orchestrating child agents. All ephemeral actors are automatically stopped and cleaned up after each call.

ask — single dispatch

r: TaskResult[str] = await self.context.ask(SearchAgent, Task(input=query))
return r.output

sequence — parallel wait-all (fail-fast)

Runs all tasks concurrently, returns results in original order. On first failure, cancels remaining siblings immediately.

results = await self.context.sequence([
    (AgentA, Task(input="x")),
    (AgentB, Task(input="y")),
    (AgentC, Task(input="z")),
])
return [r.output for r in results]

traverse — map a list through one agent

results = await self.context.traverse(["a", "b", "c"], UpperAgent)
return [r.output for r in results]   # ["A", "B", "C"]

race — first-wins, cancel the rest

Returns the result of whichever task finishes first (success or failure). All losers are cancelled.

r: TaskResult[str] = await self.context.race([
    (FastAgent, Task(input=query)),
    (SlowAgent, Task(input=query)),
])
return r.output

zip — two tasks, typed pair

a, b = await self.context.zip(
    (SearchAgent, Task(input=query)),
    (FactCheckAgent, Task(input=query)),
)
return (a.output, b.output)

stream — streaming dispatch

Use inside a streaming execute() to transparently forward child chunks:

class OrchestratorAgent(AgentActor[str, list]):
    async def execute(self, input: str):
        async for item in self.context.stream(LLMAgent, Task(input=input)):
            match item:
                case StreamEvent(event=e) if e.type == "task_chunk":
                    yield e.data          # re-yield → becomes task_chunk for caller
                case StreamResult(result=r):
                    pass                  # final result available here
ask stream
Child output Single TaskResult StreamItem sequence
Ephemeral actor Stopped after await Stopped after generator exhausted
Use when child Returns one result Streams chunks

Failure handling

AgentActor follows the actor model's let-it-crash philosophy:

  • execute() raises → framework emits task_failed, re-raises for supervision
  • The parent's supervisor_strategy() decides: restart / stop / escalate
  • The ask() caller receives the exception
class FlakyAgent(AgentActor[str, str]):
    async def execute(self, input: str) -> str:
        if random.random() < 0.3:
            raise TransientError("try again")
        return process(input)

# Use plugins.retry for automatic retries
from everything_is_an_actor.plugins.retry import ask_with_retry

result = await ask_with_retry(
    ref, Task(input="data"),
    max_attempts=3,
    base_backoff_s=0.1,
)

AgentSystem

AgentSystem extends ActorSystem with event streaming. It is a drop-in replacement — all existing APIs work unchanged.

from everything_is_an_actor import ActorSystem
from everything_is_an_actor.agents import AgentSystem

system = AgentSystem(ActorSystem("app"))

run() — spawn and stream

Spawns a fresh root agent for each call, streams all TaskEvents from the entire actor tree.

async for event in system.run(ResearchOrchestrator, user_query):
    if event.type == "task_progress":
        print(event.data)

Child agents spawned via ask() automatically route their events to the same stream.

ask_stream() — stream from existing ref

Reuses an already-spawned agent. Returns a stream of StreamItem objects — events first, then the final result.

ref = await system.spawn(SummaryAgent, "summarizer")

async for item in ref.ask_stream(Task(input="long document...")):
    match item:
        case StreamEvent(event=e):
            print(e.type, e.data)    # intermediate events
        case StreamResult(result=r):
            print(r.output)          # final output (last item)

ask_stream is symmetric with ref.ask(). Use it when the agent is long-lived and handles multiple requests.

Comparison

run() ask_stream()
Agent lifecycle Fresh spawn per call Reuse existing ref
Where to call On the system On the ref
Input raw value Task
Output TaskEvent stream StreamItem stream (StreamEvent \| StreamResult)

Stream types

ask_stream() yields a sealed StreamItem ADT:

from everything_is_an_actor.agents.task import StreamEvent, StreamResult

async for item in ref.ask_stream(Task(input="...")):
    match item:
        case StreamEvent(event=e):   # TaskEvent wrapper
            ...
        case StreamResult(result=r): # TaskResult wrapper — always last
            ...
Type Field Description
StreamEvent event: TaskEvent One lifecycle event
StreamResult result: TaskResult Final outcome, always the last item

Guard: don't override on_receive

If you accidentally override on_receive() in an AgentActor subclass, the framework emits a UserWarning at class definition time:

class MyAgent(AgentActor[str, str]):
    async def on_receive(self, message):  # ← UserWarning
        ...
UserWarning: MyAgent: do not override on_receive() in AgentActor subclasses.
Implement execute() instead — the framework manages on_receive().