💾 检查点与人在回路
这是 LangGraph 相比 AgentExecutor(LC8)最大的优势之一:持久化 + 中断 + 恢复。本章学习 Checkpointer 如何保存状态,以及 interrupt / Command 如何实现"人在回路"——让 Agent 在关键操作前暂停,等人工批准后再继续。
本章目标
- 理解 Checkpointer 的作用(按 thread_id 持久化状态)
- 会用
InMemorySaver让图拥有记忆 - 掌握
interrupt()(节点内中断)+Command(resume=)(恢复) - 理解两种人在回路方式:声明式中断(节点前后)vs 命令式中断(节点内)
- 能实现"工具执行前需人工批准"的 Agent
为什么需要检查点
回忆 LC8 的 AgentExecutor:执行完就结束,状态丢失,无法中断恢复。但真实场景需要:
- 对话记忆:用户每次发消息,Agent 要记得之前说过什么
- 中断恢复:执行到一半(如要花钱、删数据)暂停,等人确认
- 时间旅行:回溯到某个历史状态,从那里重新跑
- 容错:进程崩溃后能从最近的 checkpoint 续跑
Checkpointer 就是解决这些的——按 thread_id 持久化每个 super-step 的状态快照(LG5 的 Update 阶段存盘)。
BaseCheckpointSaver
class BaseCheckpointSaver(Generic[V]):
"""创建图 checkpointer 的基类。
Checkpointer 让 LangGraph agent 在多次交互间持久化状态。
配置 checkpointer 后,调用图时要在 config 里传 thread_id:
config = {"configurable": {"thread_id": "my-thread"}}
graph.invoke(inputs, config)
thread_id 是存取 checkpoint 的主键。没有它,
checkpointer 无法保存状态、从中断恢复、或启用时间旅行调试。
thread_id 怎么选:
- 单次工作流:每次用唯一 ID(如 uuid4)
- 对话记忆:同一对话复用同一 thread_id 累积状态
"""
# 核心方法:
def get_tuple(self, config): ... # 取最新的 checkpoint
def put(self, config, checkpoint, metadata, new_versions): ... # 存 checkpoint
def put_writes(self, config, writes, task_id): ... # 存待写入
def list(self, config): ... # 列出历史 checkpoint(时间旅行用)
# 异步版:aget_tuple / aput / aput_writes / alist
三种内置 Checkpointer
| Checkpointer | 源码 | 用途 |
|---|---|---|
InMemorySaver | checkpoint/memory/ | 内存,学习/测试用 |
SqliteSaver | libs/checkpoint-sqlite/ | SQLite 文件,单机持久化 |
PostgresSaver | libs/checkpoint-postgres/ | PostgreSQL,生产环境 |
对话记忆:复用 thread_id
# pip install langgraph langchain-openai
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END, MessagesState
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
model = ChatOpenAI(model="gpt-4o-mini")
def chatbot(state):
response = model.invoke(state["messages"])
return {"messages": [response]}
graph = StateGraph(MessagesState)
graph.add_node("chatbot", chatbot)
graph.add_edge(START, "chatbot"); graph.add_edge("chatbot", END)
# 关键:编译时传入 checkpointer
app = graph.compile(checkpointer=InMemorySaver())
# 同一对话复用同一 thread_id → 状态累积(对话记忆!)
config = {"configurable": {"thread_id": "conversation-1"}}
# 第一轮
app.invoke({"messages": [HumanMessage("我叫小明")]}, config)
# 第二轮(同一 thread_id,记得上一轮)
result = app.invoke({"messages": [HumanMessage("我叫什么?")]}, config)
print(result["messages"][-1].content) # → "你叫小明"
# 因为 checkpointer 把第一轮的消息历史存了,第二轮自动加载
thread_id 就是 OpenCode 里 sessionID 的等价物——标识一个独立对话。同一 thread_id 的多次 invoke 共享状态历史;不同 thread_id 互相隔离。这就是 Agent"记忆"的实现方式。
interrupt + Command:人在回路
这是 Checkpointer 最强大的应用。interrupt() 在节点内暂停图执行,Command(resume=) 提供人工输入后恢复。
def interrupt(value: Any) -> Any:
"""在节点内用可恢复的异常中断图。
实现【人在回路】:暂停执行,把 value 暴露给客户端。
value 可以传达上下文,或请求恢复所需的信息。
在一个节点内,首次调用此函数会抛出 GraphInterrupt 异常,
暂停执行。value 随异常发送给执行图的客户端。
客户端恢复时,必须用 Command 指定一个值来继续执行。
图会从【节点开头】重新执行所有逻辑(重要!)。
如果一个节点有多个 interrupt,LangGraph 按顺序匹配恢复值。
⚠️ 使用 interrupt 必须启用 checkpointer(依赖状态持久化)。
"""
@dataclass
class Command:
"""更新图状态并发送消息的指令。
Args:
update: 要应用到状态的更新
resume: 恢复执行用的值(配合 interrupt 使用)
- 可以是 {interrupt_id: resume_value} 映射
- 或单个值(恢复下一个 interrupt)
goto: 下一步导航到的节点名
graph: 发送到哪个图(Command.PARENT 发往父图)
"""
人在回路完整示例
# pip install langgraph
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
class State(TypedDict):
email_content: str
approved: bool
def write_email(state) -> dict:
"""节点1:生成邮件草稿。"""
return {"email_content": "尊敬的客户,您的订单已确认..."}
def human_review(state) -> Command:
"""节点2:暂停,等人工审核。"""
# interrupt() 暂停执行,把邮件内容暴露给客户端
# 第二次进入(恢复时),这里会【返回】人工的输入值
user_decision = interrupt({
"question": "这封邮件可以发送吗?",
"email": state["email_content"],
})
# user_decision 是人工通过 Command(resume=...) 传进来的
return Command(update={"approved": user_decision == "yes"})
def send_email(state) -> dict:
if state["approved"]:
return {"email_content": state["email_content"] + " [已发送]"}
return {"email_content": "[发送被拒绝]"}
# 构建图
graph = StateGraph(State)
graph.add_node("write", write_email)
graph.add_node("review", human_review)
graph.add_node("send", send_email)
graph.add_edge(START, "write")
graph.add_edge("write", "review")
graph.add_edge("review", "send")
graph.add_edge("send", END)
app = graph.compile(checkpointer=InMemorySaver()) # interrupt 必须有 checkpointer!
config = {"configurable": {"thread_id": "email-1"}}
# 第一次调用:会在 review 节点 interrupt 暂停
result = app.invoke({"email_content": ""}, config)
print("暂停时的状态:", result) # 注意:执行到 interrupt 就停了
# 查看中断信息
state = app.get_state(config)
print("中断值:", state.tasks[0].interrupts[0].value) # → 邮件内容 + 问题
# 人工决策后,用 Command 恢复
result = app.invoke(Command(resume="yes"), config) # 批准发送
print("最终:", result["email_content"]) # → ...[已发送]
恢复时,图会从节点开头重新执行,不是从中断处继续!所以 user_decision = interrupt(...) 这行在恢复时会直接返回 resume 值(不再抛异常)。这是 LangGraph 的设计:节点是幂等的。写节点时要注意这点——不要把有副作用的代码放在 interrupt 之前。
两种人在回路方式
| 方式 | 用法 | 适合 |
|---|---|---|
声明式中断compile(interrupt_before=) |
编译时指定"在哪些节点【之前】暂停" | 简单场景:固定在危险节点前停 |
命令式中断interrupt() |
节点内调用,可携带上下文给人工 | 复杂场景:动态判断、需要人工输入数据 |
# 在 "send_email" 节点【之前】暂停
app = graph.compile(
checkpointer=InMemorySaver(),
interrupt_before=["send_email"], # ← 声明式中断
)
# 第一次调用:执行到 send_email 前暂停
result = app.invoke(inputs, config)
# 人工检查 state 后,再次调用【同样的 invoke】即可继续
result = app.invoke(None, config) # 传 None 表示"从断点继续"
时间旅行(进阶)
Checkpointer 存了每一步的快照,可以回溯到任意历史状态重新执行:
# 列出所有历史 checkpoint
for state in app.get_state_history(config):
print(f"step {state.metadata.get('step')}: {state.values}")
# 回溯到某个历史状态,从那里重新跑
app.invoke(None, history_state.config) # 用历史 config 从该点继续
# 用于:调试、What-if 分析、撤销操作
与生产实践对照
| LangGraph 概念 | OpenCode 对应 | |
|---|---|---|
| Checkpointer + thread_id | → | Session DB(按 sessionID 持久化全部消息/Part) |
| InMemorySaver(内存) | → | Session 内存缓存 + SQLite/Postgres 持久化 |
| interrupt 暂停 | → | Permission.ask(危险工具前询问,OC4) |
| Command(resume=) 恢复 | → | 用户在 TUI 点击"允许" → 工具继续执行 |
| 状态快照 per step | → | 每个 Part/Message 落库(事件溯源) |
| 时间旅行 get_state_history | → | session revert(撤销到某点,OC5) |
LangGraph 的 interrupt 是通用机制;OpenCode 把它特化成权限系统(OC4):每个工具调用前,根据配置决定是 allow(直接执行)、ask(弹窗询问,等价 interrupt)还是 deny(拒绝)。这是 HITL 在真实产品里的落地形态。
小结
- Checkpointer 按
thread_id持久化每个 super-step 的状态快照,是实现记忆/中断/恢复/时间旅行的基础。 InMemorySaver学习用,SqliteSaver/PostgresSaver生产用。- 复用同一 thread_id = 对话记忆;
interrupt()+Command(resume=)= 人在回路。 - 两种 HITL:声明式
interrupt_before(简单)vs 命令式interrupt()(灵活,可携带上下文)。 - 恢复时节点从头重新执行,
interrupt()直接返回 resume 值。
下一章 LG7 · 流式输出:Agent 执行可能很久,用户需要实时看到进度。学习 LangGraph 的 7 种 StreamMode,把 Agent 的思考过程实时呈现给用户。