Skip to main content

Reporting

Workflow reporting provides comprehensive execution metrics, performance analysis, and real-time monitoring.

Core Concept

The reporting system automatically tracks workflow execution and provides detailed insights:
from egregore.core.workflow import node, Sequence

@node("processor")
def processor(data: dict) -> dict:
    return {"processed": data}

# Create and run workflow
workflow = Sequence(processor)
result = workflow.run({"data": "value"})

# Access reporting system
reporting = workflow.controller.reporting

# Get execution metrics
metrics = reporting.get_execution_metrics()
print(f"Success rate: {metrics.success_rate * 100}%")
print(f"Average duration: {metrics.average_node_duration:.3f}s")

Accessing Reports

Via Controller

# Access through workflow controller
workflow = Sequence(node1, node2, node3)
workflow.run(data)

# Get reporting system
reporting = workflow.controller.reporting

# Get various reports
metrics = reporting.get_execution_metrics()
performance = reporting.get_performance_summary()
status = reporting.get_execution_status()

Execution Metrics

WorkflowMetrics

Comprehensive execution statistics:
metrics = reporting.get_execution_metrics()

# Basic counts
print(f"Total nodes executed: {metrics.total_nodes_executed}")
print(f"Successful: {metrics.successful_executions}")
print(f"Failed: {metrics.failed_executions}")
print(f"Success rate: {metrics.success_rate * 100:.1f}%")

# Timing metrics
print(f"Total time: {metrics.total_execution_time:.3f}s")
print(f"Average duration: {metrics.average_node_duration:.3f}s")
print(f"Min duration: {metrics.min_node_duration:.3f}s")
print(f"Max duration: {metrics.max_node_duration:.3f}s")

# Node statistics
print(f"Unique nodes: {metrics.unique_nodes_count}")
print(f"Execution counts: {metrics.node_execution_counts}")

# Timing
print(f"Started: {metrics.execution_start_time}")
print(f"Ended: {metrics.execution_end_time}")
Metrics Structure:
@dataclass
class WorkflowMetrics:
    total_nodes_executed: int
    successful_executions: int
    failed_executions: int
    success_rate: float
    total_execution_time: float
    average_node_duration: float
    min_node_duration: float
    max_node_duration: float
    unique_nodes_count: int
    execution_start_time: Optional[datetime]
    execution_end_time: Optional[datetime]
    node_execution_counts: Dict[str, int]

Performance Analysis

PerformanceSummary

Detailed performance breakdown:
performance = reporting.get_performance_summary()

# Overall statistics
print(f"Average: {performance.average_node_duration:.3f}s")
print(f"Median: {performance.median_node_duration:.3f}s")
print(f"Total time: {performance.total_workflow_time:.3f}s")

# Slowest nodes
print("Slowest nodes:")
for node_name, avg_duration in performance.slowest_nodes:
    print(f"  {node_name}: {avg_duration:.3f}s")

# Fastest nodes
print("Fastest nodes:")
for node_name, avg_duration in performance.fastest_nodes:
    print(f"  {node_name}: {avg_duration:.3f}s")

# Bottlenecks
if performance.bottlenecks:
    print("Bottlenecks:")
    for node in performance.bottlenecks:
        print(f"  - {node}")

# Outliers
if performance.performance_outliers:
    print(f"Found {len(performance.performance_outliers)} outliers")
Performance Structure:
@dataclass
class PerformanceSummary:
    average_node_duration: float
    median_node_duration: float
    slowest_nodes: List[tuple]  # [(node_name, avg_duration), ...]
    fastest_nodes: List[tuple]
    bottlenecks: List[str]
    performance_outliers: List[ExecutionEntry]
    total_workflow_time: float
    nodes_analyzed: int

Real-Time Status

ExecutionStatus

Current workflow execution state:
status = reporting.get_execution_status()

# Controller state
print(f"State: {status.controller_state}")
# States: ready, running, paused, stopped, completed, error

# Execution path
print(f"Current path: {status.current_execution_path}")
print(f"Depth: {status.execution_depth}")

# Progress
print(f"Total executions: {status.total_executions}")
print(f"Running time: {status.running_time:.2f}s")

# Current node
print(f"Last executed: {status.last_executed_node}")
print(f"Current node: {status.current_node}")
print(f"Active: {status.is_active}")
Status Structure:
@dataclass
class ExecutionStatus:
    controller_state: str
    current_execution_path: List[str]
    execution_depth: int
    total_executions: int
    running_time: float
    last_executed_node: Optional[str]
    current_node: Optional[str]
    is_active: bool

Error Reporting

ErrorReport

Comprehensive error analysis:
error_report = reporting.get_error_report()

# Error counts
print(f"Total errors: {error_report.total_errors}")
print(f"Error rate: {error_report.error_rate * 100:.1f}%")

# Error categories
print("Error types:")
for error_type, count in error_report.error_categories.items():
    print(f"  {error_type}: {count}")

