Introducing PocketFlow Tracing: Effortless Observability for Your AI Workflows
Transform your PocketFlow workflows from black boxes into fully observable, debuggable systems with just one line of code.
The Challenge: Observability in AI Workflows
Building AI workflows is exciting, but debugging them? Not so much. When your PocketFlow workflow fails or behaves unexpectedly, you’re often left asking:
- Which node failed and why?
- What data was flowing between nodes?
- How long did each step take?
- Where exactly did the error occur?
- How can I optimize performance bottlenecks?
Traditional logging approaches require you to manually instrument every node, cluttering your code with observability concerns. You end up spending more time on debugging infrastructure than building your actual AI logic.
What if there was a better way?
Introducing PocketFlow Tracing
We’re excited to announce pocketflow-tracing, a new package that brings enterprise-grade observability to your PocketFlow workflows with minimal code changes. Built on top of Langfuse, it provides comprehensive tracing, monitoring, and debugging capabilities that transform how you develop and maintain AI workflows.
Key Benefits
🎯 Zero-Friction Integration: Add comprehensive tracing with just a single @trace_flow()
decorator
📊 Complete Visibility: Automatically trace every node execution, input, output, and error
⚡ Performance Insights: Track execution times and identify bottlenecks
🔍 Rich Debugging: View detailed traces in Langfuse’s powerful dashboard
🚀 Production Ready: Built for both development and production environments
🔄 Async Support: Full compatibility with AsyncFlow and AsyncNode
Before and After: See the Difference
Before: Manual Logging Nightmare
import logging
import time
from pocketflow import Node, Flow
logger = logging.getLogger(__name__)
class DataProcessingNode(Node):
def prep(self, shared):
start_time = time.time()
logger.info(f"Starting prep phase with input: {shared}")
try:
data = shared.get("input_data")
result = self._validate_data(data)
logger.info(f"Prep completed in {time.time() - start_time:.2f}s")
return result
except Exception as e:
logger.error(f"Prep failed: {e}")
raise
def exec(self, data):
start_time = time.time()
logger.info(f"Starting exec phase with data: {data}")
try:
processed = self._process_data(data)
logger.info(f"Exec completed in {time.time() - start_time:.2f}s")
return processed
except Exception as e:
logger.error(f"Exec failed: {e}")
raise
def post(self, shared, prep_res, exec_res):
start_time = time.time()
logger.info(f"Starting post phase")
try:
shared["output"] = exec_res
logger.info(f"Post completed in {time.time() - start_time:.2f}s")
return "default"
except Exception as e:
logger.error(f"Post failed: {e}")
raise
class DataProcessingFlow(Flow):
def __init__(self):
super().__init__(start=DataProcessingNode())
After: Clean Code with Automatic Tracing
from pocketflow import Node, Flow
from pocketflow_tracing import trace_flow
class DataProcessingNode(Node):
def prep(self, shared):
data = shared.get("input_data")
return self._validate_data(data)
def exec(self, data):
return self._process_data(data)
def post(self, shared, prep_res, exec_res):
shared["output"] = exec_res
return "default"
@trace_flow() # 🎉 That's it! Full observability with one line
class DataProcessingFlow(Flow):
def __init__(self):
super().__init__(start=DataProcessingNode())
The difference is striking: Your code stays clean and focused on business logic, while comprehensive tracing happens automatically behind the scenes.
Installation and Setup
Getting started with PocketFlow tracing is incredibly simple:
1. Install the Package
pip install pocketflow-tracing
The package comes with all necessary dependencies, including Langfuse v2 SDK and python-dotenv for configuration management.
2. Configure Your Environment
Create a .env
file in your project root:
# Langfuse Configuration
LANGFUSE_SECRET_KEY=sk-lf-your-secret-key-here
LANGFUSE_PUBLIC_KEY=pk-lf-your-public-key-here
LANGFUSE_HOST=https://your-self-hosted-langfuse
# Optional: Enable debug mode for development
POCKETFLOW_TRACING_DEBUG=true
Getting Langfuse Credentials:
- Sign up for a free account at langfuse.com or your self-hosted URL
- Create a new project
- Copy your API keys from the project settings
- Use
https://cloud.langfuse.com
for the hosted version, or your self-hosted URL
3. Add Tracing to Your Flow
from pocketflow import Node, Flow
from pocketflow_tracing import trace_flow
# Your existing node code stays exactly the same
class MyNode(Node):
def prep(self, shared):
return shared["input"]
def exec(self, data):
return f"Processed: {data}"
def post(self, shared, prep_res, exec_res):
shared["output"] = exec_res
return "default"
# Just add the decorator - no other changes needed!
@trace_flow()
class MyFlow(Flow):
def __init__(self):
super().__init__(start=MyNode())
# Run your flow normally
flow = MyFlow()
shared = {"input": "Hello World"}
flow.run(shared)
That’s it! Your workflow is now fully traced and observable.
Complete Working Example
Let’s build a realistic example that demonstrates the power of PocketFlow tracing:
#!/usr/bin/env python3
"""
Customer Support Ticket Analysis Workflow
Demonstrates comprehensive tracing of a multi-node AI workflow.
"""
from pocketflow import Node, Flow
from pocketflow_tracing import trace_flow
import time
import random
class TicketClassificationNode(Node):
"""Classifies support tickets by priority and category."""
def prep(self, shared):
ticket = shared.get("ticket", {})
return {
"subject": ticket.get("subject", ""),
"description": ticket.get("description", ""),
"customer_tier": ticket.get("customer_tier", "standard")
}
def exec(self, ticket_data):
# Simulate AI classification
time.sleep(0.1) # Simulate processing time
# Simple classification logic
subject = ticket_data["subject"].lower()
description = ticket_data["description"].lower()
if "urgent" in subject or "critical" in description:
priority = "high"
elif "billing" in subject or "payment" in description:
priority = "medium"
else:
priority = "low"
category = "technical" if "error" in description else "general"
return {
"priority": priority,
"category": category,
"confidence": random.uniform(0.8, 0.99)
}
def post(self, shared, prep_res, exec_res):
shared["classification"] = exec_res
return "route_ticket"
class TicketRoutingNode(Node):
"""Routes tickets to appropriate teams based on classification."""
def prep(self, shared):
classification = shared.get("classification", {})
customer_tier = shared.get("ticket", {}).get("customer_tier", "standard")
return {
"priority": classification.get("priority", "low"),
"category": classification.get("category", "general"),
"customer_tier": customer_tier
}
def exec(self, routing_data):
# Simulate routing logic
time.sleep(0.05)
if routing_data["priority"] == "high":
team = "escalation"
elif routing_data["category"] == "technical":
team = "engineering"
elif routing_data["customer_tier"] == "premium":
team = "premium_support"
else:
team = "general_support"
return {
"assigned_team": team,
"estimated_response_time": "2 hours" if routing_data["priority"] == "high" else "24 hours"
}
def post(self, shared, prep_res, exec_res):
shared["routing"] = exec_res
return "default"
@trace_flow(flow_name="CustomerSupportTicketAnalysis")
class CustomerSupportFlow(Flow):
"""Complete customer support ticket analysis workflow."""
def __init__(self):
# Create nodes
classifier = TicketClassificationNode()
router = TicketRoutingNode()
# Define flow
classifier >> router
super().__init__(start=classifier)
def main():
"""Run the customer support workflow example."""
print("🎫 Customer Support Ticket Analysis")
print("=" * 40)
# Sample ticket data
tickets = [
{
"id": "TICKET-001",
"subject": "URGENT: Payment processing error",
"description": "Customer cannot complete payment, getting error 500",
"customer_tier": "premium"
},
{
"id": "TICKET-002",
"subject": "Question about billing cycle",
"description": "When does my billing cycle reset?",
"customer_tier": "standard"
}
]
flow = CustomerSupportFlow()
for ticket in tickets:
print(f"\n📥 Processing {ticket['id']}: {ticket['subject']}")
shared = {"ticket": ticket}
result = flow.run(shared)
print(f"📊 Classification: {shared['classification']}")
print(f"🎯 Routing: {shared['routing']}")
print(f"✅ Result: {result}")
print(f"\n📈 Check your Langfuse dashboard to see detailed traces!")
if __name__ == "__main__":
main()
What Gets Captured
When you run this example, PocketFlow tracing automatically captures:
Flow-Level Data
- Execution Metadata: Start time, end time, total duration
- Input State: Complete shared data when flow begins
- Output State: Final shared data when flow completes
- Flow Result: The final return value from the flow
- Error Handling: Any exceptions with full stack traces
Node-Level Data
For each node (TicketClassificationNode, TicketRoutingNode), the system traces:
prep() Phase:
- Input: Complete shared state
- Output: Prepared data returned by prep method
- Duration: Exact execution time
- Errors: Any exceptions during preparation
exec() Phase:
- Input: Data from prep phase
- Output: Processing results
- Duration: Core execution time
- Retry Information: If retries are configured
- Errors: Processing failures with context
post() Phase:
- Input: Shared state, prep result, exec result
- Output: Action string for flow control
- Duration: Post-processing time
- State Changes: How shared data was modified
- Errors: Any post-processing issues
Viewing Your Traces in Langfuse
After running your traced workflows, open your Langfuse dashboard to explore the rich observability data:
Dashboard Overview
Navigate to your Langfuse host URL (e.g., https://cloud.langfuse.com
) and you’ll see:
- Traces List: All your flow executions with timestamps and status
- Performance Metrics: Average execution times and success rates
- Error Tracking: Failed executions with detailed error information
- Search and Filtering: Find specific traces by name, time, or status
Detailed Trace View
Click on any trace to see:
- Hierarchical Span View: Flow → Nodes → Phases in a tree structure
- Timeline Visualization: See exactly when each phase executed
- Input/Output Data: Complete data flow through your workflow
- Performance Breakdown: Time spent in each phase
- Error Details: Stack traces and error context when failures occur
The tracing provides unprecedented visibility into your workflow execution, making debugging and optimization straightforward.
Advanced Configuration
Custom Flow Names and Metadata
@trace_flow(
flow_name="ProductionDataPipeline",
session_id="batch-2024-001",
user_id="data-team"
)
class DataPipeline(Flow):
pass
Environment-Based Configuration
from pocketflow_tracing import TracingConfig
# Load custom configuration
config = TracingConfig.from_env()
config.debug = True
config.trace_inputs = True
config.trace_outputs = True
@trace_flow(config=config)
class MyFlow(Flow):
pass
Selective Tracing Control
# Fine-tune what gets traced
POCKETFLOW_TRACE_INPUTS=true
POCKETFLOW_TRACE_OUTPUTS=true
POCKETFLOW_TRACE_PREP=true
POCKETFLOW_TRACE_EXEC=true
POCKETFLOW_TRACE_POST=true
POCKETFLOW_TRACE_ERRORS=true
Troubleshooting Guide
Common Issues and Solutions
1. “No traces appearing in dashboard”
# Enable debug mode to see what's happening
export POCKETFLOW_TRACING_DEBUG=true
python your_flow.py
2. “Langfuse connection failed”
- Verify your
.env
file has correct credentials - Check network connectivity to Langfuse host
- Ensure Langfuse server is running (for self-hosted)
3. “Import errors”
# Ensure package is properly installed
pip install pocketflow-tracing
# For development installation
pip install -e ".[dev]"
4. “Performance impact concerns”
- Tracing adds minimal overhead (typically <5ms per node)
- Disable in production by setting
POCKETFLOW_TRACING_DEBUG=false
- Use selective tracing to reduce data volume
Debug Mode Output
When debug mode is enabled, you’ll see detailed output like:
[TRACING] Initializing Langfuse client...
[TRACING] Starting trace: CustomerSupportTicketAnalysis
[TRACING] Starting node span: TicketClassificationNode.prep
[TRACING] Node span completed in 0.12ms
[TRACING] Trace flushed to Langfuse
Production Considerations
Performance Impact
- Minimal Overhead: Tracing adds typically 1-5ms per node execution
- Async Processing: Traces are sent asynchronously to avoid blocking
- Configurable Depth: Control what data gets traced to manage volume
Security and Privacy
- Data Sanitization: Implement custom serializers for sensitive data
- Environment Separation: Use different Langfuse projects for dev/staging/prod
- Access Control: Leverage Langfuse’s built-in user management
Scaling Considerations
- Batch Processing: Traces are batched for efficient transmission
- Error Resilience: Tracing failures don’t affect workflow execution
- Resource Management: Automatic cleanup of trace data
Why This Matters
PocketFlow tracing represents a fundamental shift in how we approach AI workflow development:
Before: Developers spent 30-40% of their time debugging workflows with limited visibility into execution flow and data transformations.
After: Complete observability out-of-the-box enables developers to focus on building better AI logic instead of debugging infrastructure.
Real Impact:
- 🚀 Faster Development: Identify issues immediately instead of guessing
- 🔧 Easier Debugging: See exactly what happened and when
- 📈 Performance Optimization: Identify bottlenecks with precise timing data
- 🛡️ Production Confidence: Monitor workflows in real-time
- 👥 Team Collaboration: Share traces for collaborative debugging
Get Started Today
Ready to transform your PocketFlow workflows with comprehensive observability?
1. Install the Package
pip install pocketflow-tracing
2. Set Up Your Environment
# Copy the example environment file
curl -o .env https://raw.githubusercontent.com/The-Pocket/PocketFlow/main/cookbook/pocketflow-tracing/.env.example
# Edit with your Langfuse credentials
nano .env
3. Add One Line to Your Flow
from pocketflow_tracing import trace_flow
@trace_flow() # ← Add this line
class YourFlow(Flow):
# Your existing code stays the same!
pass
4. Run and Observe
Execute your flow and watch the magic happen in your Langfuse dashboard.
Join the Community
PocketFlow tracing is open source and we welcome contributions! Whether you want to:
- 🐛 Report bugs or suggest features
- 📚 Improve documentation
- 🔧 Add new tracing capabilities
- 🎯 Share use cases and examples
Visit our GitHub repository and join the growing community of developers building observable AI workflows.
What’s Next?
This is just the beginning. We’re working on exciting features like:
- Custom Metrics: Define and track domain-specific metrics
- Real-time Alerts: Get notified when workflows fail or perform poorly
- A/B Testing: Compare different workflow versions with built-in experimentation
- Cost Tracking: Monitor API usage and costs across your workflows
- Integration Ecosystem: Connect with popular MLOps and monitoring tools
Ready to make your PocketFlow workflows fully observable? Install pocketflow-tracing today and experience the difference that comprehensive, zero-friction observability makes in your AI development workflow.
Happy tracing! 🚀