📡 流式输出

Agent 执行可能涉及多步、多工具,耗时较长。用户不想干等,需要实时看到进度。LangGraph 提供 7 种 StreamMode,让你精确控制"流式吐出什么"——从状态快照到逐字 token,从节点开始/结束到自定义数据。

本章目标

  • 掌握 7 种 StreamMode 各自的输出内容和适用场景
  • 会用 stream() 替代 invoke() 获得流式输出
  • 掌握 StreamWriter 在节点内发自定义流数据
  • 能同时用多种 stream_mode 获得完整的实时视图

invoke vs stream

前面章节都用 invoke()——它阻塞到执行完才返回。但 Agent 场景下,用户需要看到过程:

📄 invoke 阻塞 vs stream 实时 python
# invoke:等到全部结束才返回(用户干等)
result = app.invoke(inputs)
print(result)

# stream:边执行边产出(用户实时看到)
for chunk in app.stream(inputs):
    print(chunk)   # 每个 chunk 是一个中间更新

7 种 StreamMode

核心概念。通过 stream_mode 参数选择"流式吐出什么":

📄 langgraph/types.py:120 · StreamMode 七种 python
StreamMode = Literal[
    "values",      # 每步后输出【完整状态】
    "updates",     # 每步后只输出【节点名 + 该步的更新】
    "custom",      # 节点内用 StreamWriter 发【自定义数据】
    "messages",    # LLM 调用【逐 token】输出 + 元数据
    "checkpoints", # 每次存 checkpoint 时输出事件
    "tasks",       # 任务开始/结束时输出事件
    "debug",       # checkpoints + tasks(调试用)
]
模式输出内容典型用途
values每步后的完整状态快照看状态如何一步步演变
updates节点名 + 本步的更新(增量)最常用,看每步改了啥
messagesLLM 输出逐 token + 元数据逐字打字效果(聊天UI必备)
custom节点内 writer() 发的任意数据自定义进度、中间结果
tasks任务开始/结束事件看任务并行情况
checkpoints存 checkpoint 事件监控持久化
debugcheckpoints + tasks综合调试

values vs updates:最常用

这俩是日常最常用的。区别在于"吐完整状态还是增量":

values 模式(完整状态) step1 后: {"count": 1} ← 完整 step2 后: {"count": 1, "msg": "hi"} ← 完整(含历史) step3 后: {"count": 2, "msg": "hi"} ← 完整 updates 模式(增量) {"node1": {"count": 1}} ← 只本步改的 {"node2": {"msg": "hi"}} ← 只本步改的 {"node1": {"count": 2}} ← 只本步改的
图 LG7.1 · values 吐完整状态(越来越大),updates 吐增量(轻量)

可运行代码

🚀 对比各种 stream_mode

pip install langgraph。下面用同一个图对比不同模式的输出。

📄 lg7_streaming.py · 各种 stream_mode 对比 python
# pip install langgraph
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import StreamWriter

class State(TypedDict):
    count: int
    log: list[str]

def step_a(state, writer: StreamWriter):
    writer({"progress": "A 开始处理"})   # custom 流:发自定义数据
    return {"count": state["count"] + 1, "log": state["log"] + ["A"]}

def step_b(state):
    return {"count": state["count"] + 1, "log": state["log"] + ["B"]}

graph = StateGraph(State)
graph.add_node("a", step_a)
graph.add_node("b", step_b)
graph.add_edge(START, "a"); graph.add_edge("a", "b"); graph.add_edge("b", END)
app = graph.compile()

inputs = {"count": 0, "log": []}

# ============ ① updates(最常用)============
print("=== updates ===")
for chunk in app.stream(inputs, stream_mode="updates"):
    print(chunk)
# {'a': {'count': 1, 'log': ['A']}}
# {'b': {'count': 2, 'log': ['A', 'B']}}

# ============ ② values(完整状态)============
print("\n=== values ===")
for chunk in app.stream(inputs, stream_mode="values"):
    print(chunk)
# {'count': 1, 'log': ['A']}
# {'count': 2, 'log': ['A', 'B']}

# ============ ③ custom(节点内自定义流)============
print("\n=== custom ===")
for chunk in app.stream(inputs, stream_mode="custom"):
    print(chunk)
# {'progress': 'A 开始处理'}

