Skip to main content

Best Practices

Best practices guide you in designing robust, maintainable, and performant workflows using proven patterns and principles.

Core Concept

Following established patterns and principles ensures your workflows are reliable, testable, and easy to maintain:
from egregore.core.workflow import node, Sequence, validate_sequence

# Good practice: Clear naming, single responsibility, error handling
@node("load_user_data")
def load_user_data(user_id: int) -> dict:
    """Load user data from database."""
    if not user_id:
        raise ValueError("User ID required")

    return database.get_user(user_id)

@node("validate_user")
def validate_user(user: dict) -> dict:
    """Validate user has required fields."""
    required = ["id", "email", "name"]
    if not all(field in user for field in required):
        raise ValueError("Missing required fields")

    return user

@node("send_email")
def send_email(user: dict) -> bool:
    """Send welcome email to user."""
    return email_service.send(user["email"], "Welcome!")

# Build workflow with validation
workflow = Sequence(load_user_data >> validate_user >> send_email)

# Validate before running
result = validate_sequence(workflow)
if not result.is_valid:
    raise ValueError(f"Invalid workflow: {result.get_summary()}")

# Execute safely
result = workflow.execute(123)

Workflow Design Patterns

ETL (Extract-Transform-Load)

Pattern: Data processing pipeline with distinct stages:
from egregore.core.workflow import node, Sequence

@node("extract")
def extract(source: str) -> list:
    """Extract data from source."""
    return database.query(source)

@node("transform")
def transform(raw_data: list) -> list:
    """Transform data to target format."""
    return [
        {
            "id": item["user_id"],
            "name": item["full_name"].title(),
            "email": item["email_address"].lower()
        }
        for item in raw_data
    ]

@node("load")
def load(transformed_data: list) -> int:
    """Load data to destination."""
    target_db.bulk_insert(transformed_data)
    return len(transformed_data)

# ETL pipeline
etl = Sequence(extract >> transform >> load)
count = etl.execute("users_table")
print(f"Processed {count} records")
When to use: Data migration, report generation, batch processing

Fan-Out / Fan-In

Pattern: Parallel processing with result aggregation:
from egregore.core.workflow import node, parallel

@node("fetch_user")
def fetch_user(user_id: int) -> dict:
    return {"id": user_id, "name": "Alice"}

@node("fetch_orders")
def fetch_orders(user_id: int) -> list:
    return database.get_orders(user_id)

@node("fetch_preferences")
def fetch_preferences(user_id: int) -> dict:
    return database.get_preferences(user_id)

@node("fetch_activity")
def fetch_activity(user_id: int) -> list:
    return database.get_activity(user_id)

@node("aggregate_profile")
def aggregate_profile(results: dict) -> dict:
    """Aggregate parallel results."""
    return {
        "user": results["fetch_user"],
        "orders": results["fetch_orders"],
        "preferences": results["fetch_preferences"],
        "activity": results["fetch_activity"]
    }

# Fan-out to parallel branches, fan-in to aggregator
workflow = Sequence(
    fetch_user >>
    parallel(fetch_orders, fetch_preferences, fetch_activity) >>
    aggregate_profile
)

profile = workflow.execute(123)
When to use: Multi-source data gathering, parallel API calls, concurrent processing

Map-Reduce

Pattern: Apply operation to each item, then reduce results:
from egregore.core.workflow import node, parallel, SharedState

@node("split_batch")
def split_batch(items: list) -> list:
    """Split items into processable chunks."""
    chunk_size = 10
    return [
        items[i:i + chunk_size]
        for i in range(0, len(items), chunk_size)
    ]

@node("map_process")
def map_process(chunk: list) -> list:
    """Process individual chunk."""
    return [item.upper() for item in chunk]

@node("reduce_results")
def reduce_results(results: dict) -> list:
    """Combine all chunk results."""
    all_results = []
    for chunk_results in results.values():
        all_results.extend(chunk_results)
    return all_results

# Map-reduce pattern
workflow = Sequence(
    split_batch >>
    parallel(map_process, map_process, map_process) >>
    reduce_results
)

processed = workflow.execute(["item1", "item2", "item3", ...])
When to use: Batch processing, large dataset operations, distributed computing

