🔄 状态与 Reducer

LG1 章节点返回的是"部分更新",但没说清"多个节点写同一字段时怎么合并"。本章解开这个关键谜题:reducer——它决定了状态如何被聚合。理解 reducer,就理解了 messages 列表为什么能正确累积。

本章目标

  • 理解默认合并策略(覆盖)和它的陷阱
  • 掌握 Annotated[T, reducer] 语法为字段指定合并函数
  • 会用 add_messages reducer 正确累积消息列表
  • 初步建立"通道(Channel)"概念——reducer 的底层实现

合并问题:默认覆盖的陷阱

LG1 我们写 return {"messages": state["messages"] + [msg]}——手动把旧消息和新消息拼起来。如果不这么做会怎样?

📄 陷阱:默认策略是"覆盖" python
class State(TypedDict):
    messages: list[str]   # 没有指定 reducer

def node_a(state):
    return {"messages": ["来自A"]}   # 只返回新值

def node_b(state):
    return {"messages": ["来自B"]}

# node_a 和 node_b 都写了 messages,结果如何?
# 默认行为:后写覆盖先写!state["messages"] 最后只剩 ["来自B"]
# 这几乎从不是你想要的——你想要的是累积 ["来自A", "来自B"]
无 reducer(默认覆盖) node_a 写 ["A"] node_b 写 ["B"] 最终: ["B"] ✗ 有 reducer(如 add_messages) node_a 写 ["A"] node_b 写 ["B"] 最终: ["A", "B"] ✓
图 LG2.1 · 没有 reducer 时,节点写入会相互覆盖而非累积

Annotated[T, reducer]:指定合并函数

解决方法是给字段标注一个 reducer 函数。用 Annotated 类型:

📄 Annotated 语法 + 自定义 reducer python
from typing import Annotated
from typing_extensions import TypedDict
import operator

# reducer 签名:(当前值, 新值) → 合并后的值
def concat_list(current: list, new: list) -> list:
    return current + new

class State(TypedDict):
    # Annotated[类型, reducer] —— 为字段指定合并函数
    messages: Annotated[list[str], concat_list]   # 用自定义 reducer 累积
    count: int                                     # 无 reducer,默认覆盖

# 现在节点只需返回"新增"的部分,不用手动拼接
def node_a(state):
    return {"messages": ["来自A"]}   # 只返回新增,reducer 自动累积!

def node_b(state):
    return {"messages": ["来自B"]}   # 同上

# 两个节点都写 messages,最终: ["来自A", "来自B"]  ✓

# 常用技巧:operator.add 就是列表拼接
class State2(TypedDict):
    items: Annotated[list, operator.add]   # 等价于上面的 concat_list
💡 reducer 让节点代码更简洁

有了 reducer,节点只返回"新增"的部分,不用手动 state["x"] + new。这正是 LG1 我们没解释的"部分更新"的完整含义——返回的会被 reducer 智能合并。这是 LangGraph 状态管理的核心便利。

add_messages:消息列表的专用 reducer

对消息列表,LangGraph 提供了专门的 reducer add_messages(源码 graph/message.py:61)。它比简单拼接更智能:

📄 langgraph/graph/message.py:61 · add_messages python
def add_messages(left, right):
    """消息列表的 reducer。

    比简单拼接更智能:
    1. 自动去重:同 id 的消息会被替换(而非重复)——支持"更新已有消息"
    2. 处理 RemoveMessage:可以删除指定 id 的消息(消息裁剪用)
    3. 自动给无 id 的消息生成 id
    """

# 标准用法:状态里用 Annotated[..., add_messages]
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
from langchain_core.messages import AnyMessage

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]   # ← 关键!

# 现在节点只追加新消息,add_messages 自动处理累积/去重/删除
def chat_node(state: State) -> dict:
    # 只返回"新增"的 AIMessage,不用管历史
    return {"messages": [AIMessage(content="你好!")]}
⚠️ 这是 Agent 状态的标准做法

回顾 LC7 章:Agent 的历史 = 消息列表。LangGraph 用 Annotated[list[AnyMessage], add_messages] 承载整个对话历史——这就是用 messages 当 Agent 状态的底层机制。LG8 的 create_react_agent 内部状态就是这么定义的。

MessagesState:开箱即用的状态

因为"用 messages 当状态"太常见,LangGraph 提供了 MessagesState 预定义状态:

📄 MessagesState 等价于自己定义 python
from langgraph.graph import MessagesState

# MessagesState 内部就是:
class MessagesState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

# 直接用,省得自己写
graph = StateGraph(MessagesState)
# 也可以扩展它,加自己的字段
class MyState(MessagesState):
    user_name: str   # 在 messages 基础上加字段

通道(Channel)概念入门

reducer 的底层是通道(Channel)——LG5 章会深入讲,这里先建立概念:

