Skip to main content

Parallel Execution

Parallel execution allows multiple nodes to run concurrently, dramatically improving workflow performance for independent operations.

Basic Parallel Execution

Use the parallel() function to execute nodes concurrently:
from egregore.core.workflow import parallel, node, Sequence

@node("fetch_api")
def fetch_api(url: str) -> dict:
    # API call
    return {"source": "api", "data": "..."}

@node("fetch_database")
def fetch_database(query: str) -> dict:
    # Database query
    return {"source": "db", "data": "..."}

@node("fetch_cache")
def fetch_cache(key: str) -> dict:
    # Cache lookup
    return {"source": "cache", "data": "..."}

# Execute all three concurrently
parallel_fetch = parallel(
    fetch_api,
    fetch_database,
    fetch_cache
)

result = parallel_fetch({"url": "...", "query": "...", "key": "..."})

Result Collection

Parallel nodes return a dictionary with terminal node names as keys:
result = parallel_fetch({...})
# Returns:
# {
#     "fetch_api": {"source": "api", "data": "..."},
#     "fetch_database": {"source": "db", "data": "..."},
#     "fetch_cache": {"source": "cache", "data": "..."}
# }

# Access individual results
api_result = result["fetch_api"]
db_result = result["fetch_database"]
cache_result = result["fetch_cache"]

Using Results in Next Node

The next node receives the dictionary of results:
@node("combine_results")
def combine_results(data: dict) -> dict:
    """Combine parallel results."""
    return {
        "api": data["fetch_api"],
        "db": data["fetch_database"],
        "cache": data["fetch_cache"],
        "combined": True
    }

workflow = Sequence(
    parallel(fetch_api, fetch_database, fetch_cache),
    combine_results
)

Concurrency Control

Limit the number of concurrent executions:
# Limit to 2 concurrent operations
limited_parallel = parallel(
    task1,
    task2,
    task3,
    task4,
    max_concurrent=2  # Only 2 run at a time
)

# Execution order:
# 1. task1 and task2 start
# 2. When one finishes, task3 starts
# 3. When another finishes, task4 starts

Why Limit Concurrency?

  • Memory constraints: Each concurrent task uses memory
  • API rate limits: Avoid overwhelming external services
  • CPU limits: Prevent system overload
  • Resource fairness: Share resources with other processes
# Heavy memory operations - limit concurrency
memory_intensive = parallel(
    process_large_dataset1,
    process_large_dataset2,
    process_large_dataset3,
    max_concurrent=2  # Prevent memory exhaustion
)

# API calls with rate limits
api_calls = parallel(
    api_request1,
    api_request2,
    api_request3,
    api_request4,
    max_concurrent=3  # Respect rate limits
)

Timeout Control

Set a timeout for the entire parallel block:
# Timeout after 30 seconds
timed_parallel = parallel(
    long_running_task1,
    long_running_task2,
    long_running_task3,
    timeout=30.0  # seconds
)

try:
    result = timed_parallel(data)
except ParallelTimeoutError:
    print("Parallel execution timed out")

Per-Branch Timeout

Timeout applies to the entire parallel block, not individual branches:
# All branches must complete within 30 seconds total
parallel(task1, task2, task3, timeout=30.0)

# If you need per-task timeouts, implement within nodes:
@node("task_with_timeout")
def task_with_timeout(data: dict) -> dict:
    import asyncio
    try:
        return asyncio.wait_for(
            async_operation(data),
            timeout=10.0
        )
    except asyncio.TimeoutError:
        return {"status": "timeout"}

Resource Optimization

Parallel nodes automatically optimize resource usage:
# Automatic optimization (default)
optimized = parallel(
    node1, node2, node3, node4, node5,
    # Automatically:
    # - Analyzes available CPU and memory
    # - Calculates optimal worker count
    # - Creates balanced execution batches
    # - Monitors resource usage
)

# Disable optimization if needed
unoptimized = parallel(
    node1, node2, node3,
    optimization_enabled=False  # Manual control only
)

Resource Allocation

The system automatically:
  1. Analyzes available resources:
    • CPU cores available
    • Available memory (70% utilization target)
    • Estimated memory per node (50MB default)
  2. Calculates optimal workers:
    optimal_workers = min(
        max_concurrent or branch_count,
        available_cpu_cores,
        available_memory / estimated_memory_per_node
    )
    
  3. Creates execution batches:
    • Balances load across workers
    • Prevents resource exhaustion
    • Maintains system stability

Manual Resource Configuration

For fine-grained control:
# Explicit concurrency limit
parallel(
    *heavy_tasks,
    max_concurrent=4  # Exact worker count
)

# Disable optimization for predictable behavior
parallel(
    *tasks,
    max_concurrent=2,
    optimization_enabled=False
)

Error Handling

Individual Branch Failures

If any branch fails, the entire parallel block raises an error:
from egregore.core.workflow import ParallelExecutionError

@node("safe_task")
def safe_task(data: dict) -> dict:
    return {"status": "success"}

@node("failing_task")
def failing_task(data: dict) -> dict:
    raise ValueError("Task failed!")

try:
    result = parallel(safe_task, failing_task)(data)
except ParallelExecutionError as e:
    print(f"Parallel execution failed: {e}")
    # Error includes which branch failed

Graceful Error Handling

Handle errors within nodes for graceful degradation:
@node("resilient_task")
def resilient_task(data: dict) -> dict:
    """Task with built-in error handling."""
    try:
        result = risky_operation(data)
        return {"status": "success", "result": result}
    except Exception as e:
        # Log error but don't crash
        return {"status": "error", "error": str(e)}