Retry with Backoff

Pattern: Automatic retry with exponential backoff for transient failures:
@node("call_external_api")
def call_external_api(endpoint: str) -> dict:
    """Call external API with retry logic."""
    response = requests.get(f"https://api.example.com/{endpoint}")

    if response.status_code >= 500:
        # Transient error - workflow will retry
        raise ConnectionError(f"API returned {response.status_code}")

    return response.json()

workflow = Sequence(call_external_api)

result = workflow.execute(
    "users",
    config={
        "error_config": {
            "strategy": "default",
            "max_retries": 5,
            "base_delay": 1.0,  # 1 second base
            "max_delay": 30.0   # Cap at 30 seconds
        }
    }
)
When to use: Network calls, database operations, external service integration

Circuit Breaker

Pattern: Fail fast when downstream service is unavailable:
from datetime import datetime, timedelta

# Track failures
failure_tracker = {
    "count": 0,
    "last_failure": None,
    "circuit_open": False
}

@node("protected_call")
def protected_call(endpoint: str) -> dict:
    """Call with circuit breaker protection."""
    # Check circuit breaker
    if failure_tracker["circuit_open"]:
        time_since_failure = datetime.now() - failure_tracker["last_failure"]
        if time_since_failure < timedelta(minutes=5):
            # Circuit still open
            raise ValueError("Circuit breaker open - service unavailable")
        else:
            # Try to close circuit
            failure_tracker["circuit_open"] = False
            failure_tracker["count"] = 0

    try:
        response = requests.get(f"https://api.example.com/{endpoint}")
        # Success - reset counter
        failure_tracker["count"] = 0
        return response.json()
    except Exception as e:
        # Track failure
        failure_tracker["count"] += 1
        failure_tracker["last_failure"] = datetime.now()

        # Open circuit after 3 failures
        if failure_tracker["count"] >= 3:
            failure_tracker["circuit_open"] = True

        raise

workflow = Sequence(protected_call)
When to use: Protect against cascading failures, rate limiting, service degradation

Saga Pattern

Pattern: Compensating transactions for distributed workflows:
@node("reserve_inventory")
def reserve_inventory(order: dict) -> dict:
    """Reserve items in inventory."""
    reservation_id = inventory.reserve(order["items"])
    return {**order, "reservation_id": reservation_id}

@node("process_payment")
def process_payment(order: dict) -> dict:
    """Process payment for order."""
    try:
        payment_id = payment_service.charge(order["amount"])
        return {**order, "payment_id": payment_id}
    except Exception as e:
        # Compensate: release inventory
        inventory.release(order["reservation_id"])
        raise

@node("confirm_order")
def confirm_order(order: dict) -> dict:
    """Confirm order completion."""
    try:
        order_id = orders_db.create(order)
        return {**order, "order_id": order_id}
    except Exception as e:
        # Compensate: refund payment and release inventory
        payment_service.refund(order["payment_id"])
        inventory.release(order["reservation_id"])
        raise

# Saga workflow with compensation
workflow = Sequence(
    reserve_inventory >>
    process_payment >>
    confirm_order
)
When to use: Distributed transactions, multi-step business processes, order processing

Node Design Principles

Single Responsibility

Principle: Each node should do one thing well:
# Good: Single responsibility per node
@node("fetch_user")
def fetch_user(user_id: int) -> dict:
    """Only fetches user data."""
    return database.get_user(user_id)

@node("validate_user")
def validate_user(user: dict) -> dict:
    """Only validates user data."""
    if not user.get("email"):
        raise ValueError("Email required")
    return user

@node("send_notification")
def send_notification(user: dict) -> bool:
    """Only sends notification."""
    return email_service.send(user["email"], "Welcome!")

# Bad: Multiple responsibilities in one node
@node("process_user")
def process_user(user_id: int) -> bool:
    """Does too many things."""
    # Fetching
    user = database.get_user(user_id)

    # Validation
    if not user.get("email"):
        raise ValueError("Email required")

    # Notification
    return email_service.send(user["email"], "Welcome!")
Benefits: Easier testing, better reusability, clearer error handling

