Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt

Use this file to discover all available pages before exploring further.

Validation

Validation ensures workflow integrity before execution by detecting cycles, verifying dependencies, and checking configuration.

Core Concept

The validation system performs comprehensive pre-execution checks to catch errors early:
from egregore.core.workflow import node, Sequence, validate_sequence

@node("processor")
def processor(data: dict) -> dict:
    return {"processed": data}

# Create workflow
workflow = Sequence(processor)

# Validate before running
result = validate_sequence(workflow)

if result.is_valid:
    print("Workflow is valid!")
    workflow.run(data)
else:
    print("Validation failed:")
    print(result.get_summary())

Quick Validation

validate_sequence()

Validate a workflow with default validators:
from egregore.core.workflow import validate_sequence, Sequence

workflow = Sequence(node1, node2, node3)

# Validate with defaults
result = validate_sequence(workflow)

# Check result
if result.is_valid:
    workflow.run(data)
else:
    # Handle validation errors
    for error in result.errors:
        print(f"ERROR: {error}")

raise_if_invalid()

Automatically raise exception on validation failure:
# Validate and raise if invalid
result = validate_sequence(workflow)
result.raise_if_invalid()  # Raises ValueError if validation failed

# If we get here, validation passed
workflow.run(data)

Validation Result

ValidationResult Structure

@dataclass
class ValidationResult:
    is_valid: bool
    errors: List[ValidationError]
    warnings: List[ValidationWarning]
    suggestions: List[ValidationSuggestion]
Access validation details:
result = validate_sequence(workflow)

# Check overall status
print(f"Valid: {result.is_valid}")

# Check errors (prevent execution)
print(f"Errors: {len(result.errors)}")
for error in result.errors:
    print(f"  - {error.message}")
    if error.suggestion:
        print(f"    Suggestion: {error.suggestion}")

# Check warnings (execution allowed)
print(f"Warnings: {len(result.warnings)}")
for warning in result.warnings:
    print(f"  - {warning.message}")

# Check suggestions (improvements)
print(f"Suggestions: {len(result.suggestions)}")
for suggestion in result.suggestions:
    print(f"  - {suggestion.message}")

get_summary()

Get formatted validation report:
result = validate_sequence(workflow)

# Print formatted summary
print(result.get_summary())

# Output:
# Validation Status: INVALID
# Errors: 1
#   - ERROR: Cycle detected: A -> B -> C -> A
# Warnings: 1
#   - WARNING: Node appears to be a dead end
# Suggestions: 0

Built-in Validators

CycleDetectionValidator

Detects circular dependencies in workflow graphs:
from egregore.core.workflow import node, decision

@node("A")
def node_a(data): return data

@node("B")
def node_b(data): return data

@node("C")
def node_c(data): return data

# Create cycle: A -> B -> C -> A
@decision("route")
def route(data):
    return "A"  # Always returns to A

# This will fail validation
workflow = Sequence(node_a >> node_b >> node_c >> route)
result = validate_sequence(workflow)

# Result:
# ERROR: Cycle detected: A -> B -> C -> route -> A
# Suggestion: Remove one of the connections in the cycle or add max_iter to decision nodes
Cycle Detection Algorithm:
  • Uses DFS (Depth-First Search) with color coding (WHITE, GRAY, BLACK)
  • WHITE: Unvisited nodes
  • GRAY: Currently being explored (in recursion stack)
  • BLACK: Fully explored
  • Back edge from GRAY to GRAY = cycle detected

DependencyValidator

Validates node dependencies and structure:
from egregore.core.workflow import parallel

# Empty parallel node (error)
empty_parallel = parallel()  # No child nodes

result = validate_sequence(Sequence(empty_parallel))
# ERROR: Parallel node has no child nodes
# Suggestion: Add nodes to parallel execution or use a regular node

# Duplicate names in parallel (error)
@node("fetch")
def fetch1(data): return data

@node("fetch")  # Same name!
def fetch2(data): return data

parallel_with_duplicates = parallel(fetch1, fetch2)
result = validate_sequence(Sequence(parallel_with_duplicates))
# ERROR: Parallel node contains duplicate names: fetch
# Suggestion: Ensure all parallel nodes have unique names
Checks performed:
  • Parallel nodes have at least one child
  • No duplicate names in parallel branches
  • Valid concurrency limits (positive integers)
  • Decision nodes have at least one pattern
  • Structural warnings for dead-end nodes

SchemaValidator

Validates workflow configuration and schema:
# Empty sequence (error)
empty_workflow = Sequence(None)
result = validate_sequence(empty_workflow)
# ERROR: Sequence has no start node
# Suggestion: Provide a start node when creating the sequence

