Actor Lifecycle and Stop Policy
Overview
Actors are persistent by default. They run until explicitly stopped. The stop_policy mechanism provides declarative auto-stop behavior for ephemeral actors.
Stop Policy ADT
from everything_is_an_actor import StopMode, AfterMessage, AfterIdle, StopPolicy
class StopMode(Enum):
NEVER = auto() # Never auto-stop (default)
ONE_TIME = auto() # Stop after processing one message
@dataclass
class AfterMessage:
message: Any # Stop after receiving this message
@dataclass
class AfterIdle:
seconds: float # Stop after idle for N seconds
Usage
One-Time Actor
Process one message then stop automatically:
After Message
Stop when receiving a specific message:
class StoppableActor(Actor):
def stop_policy(self) -> StopPolicy:
return AfterMessage(message="shutdown")
After Idle
Stop after being idle for N seconds:
API Signatures
tell(Actor, msg) — Fire and Forget
Spawns a temporary actor, sends message, actor stops itself via stop_policy:
await self.tell(EchoActor, "hello")
# EchoActor processes message, then stops based on its stop_policy
Type constraint: tell() requires actor with non-NEVER stop_policy, otherwise raises TypeError.
ask(Actor, msg) — Request/Response
Spawns a temporary actor, sends message, waits for reply, then manually stops:
Uses manual stop, not stop_policy.
spawn(Actor, name) — Persistent Child
Spawns a persistent child actor under parent's supervision:
Child actors are supervised and stopped when parent stops.
Stop Priority
Manual stop and auto stop_policy work together:
- Manual stop (
ref.stop()) — puts_Stopin mailbox - Auto stop_policy — checked after each message is processed
Both result in graceful shutdown. There's no conflict.
Examples
One-Time Worker
class OneTimeWorker(Actor):
def stop_policy(self) -> StopPolicy:
return StopMode.ONE_TIME
async def on_receive(self, task):
result = await process(task)
return result
# Actor stops after this
# Usage
await self.tell(OneTimeWorker, heavy_task)
Idle Timeout
class CacheActor(Actor):
def stop_policy(self) -> StopPolicy:
return AfterIdle(seconds=300.0) # 5 minutes
async def on_receive(self, message):
return self.cache.get(message)
# Resets idle timer after each message