Idempotency

Principle: Nodes should be safe to retry without side effects:
# Good: Idempotent operations
@node("create_or_update_user")
def create_or_update_user(user_data: dict) -> dict:
    """Idempotent - safe to retry."""
    user_id = user_data["id"]

    # Check if exists
    existing = database.get_user(user_id)

    if existing:
        # Update (idempotent)
        database.update_user(user_id, user_data)
    else:
        # Create (idempotent with unique ID)
        database.create_user(user_data)

    return database.get_user(user_id)

# Bad: Non-idempotent operations
@node("increment_counter")
def increment_counter(user_id: int) -> int:
    """Not idempotent - retry causes issues."""
    current = database.get_counter(user_id)
    new_value = current + 1  # Multiple retries = wrong count
    database.set_counter(user_id, new_value)
    return new_value
How to achieve: Use unique IDs, check-before-write, stateless operations

Type Hints

Principle: Always provide complete type hints:
from typing import List, Dict, Optional

# Good: Complete type hints
@node("process_items")
def process_items(items: List[Dict[str, int]]) -> Dict[str, int]:
    """Clear input/output types."""
    return {
        "total": sum(item["value"] for item in items),
        "count": len(items)
    }

# Bad: Missing or incomplete type hints
@node("process_items")
def process_items(items):  # No type hints
    """Unclear what this expects or returns."""
    return {"total": sum(item["value"] for item in items)}
Benefits: Type checking, better IDE support, self-documenting code

Pure Functions

Principle: Prefer pure functions without side effects:
# Good: Pure function
@node("calculate_total")
def calculate_total(items: List[dict]) -> float:
    """Pure - no side effects, deterministic."""
    return sum(item["price"] * item["quantity"] for item in items)

# Acceptable: Clear side effects with SharedState
@node("update_metrics")
def update_metrics(total: float, state: SharedState) -> float:
    """Side effects via SharedState (expected)."""
    state["total_revenue"] = state.get("total_revenue", 0) + total
    return total

# Bad: Hidden global side effects
total_revenue = 0

@node("calculate_and_track")
def calculate_and_track(items: List[dict]) -> float:
    """Hidden global mutation - hard to test."""
    global total_revenue
    result = sum(item["price"] * item["quantity"] for item in items)
    total_revenue += result  # Hidden side effect
    return result
Benefits: Easier testing, predictable behavior, thread-safe

Error Handling

Recovery Strategies

Choose appropriate strategy for each scenario:
from egregore.core.workflow import node, Sequence

# Critical validation - fail fast
@node("validate_payment")
def validate_payment(payment: dict) -> dict:
    """Critical validation - don't retry."""
    if payment["amount"] <= 0:
        raise ValueError("Invalid payment amount")
    return payment

# External API - retry with backoff
@node("call_payment_gateway")
def call_payment_gateway(payment: dict) -> dict:
    """Retry transient failures."""
    response = gateway.charge(payment)
    if response.status_code >= 500:
        raise ConnectionError("Gateway unavailable")
    return response.json()

# Non-critical feature - use fallback
@node("fetch_recommendations")
def fetch_recommendations(user_id: int) -> list:
    """Use fallback on failure."""
    try:
        return recommendation_service.get(user_id)
    except Exception:
        return []  # Empty list as fallback

# Build workflow with appropriate error configs
workflow = Sequence(
    validate_payment >>      # Fails fast
    call_payment_gateway >>  # Retries
    fetch_recommendations    # Has fallback
)

result = workflow.execute(
    payment_data,
    config={
        "error_config": {
            "strategy": "default",
            "max_retries": 3
        }
    }
)

Fallback Values

Provide meaningful defaults:
@node("get_user_preferences")
def get_user_preferences(user_id: int) -> dict:
    """Fetch preferences with fallback."""
    raise ConnectionError("Preferences service down")

# Set fallback value
get_user_preferences.fallback_value = {
    "theme": "light",
    "notifications": True,
    "language": "en"
}

@node("personalize_content")
def personalize_content(preferences: dict) -> dict:
    """Use preferences (or fallback)."""
    return {
        "theme": preferences["theme"],
        "content": customize(preferences)
    }

