介绍PocketFlow跟踪:让您的AI工作流变得轻松可观测

✍️ 作者: Remy

将您的PocketFlow工作流从黑盒转变为完全可观察、可调试的系统,只需一行代码。


挑战:AI工作流的可观测性

构建AI工作流令人兴奋,但调试它们呢?没那么有趣。当您的PocketFlow工作流失败或行为异常时,您常常会问:

  • 哪个节点失败了,为什么?
  • 节点之间传输了哪些数据?
  • 每个步骤花费了多长时间?
  • 错误具体发生在何处?
  • 如何优化性能瓶颈?

传统的日志方法要求您手动为每个节点添加监控代码,这会使您的代码变得杂乱无章。您最终会花费更多时间在调试基础设施上,而不是构建实际的AI逻辑。

有没有更好的方法?

PocketFlow Tracing介绍

我们很高兴地宣布推出pocketflow-tracing,这是一个新包,它只需极少的代码更改即可为您的PocketFlow工作流带来企业级的可观测性。它基于Langfuse构建,提供了全面的跟踪、监控和调试功能,彻底改变了您开发和维护AI工作流的方式。

主要优点

🎯 零摩擦集成:只需一个@trace_flow()装饰器即可添加全面跟踪
📊 完全可见性:自动跟踪每个节点的执行、输入、输出和错误
性能洞察:跟踪执行时间并识别瓶颈
🔍 丰富的调试:在Langfuse的强大仪表板中查看详细跟踪
🚀 生产就绪:适用于开发和生产环境
🔄 异步支持:与AsyncFlow和AsyncNode完全兼容

前后对比:看看差异

前:手动日志噩梦

import logging
import time
from pocketflow import Node, Flow

logger = logging.getLogger(__name__)

class DataProcessingNode(Node):
    def prep(self, shared):
        start_time = time.time()
        logger.info(f"Starting prep phase with input: {shared}")
        try:
            data = shared.get("input_data")
            result = self._validate_data(data)
            logger.info(f"Prep completed in {time.time() - start_time:.2f}s")
            return result
        except Exception as e:
            logger.error(f"Prep failed: {e}")
            raise
    
    def exec(self, data):
        start_time = time.time()
        logger.info(f"Starting exec phase with data: {data}")
        try:
            processed = self._process_data(data)
            logger.info(f"Exec completed in {time.time() - start_time:.2f}s")
            return processed
        except Exception as e:
            logger.error(f"Exec failed: {e}")
            raise
    
    def post(self, shared, prep_res, exec_res):
        start_time = time.time()
        logger.info(f"Starting post phase")
        try:
            shared["output"] = exec_res
            logger.info(f"Post completed in {time.time() - start_time:.2f}s")
            return "default"
        except Exception as e:
            logger.error(f"Post failed: {e}")
            raise

class DataProcessingFlow(Flow):
    def __init__(self):
        super().__init__(start=DataProcessingNode())

后:干净的代码和自动跟踪

from pocketflow import Node, Flow
from pocketflow_tracing import trace_flow

class DataProcessingNode(Node):
    def prep(self, shared):
        data = shared.get("input_data")
        return self._validate_data(data)
    
    def exec(self, data):
        return self._process_data(data)
    
    def post(self, shared, prep_res, exec_res):
        shared["output"] = exec_res
        return "default"

@trace_flow()  # 🎉 就是这样!一行代码实现全面可观测性
class DataProcessingFlow(Flow):
    def __init__(self):
        super().__init__(start=DataProcessingNode())

差异显著:您的代码保持干净并专注于业务逻辑,而全面跟踪则自动在后台进行。

安装和设置

开始使用PocketFlow跟踪非常简单:

1. 安装包

pip install pocketflow-tracing

该包包含了所有必要的依赖项,包括Langfuse v2 SDK和用于配置管理的python-dotenv。

2. 配置环境

在项目根目录下创建一个.env文件:

