⚡ 短期记忆深度

LG6 章你学会了用 Checkpointer 实现对话记忆和人在回路。但那只是入门——本章深挖 Checkpointer 的四个核心方法、thread_id 的精确语义、时间旅行调试、状态分支(fork)、消息裁剪策略,让你真正掌握 Agent 的"短期记忆工程"。

本章目标

  • 精读 BaseCheckpointSaver 的四个核心方法及调用时机
  • 理解 CheckpointTuple 数据结构与版本追踪机制
  • 掌握 thread_id 的两种用法(单次工作流 vs 对话记忆)
  • 会用时间旅行调试(get_state_history)回溯任意历史状态
  • 掌握消息裁剪的多种策略及取舍

短期记忆的本质

回顾 F2 章:LLM 无状态,多轮对话靠"每次重发历史"。但历史太长会撑爆 context。短期记忆 = 在 context window 内、单次会话内的对话状态管理

维度短期记忆(Checkpoint)长期记忆(Store,AD6)
范围单个会话/thread 内跨会话/跨 thread
绑定thread_idnamespace
内容对话历史 + 中间状态 + interrupt用户画像、偏好、知识
检索按 thread_id 直接取按 namespace + 语义搜索
典型问题历史太长→裁剪/压缩记什么、何时记、如何召回
📚 本章只讲短期记忆

本章深挖 Checkpoint(短期)。跨会话的长期记忆(Store)在 AD6。两者协作构成完整记忆系统。

BaseCheckpointSaver 四方法精读

LG6 只提了方法名,现在精读每个方法的职责和调用时机

📄 langgraph/checkpoint/base/__init__.py:176 · 四个核心方法 python
class BaseCheckpointSaver(Generic[V]):
    """checkpointer 基类。让 agent 在多次交互间持久化状态。

    thread_id 是存取 checkpoint 的主键。没有它,
    checkpointer 无法保存状态、从中断恢复、或启用时间旅行。
    """

    serde: SerializerProtocol = JsonPlusSerializer()  # 序列化器

    # ====== 四个核心方法 ======

    def get_tuple(self, config) -> CheckpointTuple | None:
        """① 取【最新】的 checkpoint(含元数据、父指针)。
        调用时机:每次 invoke/stream 开始时,加载上次状态。"""

    def put(self, config, checkpoint, metadata, new_versions) -> RunnableConfig:
        """② 存一个完整的 checkpoint(状态快照)。
        调用时机:每个 super-step 结束(LG5 的 Update 阶段)。
        返回新 checkpoint 的 config(含 checkpoint_id)。"""

    def put_writes(self, config, writes, task_id) -> None:
        """③ 存【单个任务的写入】(增量,未提交的状态)。
        调用时机:任务执行中实时写入,支持中断恢复(部分完成可续)。
        这是实现"断点续跑"的关键——不必等整个 step 完成。"""

    def list(self, config, *, filter=None, before=None, limit=None):
        """④ 列出某 thread 的【历史 checkpoint 序列】。
        调用时机:时间旅行调试、回溯分析。
        支持 filter(按元数据过滤)、before(某点之前)、limit(数量)。"""

    # get 是 get_tuple 的便捷封装(只取 .checkpoint)
    def get(self, config) -> Checkpoint | None:
        if value := self.get_tuple(config):
            return value.checkpoint
四方法在一次 invoke 中的调用时序 invoke 开始 ① get_tuple super-step 执行 ③ put_writes(任务写入) step 结束 ② put(存快照) 调试时 ④ list(历史) 关键:put_writes 支持中断恢复 任务执行一半崩溃 → 下次 get_tuple 能读到已写入的部分 → 从断点续跑(而非重头)
图 AD5.1 · 四方法的调用时序:get→put_writes→put,list 用于调试

CheckpointTuple 数据结构

每个 checkpoint 不只是状态,还含版本追踪和父子关系

📄 checkpoint/base/__init__.py:139 · CheckpointTuple python
class CheckpointTuple(NamedTuple):
    config: RunnableConfig          # 本 checkpoint 的 config(含 checkpoint_id)
    checkpoint: Checkpoint          # 状态快照本体
    metadata: CheckpointMetadata     # 元数据(step/source/writes/parents)
    parent_config: RunnableConfig | None   # ★ 父 checkpoint 指针(构成历史链)
    pending_writes: list             # 待写入(未完成的任务)

