Flow DSL Spec
Flow ADT 的三种等价表示。三者覆盖同一组可序列化 variant,无损互转。
YAML(人类编写) ←→ Flow ADT ←→ JSON(机器传输)
↕
Python(代码组合)
三列对比
| 组合子 |
YAML |
JSON |
Python |
| 单 agent |
agent: A |
Agent(cls) |
agent(A) |
| agent+参数 |
agent: {name: A, timeout: 30} |
Agent(cls, timeout) |
agent(A, timeout=30) |
| 顺序 |
steps: [A, B, C] |
FlatMap(first, next) |
a.flat_map(b) |
| 分发并行 |
each: [A, B] |
Zip(left, right) |
a.zip(b) |
| 广播并行 |
all: [A, B] |
Broadcast(flows) |
broadcast(a, b) |
| 竞争 |
race: [A, B, C] |
Race(flows) |
race(a, b, c) |
| 法定人数 |
at_least: {n: 2, flows: [A,B,C]} |
AtLeast(n, flows) |
at_least(2, a, b, c) |
| 类型路由 |
branch: {source: X, mapping: {...}} |
Branch(source, mapping) |
x.branch({T: a}) |
| 兜底 |
fallback_to: {source: A, fallback: B} |
FallbackTo(source, fallback) |
a.fallback_to(b) |
| 异常恢复 |
recover_with: {source: A, handler: B} |
RecoverWith(source, handler) |
a.recover_with(b) |
| 循环 |
loop: {body: A, max_iter: 10} |
Loop(body, max_iter) |
loop(a, max_iter=10) |
| 旁路通知 |
notify: {source: A, side: L} |
Notify(source, side) |
a.notify(l) |
| 同步副作用 |
tap: {source: A, side: T} |
Tap(source, side) |
a.tap(t) |
| 守卫 |
guard: {source: A, check: C} |
Guard(source, check) |
a.guard(c) |
| 子flow |
flow: Name |
展开为 ADT |
变量引用 |
数据传递
| 组合子 |
类型签名 |
输入 |
输出 |
| flat_map |
Flow[I,M] → Flow[M,O] = Flow[I,O] |
M 链式传递 |
O |
| zip |
Flow[I,O] × Flow[I2,O2] = Flow[(I,I2), (O,O2)] |
拆 tuple |
tuple |
| broadcast |
Flow[I,O] × N = Flow[I, (O,...)] |
同一个 I 广播 |
tuple |
| race |
Flow[I,O] × N = Flow[I, O] |
同一个 I 广播 |
最快的 O |
| at_least |
Flow[I,O] × N = Flow[I, QuorumResult[O]] |
同一个 I 广播 |
QuorumResult |
| branch |
Flow[I, A\|B] → {A: Flow[A,O], B: Flow[B,O]} |
isinstance 路由 |
O |
| fallback_to |
Flow[I,O] × Flow[I,O] = Flow[I, O] |
同一个 I |
成功方的 O |
| recover_with |
Flow[I,O] × Flow[Exception,O] = Flow[I, O] |
Exception |
O |
| loop |
Flow[I, Continue[I]\|Done[O]] = Flow[I, O] |
Continue → 下轮 I |
Done 的 O |
| notify |
Flow[I,O] × Flow[O,Any] = Flow[I, O] |
O 给 side(后台) |
O 穿透 |
| tap |
Flow[I,O] × Flow[O,Any] = Flow[I, O] |
O 给 side(同步) |
O 穿透 |
| guard |
Flow[I,O] × Flow[O,bool] = Flow[I, O] |
O 给 check |
O 穿透(或异常) |
新增代码
| 文件 |
改动 |
flow/flow.py |
新增 _Notify, _Tap, _Guard;Flow 加 notify(), tap(), guard() |
flow/combinators.py |
新增 broadcast() |
flow/interpreter.py |
新增三个 case |
flow/serialize.py |
新增四个 to_dict/from_dict |
flow/yaml_parser.py |
新建 |
完整示例
flow: ResearchReport
steps:
- at_least:
n: 2
flows:
- agent: Google
- agent: Bing
- agent: DuckDuckGo
- all:
- agent: Researcher
- agent: Analyst
- guard:
source:
notify:
source:
fallback_to:
source: {agent: Writer, timeout: 60}
fallback: {agent: BackupWriter}
side: {agent: AuditLogger}
check: {agent: QualityChecker}
- loop:
body: {agent: Reviewer}
max_iter: 5
{"type":"FlatMap",
"first":{"type":"AtLeast","n":2,
"flows":[{"type":"Agent","cls":"Google"},
{"type":"Agent","cls":"Bing"},
{"type":"Agent","cls":"DuckDuckGo"}]},
"next":{"type":"FlatMap",
"first":{"type":"Broadcast",
"flows":[{"type":"Agent","cls":"Researcher"},
{"type":"Agent","cls":"Analyst"}]},
"next":{"type":"FlatMap",
"first":{"type":"Guard",
"source":{"type":"Notify",
"source":{"type":"FallbackTo",
"source":{"type":"Agent","cls":"Writer","timeout":60},
"fallback":{"type":"Agent","cls":"BackupWriter"}},
"side":{"type":"Agent","cls":"AuditLogger"}},
"check":{"type":"Agent","cls":"QualityChecker"}},
"next":{"type":"Loop",
"body":{"type":"Agent","cls":"Reviewer"},
"max_iter":5}}}}
pipeline = (
at_least(2, agent(Google), agent(Bing), agent(DuckDuckGo))
.flat_map(broadcast(agent(Researcher), agent(Analyst)))
.flat_map(
agent(Writer, timeout=60)
.fallback_to(agent(BackupWriter))
.notify(agent(AuditLogger))
.guard(agent(QualityChecker))
)
.flat_map(loop(agent(Reviewer), max_iter=5))
)
静态类型校验
YAML 上传后,不执行,纯静态检查类型链是否匹配。类型来自 Python agent 类的泛型参数。
校验规则
| 组合子 |
校验 |
| flat_map(A, B) |
A 的 O == B 的 I |
| zip(A, B) |
输入必须是 tuple,A 和 B 各取一份 |
| broadcast(A, B) |
A 和 B 的 I 类型相同 |
| race(A, B) |
A 和 B 的 I 相同,O 相同 |
| at_least(n, A, B) |
同 race |
| branch(source, {T: flow}) |
source 的 O 是 T 的超类型 |
| fallback_to(A, B) |
A 和 B 的 I 相同,O 相同 |
| recover_with(A, B) |
B 的 I == Exception |
| notify/tap(A, side) |
side 的 I == A 的 O |
| guard(A, check) |
check 的 I == A 的 O,check 的 O == bool |
类型提取
import typing
def get_io_types(agent_cls: type) -> tuple[type, type] | None:
"""从 AgentActor[I, O] 提取 I 和 O。"""
for base in getattr(agent_cls, '__orig_bases__', ()):
args = typing.get_args(base)
if len(args) >= 2:
return args[0], args[1]
return None
CLI 使用
$ python -m flow validate pipeline.yaml --registry myapp.agents
✓ Researcher [str → ResearchResult]
✓ Analyst [str → Analysis]
✓ Writer [ResearchResult → Report]
✗ Reviewer [str → bool]
Error: Writer outputs Report, but Reviewer expects str
1 error, 3 passed
跳过校验的情况
| 情况 |
行为 |
agent 没写泛型参数 AgentActor |
跳过,不报错 |
类型是 Any |
跳过,兼容所有 |
| registry 找不到 agent |
报错:未注册 |
文件
| 文件 |
内容 |
flow/validate.py |
类型校验逻辑 |
flow/__main__.py |
CLI 入口 python -m flow validate |
跨语言 & 跨系统
Flow DSL 是协议规范,不绑定 Python。任何语言实现三样东西即可对接:
- 解析器 — YAML/JSON → 本语言的 Flow ADT
- Interpreter — 遍历 ADT,按组合子语义执行
- Agent 接口 —
execute(input) → output
┌──────────────────────────────────────┐
│ Flow JSON/YAML(规范层) │
│ 组合子语义确定,语言无关 │
└──────────────┬───────────────────────┘
│
┌──────────┼──────────┐
▼ ▼ ▼
Python Go/Rust 任何语言
asyncio goroutine 本地运行时
│ │ │
▼ ▼ ▼
本地 Agent gRPC Agent A2A HTTP Agent
混合编排
同一个 pipeline 里的 agent 可以跨语言、跨进程、跨网络。Interpreter 按 cls 前缀路由:
{"type":"FlatMap",
"first": {"type":"Agent", "cls":"Researcher"},
"next": {"type":"Agent", "cls":"a2a://writer.example.com"}}
| cls 前缀 |
路由 |
| 无前缀 |
本地 spawn |
a2a:// |
A2A HTTP JSON-RPC |
grpc:// |
gRPC 调用 |
mcp:// |
MCP tool 调用 |
混合使用(YAML + Python)
YAML 定义可序列化骨架,Python 补充含 callable 的部分:
pipeline = Flow.from_yaml("pipeline.yaml", registry=agent_registry)
pipeline = pipeline.map(lambda r: r.summary).filter(lambda r: len(r) > 100)
result = await system.run_flow(pipeline, input)
新增代码总结
| 文件 |
改动 |
flow/flow.py |
新增 _Notify, _Tap, _Guard;Flow 加 notify(), tap(), guard() |
flow/combinators.py |
新增 broadcast() |
flow/interpreter.py |
新增三个 case |
flow/serialize.py |
新增四个 to_dict/from_dict |
flow/validate.py |
新建 — 静态类型校验 |
flow/yaml_parser.py |
新建 — YAML → Flow ADT 解析器 |
flow/__main__.py |
新建 — CLI 入口 |