# Parallel block completes even if operations fail
result = parallel(resilient_task, another_task)(data)

# Check individual results
if result["resilient_task"]["status"] == "error":
    handle_error(result["resilient_task"]["error"])

Workflow Integration

In Sequences

Parallel nodes integrate seamlessly with sequential workflows:
workflow = Sequence(
    extract_data,           # Sequential
    parallel(               # Parallel
        transform_type_a,
        transform_type_b,
        transform_type_c
    ),
    combine_results,        # Sequential
    save_to_database        # Sequential
)

Nested Parallel Execution

Parallel blocks can contain parallel blocks:
inner_parallel = parallel(subtask1, subtask2, subtask3)

outer_parallel = parallel(
    main_task1,
    inner_parallel,  # Nested parallel execution
    main_task3
)

# Execution:
# - main_task1, inner_parallel, and main_task3 run concurrently
# - Within inner_parallel, subtask1/2/3 run concurrently

With Decision Nodes

Combine parallel execution with conditional routing:
@decision("route_by_size")
def route_by_size(data: dict) -> str:
    size = len(data["items"])
    return "parallel_process" if size > 100 else "sequential_process"

@node("sequential_process")
def sequential_process(data: dict) -> dict:
    return process_sequentially(data)

# For large datasets, use parallel processing
parallel_process = parallel(
    process_chunk1,
    process_chunk2,
    process_chunk3,
    process_chunk4
)

workflow = Sequence(
    load_data,
    route_by_size,
    # Execution branches based on data size
)

Performance Considerations

When to Use Parallel Execution

Good candidates:
  • Independent operations (no data dependencies)
  • I/O-bound tasks (API calls, file operations)
  • CPU-bound tasks with sufficient cores
  • Long-running operations
Poor candidates:
  • Operations with dependencies between them
  • Very fast operations (overhead exceeds benefit)
  • Memory-intensive operations (without concurrency limits)
  • Operations requiring strict ordering

Overhead Analysis

# Parallel execution overhead: ~1-2ms per branch
# Benefits kick in when operations take >10ms each

# Not worth it (too fast):
@node("add_numbers")
def add_numbers(a: int, b: int) -> int:
    return a + b  # <1ms - sequential is faster

# Worth it (I/O bound):
@node("fetch_from_api")
def fetch_from_api(url: str) -> dict:
    return requests.get(url).json()  # 100-500ms - parallel is much faster

Resource Impact

BranchesNo Limitmax_concurrent=2Optimization
33 workers2 workersAuto-calculated
1010 workers2 workersCPU-limited
100100 workers ⚠️2 workersMemory-limited
Recommendation: Use max_concurrent or enable optimization for >5 branches.

Best Practices

# Good: Independent operations
parallel(
    fetch_user_profile,
    fetch_user_posts,
    fetch_user_comments
)

# Bad: Dependent operations
parallel(
    fetch_user_id,      # Must complete first!
    fetch_user_profile  # Depends on user_id
)
# Good: Limited concurrency
parallel(
    *api_requests,
    max_concurrent=5  # Respect rate limits
)

# Bad: Unlimited concurrency
parallel(*api_requests)  # May overwhelm API
# Good: With timeout
parallel(
    *long_tasks,
    timeout=60.0  # Fail if not done in 60s
)

# Bad: No timeout
parallel(*long_tasks)  # May hang indefinitely
# Good: Resilient nodes
@node("safe_fetch")
def safe_fetch(url: str) -> dict:
    try:
        return fetch(url)
    except Exception as e:
        return {"error": str(e)}

# Bad: Unhandled errors
@node("unsafe_fetch")
def unsafe_fetch(url: str) -> dict:
    return fetch(url)  # Crashes entire parallel block
# Good: Clear names
@node("fetch_user_data")
@node("fetch_order_data")
@node("fetch_payment_data")

result = parallel(...)(data)
user = result["fetch_user_data"]  # Clear

# Bad: Generic names
@node("fetch1")
@node("fetch2")
@node("fetch3")

result = parallel(...)(data)
user = result["fetch1"]  # Which fetch is this?

Advanced Patterns

Map-Reduce Pattern

@node("split_work")
def split_work(data: dict) -> list:
    """Split data into chunks."""
    return [chunk for chunk in split(data, size=100)]

# Create parallel processors dynamically
def create_processors(chunk_count: int):
    return [
        node(f"process_chunk_{i}")(
            lambda chunk: process(chunk)
        )
        for i in range(chunk_count)
    ]

@node("reduce_results")
def reduce_results(results: dict) -> dict:
    """Combine parallel results."""
    return merge(results.values())

# Map-reduce workflow
workflow = Sequence(
    split_work,
    # Create parallel processors based on chunk count
    # (would need dynamic workflow construction)
    reduce_results
)

Fan-Out Fan-In

# Fan-out: One input, many parallel operations
fan_out = parallel(
    process_for_system_a,
    process_for_system_b,
    process_for_system_c
)

# Fan-in: Many results, one output
@node("fan_in")
def fan_in(results: dict) -> dict:
    """Combine all parallel results."""
    return {
        "system_a": results["process_for_system_a"],
        "system_b": results["process_for_system_b"],
        "system_c": results["process_for_system_c"],
        "combined": True
    }

workflow = Sequence(
    prepare_data,
    fan_out,
    fan_in,
    send_notifications
)

What’s Next?