workflow = Sequence(get_user_preferences >> personalize_content)

# Even if preferences fail, workflow continues with fallback
result = workflow.execute(
    123,
    config={"error_config": {"max_retries": 2}}
)

Error Context

Provide helpful error messages:
@node("process_document")
def process_document(doc: dict) -> dict:
    """Process with helpful error context."""
    if "content" not in doc:
        raise ValueError(
            f"Document missing 'content' field. "
            f"Available fields: {list(doc.keys())}"
        )

    if not doc["content"].strip():
        raise ValueError(
            f"Document content is empty. "
            f"Document ID: {doc.get('id', 'unknown')}"
        )

    # Process
    return {"processed": doc["content"]}

Performance Optimization

Parallel Execution

Use parallel nodes for independent operations:
from egregore.core.workflow import node, parallel

# Sequential (slow)
workflow_slow = Sequence(
    fetch_user >>
    fetch_orders >>
    fetch_preferences >>
    fetch_activity
)
# Total time: 400ms (100ms each)

# Parallel (fast)
workflow_fast = Sequence(
    fetch_user >>
    parallel(fetch_orders, fetch_preferences, fetch_activity)
)
# Total time: 200ms (100ms + max(100ms, 100ms, 100ms))

Caching

Cache expensive operations:
from functools import lru_cache

# Cache expensive calculations
@lru_cache(maxsize=128)
def expensive_calculation(x: int) -> int:
    """Cached computation."""
    import time
    time.sleep(1)  # Simulate expensive work
    return x * x

@node("calculate")
def calculate(x: int) -> int:
    """Use cached function."""
    return expensive_calculation(x)

workflow = Sequence(calculate)

# First call: 1 second
result1 = workflow.execute(5)

# Second call: instant (cached)
result2 = workflow.execute(5)

Lazy Evaluation

Defer work until needed:
@node("prepare_data")
def prepare_data(source: str) -> dict:
    """Return data generator, not full dataset."""
    return {
        "source": source,
        "iterator": database.iter_records(source)  # Generator
    }

@node("process_subset")
def process_subset(data: dict) -> list:
    """Process only what's needed."""
    # Only loads 10 records
    return [next(data["iterator"]) for _ in range(10)]

Resource Management

Manage memory and connections:
@node("batch_process")
def batch_process(items: list) -> int:
    """Process in batches to manage memory."""
    batch_size = 100
    processed = 0

    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        # Process batch
        database.bulk_insert(batch)
        processed += len(batch)

        # Clear batch from memory
        del batch

    return processed

Testing Strategies

Unit Testing Nodes

Test nodes in isolation:
import pytest
from egregore.core.workflow import node

@node("calculate_discount")
def calculate_discount(amount: float, coupon: str) -> float:
    """Calculate discount from coupon code."""
    discounts = {"SAVE10": 0.10, "SAVE20": 0.20}
    discount_pct = discounts.get(coupon, 0.0)
    return amount * (1 - discount_pct)

def test_calculate_discount_with_valid_coupon():
    """Test discount calculation with valid coupon."""
    result = calculate_discount.execute(100.0, "SAVE10")
    assert result == 90.0

def test_calculate_discount_with_invalid_coupon():
    """Test discount calculation with invalid coupon."""
    result = calculate_discount.execute(100.0, "INVALID")
    assert result == 100.0

def test_calculate_discount_edge_cases():
    """Test edge cases."""
    assert calculate_discount.execute(0.0, "SAVE10") == 0.0
    assert calculate_discount.execute(100.0, "") == 100.0

Integration Testing Workflows

Test complete workflows:
def test_order_processing_workflow():
    """Test complete order processing."""
    workflow = Sequence(
        validate_order >>
        reserve_inventory >>
        process_payment >>
        send_confirmation
    )

    order = {
        "items": [{"id": 1, "quantity": 2}],
        "amount": 50.0,
        "email": "test@example.com"
    }

    result = workflow.execute(order)

    assert result["status"] == "confirmed"
    assert result["order_id"] is not None
    assert result["payment_id"] is not None

Testing Error Handling