# Single-node workflow (warning)
single_node = Sequence(processor)
result = validate_sequence(single_node)
# WARNING: Sequence contains only one node
# Suggestion: Consider if this should be a simple function call instead
Checks performed:
  • Sequence has a start node
  • Sequence contains executable nodes
  • Meaningful workflow structure
  • Basic configuration validity

Custom Validators

Create Custom Validator

Implement BaseValidator to create custom validation rules:
from egregore.core.workflow.validation import (
    BaseValidator, ValidationResult, ValidationError, ValidationWarning
)

class CustomValidator(BaseValidator):
    """Custom validation logic."""

    @property
    def validator_name(self) -> str:
        return "CustomValidation"

    def validate(self, sequence) -> ValidationResult:
        """Perform custom validation."""
        errors = []
        warnings = []

        # Custom validation logic
        nodes = self._get_all_nodes(sequence)

        for node in nodes:
            # Example: Check node names
            if hasattr(node, 'name') and not node.name:
                warnings.append(ValidationWarning(
                    message=f"Node {node} has no name",
                    location=node,
                    suggestion="Add a name for better debugging"
                ))

            # Example: Check for specific pattern
            if hasattr(node, 'name') and 'test' in node.name.lower():
                warnings.append(ValidationWarning(
                    message=f"Node name contains 'test': {node.name}",
                    location=node,
                    suggestion="Remove test nodes before production"
                ))

        return ValidationResult(
            is_valid=len(errors) == 0,
            errors=errors,
            warnings=warnings
        )

    def _get_all_nodes(self, sequence):
        """Helper to get all nodes."""
        from collections import deque
        nodes = []
        visited = set()
        queue = deque([sequence.start])

        while queue:
            node = queue.popleft()
            if node in visited or node is None:
                continue
            visited.add(node)
            nodes.append(node)

            # Add connected nodes
            if hasattr(node, 'next_node') and node.next_node:
                queue.append(node.next_node)

        return nodes

Use Custom Validator

# Create validator
custom_validator = CustomValidator()

# Validate with custom validator
result = validate_sequence(workflow, validators=[custom_validator])

# Or combine with built-in validators
from egregore.core.workflow.validators import (
    CycleDetectionValidator, DependencyValidator
)

result = validate_sequence(workflow, validators=[
    CycleDetectionValidator(),
    DependencyValidator(),
    custom_validator
])

Validation Pipeline

SequenceValidator

Coordinate multiple validators:
from egregore.core.workflow.validation import SequenceValidator
from egregore.core.workflow.validators import (
    CycleDetectionValidator,
    DependencyValidator,
    SchemaValidator
)

# Create validation pipeline
validator = SequenceValidator()

# Add validators
validator.add_validator(CycleDetectionValidator())
validator.add_validator(DependencyValidator())
validator.add_validator(SchemaValidator())
validator.add_validator(CustomValidator())

# Run validation
result = validator.validate_sequence(workflow)

# Get formatted report
report = validator.get_validation_report(workflow)
print(report)

create_default_validator()

Create validator with all default validators:
from egregore.core.workflow.validation import create_default_validator

# Create default validator (includes all built-in validators)
validator = create_default_validator()

# Add custom validators
validator.add_validator(CustomValidator())

# Validate
result = validator.validate_sequence(workflow)

Validation Severity

Severity Levels

ERROR - Prevents execution:
ValidationError(
    message="Cycle detected in workflow",
    severity=ValidationSeverity.ERROR,
    location=node,
    suggestion="Break the cycle or add max_iter"
)
WARNING - Execution allowed but issues exist:
ValidationWarning(
    message="Node appears to be a dead end",
    severity=ValidationSeverity.WARNING,
    location=node,
    suggestion="Consider connecting to another node"
)
INFO - Suggestions for improvement:
ValidationSuggestion(
    message="Consider using parallel execution",
    severity=ValidationSeverity.INFO,
    location=node,
    suggestion="These nodes could run concurrently"
)

Common Patterns

Pre-Execution Validation

from egregore.core.workflow import validate_sequence, Sequence

def run_workflow_safely(workflow: Sequence, data: dict) -> dict:
    """Run workflow with validation."""
    # Validate first
    result = validate_sequence(workflow)

    if not result.is_valid:
        raise ValueError(f"Workflow validation failed:\n{result.get_summary()}")

    # Show warnings
    if result.warnings:
        print("Warnings:")
        for warning in result.warnings:
            print(f"  - {warning}")

    # Execute
    return workflow.run(data)

Development vs Production

import os

# Strict validation in development
if os.getenv("ENV") == "development":
    result = validate_sequence(workflow)
    result.raise_if_invalid()

    # Show all warnings
    for warning in result.warnings:
        print(f"WARNING: {warning}")

# Lenient in production (log warnings only)
else:
    result = validate_sequence(workflow)

    if not result.is_valid:
        # Log errors but continue if non-critical
        for error in result.errors:
            logger.error(f"Validation error: {error}")

    for warning in result.warnings:
        logger.warning(f"Validation warning: {warning}")