# Langfuse配置
LANGFUSE_SECRET_KEY=sk-lf-your-secret-key-here
LANGFUSE_PUBLIC_KEY=pk-lf-your-public-key-here
LANGFUSE_HOST=https://your-self-hosted-langfuse

# 可选:开发时启用调试模式
POCKETFLOW_TRACING_DEBUG=true

获取Langfuse凭证:

  1. langfuse.com或您的自托管URL注册免费账户
  2. 创建一个新的项目
  3. 从项目设置中复制您的API密钥
  4. 使用https://cloud.langfuse.com作为托管版本,或您的自托管URL

3. 为您的流添加跟踪

from pocketflow import Node, Flow
from pocketflow_tracing import trace_flow

# 您的现有节点代码保持不变
class MyNode(Node):
    def prep(self, shared):
        return shared["input"]
    
    def exec(self, data):
        return f"Processed: {data}"
    
    def post(self, shared, prep_res, exec_res):
        shared["output"] = exec_res
        return "default"

# 只需添加装饰器 - 不需要其他更改!
@trace_flow()
class MyFlow(Flow):
    def __init__(self):
        super().__init__(start=MyNode())

# 正常运行您的流
flow = MyFlow()
shared = {"input": "Hello World"}
flow.run(shared)

就这样!您的工作流现在已完全跟踪并可观察。

完整的工作示例

让我们构建一个现实示例,展示PocketFlow跟踪的强大功能:

#!/usr/bin/env python3
"""
客户支持票证分析工作流
演示多节点AI工作流的全面跟踪。
"""

from pocketflow import Node, Flow
from pocketflow_tracing import trace_flow
import time
import random

class TicketClassificationNode(Node):
    """根据优先级和类别对支持票证进行分类。"""
    
    def prep(self, shared):
        ticket = shared.get("ticket", {})
        return {
            "subject": ticket.get("subject", ""),
            "description": ticket.get("description", ""),
            "customer_tier": ticket.get("customer_tier", "standard")
        }
    
    def exec(self, ticket_data):
        # 模拟AI分类
        time.sleep(0.1)  # 模拟处理时间
        
        # 简单分类逻辑
        subject = ticket_data["subject"].lower()
        description = ticket_data["description"].lower()
        
        if "urgent" in subject or "critical" in description:
            priority = "high"
        elif "billing" in subject or "payment" in description:
            priority = "medium"
        else:
            priority = "low"
            
        category = "technical" if "error" in description else "general"
        
        return {
            "priority": priority,
            "category": category,
            "confidence": random.uniform(0.8, 0.99)
        }
    
    def post(self, shared, prep_res, exec_res):
        shared["classification"] = exec_res
        return "route_ticket"

class TicketRoutingNode(Node):
    """根据分类将票证路由到适当的团队。"""
    
    def prep(self, shared):
        classification = shared.get("classification", {})
        customer_tier = shared.get("ticket", {}).get("customer_tier", "standard")
        return {
            "priority": classification.get("priority", "low"),
            "category": classification.get("category", "general"),
            "customer_tier": customer_tier
        }
    
    def exec(self, routing_data):
        # 模拟路由逻辑
        time.sleep(0.05)
        
        if routing_data["priority"] == "high":
            team = "escalation"
        elif routing_data["category"] == "technical":
            team = "engineering"
        elif routing_data["customer_tier"] == "premium":
            team = "premium_support"
        else:
            team = "general_support"
            
        return {
            "assigned_team": team,
            "estimated_response_time": "2 hours" if routing_data["priority"] == "high" else "24 hours"
        }
    
    def post(self, shared, prep_res, exec_res):
        shared["routing"] = exec_res
        return "default"

@trace_flow(flow_name="CustomerSupportTicketAnalysis")
class CustomerSupportFlow(Flow):
    """完整的客户支持票证分析工作流。"""
    
    def __init__(self):
        # 创建节点
        classifier = TicketClassificationNode()
        router = TicketRoutingNode()
        
        # 定义流
        classifier >> router
        
        super().__init__(start=classifier)