Test error scenarios:
def test_workflow_handles_payment_failure():
    """Test workflow handles payment failure gracefully."""
    workflow = Sequence(
        validate_order >>
        reserve_inventory >>
        process_payment >>
        send_confirmation
    )

    # Invalid payment triggers error
    order = {"amount": -10.0}

    with pytest.raises(ValueError) as exc_info:
        workflow.execute(order)

    assert "Invalid payment amount" in str(exc_info.value)

def test_workflow_retries_transient_failures():
    """Test retry behavior."""
    attempt_count = [0]

    @node("flaky_operation")
    def flaky_operation(x: int) -> int:
        attempt_count[0] += 1
        if attempt_count[0] < 3:
            raise ConnectionError("Temporary failure")
        return x * 2

    workflow = Sequence(flaky_operation)

    result = workflow.execute(
        5,
        config={
            "error_config": {
                "strategy": "default",
                "max_retries": 5
            }
        }
    )

    assert result == 10
    assert attempt_count[0] == 3  # Failed twice, succeeded third time

Validation Testing

Test workflow structure:
from egregore.core.workflow import validate_sequence

def test_workflow_structure_is_valid():
    """Test workflow passes validation."""
    workflow = create_production_workflow()

    result = validate_sequence(workflow)

    assert result.is_valid, f"Validation failed: {result.get_summary()}"
    assert len(result.errors) == 0
    assert len(result.warnings) == 0

def test_invalid_workflow_detected():
    """Test validation catches structural issues."""
    # Create workflow with cycle
    workflow_with_cycle = create_cyclic_workflow()

    result = validate_sequence(workflow_with_cycle)

    assert not result.is_valid
    assert len(result.errors) > 0
    assert "cycle" in result.errors[0].message.lower()

Production Deployment

Pre-Deployment Validation

Always validate before deploying:
from egregore.core.workflow import validate_sequence, set_type_checking_mode

def deploy_workflow(workflow_factory):
    """Deploy workflow with validation."""
    # Enable strict type checking
    set_type_checking_mode(strict=True)

    # Create workflow
    workflow = workflow_factory()

    # Validate structure
    result = validate_sequence(workflow)
    if not result.is_valid:
        raise ValueError(f"Workflow validation failed: {result.get_summary()}")

    # Validate types
    # (Type errors will raise in strict mode during construction)

    # Test with sample data
    test_result = workflow.execute(get_test_data())
    assert test_result is not None

    # Deploy
    production_system.deploy(workflow)

    print(f"✅ Workflow '{workflow.name}' deployed successfully")

Monitoring

Monitor workflow health:
from egregore.core.workflow import Sequence

def monitor_workflow_health(workflow: Sequence):
    """Set up monitoring for production workflow."""
    reporting = workflow.controller.reporting

    # Set performance thresholds
    from egregore.analytics.workflow_reporting import PerformanceThreshold

    threshold = PerformanceThreshold(
        node_name=None,  # All nodes
        max_duration=5.0,
        warning_duration=2.0,
        alert_callback=lambda alert: send_alert_to_ops(alert)
    )
    reporting.set_performance_threshold("production", threshold)

    # Subscribe to errors
    def on_error(event_data):
        log_error_to_monitoring(event_data)
        notify_team(event_data)

    reporting.subscribe("execution_error", on_error)

    # Run workflow
    result = workflow.execute(data)

    # Check health after execution
    metrics = reporting.get_execution_metrics()
    if metrics.success_rate < 0.95:
        alert_ops_team(f"Success rate dropped to {metrics.success_rate}")

    alerts = reporting.get_performance_alerts()
    if alerts:
        notify_performance_issues(alerts)

    return result

Graceful Degradation

Handle service unavailability:
@node("get_recommendations")
def get_recommendations(user_id: int) -> list:
    """Get recommendations with fallback."""
    try:
        return recommendation_service.get(user_id, timeout=2)
    except Exception:
        # Degrade to simple recommendations
        return get_simple_recommendations(user_id)

def get_simple_recommendations(user_id: int) -> list:
    """Simple fallback recommendations."""
    # Return popular items or cached recommendations
    return cache.get("popular_items", [])

Configuration Management

Externalize configuration:
import os
from typing import Dict