# Failed nodes
print("Failed nodes:")
for node in error_report.failed_nodes:
    print(f"  - {node}")

# Most common errors
print("Most common errors:")
for error_type, count in error_report.most_common_errors:
    print(f"  {error_type}: {count}")

# Detailed error information
for error in error_report.error_details:
    print(f"Node: {error['node_name']}")
    print(f"  Type: {error['error_type']}")
    print(f"  Message: {error['error_message']}")
    if error.get('stack_trace'):
        print(f"  Stack: {error['stack_trace']}")
Error Structure:
@dataclass
class ErrorReport:
    total_errors: int
    error_categories: Dict[str, int]
    failed_nodes: List[str]
    error_details: List[Dict[str, Any]]
    error_rate: float
    most_common_errors: List[tuple]

Performance Thresholds

Set Thresholds

Define performance expectations:
from egregore.core.workflow.reporting import PerformanceThreshold

# Set global threshold
global_threshold = PerformanceThreshold(
    node_name=None,  # None = applies to all nodes
    max_duration=1.0,  # 1 second max
    warning_duration=0.5,  # 0.5 second warning
    alert_callback=lambda alert: print(f"Alert: {alert}"),
    description="Global performance threshold"
)

reporting.set_performance_threshold("global", global_threshold)

# Set node-specific threshold
node_threshold = PerformanceThreshold(
    node_name="slow_processor",
    max_duration=5.0,
    warning_duration=2.0,
    alert_callback=None,
    description="Slow processor threshold"
)

reporting.set_performance_threshold("slow_processor", node_threshold)

Monitor Violations

Check for threshold violations:
# Get performance alerts
alerts = reporting.get_performance_alerts()

for alert in alerts:
    print(f"Alert ID: {alert.alert_id}")
    print(f"Node: {alert.node_name}")
    print(f"Severity: {alert.severity}")  # 'warning' or 'critical'
    print(f"Actual: {alert.actual_duration:.3f}s")
    print(f"Threshold: {alert.threshold_duration:.3f}s")
    print(f"Violation factor: {alert.violation_factor:.2f}x")
    print(f"Message: {alert.message}")

Bottleneck Analysis

Identify Bottlenecks

Find performance issues:
bottlenecks = reporting.analyze_bottlenecks()

# Bottleneck nodes
for node_info in bottlenecks.bottleneck_nodes:
    print(f"Node: {node_info['node_name']}")
    print(f"  Average: {node_info['avg_duration']:.3f}s")
    print(f"  Impact: {node_info['impact']:.1f}%")

# Performance outliers
for outlier in bottlenecks.performance_outliers:
    print(f"Outlier: {outlier['node_name']}")
    print(f"  Duration: {outlier['duration']:.3f}s")
    print(f"  Deviation: {outlier['deviation']:.2f}x normal")

# Optimization suggestions
print("Suggestions:")
for suggestion in bottlenecks.optimization_suggestions:
    print(f"  - {suggestion}")

# Critical path
print(f"Critical path: {' -> '.join(bottlenecks.critical_path)}")

# Total impact
print(f"Total bottleneck impact: {bottlenecks.total_bottleneck_impact:.2f}s")

Trend Analysis

Track performance over time:
# Get trend for specific node
trend = reporting.get_performance_trend("processor")

print(f"Trend: {trend.trend_direction}")  # improving, degrading, stable
print(f"Strength: {trend.trend_strength:.2f}")  # -1.0 to 1.0
print(f"Current avg: {trend.current_avg_duration:.3f}s")
print(f"Historical avg: {trend.historical_avg_duration:.3f}s")
print(f"Change: {trend.change_percentage:+.1f}%")
print(f"Confidence: {trend.confidence_level:.2f}")
print(f"Data points: {trend.data_points}")
print(f"Recent outliers: {trend.recent_outliers}")

Performance Comparison

Compare Periods

Compare performance between time periods:
# Compare current vs baseline
comparison = reporting.compare_performance(
    baseline_start=datetime(2025, 1, 1),
    baseline_end=datetime(2025, 1, 7),
    current_start=datetime(2025, 1, 8),
    current_end=datetime(2025, 1, 14)
)

print(f"Comparison: {comparison.comparison_type}")
print(f"Baseline: {comparison.baseline_name}")
print(f"  Average: {comparison.baseline_avg_duration:.3f}s")
print(f"  Data points: {comparison.baseline_data_points}")

print(f"Current: {comparison.current_name}")
print(f"  Average: {comparison.current_avg_duration:.3f}s")
print(f"  Data points: {comparison.current_data_points}")

print(f"Performance change: {comparison.performance_change:+.1f}%")
print(f"Significance: {comparison.significance_level:.2f}")
print(f"Recommendation: {comparison.recommendation}")

Export Reports

JSON Export

# Export metrics as JSON
metrics_json = reporting.export_metrics_json()
print(metrics_json)

# Save to file
with open("workflow_metrics.json", "w") as f:
    f.write(metrics_json)