class Checkpoint(TypedDict):
    v: int                           # 版本号
    id: str                          # checkpoint 唯一 ID
    ts: str                          # 时间戳
    channel_values: dict             # ★ 各 channel 的当前值(就是状态)
    channel_versions: dict           # 各 channel 的版本(追踪变化)
    versions_seen: dict              # 各节点已看到的版本(决定是否要执行)
    pending_sends: list              # 待处理的 Send

class CheckpointMetadata(TypedDict):
    source: Literal["input", "loop", "update"]   # 来源:输入/循环/手动更新
    step: int                          # 第几步
    writes: dict                       # 本步的写入
    parents: list                      # 父 checkpoint IDs(分支用)
🧭 parent_config 构成历史链

每个 checkpoint 通过 parent_config 指向上一个——形成链表式的历史。这就是时间旅行调试的基础:list() 沿着 parent 链回溯。metadata.parents(列表)甚至支持多个父,实现状态分支(fork)——一个 checkpoint 可以有多个子,形成树。

thread_id 的两种语义

源码 docstring(base/__init__.py:194)明确两种用法:

用法thread_id 选择效果场景
单次工作流每次用新 uuid独立运行,互不影响批量任务、一次性 pipeline
对话记忆同一对话复用同一 id状态累积(历史叠加)聊天、多轮交互 Agent

时间旅行调试

这是 Checkpointer 最酷的能力——回溯到任意历史状态,从那里重新执行

📄 ad5_time_travel.py · 时间旅行 python
# pip install langgraph
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

class State(TypedDict):
    count: int

def step(state):
    return {"count": state["count"] + 1}

g = StateGraph(State)
g.add_node("step", step)
g.add_edge(START, "step")
g.add_edge("step", "step")  # 自循环(演示用,实际要加退出条件)
app = g.compile(checkpointer=InMemorySaver())

config = {"configurable": {"thread_id": "t1"}}

# 跑几步
for _ in range(3):
    app.invoke({"count": 0} if _ == 0 else None, config)

# ★ 时间旅行:列出所有历史状态
print("=== 历史状态 ===")
states = list(app.get_state_history(config))
for s in reversed(states):
    print(f"step {s.metadata.get('step')}: count={s.values.get('count')}")

# ★ 回溯到第 1 步,从那里【分叉】重新跑(不影响原历史)
print("\n=== 从 step 1 分叉重跑 ===")
step1_config = states[-2].config   # 第1步的 config
result = app.invoke(None, step1_config)   # 从该 checkpoint 继续
print("分叉后的 count:", result["count"])
# 原历史还在,这是【新分支】——状态树!
状态分支树(时间旅行产生分叉) step0 step1 step2 step3 原始执行链 step1' step2' ★ 从 step1 分叉 同一 thread 下,分叉产生独立分支,原历史不受影响
图 AD5.2 · 时间旅行分叉:从历史 checkpoint 创建新分支(What-if 分析)
💡 时间旅行的实用价值

不只是炫技:① 调试——Agent 行为异常时回溯每步状态找问题;② What-if——"如果第3步用不同参数会怎样",从那步分叉重跑;③ 撤销——回到满意的状态丢弃后续(OpenCode 的 session revert 就是这个,OC5)。这是普通 while 循环(LC8 AgentExecutor)做不到的——它没有持久化历史。

消息裁剪策略对比

F2 章强调 context 有限。长对话必须裁剪。LangGraph + LangChain 提供多种策略:

策略原理源码/方法优缺点
保留最近 N 条滑窗,丢弃最旧LG2 的 RemoveMessage✓简单 ✗丢失早期关键信息
按 token 数截断累计 token 超阈值才删trim_messages(token_count)✓精确控 token
摘要压缩旧消息总结成一段OpenCode compaction✓保留信息 ✗摘要损失
保留首尾留 system + 最近 + 删中间trim_messages(strategy)✓保人设+近期
📄 ad5_pruning.py · 三种裁剪策略 python
from langchain_core.messages import trim_messages
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage

messages = [
    SystemMessage(content="你是助手"),
    HumanMessage(content="问题1"), AIMessage(content="答1"),
    HumanMessage(content="问题2"), AIMessage(content="答2"),
    HumanMessage(content="问题3"), AIMessage(content="答3"),
]