# ============ ④ 多模式同时(返回元组)============
print("\n=== updates + custom 同时 ===")
for mode, chunk in app.stream(inputs, stream_mode=["updates", "custom"]):
    print(f"[{mode}] {chunk}")
# [custom] {'progress': 'A 开始处理'}
# [updates] {'a': {...}}
# [updates] {'b': {...}}

messages 模式:逐 token 输出

对聊天 UI 最重要——LLM 输出逐字流式,实现打字机效果:

📄 messages 模式:聊天 UI 的打字机效果 python
# pip install langgraph langchain-openai
from langgraph.graph import StateGraph, START, END, MessagesState
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini")

def chatbot(state):
    return {"messages": [model.invoke(state["messages"])]}

app = StateGraph(MessagesState).add_node("chatbot", chatbot).add_edge(START,"chatbot").add_edge("chatbot",END).compile()

# messages 模式:逐 token 输出
for msg, metadata in app.stream(
    {"messages": [("human", "讲个笑话")]},
    stream_mode="messages",
):
    # msg 是 AIMessageChunk(每次一小片 token)
    # metadata 含 langgraph_node, langgraph_step 等
    print(msg.content, end="", flush=True)
print()
# 实时逐字输出:"从前有个..."(打字机效果)

StreamWriter:节点内自定义流

当你想在节点执行过程中主动推送进度/中间结果(而不只是节点返回值),用 StreamWriter

📄 langgraph/types.py:136 · StreamWriter python
StreamWriter = Callable[[Any], None]
"""接受单个参数、写入输出流的 Callable。
当节点把 writer 作为关键字参数请求时,自动注入。
不用 stream_mode="custom" 时,它是空操作(no-op)。"""

# 用法:节点签名声明 writer 参数,自动注入
def long_task(state, writer: StreamWriter) -> dict:
    for i in range(3):
        # 模拟分步处理,每步推送进度
        writer({"step": i, "progress": f"{i*33}% 已完成"})
        time.sleep(1)
    return {"result": "完成"}

# stream_mode="custom" 才能收到 writer 发的数据
for chunk in app.stream(inputs, stream_mode="custom"):
    print(chunk)
# {'step': 0, 'progress': '0% 已完成'}
# {'step': 1, 'progress': '33% 已完成'}
# {'step': 2, 'progress': '66% 已完成'}
💡 StreamWriter 让长任务有反馈

没有 StreamWriter,节点执行时用户只能干等(流里没东西直到节点返回)。有了它,你可以在节点内部主动推送进度——这对耗时工具(批量处理、长检索)非常重要,避免用户以为卡死了。

stream_events:Runnable 标准接口

除了 stream(stream_mode=),还有 stream_events()——这是 LangChain Runnable 的标准事件流接口(LC5 提过 Runnable 统一接口)。它产出更细粒度的事件:

📄 stream_events(v2 事件流) python
async for event in app.astream_events(inputs, version="v2"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        # LLM 逐 token
        print(event["data"]["chunk"].content, end="")
    elif kind == "on_tool_start":
        print(f"\n[工具开始] {event['name']}")
    elif kind == "on_tool_end":
        print(f"[工具结束] {event['name']}")
    # 还有 on_chain_start/end, on_chat_model_start/end 等

与生产实践对照

LangGraph 概念OpenCode 对应
stream() 流式输出LLMEvent 流(贯穿整个 processor)
messages 逐 tokentext-delta 事件(ai-sdk.ts:76)
updates 节点级更新tool-call/tool-result 事件
StreamWriter 自定义流工具的 metadata() 回写状态
tasks 任务事件step-start/step-finish 事件
stream_events 统一接口统一的 LLMEvent 归一化(16种映射)
🔍 OpenCode 的流式是"全链路"的

OpenCode 把整个 agent 执行过程都流式化:从 LLM 的 text-delta(逐字)、到 tool-call/tool-result(工具生命周期)、到 reasoning(推理链)、再到 step-start/step-finish(步骤边界)。它的 session/llm/ai-sdk.ts 把 AI SDK 的 16 种原始事件归一化成统一的 LLMEvent 流——这是 OC2 会精读的关键设计。LangGraph 的 7 种 StreamMode 是它的概念源头。

小结

下一章 · LangGraph 终章 ★

所有概念都学完了!下一章 LG8 · 预构建 Agent:用 create_react_agent + ToolNode,一行代码组装完整的 ReAct Agent。这是 LG1-LG7 所有知识的集大成,也是你日常最常用的 API。