Skip to main content

Validation

Validation ensures workflow integrity before execution by detecting cycles, verifying dependencies, and checking configuration.

Core Concept

The validation system performs comprehensive pre-execution checks to catch errors early:
from egregore.core.workflow import node, Sequence, validate_sequence

@node("processor")
def processor(data: dict) -> dict:
    return {"processed": data}

# Create workflow
workflow = Sequence(processor)

# Validate before running
result = validate_sequence(workflow)

if result.is_valid:
    print("Workflow is valid!")
    workflow.run(data)
else:
    print("Validation failed:")
    print(result.get_summary())

Quick Validation

validate_sequence()

Validate a workflow with default validators:
from egregore.core.workflow import validate_sequence, Sequence

workflow = Sequence(node1, node2, node3)

# Validate with defaults
result = validate_sequence(workflow)

# Check result
if result.is_valid:
    workflow.run(data)
else:
    # Handle validation errors
    for error in result.errors:
        print(f"ERROR: {error}")

raise_if_invalid()

Automatically raise exception on validation failure:
# Validate and raise if invalid
result = validate_sequence(workflow)
result.raise_if_invalid()  # Raises ValueError if validation failed

# If we get here, validation passed
workflow.run(data)

Validation Result

ValidationResult Structure

@dataclass
class ValidationResult:
    is_valid: bool
    errors: List[ValidationError]
    warnings: List[ValidationWarning]
    suggestions: List[ValidationSuggestion]
Access validation details:
result = validate_sequence(workflow)

# Check overall status
print(f"Valid: {result.is_valid}")

# Check errors (prevent execution)
print(f"Errors: {len(result.errors)}")
for error in result.errors:
    print(f"  - {error.message}")
    if error.suggestion:
        print(f"    Suggestion: {error.suggestion}")

# Check warnings (execution allowed)
print(f"Warnings: {len(result.warnings)}")
for warning in result.warnings:
    print(f"  - {warning.message}")

# Check suggestions (improvements)
print(f"Suggestions: {len(result.suggestions)}")
for suggestion in result.suggestions:
    print(f"  - {suggestion.message}")

get_summary()

Get formatted validation report:
result = validate_sequence(workflow)

# Print formatted summary
print(result.get_summary())

# Output:
# Validation Status: INVALID
# Errors: 1
#   - ERROR: Cycle detected: A -> B -> C -> A
# Warnings: 1
#   - WARNING: Node appears to be a dead end
# Suggestions: 0

Built-in Validators

CycleDetectionValidator

Detects circular dependencies in workflow graphs:
from egregore.core.workflow import node, decision

@node("A")
def node_a(data): return data

@node("B")
def node_b(data): return data

@node("C")
def node_c(data): return data

# Create cycle: A -> B -> C -> A
@decision("route")
def route(data):
    return "A"  # Always returns to A

# This will fail validation
workflow = Sequence(node_a >> node_b >> node_c >> route)
result = validate_sequence(workflow)

# Result:
# ERROR: Cycle detected: A -> B -> C -> route -> A
# Suggestion: Remove one of the connections in the cycle or add max_iter to decision nodes
Cycle Detection Algorithm:
  • Uses DFS (Depth-First Search) with color coding (WHITE, GRAY, BLACK)
  • WHITE: Unvisited nodes
  • GRAY: Currently being explored (in recursion stack)
  • BLACK: Fully explored
  • Back edge from GRAY to GRAY = cycle detected

DependencyValidator

Validates node dependencies and structure:
from egregore.core.workflow import parallel

# Empty parallel node (error)
empty_parallel = parallel()  # No child nodes

result = validate_sequence(Sequence(empty_parallel))
# ERROR: Parallel node has no child nodes
# Suggestion: Add nodes to parallel execution or use a regular node

# Duplicate names in parallel (error)
@node("fetch")
def fetch1(data): return data

@node("fetch")  # Same name!
def fetch2(data): return data

parallel_with_duplicates = parallel(fetch1, fetch2)
result = validate_sequence(Sequence(parallel_with_duplicates))
# ERROR: Parallel node contains duplicate names: fetch
# Suggestion: Ensure all parallel nodes have unique names
Checks performed:
  • Parallel nodes have at least one child
  • No duplicate names in parallel branches
  • Valid concurrency limits (positive integers)
  • Decision nodes have at least one pattern
  • Structural warnings for dead-end nodes

SchemaValidator

Validates workflow configuration and schema:
# Empty sequence (error)
empty_workflow = Sequence(None)
result = validate_sequence(empty_workflow)
# ERROR: Sequence has no start node
# Suggestion: Provide a start node when creating the sequence

