📡 可观测与追踪

Agent 是多步骤黑盒——LLM 调用、工具执行、状态流转都看不见。生产 Agent 必须可观测:哪步慢、哪步贵、哪步出错。本章精读 LangChain 的 Callbacks 三层体系、LangSmith 追踪集成、LangGraph 的 stream_mode=debug,让 Agent 从黑盒变透明。

本章目标

  • 理解为什么 Agent 比普通应用更需要可观测
  • 精读 Callbacks 三层体系(Handler → Manager → Tracer)
  • 掌握 LangSmith 追踪集成与运行树概念
  • 会用自定义 callback 做 token 成本监控
  • 对比 LangGraph stream_mode=debug 与 OpenCode 的 OTel 方案

为什么 Agent 特别需要可观测

普通应用出错了,看异常栈就行。但 Agent 的复杂度在于:

挑战普通应用Agent
执行路径代码写死,可预测LLM 动态决定,每次不同
失败原因异常栈清晰可能是幻觉/选错工具/prompt 问题,难定位
成本固定(服务器/带宽)按 token 计费,一次跑可能花几块
性能瓶颈代码 profiling可能是 LLM 慢/工具慢/检索慢,要分辨
💡 可观测的三个维度
  • 追踪(Tracing):每一步做了什么、耗时多久——调试用
  • 指标(Metrics):token 用量、成本、延迟分布——监控用
  • 日志(Logging):中间状态、错误详情——审计用

Callbacks 三层体系

LangChain 的可观测基础是 callbacks,分三层:

📄 callbacks/base.py:496 · BaseCallbackHandler(第一层) python
class BaseCallbackHandler:
    """所有回调处理器的基类。定义全套生命周期钩子。"""

    # LLM 生命周期
    def on_llm_start(self, serialized, prompts, *, run_id, parent_run_id, **kwargs): ...
    def on_llm_new_token(self, token, *, run_id, **kwargs): ...   # 流式 token
    def on_llm_end(self, response, *, run_id, **kwargs): ...
    def on_llm_error(self, error, *, run_id, **kwargs): ...

    # Chain 生命周期
    def on_chain_start(self, serialized, inputs, *, run_id, ...): ...
    def on_chain_end(self, outputs, *, run_id, ...): ...

    # Tool 生命周期
    def on_tool_start(self, serialized, input_str, *, run_id, ...): ...
    def on_tool_end(self, output, *, run_id, ...): ...

    # Retriever 生命周期
    def on_retriever_start(self, ...): ...

    # 每个 run 都有 run_id 和 parent_run_id —— 构成父子 run 树!
Callbacks 三层体系 ① Handler 处理器 最细粒度钩子 on_llm_start on_tool_end on_chain_* 你实现这个 如 StreamingStdOut base.py:496 ② Manager 管理器 串多个 handler 构建 run 树 父子回调分发 按类型分发: LLMRun/ChainRun/ToolRun manager.py:1377 ③ Tracer 追踪器 特殊的 Handler 序列化 run 树 上报到平台 如 LangChainTracer → 上报 LangSmith tracers/base.py:33
图 AD7.1 · 三层体系:Handler(钩子)→ Manager(编排)→ Tracer(上报)
📌 关键概念:run 树

每次 invoke 产生一棵 run 树:根 run(整个调用)→ 子 run(每次 LLM/Tool/Chain 调用)。通过 run_idparent_run_id 串联。这棵树就是追踪的可视化基础——LangSmith 显示的就是这棵树。理解 run 树是理解可观测的关键。

内置 Handler 速览

Handler源码用途
StdOutCallbackHandlercallbacks/stdout.py:16打印事件到 stdout(最简调试)
StreamingStdOutCallbackHandlercallbacks/streaming_stdout.py:18流式打印 token
UsageMetadataCallbackHandlercallbacks/usage.py:18★统计 token 用量(成本监控)
FileCallbackHandlercallbacks/file.py:21写事件到文件

实战:自定义成本监控 callback

🚀 自己实现 callback

统计每次 Agent 调用的 token 成本——生产必备的可观测能力。

📄 ad7_cost_callback.py · token 成本监控 python
# pip install langchain langchain-openai
from langchain_core.callbacks import BaseCallbackHandler
from langchain_openai import ChatOpenAI

class CostMonitorHandler(BaseCallbackHandler):
    """统计每次 LLM 调用的 token 成本。"""

    def __init__(self, pricing: dict):
        self.pricing = pricing          # {"gpt-4o-mini": {"input": 0.15, "output": 0.6}}(每百万 token $)
        self.total_input = 0
        self.total_output = 0
        self.total_cost = 0.0
        self.calls = 0

    def on_llm_end(self, response, *, run_id, parent_run_id, **kwargs):
        """每次 LLM 调用结束时,累加 token 和成本。"""
        self.calls += 1
        for generation in response.flatten():
            msg = generation.generations[0][0].message
            usage = getattr(msg, "usage_metadata", None)
            if usage:
                model = "gpt-4o-mini"  # 实际从 llm_output 取
                inp, out = usage.get("input_tokens", 0), usage.get("output_tokens", 0)
                self.total_input += inp
                self.total_output += out
                cost = (inp * self.pricing[model]["input"] +
                        out * self.pricing[model]["output"]) / 1_000_000
                self.total_cost += cost
                print(f"  [调用{self.calls}] in={inp} out={out} cost=${cost:.4f}")

    def report(self):
        print(f"\n===== 成本报告 =====")
        print(f"总调用: {self.calls}")
        print(f"总 token: in={self.total_input} out={self.total_output}")
        print(f"总成本: ${self.total_cost:.4f}")