State 字段 messages: Annotated[list, add_messages] 编译时映射 每个字段 → 一个 Channel LG5 详解 Channel 通道 update() 调 reducer 累积写入值
图 LG2.2 · 每个 State 字段编译后对应一个 Channel,reducer 就是 Channel 的 update 逻辑
📄 langgraph/channels/base.py:19 · BaseChannel(LG5 精读) python
class BaseChannel(Generic[Value, Update, Checkpoint], ABC):
    """所有通道的基类。"""
    __slots__ = ("key", "typ")

    @property
    @abstractmethod
    def ValueType(self) -> Any:
        """存储的值类型"""

    @property
    @abstractmethod
    def UpdateType(self) -> Any:
        """接收的更新类型"""

    def checkpoint(self) -> Checkpoint | Any:
        """返回可序列化的当前状态快照(用于持久化)"""
    # update() 方法:调用 reducer 聚合新值 —— 这就是 reducer 的执行处
    # get() 方法:返回当前值
🔮 LG5 会讲透

现在你只需知道:reducer 是给开发者的"语法糖",底层是 Channel。编译时,Annotated[list, add_messages] 会被映射成一个 BinaryOperatorAggregate 通道,它的 update() 就是调用 add_messages。LG5 的 Pregel 引擎概念会把这个机制完整讲清楚。

可运行代码

🚀 观察 reducer 的效果

pip install langgraph langchain-core。下面例子对比有无 reducer 的差异。

📄 lg2_reducers.py · reducer 实操 python
# pip install langgraph langchain-core
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END, add_messages
from langgraph.graph.message import MessagesState
from langchain_core.messages import HumanMessage, AIMessage

# ============ ① 无 reducer:覆盖(反面教材)============
class BadState(TypedDict):
    log: list[str]

def append_a(state):
    return {"log": ["A"]}   # 想追加,但...

g1 = StateGraph(BadState)
g1.add_node("a", append_a)
g1.add_node("b", lambda s: {"log": ["B"]})
g1.add_edge(START, "a"); g1.add_edge("a", "b"); g1.add_edge("b", END)
app1 = g1.compile()
print("无reducer:", app1.invoke({"log": []}))   # → {'log': ['B']}  被覆盖了!

# ============ ② 有 reducer:累积 ✓ ============
import operator
class GoodState(TypedDict):
    log: Annotated[list[str], operator.add]   # ← 加了 reducer

g2 = StateGraph(GoodState)
g2.add_node("a", lambda s: {"log": ["A"]})    # 只返回新增
g2.add_node("b", lambda s: {"log": ["B"]})
g2.add_edge(START, "a"); g2.add_edge("a", "b"); g2.add_edge("b", END)
app2 = g2.compile()
print("有reducer:", app2.invoke({"log": []}))  # → {'log': ['A', 'B']}  累积!

# ============ ③ add_messages:消息累积(Agent 标准做法)============
class ChatState(TypedDict):
    messages: Annotated[list, add_messages]

def chatbot(state: ChatState) -> dict:
    user_msg = state["messages"][-1].content
    return {"messages": [AIMessage(content=f"你说的是:{user_msg}")]}

g3 = StateGraph(ChatState)
g3.add_node("chatbot", chatbot)
g3.add_edge(START, "chatbot"); g3.add_edge("chatbot", END)
app3 = g3.compile()
result = app3.invoke({"messages": [HumanMessage(content="你好")]})
print("\n消息历史:")
for m in result["messages"]:
    print(f"  [{m.type}] {m.content}")
# → [human] 你好
#   [ai] 你说的是:你好   ← add_messages 自动累积了!

# ============ ④ 直接用 MessagesState(更省事)============
g4 = StateGraph(MessagesState)   # 内置 messages 字段
g4.add_node("bot", chatbot)
g4.add_edge(START, "bot"); g4.add_edge("bot", END)
app4 = g4.compile()
# 等价于上面的 ChatState 写法

进阶:消息裁剪

回顾 F2 章:context window 有限,消息历史不能无限增长。add_messages 配合 RemoveMessage 可以删除旧消息:

📄 用 RemoveMessage 裁剪历史(F2 章的记忆管理) python
from langchain_core.messages import RemoveMessage

def prune_old_messages(state):
    # 删除除了最后 N 条之外的所有消息
    keep = 6
    messages = state["messages"]
    delete_ids = [RemoveMessage(id=m.id) for m in messages[:-keep]]
    return {"messages": delete_ids}   # add_messages 会处理 RemoveMessage

# 这就是 Agent 控制上下文长度的标准技巧
# OpenCode 的 compaction 是更高级的版本(OC5)

与生产实践对照

LangGraph 概念OpenCode 对应
State + messages reducerSession DB 的消息表(按 sessionID 累积)
add_messages 去重/更新updatePart/updatePartDelta(流式更新同一 Part)
RemoveMessage 裁剪compaction(压缩历史,OC5)
Channel 通道(底层)事件总线 EventV2Bridge(状态变更解耦)

小结

下一章

现在状态会正确累积了。但 LG1 的循环用的是简单 if/else 路由。下一章 LG3 · 条件边与 Send:深入学习 add_conditional_edges 实现动态路由,以及 Send 实现 map-reduce 并行扇出——这是构建复杂 Agent 控制流的关键。