# Single-node workflow (warning)
single_node = Sequence(processor)
result = validate_sequence(single_node)
# WARNING: Sequence contains only one node
# Suggestion: Consider if this should be a simple function call instead
Checks performed:
  • Sequence has a start node
  • Sequence contains executable nodes
  • Meaningful workflow structure
  • Basic configuration validity

Custom Validators

Create Custom Validator

Implement BaseValidator to create custom validation rules:
from egregore.core.workflow.validation import (
    BaseValidator, ValidationResult, ValidationError, ValidationWarning
)

class CustomValidator(BaseValidator):
    """Custom validation logic."""

    @property
    def validator_name(self) -> str:
        return "CustomValidation"

    def validate(self, sequence) -> ValidationResult:
        """Perform custom validation."""
        errors = []
        warnings = []

        # Custom validation logic
        nodes = self._get_all_nodes(sequence)

        for node in nodes:
            # Example: Check node names
            if hasattr(node, 'name') and not node.name:
                warnings.append(ValidationWarning(
                    message=f"Node {node} has no name",
                    location=node,
                    suggestion="Add a name for better debugging"
                ))

            # Example: Check for specific pattern
            if hasattr(node, 'name') and 'test' in node.name.lower():
                warnings.append(ValidationWarning(
                    message=f"Node name contains 'test': {node.name}",
                    location=node,
                    suggestion="Remove test nodes before production"
                ))

        return ValidationResult(
            is_valid=len(errors) == 0,
            errors=errors,
            warnings=warnings
        )

    def _get_all_nodes(self, sequence):
        """Helper to get all nodes."""
        from collections import deque
        nodes = []
        visited = set()
        queue = deque([sequence.start])

        while queue:
            node = queue.popleft()
            if node in visited or node is None:
                continue
            visited.add(node)
            nodes.append(node)

            # Add connected nodes
            if hasattr(node, 'next_node') and node.next_node:
                queue.append(node.next_node)

        return nodes

Use Custom Validator

# Create validator
custom_validator = CustomValidator()

# Validate with custom validator
result = validate_sequence(workflow, validators=[custom_validator])

# Or combine with built-in validators
from egregore.core.workflow.validators import (
    CycleDetectionValidator, DependencyValidator
)

result = validate_sequence(workflow, validators=[
    CycleDetectionValidator(),
    DependencyValidator(),
    custom_validator
])

Validation Pipeline

SequenceValidator

Coordinate multiple validators:
from egregore.core.workflow.validation import SequenceValidator
from egregore.core.workflow.validators import (
    CycleDetectionValidator,
    DependencyValidator,
    SchemaValidator
)

# Create validation pipeline
validator = SequenceValidator()

# Add validators
validator.add_validator(CycleDetectionValidator())
validator.add_validator(DependencyValidator())
validator.add_validator(SchemaValidator())
validator.add_validator(CustomValidator())

# Run validation
result = validator.validate_sequence(workflow)

# Get formatted report
report = validator.get_validation_report(workflow)
print(report)

create_default_validator()

Create validator with all default validators:
from egregore.core.workflow.validation import create_default_validator

# Create default validator (includes all built-in validators)
validator = create_default_validator()

# Add custom validators
validator.add_validator(CustomValidator())

# Validate
result = validator.validate_sequence(workflow)

Validation Severity

Severity Levels

ERROR - Prevents execution:
ValidationError(
    message="Cycle detected in workflow",
    severity=ValidationSeverity.ERROR,
    location=node,
    suggestion="Break the cycle or add max_iter"
)
WARNING - Execution allowed but issues exist:
ValidationWarning(
    message="Node appears to be a dead end",
    severity=ValidationSeverity.WARNING,
    location=node,
    suggestion="Consider connecting to another node"
)
INFO - Suggestions for improvement:
ValidationSuggestion(
    message="Consider using parallel execution",
    severity=ValidationSeverity.INFO,
    location=node,
    suggestion="These nodes could run concurrently"
)

Common Patterns

Pre-Execution Validation

from egregore.core.workflow import validate_sequence, Sequence

def run_workflow_safely(workflow: Sequence, data: dict) -> dict:
    """Run workflow with validation."""
    # Validate first
    result = validate_sequence(workflow)

    if not result.is_valid:
        raise ValueError(f"Workflow validation failed:\n{result.get_summary()}")

    # Show warnings
    if result.warnings:
        print("Warnings:")
        for warning in result.warnings:
            print(f"  - {warning}")

    # Execute
    return workflow.run(data)

Development vs Production

import os

# Strict validation in development
if os.getenv("ENV") == "development":
    result = validate_sequence(workflow)
    result.raise_if_invalid()

    # Show all warnings
    for warning in result.warnings:
        print(f"WARNING: {warning}")

# Lenient in production (log warnings only)
else:
    result = validate_sequence(workflow)

    if not result.is_valid:
        # Log errors but continue if non-critical
        for error in result.errors:
            logger.error(f"Validation error: {error}")

    for warning in result.warnings:
        logger.warning(f"Validation warning: {warning}")