def get_workflow_config() -> Dict:
    """Load workflow configuration from environment."""
    return {
        "error_config": {
            "strategy": os.getenv("WORKFLOW_ERROR_STRATEGY", "default"),
            "max_retries": int(os.getenv("WORKFLOW_MAX_RETRIES", "3")),
            "base_delay": float(os.getenv("WORKFLOW_BASE_DELAY", "1.0")),
            "max_delay": float(os.getenv("WORKFLOW_MAX_DELAY", "30.0"))
        },
        "timeouts": {
            "external_api": int(os.getenv("API_TIMEOUT", "10")),
            "database": int(os.getenv("DB_TIMEOUT", "5"))
        }
    }

# Use configuration
workflow = Sequence(node1 >> node2 >> node3)
config = get_workflow_config()

result = workflow.execute(data, config=config)

Common Anti-Patterns

God Nodes

Avoid: Nodes that do too much
# Bad: God node
@node("do_everything")
def do_everything(user_id: int) -> dict:
    """Does too many unrelated things."""
    user = database.get_user(user_id)
    orders = database.get_orders(user_id)
    recommendations = ml_service.recommend(user_id)
    email_service.send(user["email"], "Update")
    cache.set(f"user:{user_id}", user)
    metrics.increment("user_processed")
    return {"user": user, "orders": orders, "recommendations": recommendations}

# Good: Focused nodes
@node("fetch_user")
def fetch_user(user_id: int) -> dict:
    return database.get_user(user_id)

@node("fetch_orders")
def fetch_orders(user_id: int) -> list:
    return database.get_orders(user_id)

@node("get_recommendations")
def get_recommendations(user_id: int) -> list:
    return ml_service.recommend(user_id)

Tight Coupling

Avoid: Nodes that depend on specific implementations
# Bad: Tightly coupled
@node("process_and_store")
def process_and_store(data: dict) -> bool:
    """Hardcoded to specific database."""
    processed = transform(data)
    postgres_db.insert("users", processed)  # Tight coupling
    return True

# Good: Dependency injection
@node("process")
def process(data: dict) -> dict:
    """Pure transformation."""
    return transform(data)

@node("store")
def store(data: dict, db=None) -> bool:
    """Accepts any database implementation."""
    db = db or get_default_db()
    db.insert("users", data)
    return True

Hidden State

Avoid: Nodes that rely on hidden global state
# Bad: Hidden global state
current_user = None

@node("process_user")
def process_user(user_id: int) -> dict:
    """Relies on hidden global."""
    global current_user
    current_user = database.get_user(user_id)
    return current_user

# Good: Explicit state via SharedState
@node("process_user")
def process_user(user_id: int, state: SharedState) -> dict:
    """Explicit state management."""
    user = database.get_user(user_id)
    state["current_user"] = user
    return user

Ignoring Errors

Avoid: Swallowing errors without handling
# Bad: Silent failures
@node("fetch_data")
def fetch_data(endpoint: str) -> dict:
    """Hides errors."""
    try:
        return api.get(endpoint)
    except Exception:
        return {}  # Silent failure - could hide critical issues

# Good: Explicit error handling
@node("fetch_data")
def fetch_data(endpoint: str) -> dict:
    """Handles errors appropriately."""
    try:
        return api.get(endpoint)
    except ConnectionError as e:
        # Transient error - let workflow retry
        raise
    except ValueError as e:
        # Invalid data - log and use fallback
        logger.error(f"Invalid data from {endpoint}: {e}")
        return get_cached_data(endpoint)

Real-World Examples

User Registration Workflow

from egregore.core.workflow import node, Sequence, validate_sequence

@node("validate_registration")
def validate_registration(data: dict) -> dict:
    """Validate registration data."""
    required = ["email", "password", "name"]
    if not all(field in data for field in required):
        raise ValueError("Missing required fields")

    if len(data["password"]) < 8:
        raise ValueError("Password too short")

    return data

@node("check_existing_user")
def check_existing_user(data: dict) -> dict:
    """Check if user already exists."""
    if database.user_exists(data["email"]):
        raise ValueError("Email already registered")

    return data