HTML Report

# Generate HTML report
html = reporting.generate_html_report()

# Save to file
with open("workflow_report.html", "w") as f:
    f.write(html)

Event Subscribers

Subscribe to Events

Get notified of workflow events:
def on_execution_complete(event_data):
    """Called when execution completes."""
    print(f"Completed: {event_data['node_name']}")
    print(f"Duration: {event_data['duration']:.3f}s")

def on_execution_error(event_data):
    """Called when execution fails."""
    print(f"Error in: {event_data['node_name']}")
    print(f"Error: {event_data['error']}")

# Subscribe to events
reporting.subscribe("execution_complete", on_execution_complete)
reporting.subscribe("execution_error", on_execution_error)

# Run workflow (callbacks triggered automatically)
workflow.run(data)

Common Patterns

Performance Dashboard

def print_performance_dashboard(workflow: Sequence):
    """Display comprehensive performance dashboard."""
    reporting = workflow.controller.reporting

    # Metrics
    metrics = reporting.get_execution_metrics()
    print("=== Execution Metrics ===")
    print(f"Total nodes: {metrics.total_nodes_executed}")
    print(f"Success rate: {metrics.success_rate * 100:.1f}%")
    print(f"Total time: {metrics.total_execution_time:.2f}s")

    # Performance
    performance = reporting.get_performance_summary()
    print("\n=== Performance ===")
    print(f"Average: {performance.average_node_duration:.3f}s")
    print(f"Median: {performance.median_node_duration:.3f}s")

    if performance.bottlenecks:
        print(f"\nBottlenecks: {', '.join(performance.bottlenecks)}")

    # Errors
    errors = reporting.get_error_report()
    if errors.total_errors > 0:
        print(f"\n=== Errors ===")
        print(f"Total errors: {errors.total_errors}")
        print(f"Error rate: {errors.error_rate * 100:.1f}%")

Continuous Monitoring

def monitor_workflow_health(workflow: Sequence):
    """Monitor workflow health in real-time."""
    reporting = workflow.controller.reporting

    # Set thresholds
    threshold = PerformanceThreshold(
        node_name=None,
        max_duration=1.0,
        warning_duration=0.5,
        alert_callback=lambda alert: send_alert(alert),
        description="Health monitoring"
    )
    reporting.set_performance_threshold("health", threshold)

    # Subscribe to errors
    def on_error(event_data):
        send_error_notification(event_data)

    reporting.subscribe("execution_error", on_error)

    # Run workflow
    workflow.run(data)

    # Check health
    alerts = reporting.get_performance_alerts()
    if alerts:
        print(f"Health issues: {len(alerts)} alerts")

Performance Regression Detection

def detect_regression(workflow: Sequence, baseline_metrics: dict):
    """Detect performance regression."""
    reporting = workflow.controller.reporting

    # Run workflow
    workflow.run(data)

    # Get current metrics
    current_metrics = reporting.get_execution_metrics()

    # Compare
    regression_threshold = 1.2  # 20% slower
    if current_metrics.average_node_duration > baseline_metrics["average"] * regression_threshold:
        print("⚠️ Performance regression detected!")
        print(f"Baseline: {baseline_metrics['average']:.3f}s")
        print(f"Current: {current_metrics.average_node_duration:.3f}s")
        return True

    return False

Best Practices

# Good: Set thresholds for production workflows
threshold = PerformanceThreshold(
    node_name=None,
    max_duration=5.0,
    warning_duration=2.0,
    alert_callback=alert_ops_team,
    description="Production threshold"
)
reporting.set_performance_threshold("production", threshold)

# Bad: No monitoring
workflow.run(data)  # No visibility
# Good: Export and save metrics
metrics_json = reporting.export_metrics_json()
with open(f"metrics_{datetime.now().isoformat()}.json", "w") as f:
    f.write(metrics_json)

# Bad: No metric retention
metrics = reporting.get_execution_metrics()
# Lost after execution
# Good: React to errors immediately
def on_error(event):
    log_error(event)
    notify_team(event)
    trigger_fallback()

reporting.subscribe("execution_error", on_error)

# Bad: Check errors after execution
errors = reporting.get_error_report()
# Too late to react
# Good: Identify and optimize bottlenecks
bottlenecks = reporting.analyze_bottlenecks()
for suggestion in bottlenecks.optimization_suggestions:
    apply_optimization(suggestion)

# Bad: Ignore performance issues
workflow.run(data)
# Slow workflows stay slow

Performance Overhead

Reporting Impact

  • Metric collection: Less than 1% overhead
  • Event callbacks: ~0.1ms per event
  • Trend analysis: ~5ms per analysis
  • Total impact: Negligible for typical workflows

Optimization

# Disable reporting for performance-critical workflows
workflow.controller.reporting_enabled = False

# Or selectively disable features
reporting.disable_event_subscribers()
reporting.disable_trend_analysis()

What’s Next?