CI/CD Integration

def test_workflow_validation():
    """Test workflow passes validation."""
    workflow = create_production_workflow()

    result = validate_sequence(workflow)

    # Assert no errors
    assert result.is_valid, f"Validation failed:\n{result.get_summary()}"

    # Assert no warnings (strict)
    assert len(result.warnings) == 0, f"Workflow has warnings:\n{result.get_summary()}"

Validation Report

def generate_validation_report(workflow: Sequence) -> dict:
    """Generate detailed validation report."""
    result = validate_sequence(workflow)

    return {
        "valid": result.is_valid,
        "summary": result.get_summary(),
        "error_count": len(result.errors),
        "warning_count": len(result.warnings),
        "suggestion_count": len(result.suggestions),
        "errors": [
            {
                "message": error.message,
                "severity": error.severity.value,
                "location": str(error.location) if error.location else None,
                "suggestion": error.suggestion
            }
            for error in result.errors
        ],
        "warnings": [
            {
                "message": warning.message,
                "severity": warning.severity.value,
                "location": str(warning.location) if warning.location else None,
                "suggestion": warning.suggestion
            }
            for warning in result.warnings
        ]
    }

Best Practices

# Good: Validate before deployment
def deploy_workflow(workflow: Sequence):
    result = validate_sequence(workflow)
    if not result.is_valid:
        raise ValueError("Cannot deploy invalid workflow")

    # Deploy
    production_system.deploy(workflow)

# Bad: Deploy without validation
def deploy_workflow(workflow: Sequence):
    production_system.deploy(workflow)  # May fail at runtime
# Good: Test validation passes
def test_workflow_is_valid():
    workflow = create_workflow()
    result = validate_sequence(workflow)
    assert result.is_valid

# Bad: No validation testing
def test_workflow():
    workflow = create_workflow()
    result = workflow.run(test_data)  # Fails at runtime
# Good: Domain-specific validation
class BusinessRuleValidator(BaseValidator):
    def validate(self, sequence):
        # Validate business logic
        # - Required approval nodes
        # - Data sensitivity checks
        # - Compliance requirements
        pass

# Bad: No domain validation
result = validate_sequence(workflow)  # Only structural checks
# Good: Comprehensive logging
result = validate_sequence(workflow)
logger.info(f"Validation result: {result.get_summary()}")

if not result.is_valid:
    for error in result.errors:
        logger.error(f"Validation error: {error}")

# Bad: Silent validation
result = validate_sequence(workflow)
if result.is_valid:
    workflow.run(data)
# Good: Review warnings
result = validate_sequence(workflow)

if result.warnings:
    print("Review these warnings:")
    for warning in result.warnings:
        print(f"  {warning}")
        if warning.suggestion:
            print(f"  Suggestion: {warning.suggestion}")

# Bad: Ignore warnings
result = validate_sequence(workflow)
if result.is_valid:
    workflow.run(data)  # Warnings ignored

Performance Considerations

Validation Overhead

  • Cycle detection: O(V + E) where V=nodes, E=edges
  • Dependency validation: O(V) node traversal
  • Schema validation: O(1) basic checks
  • Total: Typically less than 10ms for workflows with less than 100 nodes

When to Validate

# Validate once at startup (recommended)
workflow = create_workflow()
validate_sequence(workflow).raise_if_invalid()

# Run many times without re-validation
for data in dataset:
    workflow.run(data)

# Don't validate on every run (slow)
for data in dataset:
    validate_sequence(workflow)  # SLOW - unnecessary
    workflow.run(data)

Caching Validation Results

class ValidatedWorkflow:
    """Workflow with cached validation."""

    def __init__(self, workflow: Sequence):
        self.workflow = workflow
        self._validation_result = None

    def validate(self) -> ValidationResult:
        """Validate and cache result."""
        if self._validation_result is None:
            self._validation_result = validate_sequence(self.workflow)
        return self._validation_result

    def run(self, data: dict) -> dict:
        """Run with automatic validation."""
        self.validate().raise_if_invalid()
        return self.workflow.run(data)

Error Messages

Common Validation Errors

Cycle detected:
ERROR: Cycle detected: node_a -> node_b -> node_c -> node_a
Suggestion: Remove one of the connections in the cycle or add max_iter to decision nodes
Empty parallel node:
ERROR: Parallel node has no child nodes
Suggestion: Add nodes to parallel execution or use a regular node
Duplicate names:
ERROR: Parallel node contains duplicate names: fetch
Suggestion: Ensure all parallel nodes have unique names
No start node:
ERROR: Sequence has no start node
Suggestion: Provide a start node when creating the sequence

What’s Next?