def main():
    """运行客户支持工作流示例。"""
    print("🎫 客户支持票证分析")
    print("=" * 40)
    
    # 示例票证数据
    tickets = [
        {
            "id": "TICKET-001",
            "subject": "URGENT: Payment processing error",
            "description": "Customer cannot complete payment, getting error 500",
            "customer_tier": "premium"
        },
        {
            "id": "TICKET-002", 
            "subject": "Question about billing cycle",
            "description": "When does my billing cycle reset?",
            "customer_tier": "standard"
        }
    ]
    
    flow = CustomerSupportFlow()
    
    for ticket in tickets:
        print(f"\n📥 Processing {ticket['id']}: {ticket['subject']}")
        
        shared = {"ticket": ticket}
        result = flow.run(shared)
        
        print(f"📊 Classification: {shared['classification']}")
        print(f"🎯 Routing: {shared['routing']}")
        print(f"✅ Result: {result}")
    
    print(f"\n📈 在Langfuse仪表板中查看详细跟踪!")

if __name__ == "__main__":
    main()

什么会被捕获

当您运行此示例时,PocketFlow跟踪会自动捕获:

流级别数据

  • 执行元数据:开始时间、结束时间、总持续时间
  • 输入状态:流开始时的完整共享数据
  • 输出状态:流完成时的最终共享数据
  • 流结果:流的最终返回值
  • 错误处理:任何带有完整堆栈跟踪的异常

节点级别数据

对于每个节点(TicketClassificationNode、TicketRoutingNode),系统会跟踪:

prep() 阶段:

  • 输入:完整的共享状态
  • 输出:prep方法返回的准备数据
  • 持续时间:确切执行时间
  • 错误:prep阶段期间的任何异常

exec() 阶段:

  • 输入:prep阶段的数据
  • 输出:处理结果
  • 持续时间:核心执行时间
  • 重试信息:如果配置了重试
  • 错误:带有上下文的处理失败

post() 阶段:

  • 输入:共享状态、prep结果、exec结果
  • 输出:流控制的动作字符串
  • 持续时间:后处理时间
  • 状态更改:共享数据如何被修改
  • 错误:任何后处理问题

在Langfuse中查看您的跟踪

运行您的跟踪工作流后,打开您的Langfuse仪表板以探索丰富的可观测性数据:

仪表板概览