@node("create_user")
def create_user(data: dict) -> dict:
    """Create user account."""
    user_id = database.create_user({
        "email": data["email"],
        "password": hash_password(data["password"]),
        "name": data["name"]
    })

    return {**data, "user_id": user_id}

@node("send_welcome_email")
def send_welcome_email(user: dict) -> dict:
    """Send welcome email (non-critical)."""
    try:
        email_service.send(user["email"], "Welcome!", template="welcome")
    except Exception as e:
        logger.warning(f"Failed to send welcome email: {e}")
        # Don't fail workflow for email issues

    return user

@node("create_profile")
def create_profile(user: dict) -> dict:
    """Create user profile."""
    profile_id = database.create_profile({
        "user_id": user["user_id"],
        "name": user["name"]
    })

    return {**user, "profile_id": profile_id}

# Build registration workflow
registration_workflow = Sequence(
    validate_registration >>
    check_existing_user >>
    create_user >>
    send_welcome_email >>
    create_profile
)

# Validate workflow
result = validate_sequence(registration_workflow)
assert result.is_valid

# Use in production
new_user = registration_workflow.execute({
    "email": "user@example.com",
    "password": "securepassword123",
    "name": "Alice Smith"
})

Data Pipeline

@node("extract_logs")
def extract_logs(date_range: dict) -> list:
    """Extract logs from S3."""
    return s3.get_logs(date_range["start"], date_range["end"])

@node("parse_logs")
def parse_logs(raw_logs: list) -> list:
    """Parse log entries."""
    return [parse_log_line(line) for line in raw_logs]

@node("filter_errors")
def filter_errors(logs: list) -> list:
    """Filter to error logs only."""
    return [log for log in logs if log["level"] == "ERROR"]

@node("aggregate_metrics")
def aggregate_metrics(error_logs: list) -> dict:
    """Calculate metrics from errors."""
    by_service = {}
    for log in error_logs:
        service = log["service"]
        by_service[service] = by_service.get(service, 0) + 1

    return {
        "total_errors": len(error_logs),
        "by_service": by_service,
        "time_range": error_logs[0]["timestamp"] if error_logs else None
    }

@node("store_metrics")
def store_metrics(metrics: dict) -> bool:
    """Store metrics in database."""
    database.insert("error_metrics", metrics)
    return True

# ETL pipeline
pipeline = Sequence(
    extract_logs >>
    parse_logs >>
    filter_errors >>
    aggregate_metrics >>
    store_metrics
)

# Run daily
result = pipeline.execute({
    "start": "2025-01-01",
    "end": "2025-01-02"
})

Best Practices Summary

  • ETL: Extract-Transform-Load for data processing
  • Fan-Out/Fan-In: Parallel processing with aggregation
  • Map-Reduce: Distributed processing of large datasets
  • Retry with Backoff: Handle transient failures gracefully
  • Circuit Breaker: Protect against cascading failures
  • Saga: Distributed transactions with compensation
  • Single Responsibility: One node, one purpose
  • Idempotency: Safe to retry without side effects
  • Type Hints: Always provide complete type annotations
  • Pure Functions: Minimize side effects
  • Error Context: Provide helpful error messages
  • Recovery Strategies: Choose appropriate strategy per scenario
  • Fallback Values: Provide meaningful defaults
  • Retry Logic: Use exponential backoff for transient failures
  • Circuit Breakers: Fail fast when services are down
  • Compensation: Implement rollback for failed transactions
  • Parallel Execution: Use parallel nodes for independent work
  • Caching: Cache expensive computations
  • Lazy Evaluation: Defer work until needed
  • Resource Management: Batch processing, connection pooling
  • Monitoring: Track metrics and set performance thresholds
  • Unit Tests: Test nodes in isolation
  • Integration Tests: Test complete workflows
  • Error Tests: Validate error handling behavior
  • Validation Tests: Verify workflow structure
  • Performance Tests: Measure and track performance
  • Pre-Deployment Validation: Always validate before deploying
  • Monitoring: Set up alerts and performance tracking
  • Graceful Degradation: Handle service unavailability
  • Configuration: Externalize settings
  • Documentation: Document workflows and nodes

What’s Next?