💾 检查点与人在回路

这是 LangGraph 相比 AgentExecutor(LC8)最大的优势之一:持久化 + 中断 + 恢复。本章学习 Checkpointer 如何保存状态,以及 interrupt / Command 如何实现"人在回路"——让 Agent 在关键操作前暂停,等人工批准后再继续。

本章目标

  • 理解 Checkpointer 的作用(按 thread_id 持久化状态)
  • 会用 InMemorySaver 让图拥有记忆
  • 掌握 interrupt()(节点内中断)+ Command(resume=)(恢复)
  • 理解两种人在回路方式:声明式中断(节点前后)vs 命令式中断(节点内)
  • 能实现"工具执行前需人工批准"的 Agent

为什么需要检查点

回忆 LC8 的 AgentExecutor:执行完就结束,状态丢失,无法中断恢复。但真实场景需要:

Checkpointer 就是解决这些的——按 thread_id 持久化每个 super-step 的状态快照(LG5 的 Update 阶段存盘)。

BaseCheckpointSaver

📄 langgraph/checkpoint/base/__init__.py:176 · BaseCheckpointSaver python
class BaseCheckpointSaver(Generic[V]):
    """创建图 checkpointer 的基类。

    Checkpointer 让 LangGraph agent 在多次交互间持久化状态。

    配置 checkpointer 后,调用图时要在 config 里传 thread_id:
        config = {"configurable": {"thread_id": "my-thread"}}
        graph.invoke(inputs, config)

    thread_id 是存取 checkpoint 的主键。没有它,
    checkpointer 无法保存状态、从中断恢复、或启用时间旅行调试。

    thread_id 怎么选:
    - 单次工作流:每次用唯一 ID(如 uuid4)
    - 对话记忆:同一对话复用同一 thread_id 累积状态
    """

    # 核心方法:
    def get_tuple(self, config): ...   # 取最新的 checkpoint
    def put(self, config, checkpoint, metadata, new_versions): ...  # 存 checkpoint
    def put_writes(self, config, writes, task_id): ...  # 存待写入
    def list(self, config): ...        # 列出历史 checkpoint(时间旅行用)
    # 异步版:aget_tuple / aput / aput_writes / alist
invoke + thread_id super-step Plan Execute Update + 存 checkpoint Checkpointer 按 thread_id 存 step1 快照 step2 快照 step3 快照
图 LG6.1 · 每个 super-step 结束,状态快照按 thread_id 存入 Checkpointer

三种内置 Checkpointer

Checkpointer源码用途
InMemorySavercheckpoint/memory/内存,学习/测试用
SqliteSaverlibs/checkpoint-sqlite/SQLite 文件,单机持久化
PostgresSaverlibs/checkpoint-postgres/PostgreSQL,生产环境

对话记忆:复用 thread_id

📄 lg6_memory.py · 用 thread_id 实现对话记忆 python
# pip install langgraph langchain-openai
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END, MessagesState
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

model = ChatOpenAI(model="gpt-4o-mini")

def chatbot(state):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

graph = StateGraph(MessagesState)
graph.add_node("chatbot", chatbot)
graph.add_edge(START, "chatbot"); graph.add_edge("chatbot", END)

# 关键:编译时传入 checkpointer
app = graph.compile(checkpointer=InMemorySaver())

# 同一对话复用同一 thread_id → 状态累积(对话记忆!)
config = {"configurable": {"thread_id": "conversation-1"}}

# 第一轮
app.invoke({"messages": [HumanMessage("我叫小明")]}, config)

# 第二轮(同一 thread_id,记得上一轮)
result = app.invoke({"messages": [HumanMessage("我叫什么?")]}, config)
print(result["messages"][-1].content)  # → "你叫小明"
# 因为 checkpointer 把第一轮的消息历史存了,第二轮自动加载
💡 thread_id 是"会话标识"

thread_id 就是 OpenCode 里 sessionID 的等价物——标识一个独立对话。同一 thread_id 的多次 invoke 共享状态历史;不同 thread_id 互相隔离。这就是 Agent"记忆"的实现方式。

interrupt + Command:人在回路

这是 Checkpointer 最强大的应用。interrupt() 在节点内暂停图执行,Command(resume=) 提供人工输入后恢复。

📄 langgraph/types.py:811 · interrupt 函数 python
def interrupt(value: Any) -> Any:
    """在节点内用可恢复的异常中断图。

    实现【人在回路】:暂停执行,把 value 暴露给客户端。
    value 可以传达上下文,或请求恢复所需的信息。

    在一个节点内,首次调用此函数会抛出 GraphInterrupt 异常,
    暂停执行。value 随异常发送给执行图的客户端。

    客户端恢复时,必须用 Command 指定一个值来继续执行。
    图会从【节点开头】重新执行所有逻辑(重要!)。

    如果一个节点有多个 interrupt,LangGraph 按顺序匹配恢复值。

    ⚠️ 使用 interrupt 必须启用 checkpointer(依赖状态持久化)。
    """
