📜 Functional API

StateGraph 是"声明式图",但有时你想要更自然的命令式写法——用普通函数、循环、if/else。Functional API@entrypoint / @task)让你用最熟悉的 Python 代码,也能享受图的全部能力(中断、并行、检查点)。

本章目标

  • 理解 @entrypoint@task 两个装饰器
  • 掌握 task 调用返回 future 的机制(实现并行)
  • 理解 previous 参数(跨调用的状态记忆)
  • 知道何时用 StateGraph vs Functional API

两种风格对比

同一个工作流,LangGraph 提供两种表达方式:

维度StateGraph(声明式)Functional API(命令式)
写法add_node / add_edge 构图普通函数 + 循环 + if
控制流边和条件边Python 原生 if/while/for
并行Send 扇出task 返回 future,并发收集
可视化✓ 可 draw_ascii✗ 较难可视化
适合复杂、需要可视化的图命令式逻辑、偶尔并行/中断
底层都编译成 Pregel(LG5)同左
📌 重要:底层完全一样

两种风格编译后都是 Pregel 图(LG5 详解)。Functional API 不是"简化版",而是"另一种表达"。它放弃了可视化,换来更自然的命令式写法。选择哪种,取决于你的工作流是否需要图形化展示

@task:可并行的工作单元

@task 把普通函数变成"图的任务"。它最特别的一点:调用时返回 future,不是直接结果——这是实现并行的关键。

📄 langgraph/func/__init__.py:132 · task 装饰器 python
def task(
    __func_or_none__,
    *,
    name=None,
    retry_policy=None,     # 失败重试策略
    cache_policy=None,     # 结果缓存
    timeout=None,          # 超时控制
):
    """用 @task 装饰器定义一个 LangGraph 任务。

    重要规则:
    - task 只能在 entrypoint 或 StateGraph 内部调用
    - 调用 task 函数会【返回一个 future】(不是直接结果!)
    - future 可以并发收集 → 实现并行
    - 启用 checkpointer 时,输入输出必须可序列化
    """

@entrypoint:工作流入口

@entrypoint 定义整个工作流。它装饰的函数是入口,可以注入特殊参数:

📄 langgraph/func/__init__.py:262 · entrypoint 装饰器 python
class entrypoint:
    """用 @entrypoint 定义一个 LangGraph 工作流。

    被装饰函数必须接收【单个参数】(输入)。多参数用 dict 传。

    可注入的特殊参数(运行时自动注入):
    - config   : RunnableConfig,运行时配置
    - previous : 同一 thread 上次调用的返回值(需 checkpointer)—— 跨调用记忆!
    - runtime  : Runtime 对象(含 context/store/writer)

    状态管理:
    - previous 让你能访问"上次运行的结果"
    - 若想让保存的值≠返回值,用 entrypoint.final
    """

可运行代码

🚀 命令式 + 并行

pip install langgraph。下面例子展示 task 的 future 并行机制。

📄 lg4_functional.py · task 并行 + entrypoint python
# pip install langgraph
import time
from langgraph.func import task, entrypoint

# ============ ① @task 返回 future,实现并行 ============
@task
def slow_square(x: int) -> int:
    """一个慢任务(假装调 LLM)。"""
    time.sleep(1)   # 假装耗时
    return x * x

@entrypoint()
def parallel_squares(nums: list[int]) -> list[int]:
    """工作流:并行计算多个平方。"""
    # 关键:调用 task 返回的是 future,不会立即执行!
    futures = [slow_square(n) for n in nums]
    # 此时 3 个 task 已经【并发启动】,不是串行!

    # 收集 future 的结果(.result() 阻塞等待)
    results = [f.result() for f in futures]
    return results

start = time.time()
print(parallel_squares.invoke([1, 2, 3]))  # → [1, 4, 9]
print(f"耗时: {time.time()-start:.1f}s")    # → ~1s(并行),而非 3s(串行)


# ============ ② previous:跨调用的记忆 ============
from langgraph.checkpoint.memory import InMemorySaver

@entrypoint(checkpointer=InMemorySaver())
def counter(data, previous) -> int:
    """每次调用,在上次结果基础上累加。"""
    # previous = 上次同一 thread 的返回值(首次为 None)
    prev = previous or 0
    return prev + data["add"]

