介绍PocketFlow跟踪:让您的AI工作流变得轻松可观测
将您的PocketFlow工作流从黑盒转变为完全可观察、可调试的系统,只需一行代码。
挑战:AI工作流的可观测性
构建AI工作流令人兴奋,但调试它们呢?没那么有趣。当您的PocketFlow工作流失败或行为异常时,您常常会问:
- 哪个节点失败了,为什么?
- 节点之间传输了哪些数据?
- 每个步骤花费了多长时间?
- 错误具体发生在何处?
- 如何优化性能瓶颈?
传统的日志方法要求您手动为每个节点添加监控代码,这会使您的代码变得杂乱无章。您最终会花费更多时间在调试基础设施上,而不是构建实际的AI逻辑。
有没有更好的方法?
PocketFlow Tracing介绍
我们很高兴地宣布推出pocketflow-tracing,这是一个新包,它只需极少的代码更改即可为您的PocketFlow工作流带来企业级的可观测性。它基于Langfuse构建,提供了全面的跟踪、监控和调试功能,彻底改变了您开发和维护AI工作流的方式。
主要优点
🎯 零摩擦集成:只需一个@trace_flow()
装饰器即可添加全面跟踪
📊 完全可见性:自动跟踪每个节点的执行、输入、输出和错误
⚡ 性能洞察:跟踪执行时间并识别瓶颈
🔍 丰富的调试:在Langfuse的强大仪表板中查看详细跟踪
🚀 生产就绪:适用于开发和生产环境
🔄 异步支持:与AsyncFlow和AsyncNode完全兼容
前后对比:看看差异
前:手动日志噩梦
import logging
import time
from pocketflow import Node, Flow
logger = logging.getLogger(__name__)
class DataProcessingNode(Node):
def prep(self, shared):
start_time = time.time()
logger.info(f"Starting prep phase with input: {shared}")
try:
data = shared.get("input_data")
result = self._validate_data(data)
logger.info(f"Prep completed in {time.time() - start_time:.2f}s")
return result
except Exception as e:
logger.error(f"Prep failed: {e}")
raise
def exec(self, data):
start_time = time.time()
logger.info(f"Starting exec phase with data: {data}")
try:
processed = self._process_data(data)
logger.info(f"Exec completed in {time.time() - start_time:.2f}s")
return processed
except Exception as e:
logger.error(f"Exec failed: {e}")
raise
def post(self, shared, prep_res, exec_res):
start_time = time.time()
logger.info(f"Starting post phase")
try:
shared["output"] = exec_res
logger.info(f"Post completed in {time.time() - start_time:.2f}s")
return "default"
except Exception as e:
logger.error(f"Post failed: {e}")
raise
class DataProcessingFlow(Flow):
def __init__(self):
super().__init__(start=DataProcessingNode())
后:干净的代码和自动跟踪
from pocketflow import Node, Flow
from pocketflow_tracing import trace_flow
class DataProcessingNode(Node):
def prep(self, shared):
data = shared.get("input_data")
return self._validate_data(data)
def exec(self, data):
return self._process_data(data)
def post(self, shared, prep_res, exec_res):
shared["output"] = exec_res
return "default"
@trace_flow() # 🎉 就是这样!一行代码实现全面可观测性
class DataProcessingFlow(Flow):
def __init__(self):
super().__init__(start=DataProcessingNode())
差异显著:您的代码保持干净并专注于业务逻辑,而全面跟踪则自动在后台进行。
安装和设置
开始使用PocketFlow跟踪非常简单:
1. 安装包
pip install pocketflow-tracing
该包包含了所有必要的依赖项,包括Langfuse v2 SDK和用于配置管理的python-dotenv。
2. 配置环境
在项目根目录下创建一个.env
文件:
# Langfuse配置
LANGFUSE_SECRET_KEY=sk-lf-your-secret-key-here
LANGFUSE_PUBLIC_KEY=pk-lf-your-public-key-here
LANGFUSE_HOST=https://your-self-hosted-langfuse
# 可选:开发时启用调试模式
POCKETFLOW_TRACING_DEBUG=true
获取Langfuse凭证:
- 在langfuse.com或您的自托管URL注册免费账户
- 创建一个新的项目
- 从项目设置中复制您的API密钥
- 使用
https://cloud.langfuse.com
作为托管版本,或您的自托管URL
3. 为您的流添加跟踪
from pocketflow import Node, Flow
from pocketflow_tracing import trace_flow
# 您的现有节点代码保持不变
class MyNode(Node):
def prep(self, shared):
return shared["input"]
def exec(self, data):
return f"Processed: {data}"
def post(self, shared, prep_res, exec_res):
shared["output"] = exec_res
return "default"
# 只需添加装饰器 - 不需要其他更改!
@trace_flow()
class MyFlow(Flow):
def __init__(self):
super().__init__(start=MyNode())
# 正常运行您的流
flow = MyFlow()
shared = {"input": "Hello World"}
flow.run(shared)
就这样!您的工作流现在已完全跟踪并可观察。
完整的工作示例
让我们构建一个现实示例,展示PocketFlow跟踪的强大功能:
#!/usr/bin/env python3
"""
客户支持票证分析工作流
演示多节点AI工作流的全面跟踪。
"""
from pocketflow import Node, Flow
from pocketflow_tracing import trace_flow
import time
import random
class TicketClassificationNode(Node):
"""根据优先级和类别对支持票证进行分类。"""
def prep(self, shared):
ticket = shared.get("ticket", {})
return {
"subject": ticket.get("subject", ""),
"description": ticket.get("description", ""),
"customer_tier": ticket.get("customer_tier", "standard")
}
def exec(self, ticket_data):
# 模拟AI分类
time.sleep(0.1) # 模拟处理时间
# 简单分类逻辑
subject = ticket_data["subject"].lower()
description = ticket_data["description"].lower()
if "urgent" in subject or "critical" in description:
priority = "high"
elif "billing" in subject or "payment" in description:
priority = "medium"
else:
priority = "low"
category = "technical" if "error" in description else "general"
return {
"priority": priority,
"category": category,
"confidence": random.uniform(0.8, 0.99)
}
def post(self, shared, prep_res, exec_res):
shared["classification"] = exec_res
return "route_ticket"
class TicketRoutingNode(Node):
"""根据分类将票证路由到适当的团队。"""
def prep(self, shared):
classification = shared.get("classification", {})
customer_tier = shared.get("ticket", {}).get("customer_tier", "standard")
return {
"priority": classification.get("priority", "low"),
"category": classification.get("category", "general"),
"customer_tier": customer_tier
}
def exec(self, routing_data):
# 模拟路由逻辑
time.sleep(0.05)
if routing_data["priority"] == "high":
team = "escalation"
elif routing_data["category"] == "technical":
team = "engineering"
elif routing_data["customer_tier"] == "premium":
team = "premium_support"
else:
team = "general_support"
return {
"assigned_team": team,
"estimated_response_time": "2 hours" if routing_data["priority"] == "high" else "24 hours"
}
def post(self, shared, prep_res, exec_res):
shared["routing"] = exec_res
return "default"
@trace_flow(flow_name="CustomerSupportTicketAnalysis")
class CustomerSupportFlow(Flow):
"""完整的客户支持票证分析工作流。"""
def __init__(self):
# 创建节点
classifier = TicketClassificationNode()
router = TicketRoutingNode()
# 定义流
classifier >> router
super().__init__(start=classifier)
def main():
"""运行客户支持工作流示例。"""
print("🎫 客户支持票证分析")
print("=" * 40)
# 示例票证数据
tickets = [
{
"id": "TICKET-001",
"subject": "URGENT: Payment processing error",
"description": "Customer cannot complete payment, getting error 500",
"customer_tier": "premium"
},
{
"id": "TICKET-002",
"subject": "Question about billing cycle",
"description": "When does my billing cycle reset?",
"customer_tier": "standard"
}
]
flow = CustomerSupportFlow()
for ticket in tickets:
print(f"\n📥 Processing {ticket['id']}: {ticket['subject']}")
shared = {"ticket": ticket}
result = flow.run(shared)
print(f"📊 Classification: {shared['classification']}")
print(f"🎯 Routing: {shared['routing']}")
print(f"✅ Result: {result}")
print(f"\n📈 在Langfuse仪表板中查看详细跟踪!")
if __name__ == "__main__":
main()
什么会被捕获
当您运行此示例时,PocketFlow跟踪会自动捕获:
流级别数据
- 执行元数据:开始时间、结束时间、总持续时间
- 输入状态:流开始时的完整共享数据
- 输出状态:流完成时的最终共享数据
- 流结果:流的最终返回值
- 错误处理:任何带有完整堆栈跟踪的异常
节点级别数据
对于每个节点(TicketClassificationNode、TicketRoutingNode),系统会跟踪:
prep() 阶段:
- 输入:完整的共享状态
- 输出:prep方法返回的准备数据
- 持续时间:确切执行时间
- 错误:prep阶段期间的任何异常
exec() 阶段:
- 输入:prep阶段的数据
- 输出:处理结果
- 持续时间:核心执行时间
- 重试信息:如果配置了重试
- 错误:带有上下文的处理失败
post() 阶段:
- 输入:共享状态、prep结果、exec结果
- 输出:流控制的动作字符串
- 持续时间:后处理时间
- 状态更改:共享数据如何被修改
- 错误:任何后处理问题
在Langfuse中查看您的跟踪
运行您的跟踪工作流后,打开您的Langfuse仪表板以探索丰富的可观测性数据:
仪表板概览
导航到您的Langfuse主机URL(例如,https://cloud.langfuse.com
),您将看到:
- 跟踪列表:所有您的流执行,带有时间戳和状态
- 性能指标:平均执行时间和成功率
- 错误跟踪:带有详细错误信息的失败执行
- 搜索和过滤:按名称、时间和状态查找特定跟踪
详细跟踪视图
点击任何跟踪以查看:
- 层次结构跨度视图:流 → 节点 → 阶段的树结构
- 时间线可视化:确切执行时间
- 输入/输出数据:完整数据流通过您的工作流
- 性能分解:每个阶段花费的时间
- 错误详细信息:失败时的堆栈跟踪和错误上下文
跟踪提供了前所未有的工作流执行可见性,使调试和优化变得简单。
高级配置
自定义流名称和元数据
@trace_flow(
flow_name="ProductionDataPipeline",
session_id="batch-2024-001",
user_id="data-team"
)
class DataPipeline(Flow):
pass
环境配置
from pocketflow_tracing import TracingConfig
# 加载自定义配置
config = TracingConfig.from_env()
config.debug = True
config.trace_inputs = True
config.trace_outputs = True
@trace_flow(config=config)
class MyFlow(Flow):
pass
选择性跟踪控制
# 精细调整要跟踪的内容
POCKETFLOW_TRACE_INPUTS=true
POCKETFLOW_TRACE_OUTPUTS=true
POCKETFLOW_TRACE_PREP=true
POCKETFLOW_TRACE_EXEC=true
POCKETFLOW_TRACE_POST=true
POCKETFLOW_TRACE_ERRORS=true
故障排除指南
常见问题及解决方案
1. “仪表板中没有跟踪出现”
# 启用调试模式查看发生了什么
export POCKETFLOW_TRACING_DEBUG=true
python your_flow.py
2. “Langfuse连接失败”
- 验证您的
.env
文件具有正确的凭证 - 检查到Langfuse主机的网络连接
- 确保Langfuse服务器正在运行(对于自托管)
3. “导入错误”
# 确保包已正确安装
pip install pocketflow-tracing
# 对于开发安装
pip install -e ".[dev]"
4. “性能影响担忧”
- 跟踪添加的开销极小(通常每个节点<5ms)
- 通过设置
POCKETFLOW_TRACING_DEBUG=false
禁用生产环境中的跟踪 - 使用选择性跟踪以减少数据量
调试模式输出
当启用调试模式时,您将看到详细的输出,例如:
[TRACING] 初始化Langfuse客户端...
[TRACING] 开始跟踪:CustomerSupportTicketAnalysis
[TRACING] 开始节点跨度:TicketClassificationNode.prep
[TRACING] 节点跨度完成,耗时0.12ms
[TRACING] 跟踪已刷新到Langfuse
生产考虑
性能影响
- 极小开销:跟踪通常每个节点执行增加1-5ms
- 异步处理:跟踪异步发送以避免阻塞
- 可配置深度:控制要跟踪的数据量
安全性和隐私
- 数据净化:为敏感数据实现自定义序列化器
- 环境分离:为开发/测试/生产使用不同的Langfuse项目
- 访问控制:利用Langfuse内置的用户管理
扩展考虑
- 批处理:批处理跟踪以高效传输
- 错误恢复:跟踪失败不会影响工作流执行
- 资源管理:自动清理跟踪数据
为什么这很重要
PocketFlow跟踪代表了我们处理AI工作流开发方式的根本转变:
之前:开发人员将30-40%的时间用于调试工作流,对执行流程和数据转换的可见性有限。
之后:开箱即用的完全可观测性使开发人员能够专注于构建更好的AI逻辑,而不是调试基础设施。
实际影响:
- 🚀 更快的开发:立即识别问题,而不是猜测
- 🔧 更容易调试:确切地看到发生了什么和何时发生
- 📈 性能优化:使用精确的计时数据识别瓶颈
- 🛡️ 生产信心:实时监控工作流
- 👥 团队协作:共享跟踪以进行协作调试
今天就开始
准备好通过全面的可观测性来转变您的PocketFlow工作流吗?
1. 安装包
pip install pocketflow-tracing
2. 设置环境
# 复制示例环境文件
curl -o .env https://raw.githubusercontent.com/The-Pocket/PocketFlow/main/cookbook/pocketflow-tracing/.env.example
# 使用您的Langfuse凭证编辑
nano .env
3. 为您的流添加一行
from pocketflow_tracing import trace_flow
@trace_flow() # ← 添加此行
class YourFlow(Flow):
# 您的现有代码保持不变!
pass
4. 运行并观察
执行您的流,并在您的Langfuse仪表板中观看神奇的效果。
加入社区
PocketFlow跟踪是开源的,我们欢迎贡献!无论您想:
- 🐛 报告错误或建议功能
- 📚 改进文档
- 🔧 添加新的跟踪功能
- 🎯 分享用例和示例
访问我们的GitHub仓库并加入正在增长的开发人员社区,他们正在构建可观测的AI工作流。
下一步是什么?
这只是开始。我们正在开发令人兴奋的功能,例如:
- 自定义指标:定义和跟踪特定领域的指标
- 实时警报:当工作流失败或性能不佳时获得通知
- A/B测试:使用内置实验比较不同的工作流版本
- 成本跟踪:监控API使用量和工作流的成本
- 集成生态系统:与流行的MLOps和监控工具连接
准备好让您的PocketFlow工作流完全可观测了吗? 今天安装pocketflow-tracing,体验全面、零摩擦