📜 Functional API
StateGraph 是"声明式图",但有时你想要更自然的命令式写法——用普通函数、循环、if/else。Functional API(@entrypoint / @task)让你用最熟悉的 Python 代码,也能享受图的全部能力(中断、并行、检查点)。
本章目标
- 理解
@entrypoint和@task两个装饰器 - 掌握 task 调用返回 future 的机制(实现并行)
- 理解
previous参数(跨调用的状态记忆) - 知道何时用 StateGraph vs Functional API
两种风格对比
同一个工作流,LangGraph 提供两种表达方式:
| 维度 | StateGraph(声明式) | Functional API(命令式) |
|---|---|---|
| 写法 | add_node / add_edge 构图 | 普通函数 + 循环 + if |
| 控制流 | 边和条件边 | Python 原生 if/while/for |
| 并行 | Send 扇出 | task 返回 future,并发收集 |
| 可视化 | ✓ 可 draw_ascii | ✗ 较难可视化 |
| 适合 | 复杂、需要可视化的图 | 命令式逻辑、偶尔并行/中断 |
| 底层 | 都编译成 Pregel(LG5) | 同左 |
两种风格编译后都是 Pregel 图(LG5 详解)。Functional API 不是"简化版",而是"另一种表达"。它放弃了可视化,换来更自然的命令式写法。选择哪种,取决于你的工作流是否需要图形化展示。
@task:可并行的工作单元
@task 把普通函数变成"图的任务"。它最特别的一点:调用时返回 future,不是直接结果——这是实现并行的关键。
def task(
__func_or_none__,
*,
name=None,
retry_policy=None, # 失败重试策略
cache_policy=None, # 结果缓存
timeout=None, # 超时控制
):
"""用 @task 装饰器定义一个 LangGraph 任务。
重要规则:
- task 只能在 entrypoint 或 StateGraph 内部调用
- 调用 task 函数会【返回一个 future】(不是直接结果!)
- future 可以并发收集 → 实现并行
- 启用 checkpointer 时,输入输出必须可序列化
"""
@entrypoint:工作流入口
@entrypoint 定义整个工作流。它装饰的函数是入口,可以注入特殊参数:
class entrypoint:
"""用 @entrypoint 定义一个 LangGraph 工作流。
被装饰函数必须接收【单个参数】(输入)。多参数用 dict 传。
可注入的特殊参数(运行时自动注入):
- config : RunnableConfig,运行时配置
- previous : 同一 thread 上次调用的返回值(需 checkpointer)—— 跨调用记忆!
- runtime : Runtime 对象(含 context/store/writer)
状态管理:
- previous 让你能访问"上次运行的结果"
- 若想让保存的值≠返回值,用 entrypoint.final
"""
可运行代码
pip install langgraph。下面例子展示 task 的 future 并行机制。
# pip install langgraph
import time
from langgraph.func import task, entrypoint
# ============ ① @task 返回 future,实现并行 ============
@task
def slow_square(x: int) -> int:
"""一个慢任务(假装调 LLM)。"""
time.sleep(1) # 假装耗时
return x * x
@entrypoint()
def parallel_squares(nums: list[int]) -> list[int]:
"""工作流:并行计算多个平方。"""
# 关键:调用 task 返回的是 future,不会立即执行!
futures = [slow_square(n) for n in nums]
# 此时 3 个 task 已经【并发启动】,不是串行!
# 收集 future 的结果(.result() 阻塞等待)
results = [f.result() for f in futures]
return results
start = time.time()
print(parallel_squares.invoke([1, 2, 3])) # → [1, 4, 9]
print(f"耗时: {time.time()-start:.1f}s") # → ~1s(并行),而非 3s(串行)
# ============ ② previous:跨调用的记忆 ============
from langgraph.checkpoint.memory import InMemorySaver
@entrypoint(checkpointer=InMemorySaver())
def counter(data, previous) -> int:
"""每次调用,在上次结果基础上累加。"""
# previous = 上次同一 thread 的返回值(首次为 None)
prev = previous or 0
return prev + data["add"]
config = {"configurable": {"thread_id": "t1"}}
print(counter.invoke({"add": 5}, config)) # → 5 (previous=None)
print(counter.invoke({"add": 3}, config)) # → 8 (previous=5)
print(counter.invoke({"add": 10}, config)) # → 18 (previous=8)
# previous 实现了"跨调用记忆",类似消息历史的效果
# ============ ③ 命令式的复杂控制流 ============
@task
def retrieve(query: str) -> str:
return f"关于{query}的文档"
@task
def generate(query: str, context: str) -> str:
return f"基于[{context}]回答{query}"
@entrypoint()
def rag(query: str) -> str:
"""命令式 RAG:比 StateGraph 更自然的写法。"""
ctx = retrieve(query).result() # 串行等待
answer = generate(query, ctx).result()
# 如果想判断"够不够"再检索,直接写 if:
# if len(ctx) < 10: ctx = retrieve(query + " 更多").result()
return answer
print(rag.invoke("什么是Agent")) # → 基于文档回答
注意 slow_square(n) 调用不立即返回结果,而是返回 future。只有调用 .result() 时才阻塞等待。这就是为什么 3 个 sleep(1) 的任务总共只花 ~1 秒——它们并发执行了。如果写成 slow_square(n).result() 一行调用,就退化成串行。
entrypoint.final:保存值 ≠ 返回值
有时你想保存到 checkpoint 的值和返回给调用者的值不同(比如返回精简结果,但保存完整中间状态):
@entrypoint(checkpointer=InMemorySaver())
def workflow(data, previous):
# previous 拿到的是【保存的】完整状态
state = previous or {"count": 0, "history": []}
state["count"] += 1
state["history"].append(data)
# 返回精简信息给调用者,但保存完整 state
return entrypoint.final(
value=f"已处理 {state['count']} 次", # ← 返回给调用者
save=state, # ← 保存到 checkpoint(下次 previous 拿这个)
)
何时用哪种
生产 Agent 推荐 StateGraph(可视化、可控、可中断在节点级)。Functional API 适合快速原型和数据处理 pipeline(命令式更顺手)。两者可以混用——task 可以在 StateGraph 节点里调用。
与生产实践对照
| LangGraph 概念 | OpenCode 对应 | |
|---|---|---|
| 命令式循环(while/for) | → | runLoop 的 while(true) 主循环(最命令式!) |
| task future 并行 | → | Effect Stream 的并发处理(工具结果并行) |
| previous 跨调用记忆 | → | Session 持久化(按 sessionID 存全部消息) |
| retry_policy 失败重试 | → | session/retry.ts 的重试逻辑 |
| timeout 超时控制 | → | AbortController + 请求超时 |
OpenCode 的 runLoop 是纯命令式的 while(true) 循环(OC2 会精读),更接近 Functional API 的风格,而非声明式 StateGraph。这是 TypeScript 生态的常见选择——显式循环比构图更直观。学完 OC2 你会深有体会。
小结
- Functional API(
@entrypoint/@task)是命令式写法,底层同样编译成 Pregel。 @task调用返回 future,多个 task 可并发执行,.result()收集。@entrypoint支持previous(跨调用记忆)、config、runtime注入。entrypoint.final(value=, save=)分离"返回值"和"保存值"。- 选择:需要可视化/复杂图用 StateGraph;命令式逻辑/快速原型用 Functional API。
前三章你学会了"怎么用"StateGraph 和 Functional API。下一章 LG5 · Pregel 引擎概念——核心章!我们钻进底层,看 StateGraph 编译后到底变成了什么。用图解讲清 Pregel 的 BSP 三阶段模型和 Channel 通道机制(概念入门级)。