# 使用:把 callback 通过 config 注入
handler = CostMonitorHandler(pricing={"gpt-4o-mini": {"input": 0.15, "output": 0.6}})
model = ChatOpenAI(model="gpt-4o-mini")

# 方式①:通过 config 的 callbacks 注入
result = model.invoke(
    "讲个笑话",
    config={"callbacks": [handler]},
)
handler.report()

# 方式②:构造时绑定(对该 model 的所有调用生效)
# model = ChatOpenAI(model="gpt-4o-mini", callbacks=[handler])
💡 callback 的注入方式

两种注入:① config={"callbacks": [...]} 按调用注入(灵活,推荐);② 构造时 callbacks= 绑定到组件(全局)。LCEL 链也支持 chain.with_config(callbacks=[...])。所有 Runnable 都自动支持 callbacks——这是统一接口(LC5)的好处。

LangSmith 追踪集成

LangSmith 是 LangChain 的官方追踪平台。LangChainTracer 把 run 树上报到云端可视化:

📄 tracers/langchain.py:134 · LangChainTracer python
class LangChainTracer(BaseTracer):
    """上报 run 到 LangSmith 的 tracer。

    __init__ 接收 project_name(:142);
    _persist_run(:281)把 Run 上报到 LangSmith。
    """
    def __init__(self, project_name=None, ...): ...

# 最简集成:设置环境变量,全自动追踪
import os
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = "lsv2_..."
os.environ["LANGSMITH_PROJECT"] = "my-agent"

# 之后所有 invoke 自动追踪!无需改代码
agent.invoke({"messages": [...]})
# 去 LangSmith 网页能看到完整 run 树:
# 哪个 LLM 调用、调了什么 prompt、用了多少 token、工具执行多久
agent.invoke 本地执行 LangChainTracer 构建 run 树 root run └ llm run └ tool run _persist_run 上报 LangSmith 云端 可视化 run 树
图 AD7.2 · LangSmith 追踪:Tracer 构建 run 树 → 上报 → 云端可视化

LangGraph 的 stream_mode=debug

LangGraph 复用 LangChain callbacks,但有自己独有的可观测——stream_mode="debug"(LG7 提过):

📄 pregel/debug.py + types.py:120 · debug 流模式 python
# types.py:133 注释:debug = checkpoints + tasks
StreamMode = Literal[..., "debug", ...]  # :120

# debug 模式:透出 Pregel 内部的 task 调度 + checkpoint 变化
# 由 pregel/debug.py 的 map_debug_tasks / map_debug_checkpoint 映射

for event in app.stream(inputs, stream_mode="debug"):
    if "tasks" in event:
        print(f"任务调度: {event['tasks']}")
    if "checkpoint" in event:
        print(f"状态快照: {event['checkpoint']['channel_values']}")
# 这是 LG5 Pregel 引擎最底层的可观测——看每个 super-step 内部

OpenCode 的 OpenTelemetry 方案

OpenCode 用 OpenTelemetry(业界标准)做零侵入追踪:

📄 tool/tool.ts:145 · Effect.withSpan(OTel 集成) typescript
// wrap 装饰器(OC3 讲过)在工具执行外包 OTel span
function wrap(id, def, truncate, agents) {
  return async (args, ctx) => {
    // ...校验...
    return await def.execute(args, ctx).pipe(
      Effect.orDie,
      // ★ 每个工具调用一个 OTel span,带属性
      Effect.withSpan("Tool.execute", {
        attributes: {
          "tool.name": id,
          "session.id": ctx.sessionID,
          "message.id": ctx.messageID,
          "tool.call_id": ctx.callID,
        }
      })
    )
  }
}
// span 上报到 OTel collector → Jaeger/Grafana 可视化
// 比 LangSmith 更通用(不绑定特定平台)

OpenCode 的事件总线

除了 OTel span,OpenCode 还有面向 UI 的事件总线

📄 event-v2-bridge.ts:12 · 事件总线 typescript
// EventV2Bridge 服务:包装核心 EventV2,给事件附加 location(实例/工作区/项目)
// 订阅核心事件转发到 GlobalBus(UI 消费)
// durable 事件还会转成 sync 事件(跨设备同步)

// status.ts:39 会话状态变化时发布 SessionStatusEvent
// 这是面向 UI 的实时状态推送(idle/busy/retry)

与生产实践对照

LangChain/LangGraphOpenCode
BaseCallbackHandler 钩子Effect 的生命周期钩子
run 树(run_id/parent_run_id)OTel span 树(trace_id/span_id/parent)
LangSmith 追踪OTel collector → Jaeger/Grafana
UsageMetadataCallbackHandlersession.ts getUsage 成本计算
stream_mode=debugEventV2Bridge 事件流
verbose=True 打印Effect.logInfo/logDebug
🧭 两种追踪范式

LangChain 用自家的 callbacks + LangSmith(深度集成,但绑定生态)。OpenCode 用业界标准 OpenTelemetry(通用,可对接任何可观测平台)。前者上手快、可视化好;后者更开放、可移植。选择取决于你是否愿意绑定 LangSmith 生态。

小结

下一章

下一章 AD8 · 容错与韧性 ★——可观测让你"看到"问题,容错让你"解决"问题。精读重试/降级/限流/超时,构建不怕失败的 Agent。