Mixture of Agents (MOA)
Composable multi-agent orchestration pattern — parallel proposers → quorum validation → aggregation, chained in layers.
Built on the Flow API as a pattern library. Dependency direction: moa/ → flow/ → agents/ → core/.
Quick Start
import asyncio
from everything_is_an_actor.agents import AgentActor
from everything_is_an_actor.moa import MoASystem, moa_layer, moa_tree, LayerOutput
from everything_is_an_actor.flow.quorum import QuorumResult
# Proposers
class Researcher(AgentActor[str, str]):
async def execute(self, input: str) -> str:
return f"Research on: {input}"
class Critic(AgentActor[str, str]):
async def execute(self, input: str) -> str:
return f"Critique of: {input}"
# Aggregator
class Synthesizer(AgentActor[QuorumResult[str], str]):
async def execute(self, results: QuorumResult[str]) -> str:
return "\n".join(results.succeeded)
# Build and run
system = MoASystem()
result = await system.run(
moa_tree([
moa_layer(
proposers=[Researcher, Critic],
aggregator=Synthesizer,
min_success=1,
),
]),
input="What is the actor model?",
)
await system.shutdown()
Architecture
graph LR
moa["moa/"]
p["patterns.py<br/><i>moa_layer(), moa_tree()</i>"]
l["layer_output.py<br/><i>LayerOutput directive carrier</i>"]
s["system.py<br/><i>MoASystem entry point</i>"]
u["utils.py<br/><i>format_references()</i>"]
moa --- p
moa --- l
moa --- s
moa --- u
style moa fill:#c4b8d9,stroke:#9b7ab5,color:#2c3e50
style p fill:#d9d4b8,stroke:#b5b07a,color:#2c3e50
style l fill:#d9d4b8,stroke:#b5b07a,color:#2c3e50
style s fill:#d9d4b8,stroke:#b5b07a,color:#2c3e50
style u fill:#d9d4b8,stroke:#b5b07a,color:#2c3e50
MOA is purely compositional — it uses existing Flow primitives (at_least, agent, pure, flat_map) without modifying core or agents.
Core Components
moa_layer() — Single Layer
A single MOA layer: parallel proposers → quorum → aggregator.
from everything_is_an_actor.moa import moa_layer
layer = moa_layer(
proposers=[Agent1, Agent2, Agent3],
aggregator=SynthesisAgg,
min_success=2, # at least 2 must succeed
timeout=15.0, # per-proposer timeout
)
Internally:
1. Injects directive into proposer input (if present from previous layer).
2. Runs proposers via at_least(min_success, ...) — Validated semantics.
3. Feeds QuorumResult to aggregator.
4. Extracts directive from LayerOutput (if returned).
moa_tree() — Multi-Layer Pipeline
Chains layers via flat_map with directive passing between layers.
from everything_is_an_actor.moa import moa_tree
pipeline = moa_tree([
moa_layer(proposers=[R1, R2, R3], aggregator=Synth, min_success=2),
moa_layer(proposers=[Critic], aggregator=Refiner),
])
- Wraps input as
(input, None)for the first layer. - Each layer outputs
(result, directive). - Final layer result is unwrapped automatically.
MoASystem — High-Level Entry Point
Owns the full ActorSystem → AgentSystem lifecycle. For users who don't need low-level control.
from everything_is_an_actor.moa import MoASystem
system = MoASystem()
result = await system.run(pipeline, "query")
# Or stream events
async for event in system.run_stream(pipeline, "query"):
print(event.type, event.data)
await system.shutdown()
Validated Fault-Tolerance
MOA uses the at_least combinator from the Flow layer for quorum validation.
- All proposers run in parallel.
- Domain exceptions are recovered into
QuorumResult.failedlist. - System exceptions (
MemoryError,SystemExit) propagate immediately. - If
>= min_successproposers succeeded, the pipeline continues. - If
< min_success, aRuntimeErroris raised.
class SmartAgg(AgentActor[QuorumResult[str], str]):
async def execute(self, results: QuorumResult[str]) -> str:
# Inspect failures
for err in results.failed:
print(f"Proposer failed: {err}")
# Use successes
return "\n".join(results.succeeded)
LayerOutput Directive
Aggregators can pass directives to the next layer's proposers by returning LayerOutput:
from everything_is_an_actor.moa import LayerOutput
class DirectiveAgg(AgentActor[QuorumResult, LayerOutput[str]]):
async def execute(self, results: QuorumResult) -> LayerOutput[str]:
conflicts = find_conflicts(results.succeeded)
return LayerOutput(
result=summarize(results.succeeded),
directive="focus on disagreements" if conflicts else None,
)
When directive is set, next layer's proposers receive {"input": result, "directive": directive}. When directive is None, proposers receive the raw result.
Proposer Timeout
Each moa_layer has a timeout parameter (default 30s). When a proposer exceeds this:
TimeoutErroris raised.- The ephemeral actor is interrupted (forced cancellation).
recover()converts it to a failed entry inQuorumResult.- Pipeline continues if
min_successis still met.
format_references Utility
Convenience function for LLM-based aggregators:
from everything_is_an_actor.moa import format_references
text = format_references(results)
# 1. [Researcher] quantum computing overview
# 2. [Critic] challenges in quantum computing
Three Progressive API Levels
| Level | Entry Point | You Need to Understand |
|---|---|---|
| Beginner | MoASystem.run(moa_tree([...]), input) |
Just fill in parameters |
| Intermediate | AgentSystem.run_flow(flow, input) |
Flow composition |
| Advanced | AgentSystem + ActorSystem |
Actor model |
Intermediate: Direct Flow Composition
from everything_is_an_actor import ActorSystem
from everything_is_an_actor.agents import AgentSystem
from everything_is_an_actor.flow import agent, at_least
pipeline = (
at_least(2, agent(R1), agent(R2), agent(R3))
.flat_map(agent(Synthesizer))
.flat_map(
at_least(1, agent(Critic))
.flat_map(agent(Refiner))
)
)
system = AgentSystem(ActorSystem())
result = await system.run_flow(pipeline, "input")
await system.shutdown()