Reporting
Workflow reporting provides comprehensive execution metrics, performance analysis, and real-time monitoring.Core Concept
The reporting system automatically tracks workflow execution and provides detailed insights:Copy
from egregore.core.workflow import node, Sequence
@node("processor")
def processor(data: dict) -> dict:
return {"processed": data}
# Create and run workflow
workflow = Sequence(processor)
result = workflow.run({"data": "value"})
# Access reporting system
reporting = workflow.controller.reporting
# Get execution metrics
metrics = reporting.get_execution_metrics()
print(f"Success rate: {metrics.success_rate * 100}%")
print(f"Average duration: {metrics.average_node_duration:.3f}s")
Accessing Reports
Via Controller
Copy
# Access through workflow controller
workflow = Sequence(node1, node2, node3)
workflow.run(data)
# Get reporting system
reporting = workflow.controller.reporting
# Get various reports
metrics = reporting.get_execution_metrics()
performance = reporting.get_performance_summary()
status = reporting.get_execution_status()
Execution Metrics
WorkflowMetrics
Comprehensive execution statistics:Copy
metrics = reporting.get_execution_metrics()
# Basic counts
print(f"Total nodes executed: {metrics.total_nodes_executed}")
print(f"Successful: {metrics.successful_executions}")
print(f"Failed: {metrics.failed_executions}")
print(f"Success rate: {metrics.success_rate * 100:.1f}%")
# Timing metrics
print(f"Total time: {metrics.total_execution_time:.3f}s")
print(f"Average duration: {metrics.average_node_duration:.3f}s")
print(f"Min duration: {metrics.min_node_duration:.3f}s")
print(f"Max duration: {metrics.max_node_duration:.3f}s")
# Node statistics
print(f"Unique nodes: {metrics.unique_nodes_count}")
print(f"Execution counts: {metrics.node_execution_counts}")
# Timing
print(f"Started: {metrics.execution_start_time}")
print(f"Ended: {metrics.execution_end_time}")
Copy
@dataclass
class WorkflowMetrics:
total_nodes_executed: int
successful_executions: int
failed_executions: int
success_rate: float
total_execution_time: float
average_node_duration: float
min_node_duration: float
max_node_duration: float
unique_nodes_count: int
execution_start_time: Optional[datetime]
execution_end_time: Optional[datetime]
node_execution_counts: Dict[str, int]
Performance Analysis
PerformanceSummary
Detailed performance breakdown:Copy
performance = reporting.get_performance_summary()
# Overall statistics
print(f"Average: {performance.average_node_duration:.3f}s")
print(f"Median: {performance.median_node_duration:.3f}s")
print(f"Total time: {performance.total_workflow_time:.3f}s")
# Slowest nodes
print("Slowest nodes:")
for node_name, avg_duration in performance.slowest_nodes:
print(f" {node_name}: {avg_duration:.3f}s")
# Fastest nodes
print("Fastest nodes:")
for node_name, avg_duration in performance.fastest_nodes:
print(f" {node_name}: {avg_duration:.3f}s")
# Bottlenecks
if performance.bottlenecks:
print("Bottlenecks:")
for node in performance.bottlenecks:
print(f" - {node}")
# Outliers
if performance.performance_outliers:
print(f"Found {len(performance.performance_outliers)} outliers")
Copy
@dataclass
class PerformanceSummary:
average_node_duration: float
median_node_duration: float
slowest_nodes: List[tuple] # [(node_name, avg_duration), ...]
fastest_nodes: List[tuple]
bottlenecks: List[str]
performance_outliers: List[ExecutionEntry]
total_workflow_time: float
nodes_analyzed: int
Real-Time Status
ExecutionStatus
Current workflow execution state:Copy
status = reporting.get_execution_status()
# Controller state
print(f"State: {status.controller_state}")
# States: ready, running, paused, stopped, completed, error
# Execution path
print(f"Current path: {status.current_execution_path}")
print(f"Depth: {status.execution_depth}")
# Progress
print(f"Total executions: {status.total_executions}")
print(f"Running time: {status.running_time:.2f}s")
# Current node
print(f"Last executed: {status.last_executed_node}")
print(f"Current node: {status.current_node}")
print(f"Active: {status.is_active}")
Copy
@dataclass
class ExecutionStatus:
controller_state: str
current_execution_path: List[str]
execution_depth: int
total_executions: int
running_time: float
last_executed_node: Optional[str]
current_node: Optional[str]
is_active: bool
Error Reporting
ErrorReport
Comprehensive error analysis:Copy
error_report = reporting.get_error_report()
# Error counts
print(f"Total errors: {error_report.total_errors}")
print(f"Error rate: {error_report.error_rate * 100:.1f}%")
# Error categories
print("Error types:")
for error_type, count in error_report.error_categories.items():
print(f" {error_type}: {count}")
# Failed nodes
print("Failed nodes:")
for node in error_report.failed_nodes:
print(f" - {node}")
# Most common errors
print("Most common errors:")
for error_type, count in error_report.most_common_errors:
print(f" {error_type}: {count}")
# Detailed error information
for error in error_report.error_details:
print(f"Node: {error['node_name']}")
print(f" Type: {error['error_type']}")
print(f" Message: {error['error_message']}")
if error.get('stack_trace'):
print(f" Stack: {error['stack_trace']}")
Copy
@dataclass
class ErrorReport:
total_errors: int
error_categories: Dict[str, int]
failed_nodes: List[str]
error_details: List[Dict[str, Any]]
error_rate: float
most_common_errors: List[tuple]
Performance Thresholds
Set Thresholds
Define performance expectations:Copy
from egregore.core.workflow.reporting import PerformanceThreshold
# Set global threshold
global_threshold = PerformanceThreshold(
node_name=None, # None = applies to all nodes
max_duration=1.0, # 1 second max
warning_duration=0.5, # 0.5 second warning
alert_callback=lambda alert: print(f"Alert: {alert}"),
description="Global performance threshold"
)
reporting.set_performance_threshold("global", global_threshold)
# Set node-specific threshold
node_threshold = PerformanceThreshold(
node_name="slow_processor",
max_duration=5.0,
warning_duration=2.0,
alert_callback=None,
description="Slow processor threshold"
)
reporting.set_performance_threshold("slow_processor", node_threshold)
Monitor Violations
Check for threshold violations:Copy
# Get performance alerts
alerts = reporting.get_performance_alerts()
for alert in alerts:
print(f"Alert ID: {alert.alert_id}")
print(f"Node: {alert.node_name}")
print(f"Severity: {alert.severity}") # 'warning' or 'critical'
print(f"Actual: {alert.actual_duration:.3f}s")
print(f"Threshold: {alert.threshold_duration:.3f}s")
print(f"Violation factor: {alert.violation_factor:.2f}x")
print(f"Message: {alert.message}")
Bottleneck Analysis
Identify Bottlenecks
Find performance issues:Copy
bottlenecks = reporting.analyze_bottlenecks()
# Bottleneck nodes
for node_info in bottlenecks.bottleneck_nodes:
print(f"Node: {node_info['node_name']}")
print(f" Average: {node_info['avg_duration']:.3f}s")
print(f" Impact: {node_info['impact']:.1f}%")
# Performance outliers
for outlier in bottlenecks.performance_outliers:
print(f"Outlier: {outlier['node_name']}")
print(f" Duration: {outlier['duration']:.3f}s")
print(f" Deviation: {outlier['deviation']:.2f}x normal")
# Optimization suggestions
print("Suggestions:")
for suggestion in bottlenecks.optimization_suggestions:
print(f" - {suggestion}")
# Critical path
print(f"Critical path: {' -> '.join(bottlenecks.critical_path)}")
# Total impact
print(f"Total bottleneck impact: {bottlenecks.total_bottleneck_impact:.2f}s")
Performance Trends
Trend Analysis
Track performance over time:Copy
# Get trend for specific node
trend = reporting.get_performance_trend("processor")
print(f"Trend: {trend.trend_direction}") # improving, degrading, stable
print(f"Strength: {trend.trend_strength:.2f}") # -1.0 to 1.0
print(f"Current avg: {trend.current_avg_duration:.3f}s")
print(f"Historical avg: {trend.historical_avg_duration:.3f}s")
print(f"Change: {trend.change_percentage:+.1f}%")
print(f"Confidence: {trend.confidence_level:.2f}")
print(f"Data points: {trend.data_points}")
print(f"Recent outliers: {trend.recent_outliers}")
Performance Comparison
Compare Periods
Compare performance between time periods:Copy
# Compare current vs baseline
comparison = reporting.compare_performance(
baseline_start=datetime(2025, 1, 1),
baseline_end=datetime(2025, 1, 7),
current_start=datetime(2025, 1, 8),
current_end=datetime(2025, 1, 14)
)
print(f"Comparison: {comparison.comparison_type}")
print(f"Baseline: {comparison.baseline_name}")
print(f" Average: {comparison.baseline_avg_duration:.3f}s")
print(f" Data points: {comparison.baseline_data_points}")
print(f"Current: {comparison.current_name}")
print(f" Average: {comparison.current_avg_duration:.3f}s")
print(f" Data points: {comparison.current_data_points}")
print(f"Performance change: {comparison.performance_change:+.1f}%")
print(f"Significance: {comparison.significance_level:.2f}")
print(f"Recommendation: {comparison.recommendation}")
Export Reports
JSON Export
Copy
# Export metrics as JSON
metrics_json = reporting.export_metrics_json()
print(metrics_json)
# Save to file
with open("workflow_metrics.json", "w") as f:
f.write(metrics_json)
HTML Report
Copy
# Generate HTML report
html = reporting.generate_html_report()
# Save to file
with open("workflow_report.html", "w") as f:
f.write(html)
Event Subscribers
Subscribe to Events
Get notified of workflow events:Copy
def on_execution_complete(event_data):
"""Called when execution completes."""
print(f"Completed: {event_data['node_name']}")
print(f"Duration: {event_data['duration']:.3f}s")
def on_execution_error(event_data):
"""Called when execution fails."""
print(f"Error in: {event_data['node_name']}")
print(f"Error: {event_data['error']}")
# Subscribe to events
reporting.subscribe("execution_complete", on_execution_complete)
reporting.subscribe("execution_error", on_execution_error)
# Run workflow (callbacks triggered automatically)
workflow.run(data)
Common Patterns
Performance Dashboard
Copy
def print_performance_dashboard(workflow: Sequence):
"""Display comprehensive performance dashboard."""
reporting = workflow.controller.reporting
# Metrics
metrics = reporting.get_execution_metrics()
print("=== Execution Metrics ===")
print(f"Total nodes: {metrics.total_nodes_executed}")
print(f"Success rate: {metrics.success_rate * 100:.1f}%")
print(f"Total time: {metrics.total_execution_time:.2f}s")
# Performance
performance = reporting.get_performance_summary()
print("\n=== Performance ===")
print(f"Average: {performance.average_node_duration:.3f}s")
print(f"Median: {performance.median_node_duration:.3f}s")
if performance.bottlenecks:
print(f"\nBottlenecks: {', '.join(performance.bottlenecks)}")
# Errors
errors = reporting.get_error_report()
if errors.total_errors > 0:
print(f"\n=== Errors ===")
print(f"Total errors: {errors.total_errors}")
print(f"Error rate: {errors.error_rate * 100:.1f}%")
Continuous Monitoring
Copy
def monitor_workflow_health(workflow: Sequence):
"""Monitor workflow health in real-time."""
reporting = workflow.controller.reporting
# Set thresholds
threshold = PerformanceThreshold(
node_name=None,
max_duration=1.0,
warning_duration=0.5,
alert_callback=lambda alert: send_alert(alert),
description="Health monitoring"
)
reporting.set_performance_threshold("health", threshold)
# Subscribe to errors
def on_error(event_data):
send_error_notification(event_data)
reporting.subscribe("execution_error", on_error)
# Run workflow
workflow.run(data)
# Check health
alerts = reporting.get_performance_alerts()
if alerts:
print(f"Health issues: {len(alerts)} alerts")
Performance Regression Detection
Copy
def detect_regression(workflow: Sequence, baseline_metrics: dict):
"""Detect performance regression."""
reporting = workflow.controller.reporting
# Run workflow
workflow.run(data)
# Get current metrics
current_metrics = reporting.get_execution_metrics()
# Compare
regression_threshold = 1.2 # 20% slower
if current_metrics.average_node_duration > baseline_metrics["average"] * regression_threshold:
print("⚠️ Performance regression detected!")
print(f"Baseline: {baseline_metrics['average']:.3f}s")
print(f"Current: {current_metrics.average_node_duration:.3f}s")
return True
return False
Best Practices
Monitor critical workflows
Monitor critical workflows
Copy
# Good: Set thresholds for production workflows
threshold = PerformanceThreshold(
node_name=None,
max_duration=5.0,
warning_duration=2.0,
alert_callback=alert_ops_team,
description="Production threshold"
)
reporting.set_performance_threshold("production", threshold)
# Bad: No monitoring
workflow.run(data) # No visibility
Export metrics for analysis
Export metrics for analysis
Copy
# Good: Export and save metrics
metrics_json = reporting.export_metrics_json()
with open(f"metrics_{datetime.now().isoformat()}.json", "w") as f:
f.write(metrics_json)
# Bad: No metric retention
metrics = reporting.get_execution_metrics()
# Lost after execution
Track trends over time
Track trends over time
Copy
# Good: Analyze trends
trend = reporting.get_performance_trend("critical_node")
if trend.trend_direction == "degrading":
investigate_performance_issue()
# Bad: Only check current metrics
metrics = reporting.get_execution_metrics()
# No historical context
Subscribe to critical events
Subscribe to critical events
Copy
# Good: React to errors immediately
def on_error(event):
log_error(event)
notify_team(event)
trigger_fallback()
reporting.subscribe("execution_error", on_error)
# Bad: Check errors after execution
errors = reporting.get_error_report()
# Too late to react
Use bottleneck analysis
Use bottleneck analysis
Copy
# Good: Identify and optimize bottlenecks
bottlenecks = reporting.analyze_bottlenecks()
for suggestion in bottlenecks.optimization_suggestions:
apply_optimization(suggestion)
# Bad: Ignore performance issues
workflow.run(data)
# Slow workflows stay slow
Performance Overhead
Reporting Impact
- Metric collection: Less than 1% overhead
- Event callbacks: ~0.1ms per event
- Trend analysis: ~5ms per analysis
- Total impact: Negligible for typical workflows
Optimization
Copy
# Disable reporting for performance-critical workflows
workflow.controller.reporting_enabled = False
# Or selectively disable features
reporting.disable_event_subscribers()
reporting.disable_trend_analysis()