导航到您的Langfuse主机URL(例如,https://cloud.langfuse.com),您将看到:

  • 跟踪列表:所有您的流执行,带有时间戳和状态
  • 性能指标:平均执行时间和成功率
  • 错误跟踪:带有详细错误信息的失败执行
  • 搜索和过滤:按名称、时间和状态查找特定跟踪

详细跟踪视图

点击任何跟踪以查看:

  • 层次结构跨度视图:流 → 节点 → 阶段的树结构
  • 时间线可视化:确切执行时间
  • 输入/输出数据:完整数据流通过您的工作流
  • 性能分解:每个阶段花费的时间
  • 错误详细信息:失败时的堆栈跟踪和错误上下文

跟踪提供了前所未有的工作流执行可见性,使调试和优化变得简单。

高级配置

自定义流名称和元数据

@trace_flow(
    flow_name="ProductionDataPipeline",
    session_id="batch-2024-001", 
    user_id="data-team"
)
class DataPipeline(Flow):
    pass

环境配置

from pocketflow_tracing import TracingConfig

# 加载自定义配置
config = TracingConfig.from_env()
config.debug = True
config.trace_inputs = True
config.trace_outputs = True

@trace_flow(config=config)
class MyFlow(Flow):
    pass

选择性跟踪控制

# 精细调整要跟踪的内容
POCKETFLOW_TRACE_INPUTS=true
POCKETFLOW_TRACE_OUTPUTS=true
POCKETFLOW_TRACE_PREP=true
POCKETFLOW_TRACE_EXEC=true
POCKETFLOW_TRACE_POST=true
POCKETFLOW_TRACE_ERRORS=true

故障排除指南

常见问题及解决方案

1. “仪表板中没有跟踪出现”

# 启用调试模式查看发生了什么
export POCKETFLOW_TRACING_DEBUG=true
python your_flow.py

2. “Langfuse连接失败”

  • 验证您的.env文件具有正确的凭证
  • 检查到Langfuse主机的网络连接
  • 确保Langfuse服务器正在运行(对于自托管)

3. “导入错误”

# 确保包已正确安装
pip install pocketflow-tracing

# 对于开发安装
pip install -e ".[dev]"

4. “性能影响担忧”

  • 跟踪添加的开销极小(通常每个节点<5ms)
  • 通过设置POCKETFLOW_TRACING_DEBUG=false禁用生产环境中的跟踪
  • 使用选择性跟踪以减少数据量

调试模式输出

当启用调试模式时,您将看到详细的输出,例如:

[TRACING] 初始化Langfuse客户端...
[TRACING] 开始跟踪:CustomerSupportTicketAnalysis
[TRACING] 开始节点跨度:TicketClassificationNode.prep
[TRACING] 节点跨度完成,耗时0.12ms
[TRACING] 跟踪已刷新到Langfuse

生产考虑

性能影响

  • 极小开销:跟踪通常每个节点执行增加1-5ms
  • 异步处理:跟踪异步发送以避免阻塞
  • 可配置深度:控制要跟踪的数据量

安全性和隐私

  • 数据净化:为敏感数据实现自定义序列化器
  • 环境分离:为开发/测试/生产使用不同的Langfuse项目
  • 访问控制:利用Langfuse内置的用户管理

扩展考虑

  • 批处理:批处理跟踪以高效传输
  • 错误恢复:跟踪失败不会影响工作流执行
  • 资源管理:自动清理跟踪数据

为什么这很重要

PocketFlow跟踪代表了我们处理AI工作流开发方式的根本转变:

之前:开发人员将30-40%的时间用于调试工作流,对执行流程和数据转换的可见性有限。

之后:开箱即用的完全可观测性使开发人员能够专注于构建更好的AI逻辑,而不是调试基础设施。

实际影响

  • 🚀 更快的开发:立即识别问题,而不是猜测
  • 🔧 更容易调试:确切地看到发生了什么和何时发生
  • 📈 性能优化:使用精确的计时数据识别瓶颈
  • 🛡️ 生产信心:实时监控工作流
  • 👥 团队协作:共享跟踪以进行协作调试

今天就开始

准备好通过全面的可观测性来转变您的PocketFlow工作流吗?

1. 安装包

pip install pocketflow-tracing

2. 设置环境

# 复制示例环境文件
curl -o .env https://raw.githubusercontent.com/The-Pocket/PocketFlow/main/cookbook/pocketflow-tracing/.env.example

# 使用您的Langfuse凭证编辑
nano .env

3. 为您的流添加一行

from pocketflow_tracing import trace_flow

@trace_flow()  # ← 添加此行
class YourFlow(Flow):
    # 您的现有代码保持不变!
    pass

4. 运行并观察

执行您的流,并在您的Langfuse仪表板中观看神奇的效果。

加入社区

PocketFlow跟踪是开源的,我们欢迎贡献!无论您想:

  • 🐛 报告错误或建议功能
  • 📚 改进文档
  • 🔧 添加新的跟踪功能
  • 🎯 分享用例和示例

访问我们的GitHub仓库并加入正在增长的开发人员社区,他们正在构建可观测的AI工作流。

下一步是什么?

这只是开始。我们正在开发令人兴奋的功能,例如:

  • 自定义指标:定义和跟踪特定领域的指标
  • 实时警报:当工作流失败或性能不佳时获得通知
  • A/B测试:使用内置实验比较不同的工作流版本
  • 成本跟踪:监控API使用量和工作流的成本
  • 集成生态系统:与流行的MLOps和监控工具连接

准备好让您的PocketFlow工作流完全可观测了吗? 今天安装pocketflow-tracing,体验全面、零摩擦