CI/CD Integration

def test_workflow_validation():
    """Test workflow passes validation."""
    workflow = create_production_workflow()

    result = validate_sequence(workflow)

    # Assert no errors
    assert result.is_valid, f"Validation failed:\n{result.get_summary()}"

    # Assert no warnings (strict)
    assert len(result.warnings) == 0, f"Workflow has warnings:\n{result.get_summary()}"

Validation Report

def generate_validation_report(workflow: Sequence) -> dict:
    """Generate detailed validation report."""
    result = validate_sequence(workflow)

    return {
        "valid": result.is_valid,
        "summary": result.get_summary(),
        "error_count": len(result.errors),
        "warning_count": len(result.warnings),
        "suggestion_count": len(result.suggestions),
        "errors": [
            {
                "message": error.message,
                "severity": error.severity.value,
                "location": str(error.location) if error.location else None,
                "suggestion": error.suggestion
            }
            for error in result.errors
        ],
        "warnings": [
            {
                "message": warning.message,
                "severity": warning.severity.value,
                "location": str(warning.location) if warning.location else None,
                "suggestion": warning.suggestion
            }
            for warning in result.warnings
        ]
    }

Best Practices

# Good: Validate before deployment
def deploy_workflow(workflow: Sequence):
    result = validate_sequence(workflow)
    if not result.is_valid:
        raise ValueError("Cannot deploy invalid workflow")

    # Deploy
    production_system.deploy(workflow)

# Bad: Deploy without validation
def deploy_workflow(workflow: Sequence):
    production_system.deploy(workflow)  # May fail at runtime
# Good: Test validation passes
def test_workflow_is_valid():
    workflow = create_workflow()
    result = validate_sequence(workflow)
    assert result.is_valid

# Bad: No validation testing
def test_workflow():
    workflow = create_workflow()
    result = workflow.run(test_data)  # Fails at runtime
# Good: Domain-specific validation
class BusinessRuleValidator(BaseValidator):
    def validate(self, sequence):
        # Validate business logic
        # - Required approval nodes
        # - Data sensitivity checks
        # - Compliance requirements
        pass

# Bad: No domain validation
result = validate_sequence(workflow)  # Only structural checks
# Good: Comprehensive logging
result = validate_sequence(workflow)
logger.info(f"Validation result: {result.get_summary()}")

if not result.is_valid:
    for error in result.errors:
        logger.error(f"Validation error: {error}")

# Bad: Silent validation
result = validate_sequence(workflow)
if result.is_valid:
    workflow.run(data)
# Good: Review warnings
result = validate_sequence(workflow)

if result.warnings:
    print("Review these warnings:")
    for warning in result.warnings:
        print(f"  {warning}")
        if warning.suggestion:
            print(f"  Suggestion: {warning.suggestion}")

# Bad: Ignore warnings
result = validate_sequence(workflow)
if result.is_valid:
    workflow.run(data)  # Warnings ignored

Performance Considerations

Validation Overhead

  • Cycle detection: O(V + E) where V=nodes, E=edges
  • Dependency validation: O(V) node traversal
  • Schema validation: O(1) basic checks
  • Total: Typically less than 10ms for workflows with less than 100 nodes

When to Validate

# Validate once at startup (recommended)
workflow = create_workflow()
validate_sequence(workflow).raise_if_invalid()

# Run many times without re-validation
for data in dataset:
    workflow.run(data)

# Don't validate on every run (slow)
for data in dataset:
    validate_sequence(workflow)  # SLOW - unnecessary
    workflow.run(data)

Caching Validation Results

class ValidatedWorkflow:
    """Workflow with cached validation."""

    def __init__(self, workflow: Sequence):
        self.workflow = workflow
        self._validation_result = None

    def validate(self) -> ValidationResult:
        """Validate and cache result."""
        if self._validation_result is None:
            self._validation_result = validate_sequence(self.workflow)
        return self._validation_result

    def run(self, data: dict) -> dict:
        """Run with automatic validation."""
        self.validate().raise_if_invalid()
        return self.workflow.run(data)

Error Messages

Common Validation Errors

Cycle detected:
ERROR: Cycle detected: node_a -> node_b -> node_c -> node_a
Suggestion: Remove one of the connections in the cycle or add max_iter to decision nodes
Empty parallel node:
ERROR: Parallel node has no child nodes
Suggestion: Add nodes to parallel execution or use a regular node
Duplicate names:
ERROR: Parallel node contains duplicate names: fetch
Suggestion: Ensure all parallel nodes have unique names
No start node:
ERROR: Sequence has no start node
Suggestion: Provide a start node when creating the sequence

What’s Next?

Type Safety

Type checking for workflows

Reporting

Track workflow performance

Best Practices

Workflow design patterns

Creating Nodes

Back to node creation