config = {"configurable": {"thread_id": "t1"}}
print(counter.invoke({"add": 5}, config))   # → 5   (previous=None)
print(counter.invoke({"add": 3}, config))   # → 8   (previous=5)
print(counter.invoke({"add": 10}, config))  # → 18  (previous=8)
# previous 实现了"跨调用记忆",类似消息历史的效果


# ============ ③ 命令式的复杂控制流 ============
@task
def retrieve(query: str) -> str:
    return f"关于{query}的文档"

@task
def generate(query: str, context: str) -> str:
    return f"基于[{context}]回答{query}"

@entrypoint()
def rag(query: str) -> str:
    """命令式 RAG:比 StateGraph 更自然的写法。"""
    ctx = retrieve(query).result()   # 串行等待
    answer = generate(query, ctx).result()
    # 如果想判断"够不够"再检索,直接写 if:
    # if len(ctx) < 10: ctx = retrieve(query + " 更多").result()
    return answer

print(rag.invoke("什么是Agent"))  # → 基于文档回答
调用 task() 返回 future task 1 (并发) task 2 (并发) task 3 (并发) .result() 收集结果
图 LG4.1 · task 返回 future,多个 task 并发启动,.result() 统一收集
⚠️ future 是关键

注意 slow_square(n) 调用不立即返回结果,而是返回 future。只有调用 .result() 时才阻塞等待。这就是为什么 3 个 sleep(1) 的任务总共只花 ~1 秒——它们并发执行了。如果写成 slow_square(n).result() 一行调用,就退化成串行。

entrypoint.final:保存值 ≠ 返回值

有时你想保存到 checkpoint 的值返回给调用者的值不同(比如返回精简结果,但保存完整中间状态):

📄 entrypoint.final 分离保存与返回 python
@entrypoint(checkpointer=InMemorySaver())
def workflow(data, previous):
    # previous 拿到的是【保存的】完整状态
    state = previous or {"count": 0, "history": []}
    state["count"] += 1
    state["history"].append(data)

    # 返回精简信息给调用者,但保存完整 state
    return entrypoint.final(
        value=f"已处理 {state['count']} 次",   # ← 返回给调用者
        save=state,                            # ← 保存到 checkpoint(下次 previous 拿这个)
    )

何时用哪种

用 StateGraph • 需要可视化(draw_ascii/mermaid) • 复杂的多节点协作 • 团队协作、需要文档化 • 需要中断点在特定节点 • Map-reduce(用 Send) 用 Functional API • 命令式逻辑(if/while 自然) • 偶尔并行(task future) • 简单工作流,不想构图 • 从普通函数快速迁移 • 跨调用记忆(previous)
图 LG4.2 · 两种风格的选择指南
💡 实用建议

生产 Agent 推荐 StateGraph(可视化、可控、可中断在节点级)。Functional API 适合快速原型数据处理 pipeline(命令式更顺手)。两者可以混用——task 可以在 StateGraph 节点里调用。

与生产实践对照

LangGraph 概念OpenCode 对应
命令式循环(while/for)runLoop 的 while(true) 主循环(最命令式!)
task future 并行Effect Stream 的并发处理(工具结果并行)
previous 跨调用记忆Session 持久化(按 sessionID 存全部消息)
retry_policy 失败重试session/retry.ts 的重试逻辑
timeout 超时控制AbortController + 请求超时
🔍 OpenCode 其实更像 Functional API

OpenCode 的 runLoop 是纯命令式的 while(true) 循环(OC2 会精读),更接近 Functional API 的风格,而非声明式 StateGraph。这是 TypeScript 生态的常见选择——显式循环比构图更直观。学完 OC2 你会深有体会。

小结

下一章 · 引擎概念 ★

前三章你学会了"怎么用"StateGraph 和 Functional API。下一章 LG5 · Pregel 引擎概念——核心章!我们钻进底层,看 StateGraph 编译后到底变成了什么。用图解讲清 Pregel 的 BSP 三阶段模型和 Channel 通道机制(概念入门级)。