📡 流式输出
Agent 执行可能涉及多步、多工具,耗时较长。用户不想干等,需要实时看到进度。LangGraph 提供 7 种 StreamMode,让你精确控制"流式吐出什么"——从状态快照到逐字 token,从节点开始/结束到自定义数据。
本章目标
- 掌握 7 种 StreamMode 各自的输出内容和适用场景
- 会用
stream()替代invoke()获得流式输出 - 掌握
StreamWriter在节点内发自定义流数据 - 能同时用多种 stream_mode 获得完整的实时视图
invoke vs stream
前面章节都用 invoke()——它阻塞到执行完才返回。但 Agent 场景下,用户需要看到过程:
# invoke:等到全部结束才返回(用户干等)
result = app.invoke(inputs)
print(result)
# stream:边执行边产出(用户实时看到)
for chunk in app.stream(inputs):
print(chunk) # 每个 chunk 是一个中间更新
7 种 StreamMode
核心概念。通过 stream_mode 参数选择"流式吐出什么":
StreamMode = Literal[
"values", # 每步后输出【完整状态】
"updates", # 每步后只输出【节点名 + 该步的更新】
"custom", # 节点内用 StreamWriter 发【自定义数据】
"messages", # LLM 调用【逐 token】输出 + 元数据
"checkpoints", # 每次存 checkpoint 时输出事件
"tasks", # 任务开始/结束时输出事件
"debug", # checkpoints + tasks(调试用)
]
| 模式 | 输出内容 | 典型用途 |
|---|---|---|
values | 每步后的完整状态快照 | 看状态如何一步步演变 |
updates | 节点名 + 本步的更新(增量) | 最常用,看每步改了啥 |
messages | LLM 输出逐 token + 元数据 | 逐字打字效果(聊天UI必备) |
custom | 节点内 writer() 发的任意数据 | 自定义进度、中间结果 |
tasks | 任务开始/结束事件 | 看任务并行情况 |
checkpoints | 存 checkpoint 事件 | 监控持久化 |
debug | checkpoints + tasks | 综合调试 |
values vs updates:最常用
这俩是日常最常用的。区别在于"吐完整状态还是增量":
可运行代码
pip install langgraph。下面用同一个图对比不同模式的输出。
# pip install langgraph
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import StreamWriter
class State(TypedDict):
count: int
log: list[str]
def step_a(state, writer: StreamWriter):
writer({"progress": "A 开始处理"}) # custom 流:发自定义数据
return {"count": state["count"] + 1, "log": state["log"] + ["A"]}
def step_b(state):
return {"count": state["count"] + 1, "log": state["log"] + ["B"]}
graph = StateGraph(State)
graph.add_node("a", step_a)
graph.add_node("b", step_b)
graph.add_edge(START, "a"); graph.add_edge("a", "b"); graph.add_edge("b", END)
app = graph.compile()
inputs = {"count": 0, "log": []}
# ============ ① updates(最常用)============
print("=== updates ===")
for chunk in app.stream(inputs, stream_mode="updates"):
print(chunk)
# {'a': {'count': 1, 'log': ['A']}}
# {'b': {'count': 2, 'log': ['A', 'B']}}
# ============ ② values(完整状态)============
print("\n=== values ===")
for chunk in app.stream(inputs, stream_mode="values"):
print(chunk)
# {'count': 1, 'log': ['A']}
# {'count': 2, 'log': ['A', 'B']}
# ============ ③ custom(节点内自定义流)============
print("\n=== custom ===")
for chunk in app.stream(inputs, stream_mode="custom"):
print(chunk)
# {'progress': 'A 开始处理'}
# ============ ④ 多模式同时(返回元组)============
print("\n=== updates + custom 同时 ===")
for mode, chunk in app.stream(inputs, stream_mode=["updates", "custom"]):
print(f"[{mode}] {chunk}")
# [custom] {'progress': 'A 开始处理'}
# [updates] {'a': {...}}
# [updates] {'b': {...}}
messages 模式:逐 token 输出
对聊天 UI 最重要——LLM 输出逐字流式,实现打字机效果:
# pip install langgraph langchain-openai
from langgraph.graph import StateGraph, START, END, MessagesState
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o-mini")
def chatbot(state):
return {"messages": [model.invoke(state["messages"])]}
app = StateGraph(MessagesState).add_node("chatbot", chatbot).add_edge(START,"chatbot").add_edge("chatbot",END).compile()
# messages 模式:逐 token 输出
for msg, metadata in app.stream(
{"messages": [("human", "讲个笑话")]},
stream_mode="messages",
):
# msg 是 AIMessageChunk(每次一小片 token)
# metadata 含 langgraph_node, langgraph_step 等
print(msg.content, end="", flush=True)
print()
# 实时逐字输出:"从前有个..."(打字机效果)
StreamWriter:节点内自定义流
当你想在节点执行过程中主动推送进度/中间结果(而不只是节点返回值),用 StreamWriter:
StreamWriter = Callable[[Any], None]
"""接受单个参数、写入输出流的 Callable。
当节点把 writer 作为关键字参数请求时,自动注入。
不用 stream_mode="custom" 时,它是空操作(no-op)。"""
# 用法:节点签名声明 writer 参数,自动注入
def long_task(state, writer: StreamWriter) -> dict:
for i in range(3):
# 模拟分步处理,每步推送进度
writer({"step": i, "progress": f"{i*33}% 已完成"})
time.sleep(1)
return {"result": "完成"}
# stream_mode="custom" 才能收到 writer 发的数据
for chunk in app.stream(inputs, stream_mode="custom"):
print(chunk)
# {'step': 0, 'progress': '0% 已完成'}
# {'step': 1, 'progress': '33% 已完成'}
# {'step': 2, 'progress': '66% 已完成'}
没有 StreamWriter,节点执行时用户只能干等(流里没东西直到节点返回)。有了它,你可以在节点内部主动推送进度——这对耗时工具(批量处理、长检索)非常重要,避免用户以为卡死了。
stream_events:Runnable 标准接口
除了 stream(stream_mode=),还有 stream_events()——这是 LangChain Runnable 的标准事件流接口(LC5 提过 Runnable 统一接口)。它产出更细粒度的事件:
async for event in app.astream_events(inputs, version="v2"):
kind = event["event"]
if kind == "on_chat_model_stream":
# LLM 逐 token
print(event["data"]["chunk"].content, end="")
elif kind == "on_tool_start":
print(f"\n[工具开始] {event['name']}")
elif kind == "on_tool_end":
print(f"[工具结束] {event['name']}")
# 还有 on_chain_start/end, on_chat_model_start/end 等
与生产实践对照
| LangGraph 概念 | OpenCode 对应 | |
|---|---|---|
| stream() 流式输出 | → | LLMEvent 流(贯穿整个 processor) |
| messages 逐 token | → | text-delta 事件(ai-sdk.ts:76) |
| updates 节点级更新 | → | tool-call/tool-result 事件 |
| StreamWriter 自定义流 | → | 工具的 metadata() 回写状态 |
| tasks 任务事件 | → | step-start/step-finish 事件 |
| stream_events 统一接口 | → | 统一的 LLMEvent 归一化(16种映射) |
OpenCode 把整个 agent 执行过程都流式化:从 LLM 的 text-delta(逐字)、到 tool-call/tool-result(工具生命周期)、到 reasoning(推理链)、再到 step-start/step-finish(步骤边界)。它的 session/llm/ai-sdk.ts 把 AI SDK 的 16 种原始事件归一化成统一的 LLMEvent 流——这是 OC2 会精读的关键设计。LangGraph 的 7 种 StreamMode 是它的概念源头。
小结
stream()替代invoke()获得流式输出;7 种 StreamMode 控制吐什么。- updates(增量,最常用)、values(完整状态)、messages(逐 token,聊天UI必备)。
- custom +
StreamWriter:节点内主动推送自定义进度/中间结果。 - 可同时用多种
stream_mode=["updates", "custom"]获得完整视图。 stream_events()是 Runnable 标准 v2 事件流接口。
所有概念都学完了!下一章 LG8 · 预构建 Agent:用 create_react_agent + ToolNode,一行代码组装完整的 ReAct Agent。这是 LG1-LG7 所有知识的集大成,也是你日常最常用的 API。