# ① 按 token 数裁剪(精确控量)
trimmed = trim_messages(
    messages,
    max_tokens=100,
    strategy="last",         # 保留最近的
    token_counter=len,       # 简化:用字符数当 token
    include_system=True,     # 始终保留 system
    start_on="human",        # 从 human 消息开始(不截断半句对话)
)

# ② 保留首尾(system + 最近 N 条)
trimmed2 = trim_messages(
    messages,
    max_tokens=100,
    strategy="last",
    include_system=True,
    allow_partial=False,     # 不截断单条消息
)

# ③ 用 LangGraph 的 pre_model_hook 在每次调 LLM 前自动裁剪
def auto_trim(state):
    return {"messages": trim_messages(
        state["messages"], max_tokens=4000, strategy="last",
        include_system=True, token_counter=model,
    )}

agent = create_react_agent(model, tools, pre_model_hook=auto_trim)
# 每次 LLM 调用前自动裁剪,无需手动管

实战:会话分支(fork)

生产场景:用户想"从第3轮对话分叉,尝试不同方向"。用 checkpoint 的 fork 能力:

📄 ad5_fork.py · 会话分叉 python
# 假设用户在对话中途想"重来",从某轮分叉
config_original = {"configurable": {"thread_id": "chat-1"}}

# 跑了5轮后,用户说"从第3轮重新开始,换个方向"
history = list(app.get_state_history(config_original))
round3_state = history[-3]   # 第3轮的状态(倒数第3个)

# 方法①:用原 thread 从 round3 继续(会覆盖后续)
app.update_state(round3_state.config, {"messages": [HumanMessage("新方向")]})
app.invoke(None, round3_state.config)

# 方法②:创建新 thread,复制 round3 状态(保留原历史)
config_fork = {"configurable": {"thread_id": "chat-1-fork"}}
# 把 round3 的状态写入新 thread,从此分叉独立发展
# 原 chat-1 的5轮历史完整保留,chat-1-fork 从第3轮另起

进阶:自定义 Checkpointer

实现自己的持久化后端(如 Redis、MongoDB),继承 BaseCheckpointSaver 实现四方法:

📄 自定义 Redis Checkpointer(骨架) python
from langgraph.checkpoint.base import BaseCheckpointSaver, CheckpointTuple

class RedisCheckpointer(BaseCheckpointSaver):
    def __init__(self, redis_client):
        super().__init__()
        self.redis = redis_client

    def get_tuple(self, config):
        thread_id = config["configurable"]["thread_id"]
        data = self.redis.hgetall(f"checkpoint:{thread_id}:latest")
        if not data:
            return None
        # 反序列化返回 CheckpointTuple
        return self.serde.loads_typed(data)

    def put(self, config, checkpoint, metadata, new_versions):
        thread_id = config["configurable"]["thread_id"]
        # 序列化存储,同时记 parent 链
        key = f"checkpoint:{thread_id}:{checkpoint['id']}"
        self.redis.hset(key, mapping=self.serde.dumps_typed((checkpoint, metadata)))
        self.redis.set(f"checkpoint:{thread_id}:latest", checkpoint["id"])
        # ... put_writes 和 list 类似
⚠️ 生产用现成的,别自己写

除非有特殊需求,生产直接用 PostgresSaver(AD9 详讲)或 SqliteSaver。它们已经处理好序列化、并发、迁移。自己写容易踩坑(序列化兼容、并发写入、版本迁移)。理解四方法原理是为了调试和优化,不是必须自己实现。

与生产实践对照

Checkpoint 概念OpenCode 对应
thread_id(会话标识)sessionID(会话唯一 ID)
put 每步存快照每条 Message/Part 落库(事件溯源)
get_tuple 加载历史runLoop 读 MessageV2.filterCompacted
list 时间旅行session revert(OC5,回到某点)
parent_config 历史链消息的 parentID 字段
消息裁剪compaction 压缩(更高级,OC5)
会话 forksession fork(session.ts 的 ForkInput)
🧭 OpenCode 用"事件溯源"替代"快照"

关键差异:LangGraph 存每步的完整快照(put),OpenCode 存每条消息的增量事件(事件溯源)。快照法恢复快(直接读)、占空间;事件溯源省空间、可重放但需累积。两种都是合理的工程选择,各有取舍。

小结

下一章

下一章 AD6 · 长期记忆 Store ★——短期记忆只在单会话内。怎么让 Agent 跨会话记住用户?精读 LangGraph 的 BaseStore + 语义搜索,对比 OpenCode 的文件式记忆。