⚡ 短期记忆深度
LG6 章你学会了用 Checkpointer 实现对话记忆和人在回路。但那只是入门——本章深挖 Checkpointer 的四个核心方法、thread_id 的精确语义、时间旅行调试、状态分支(fork)、消息裁剪策略,让你真正掌握 Agent 的"短期记忆工程"。
本章目标
- 精读
BaseCheckpointSaver的四个核心方法及调用时机 - 理解 CheckpointTuple 数据结构与版本追踪机制
- 掌握 thread_id 的两种用法(单次工作流 vs 对话记忆)
- 会用时间旅行调试(get_state_history)回溯任意历史状态
- 掌握消息裁剪的多种策略及取舍
短期记忆的本质
回顾 F2 章:LLM 无状态,多轮对话靠"每次重发历史"。但历史太长会撑爆 context。短期记忆 = 在 context window 内、单次会话内的对话状态管理:
| 维度 | 短期记忆(Checkpoint) | 长期记忆(Store,AD6) |
|---|---|---|
| 范围 | 单个会话/thread 内 | 跨会话/跨 thread |
| 绑定 | thread_id | namespace |
| 内容 | 对话历史 + 中间状态 + interrupt | 用户画像、偏好、知识 |
| 检索 | 按 thread_id 直接取 | 按 namespace + 语义搜索 |
| 典型问题 | 历史太长→裁剪/压缩 | 记什么、何时记、如何召回 |
本章深挖 Checkpoint(短期)。跨会话的长期记忆(Store)在 AD6。两者协作构成完整记忆系统。
BaseCheckpointSaver 四方法精读
LG6 只提了方法名,现在精读每个方法的职责和调用时机:
class BaseCheckpointSaver(Generic[V]):
"""checkpointer 基类。让 agent 在多次交互间持久化状态。
thread_id 是存取 checkpoint 的主键。没有它,
checkpointer 无法保存状态、从中断恢复、或启用时间旅行。
"""
serde: SerializerProtocol = JsonPlusSerializer() # 序列化器
# ====== 四个核心方法 ======
def get_tuple(self, config) -> CheckpointTuple | None:
"""① 取【最新】的 checkpoint(含元数据、父指针)。
调用时机:每次 invoke/stream 开始时,加载上次状态。"""
def put(self, config, checkpoint, metadata, new_versions) -> RunnableConfig:
"""② 存一个完整的 checkpoint(状态快照)。
调用时机:每个 super-step 结束(LG5 的 Update 阶段)。
返回新 checkpoint 的 config(含 checkpoint_id)。"""
def put_writes(self, config, writes, task_id) -> None:
"""③ 存【单个任务的写入】(增量,未提交的状态)。
调用时机:任务执行中实时写入,支持中断恢复(部分完成可续)。
这是实现"断点续跑"的关键——不必等整个 step 完成。"""
def list(self, config, *, filter=None, before=None, limit=None):
"""④ 列出某 thread 的【历史 checkpoint 序列】。
调用时机:时间旅行调试、回溯分析。
支持 filter(按元数据过滤)、before(某点之前)、limit(数量)。"""
# get 是 get_tuple 的便捷封装(只取 .checkpoint)
def get(self, config) -> Checkpoint | None:
if value := self.get_tuple(config):
return value.checkpoint
CheckpointTuple 数据结构
每个 checkpoint 不只是状态,还含版本追踪和父子关系:
class CheckpointTuple(NamedTuple):
config: RunnableConfig # 本 checkpoint 的 config(含 checkpoint_id)
checkpoint: Checkpoint # 状态快照本体
metadata: CheckpointMetadata # 元数据(step/source/writes/parents)
parent_config: RunnableConfig | None # ★ 父 checkpoint 指针(构成历史链)
pending_writes: list # 待写入(未完成的任务)
class Checkpoint(TypedDict):
v: int # 版本号
id: str # checkpoint 唯一 ID
ts: str # 时间戳
channel_values: dict # ★ 各 channel 的当前值(就是状态)
channel_versions: dict # 各 channel 的版本(追踪变化)
versions_seen: dict # 各节点已看到的版本(决定是否要执行)
pending_sends: list # 待处理的 Send
class CheckpointMetadata(TypedDict):
source: Literal["input", "loop", "update"] # 来源:输入/循环/手动更新
step: int # 第几步
writes: dict # 本步的写入
parents: list # 父 checkpoint IDs(分支用)
每个 checkpoint 通过 parent_config 指向上一个——形成链表式的历史。这就是时间旅行调试的基础:list() 沿着 parent 链回溯。metadata.parents(列表)甚至支持多个父,实现状态分支(fork)——一个 checkpoint 可以有多个子,形成树。
thread_id 的两种语义
源码 docstring(base/__init__.py:194)明确两种用法:
| 用法 | thread_id 选择 | 效果 | 场景 |
|---|---|---|---|
| 单次工作流 | 每次用新 uuid | 独立运行,互不影响 | 批量任务、一次性 pipeline |
| 对话记忆 | 同一对话复用同一 id | 状态累积(历史叠加) | 聊天、多轮交互 Agent |
时间旅行调试
这是 Checkpointer 最酷的能力——回溯到任意历史状态,从那里重新执行:
# pip install langgraph
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
class State(TypedDict):
count: int
def step(state):
return {"count": state["count"] + 1}
g = StateGraph(State)
g.add_node("step", step)
g.add_edge(START, "step")
g.add_edge("step", "step") # 自循环(演示用,实际要加退出条件)
app = g.compile(checkpointer=InMemorySaver())
config = {"configurable": {"thread_id": "t1"}}
# 跑几步
for _ in range(3):
app.invoke({"count": 0} if _ == 0 else None, config)
# ★ 时间旅行:列出所有历史状态
print("=== 历史状态 ===")
states = list(app.get_state_history(config))
for s in reversed(states):
print(f"step {s.metadata.get('step')}: count={s.values.get('count')}")
# ★ 回溯到第 1 步,从那里【分叉】重新跑(不影响原历史)
print("\n=== 从 step 1 分叉重跑 ===")
step1_config = states[-2].config # 第1步的 config
result = app.invoke(None, step1_config) # 从该 checkpoint 继续
print("分叉后的 count:", result["count"])
# 原历史还在,这是【新分支】——状态树!
不只是炫技:① 调试——Agent 行为异常时回溯每步状态找问题;② What-if——"如果第3步用不同参数会怎样",从那步分叉重跑;③ 撤销——回到满意的状态丢弃后续(OpenCode 的 session revert 就是这个,OC5)。这是普通 while 循环(LC8 AgentExecutor)做不到的——它没有持久化历史。
消息裁剪策略对比
F2 章强调 context 有限。长对话必须裁剪。LangGraph + LangChain 提供多种策略:
| 策略 | 原理 | 源码/方法 | 优缺点 |
|---|---|---|---|
| 保留最近 N 条 | 滑窗,丢弃最旧 | LG2 的 RemoveMessage | ✓简单 ✗丢失早期关键信息 |
| 按 token 数截断 | 累计 token 超阈值才删 | trim_messages(token_count) | ✓精确控 token |
| 摘要压缩 | 旧消息总结成一段 | OpenCode compaction | ✓保留信息 ✗摘要损失 |
| 保留首尾 | 留 system + 最近 + 删中间 | trim_messages(strategy) | ✓保人设+近期 |
from langchain_core.messages import trim_messages
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
messages = [
SystemMessage(content="你是助手"),
HumanMessage(content="问题1"), AIMessage(content="答1"),
HumanMessage(content="问题2"), AIMessage(content="答2"),
HumanMessage(content="问题3"), AIMessage(content="答3"),
]
# ① 按 token 数裁剪(精确控量)
trimmed = trim_messages(
messages,
max_tokens=100,
strategy="last", # 保留最近的
token_counter=len, # 简化:用字符数当 token
include_system=True, # 始终保留 system
start_on="human", # 从 human 消息开始(不截断半句对话)
)
# ② 保留首尾(system + 最近 N 条)
trimmed2 = trim_messages(
messages,
max_tokens=100,
strategy="last",
include_system=True,
allow_partial=False, # 不截断单条消息
)
# ③ 用 LangGraph 的 pre_model_hook 在每次调 LLM 前自动裁剪
def auto_trim(state):
return {"messages": trim_messages(
state["messages"], max_tokens=4000, strategy="last",
include_system=True, token_counter=model,
)}
agent = create_react_agent(model, tools, pre_model_hook=auto_trim)
# 每次 LLM 调用前自动裁剪,无需手动管
实战:会话分支(fork)
生产场景:用户想"从第3轮对话分叉,尝试不同方向"。用 checkpoint 的 fork 能力:
# 假设用户在对话中途想"重来",从某轮分叉
config_original = {"configurable": {"thread_id": "chat-1"}}
# 跑了5轮后,用户说"从第3轮重新开始,换个方向"
history = list(app.get_state_history(config_original))
round3_state = history[-3] # 第3轮的状态(倒数第3个)
# 方法①:用原 thread 从 round3 继续(会覆盖后续)
app.update_state(round3_state.config, {"messages": [HumanMessage("新方向")]})
app.invoke(None, round3_state.config)
# 方法②:创建新 thread,复制 round3 状态(保留原历史)
config_fork = {"configurable": {"thread_id": "chat-1-fork"}}
# 把 round3 的状态写入新 thread,从此分叉独立发展
# 原 chat-1 的5轮历史完整保留,chat-1-fork 从第3轮另起
进阶:自定义 Checkpointer
实现自己的持久化后端(如 Redis、MongoDB),继承 BaseCheckpointSaver 实现四方法:
from langgraph.checkpoint.base import BaseCheckpointSaver, CheckpointTuple
class RedisCheckpointer(BaseCheckpointSaver):
def __init__(self, redis_client):
super().__init__()
self.redis = redis_client
def get_tuple(self, config):
thread_id = config["configurable"]["thread_id"]
data = self.redis.hgetall(f"checkpoint:{thread_id}:latest")
if not data:
return None
# 反序列化返回 CheckpointTuple
return self.serde.loads_typed(data)
def put(self, config, checkpoint, metadata, new_versions):
thread_id = config["configurable"]["thread_id"]
# 序列化存储,同时记 parent 链
key = f"checkpoint:{thread_id}:{checkpoint['id']}"
self.redis.hset(key, mapping=self.serde.dumps_typed((checkpoint, metadata)))
self.redis.set(f"checkpoint:{thread_id}:latest", checkpoint["id"])
# ... put_writes 和 list 类似
除非有特殊需求,生产直接用 PostgresSaver(AD9 详讲)或 SqliteSaver。它们已经处理好序列化、并发、迁移。自己写容易踩坑(序列化兼容、并发写入、版本迁移)。理解四方法原理是为了调试和优化,不是必须自己实现。
与生产实践对照
| Checkpoint 概念 | OpenCode 对应 | |
|---|---|---|
| thread_id(会话标识) | → | sessionID(会话唯一 ID) |
| put 每步存快照 | → | 每条 Message/Part 落库(事件溯源) |
| get_tuple 加载历史 | → | runLoop 读 MessageV2.filterCompacted |
| list 时间旅行 | → | session revert(OC5,回到某点) |
| parent_config 历史链 | → | 消息的 parentID 字段 |
| 消息裁剪 | → | compaction 压缩(更高级,OC5) |
| 会话 fork | → | session fork(session.ts 的 ForkInput) |
关键差异:LangGraph 存每步的完整快照(put),OpenCode 存每条消息的增量事件(事件溯源)。快照法恢复快(直接读)、占空间;事件溯源省空间、可重放但需累积。两种都是合理的工程选择,各有取舍。
小结
- 短期记忆 = 单会话内的状态管理,靠 Checkpointer + thread_id。
- 四方法:
get_tuple(取最新)/put(存快照)/put_writes(增量写,支持中断恢复)/list(历史,时间旅行)。 - CheckpointTuple 含 parent_config,构成历史链;支持分叉成树。
- thread_id 两用法:uuid(单次工作流)/ 复用(对话记忆)。
- 时间旅行:回溯任意状态分叉重跑(调试/What-if/撤销)。
- 消息裁剪四策略:滑窗/token/摘要/首尾;生产用 PostgresSaver 别自己写。
下一章 AD6 · 长期记忆 Store ★——短期记忆只在单会话内。怎么让 Agent 跨会话记住用户?精读 LangGraph 的 BaseStore + 语义搜索,对比 OpenCode 的文件式记忆。