📄 langgraph/types.py:758 · Command(恢复用) python
@dataclass
class Command:
    """更新图状态并发送消息的指令。

    Args:
        update: 要应用到状态的更新
        resume: 恢复执行用的值(配合 interrupt 使用)
            - 可以是 {interrupt_id: resume_value} 映射
            - 或单个值(恢复下一个 interrupt)
        goto: 下一步导航到的节点名
        graph: 发送到哪个图(Command.PARENT 发往父图)
    """

人在回路完整示例

收集信息 节点 interrupt 暂停!等人工 人工审核信息 → Command(resume=批准) 或拒绝 → Command(resume=拒绝) 执行 (如已批准)
图 LG6.2 · interrupt 暂停 → 人工决策 → Command 恢复
📄 lg6_hitl.py · 人在回路完整流程 python
# pip install langgraph
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command

class State(TypedDict):
    email_content: str
    approved: bool

def write_email(state) -> dict:
    """节点1:生成邮件草稿。"""
    return {"email_content": "尊敬的客户,您的订单已确认..."}

def human_review(state) -> Command:
    """节点2:暂停,等人工审核。"""
    # interrupt() 暂停执行,把邮件内容暴露给客户端
    # 第二次进入(恢复时),这里会【返回】人工的输入值
    user_decision = interrupt({
        "question": "这封邮件可以发送吗?",
        "email": state["email_content"],
    })
    # user_decision 是人工通过 Command(resume=...) 传进来的
    return Command(update={"approved": user_decision == "yes"})

def send_email(state) -> dict:
    if state["approved"]:
        return {"email_content": state["email_content"] + " [已发送]"}
    return {"email_content": "[发送被拒绝]"}

# 构建图
graph = StateGraph(State)
graph.add_node("write", write_email)
graph.add_node("review", human_review)
graph.add_node("send", send_email)
graph.add_edge(START, "write")
graph.add_edge("write", "review")
graph.add_edge("review", "send")
graph.add_edge("send", END)

app = graph.compile(checkpointer=InMemorySaver())  # interrupt 必须有 checkpointer!
config = {"configurable": {"thread_id": "email-1"}}

# 第一次调用:会在 review 节点 interrupt 暂停
result = app.invoke({"email_content": ""}, config)
print("暂停时的状态:", result)   # 注意:执行到 interrupt 就停了

# 查看中断信息
state = app.get_state(config)
print("中断值:", state.tasks[0].interrupts[0].value)  # → 邮件内容 + 问题

# 人工决策后,用 Command 恢复
result = app.invoke(Command(resume="yes"), config)   # 批准发送
print("最终:", result["email_content"])   # → ...[已发送]
⚠️ interrupt 的"重新执行"特性

恢复时,图会从节点开头重新执行,不是从中断处继续!所以 user_decision = interrupt(...) 这行在恢复时会直接返回 resume 值(不再抛异常)。这是 LangGraph 的设计:节点是幂等的。写节点时要注意这点——不要把有副作用的代码放在 interrupt 之前。

两种人在回路方式

方式用法适合
声明式中断
compile(interrupt_before=)
编译时指定"在哪些节点【之前】暂停" 简单场景:固定在危险节点前停
命令式中断
interrupt()
节点内调用,可携带上下文给人工 复杂场景:动态判断、需要人工输入数据
📄 声明式中断:compile 时指定 python
# 在 "send_email" 节点【之前】暂停
app = graph.compile(
    checkpointer=InMemorySaver(),
    interrupt_before=["send_email"],   # ← 声明式中断
)

# 第一次调用:执行到 send_email 前暂停
result = app.invoke(inputs, config)

# 人工检查 state 后,再次调用【同样的 invoke】即可继续
result = app.invoke(None, config)   # 传 None 表示"从断点继续"

时间旅行(进阶)

Checkpointer 存了每一步的快照,可以回溯到任意历史状态重新执行:

📄 时间旅行调试 python
# 列出所有历史 checkpoint
for state in app.get_state_history(config):
    print(f"step {state.metadata.get('step')}: {state.values}")

# 回溯到某个历史状态,从那里重新跑
app.invoke(None, history_state.config)   # 用历史 config 从该点继续
# 用于:调试、What-if 分析、撤销操作

与生产实践对照

LangGraph 概念OpenCode 对应
Checkpointer + thread_idSession DB(按 sessionID 持久化全部消息/Part)
InMemorySaver(内存)Session 内存缓存 + SQLite/Postgres 持久化
interrupt 暂停Permission.ask(危险工具前询问,OC4)
Command(resume=) 恢复用户在 TUI 点击"允许" → 工具继续执行
状态快照 per step每个 Part/Message 落库(事件溯源)
时间旅行 get_state_historysession revert(撤销到某点,OC5)
💡 OpenCode 的权限是 HITL 的生产版

LangGraph 的 interrupt 是通用机制;OpenCode 把它特化成权限系统(OC4):每个工具调用前,根据配置决定是 allow(直接执行)、ask(弹窗询问,等价 interrupt)还是 deny(拒绝)。这是 HITL 在真实产品里的落地形态。

小结

下一章

下一章 LG7 · 流式输出:Agent 执行可能很久,用户需要实时看到进度。学习 LangGraph 的 7 种 StreamMode,把 Agent 的思考过程实时呈现给用户。