Flow API Design Spec
Actor-native agent orchestration with categorical concurrency primitives.
Problem
LangGraph's orchestration model has fundamental design flaws: - Shared mutable state dict between nodes (no encapsulation) - Stringly-typed conditional routing (no exhaustive checking) - No lifecycle management or supervision for nodes - Weak composability (SubGraph is awkward) - No type safety (IDE can't help, refactoring is risky)
Solution
A Flow[I, O] ADT — composable workflow combinators derived from category theory, built as data (free construction), interpreted into actor topology at runtime. LangChain provides LLM primitives; the existing actor runtime provides execution.
Architecture
Layer Diagram
everything_is_an_actor/
core/ ← actor runtime / supervision / ComposableFuture (existing)
agents/ ← AgentActor / Task / TaskResult (existing)
moa/ ← MOA pattern (existing)
flow/ ← Flow ADT + combinators + interpreter (new)
integrations/ ← LLM adapter layer (new)
langchain/
openai/
anthropic/
Dependency Direction
flow/only knowsAgentActor[I, O]— zero external dependencies- Each integration provides a convenience base class inheriting
AgentActor - LangChain (and others) are optional deps:
pip install everything-is-an-actor[langchain]
Flow ADT
Flow is a syntax tree in a free category. All combinators build ADT nodes without executing.
Variants (Sum Type)
Flow[I, O] =
| Agent(cls: type[AgentActor[I, O]])
| Pure(f: Callable[[I], O])
| FlatMap(first: Flow[I, M], next: Flow[M, O])
| Zip(left: Flow[A, B], right: Flow[C, D]) # → Flow[tuple[A,C], tuple[B,D]]
| Map(source: Flow[I, M], f: Callable[[M], O])
| Branch(mapping: dict[type, Flow])
| Race(flows: list[Flow[I, O]])
| Recover(source: Flow[I, O], handler: Callable[[Exception], O])
| RecoverWith(source: Flow[I, O], handler: Flow[Exception, O])
| FallbackTo(source: Flow[I, O], fallback: Flow[I, O])
| DivertTo(source: Flow[I, O], side: Flow[O, Any], when: Callable[[O], bool])
| Loop(body: Flow[I, Continue[I] | Done[O]], max_iter: int)
| LoopWithState(body: Flow[tuple[I, S], Continue[I] | Done[O]], init_state: S, max_iter: int)
| AndThen(source: Flow[I, O], callback: Callable[[O], None])
| Filter(source: Flow[I, O], predicate: Callable[[O], bool])
Control Types
@dataclass(frozen=True)
class Continue(Generic[A]):
"""Loop continues — feed value back as next iteration's input."""
value: A
@dataclass(frozen=True)
class Done(Generic[B]):
"""Loop terminates — produce final result."""
value: B
Concurrency Primitives
Eight primitives derived from symmetric monoidal category + coproduct + trace:
| Primitive | Method | Constructor | Category |
|---|---|---|---|
| Sequential | flat_map(flow) |
— | Monad bind / Kleisli composition |
| Parallel | zip(flow) |
— | Tensor product |
| Transform | map(f) |
pure(f) |
Functor / arr |
| Conditional | branch({T: flow}) |
— | Coproduct dispatch |
| Race | — | race(*flows) |
First completed |
| Recovery | recover(h) / recover_with(flow) / fallback_to(flow) |
— | Supervision |
| Divert | divert_to(flow, when) |
— | Side-channel (Akka divertTo) |
| Loop | — | loop(body) / loop_with_state(body, init) |
tailRecM / trace |
Plus two utilities:
- and_then(callback) — tap for side effects (logging, metrics)
- filter(predicate) — guard / assertion
User API
Method Chain (Scala Future Style)
class Flow(Generic[I, O]):
def map(self, f: Callable[[O], O2]) -> Flow[I, O2]
def flat_map(self, next: Flow[O, O2]) -> Flow[I, O2]
def zip(self, other: Flow[I2, O2]) -> Flow[tuple[I, I2], tuple[O, O2]]
def branch(self, mapping: dict[type, Flow]) -> Flow[I, O2]
def recover(self, handler: Callable[[Exception], O]) -> Flow[I, O]
def recover_with(self, handler: Flow[Exception, O]) -> Flow[I, O]
def fallback_to(self, other: Flow[I, O]) -> Flow[I, O]
def divert_to(self, side: Flow[O, Any], when: Callable[[O], bool]) -> Flow[I, O]
def and_then(self, callback: Callable[[O], None]) -> Flow[I, O]
def filter(self, predicate: Callable[[O], bool]) -> Flow[I, O]
Constructors
def agent(cls: type[AgentActor[I, O]]) -> Flow[I, O]
def pure(f: Callable[[I], O]) -> Flow[I, O]
def race(*flows: Flow[I, O]) -> Flow[I, O]
def loop(body: Flow[I, Continue[I] | Done[O]], max_iter: int = 10) -> Flow[I, O]
def loop_with_state(body: Flow[tuple[I, S], Continue[I] | Done[O]], init_state: S, max_iter: int = 10) -> Flow[I, O]
Branch Semantics
Condition = output type. branch does isinstance dispatch:
# Upstream produces tagged union → branch dispatches by type
agent(Classifier).branch({
SimpleQ: agent(QuickAnswer),
ComplexQ: agent(DeepResearch),
})
# Convenience for binary predicate
agent(Scorer).branch_on(
lambda s: s.score > 0.8,
then=agent(Premium),
otherwise=agent(Basic),
)
Loop Semantics (tailRecM)
Body returns Continue[A] (keep going) or Done[B] (exit):
refine = loop(
agent(Writer).flat_map(agent(Critic)), # str → Continue[str] | Done[str]
max_iter=3,
)
# max_iter exhausted → raise exception (let-it-crash, recoverable via recover)
loop_with_state for explicit feedback state (trace):
refine = loop_with_state(
agent(WriterWithHistory), # (draft, history) → Continue | Done
init_state=list,
max_iter=5,
)
Actor Interpreter
Recursive interpret over the Flow ADT, using the existing actor infrastructure:
| Variant | Actor Strategy |
|---|---|
Agent(cls) |
context.ask(cls, Task(input=...)) |
Pure(f) |
f(input) — no actor |
FlatMap(f, g) |
interpret(g, interpret(f, input)) |
Zip(f, g) |
ComposableFuture.sequence([...]) — parallel |
Map(f, fn) |
fn(interpret(f, input)) |
Branch(m) |
interpret(upstream); interpret(m[type(result)], result) |
Race(flows) |
asyncio.wait(FIRST_COMPLETED) + cancel remaining |
Recover(f, h) |
try interpret(f) except → h(e) |
DivertTo(f, s, p) |
interpret f; if p(result): fire-and-forget s; return result |
Loop(body) |
while True: match interpret(body): Continue → again; Done → return |
The interpreter itself is an AgentActor with a context, so it can spawn child actors and use ComposableFuture.
Entry Points
system = AgentSystem()
result = await system.run(flow, input="query")
async for event in system.run_stream(flow, input="query"):
print(event)
Integration Layer
Each integration provides a declarative AgentActor subclass with LLM-specific convenience:
# langchain adapter
class LangChainAgent(AgentActor[I, O], Generic[I, O]):
model: BaseChatModel
tools: list[BaseTool] = []
system_prompt: str = ""
output_parser: BaseOutputParser | None = None
async def execute(self, input: I) -> O:
# auto: messages → bind tools → invoke → parse
...
# Users can override execute() for full control
Design constraints:
- flow/ never imports any integration — it only knows AgentActor
- Integrations don't wrap LangChain abstractions — model IS BaseChatModel
- Override execute() as escape hatch for full control
Serialization
Flow is data → serialization is natural:
flow.to_dict() → dict # JSON-compatible
flow.to_yaml() → str
flow.to_json() → str
Flow.from_dict(d, registry) → Flow
Flow.from_yaml(s, registry) → Flow
registry: dict[str, type[AgentActor]] — agent classes stored as string names in serialized form, resolved via registry on deserialization.
Visualization
Flow ADT → mermaid diagram:
File Structure
everything_is_an_actor/
flow/
__init__.py # public API re-exports
flow.py # Flow ADT definition (all variant dataclasses)
combinators.py # agent(), pure(), race(), loop(), loop_with_state()
interpreter.py # actor interpreter (recursive interpret over ADT)
serialize.py # to_dict / from_dict / to_yaml / from_yaml
visualize.py # mermaid generation
integrations/
__init__.py
langchain/
__init__.py
agent.py # LangChainAgent base class
openai/
__init__.py
agent.py # OpenAIAgent base class
anthropic/
__init__.py
agent.py # AnthropicAgent base class
Usage Example
from everything_is_an_actor.flow import agent, pure, race, loop, Flow, Continue, Done
from everything_is_an_actor.integrations.langchain import LangChainAgent
from everything_is_an_actor.agents import AgentSystem
from langchain_openai import ChatOpenAI
# Define agents
class Researcher(LangChainAgent[str, str]):
model = ChatOpenAI(model="gpt-4o")
system_prompt = "Search and summarize."
class Analyst(LangChainAgent[str, str]):
model = ChatOpenAI(model="gpt-4o")
system_prompt = "Analyze documents."
class Writer(LangChainAgent[str, str]):
model = ChatOpenAI(model="gpt-4o-mini")
system_prompt = "Write a report."
class Critic(LangChainAgent[str, Continue[str] | Done[str]]):
model = ChatOpenAI(model="gpt-4o")
system_prompt = "Accept or request revision."
class Fallback(LangChainAgent[str, str]):
model = ChatOpenAI(model="gpt-4o-mini")
system_prompt = "Best-effort answer."
# Compose
pipeline = (
agent(Researcher)
.zip(agent(Analyst))
.map(lambda pair: f"Web:\n{pair[0]}\n\nDocs:\n{pair[1]}")
.flat_map(loop(
agent(Writer).flat_map(agent(Critic)),
max_iter=3,
))
.recover_with(agent(Fallback))
.and_then(lambda r: print(f"[done] {len(r)} chars"))
)
# Run
async def main():
system = AgentSystem()
result = await system.run(pipeline, input="What is quantum computing?")
print(result)
# Or stream
async for event in system.run_stream(pipeline, input="Compare RLHF vs DPO"):
print(event)
# Inspect
print(pipeline.visualize())
print(pipeline.type_signature)
Non-Goals
- Not replacing LangChain (use its LLM/Tool/Callback layer)
- Not reimplementing MOA (MOA is a separate pattern, already exists)
- Not providing LangGraph migration path
- Not supporting non-actor backends (Flow interprets to actors only)