> ## Documentation Index
> Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Parallel Execution

> Execute multiple nodes concurrently with resource optimization and timeout control

# Parallel Execution

**Parallel execution** allows multiple nodes to run concurrently, dramatically improving workflow performance for independent operations.

## Basic Parallel Execution

Use the `parallel()` function to execute nodes concurrently:

```python theme={null}
from egregore.core.workflow import parallel, node, Sequence

@node("fetch_api")
def fetch_api(url: str) -> dict:
    # API call
    return {"source": "api", "data": "..."}

@node("fetch_database")
def fetch_database(query: str) -> dict:
    # Database query
    return {"source": "db", "data": "..."}

@node("fetch_cache")
def fetch_cache(key: str) -> dict:
    # Cache lookup
    return {"source": "cache", "data": "..."}

# Execute all three concurrently
parallel_fetch = parallel(
    fetch_api,
    fetch_database,
    fetch_cache
)

result = parallel_fetch({"url": "...", "query": "...", "key": "..."})
```

## Result Collection

Parallel nodes return a **dictionary** with terminal node names as keys:

```python theme={null}
result = parallel_fetch({...})
# Returns:
# {
#     "fetch_api": {"source": "api", "data": "..."},
#     "fetch_database": {"source": "db", "data": "..."},
#     "fetch_cache": {"source": "cache", "data": "..."}
# }

# Access individual results
api_result = result["fetch_api"]
db_result = result["fetch_database"]
cache_result = result["fetch_cache"]
```

### Using Results in Next Node

The next node receives the dictionary of results:

```python theme={null}
@node("combine_results")
def combine_results(data: dict) -> dict:
    """Combine parallel results."""
    return {
        "api": data["fetch_api"],
        "db": data["fetch_database"],
        "cache": data["fetch_cache"],
        "combined": True
    }

workflow = Sequence(
    parallel(fetch_api, fetch_database, fetch_cache),
    combine_results
)
```

## Concurrency Control

Limit the number of concurrent executions:

```python theme={null}
# Limit to 2 concurrent operations
limited_parallel = parallel(
    task1,
    task2,
    task3,
    task4,
    max_concurrent=2  # Only 2 run at a time
)

# Execution order:
# 1. task1 and task2 start
# 2. When one finishes, task3 starts
# 3. When another finishes, task4 starts
```

### Why Limit Concurrency?

* **Memory constraints**: Each concurrent task uses memory
* **API rate limits**: Avoid overwhelming external services
* **CPU limits**: Prevent system overload
* **Resource fairness**: Share resources with other processes

```python theme={null}
# Heavy memory operations - limit concurrency
memory_intensive = parallel(
    process_large_dataset1,
    process_large_dataset2,
    process_large_dataset3,
    max_concurrent=2  # Prevent memory exhaustion
)

# API calls with rate limits
api_calls = parallel(
    api_request1,
    api_request2,
    api_request3,
    api_request4,
    max_concurrent=3  # Respect rate limits
)
```

## Timeout Control

Set a timeout for the entire parallel block:

```python theme={null}
# Timeout after 30 seconds
timed_parallel = parallel(
    long_running_task1,
    long_running_task2,
    long_running_task3,
    timeout=30.0  # seconds
)

try:
    result = timed_parallel(data)
except ParallelTimeoutError:
    print("Parallel execution timed out")
```

### Per-Branch Timeout

Timeout applies to the entire parallel block, not individual branches:

```python theme={null}
# All branches must complete within 30 seconds total
parallel(task1, task2, task3, timeout=30.0)

# If you need per-task timeouts, implement within nodes:
@node("task_with_timeout")
def task_with_timeout(data: dict) -> dict:
    import asyncio
    try:
        return asyncio.wait_for(
            async_operation(data),
            timeout=10.0
        )
    except asyncio.TimeoutError:
        return {"status": "timeout"}
```

## Resource Optimization

Parallel nodes automatically optimize resource usage:

```python theme={null}
# Automatic optimization (default)
optimized = parallel(
    node1, node2, node3, node4, node5,
    # Automatically:
    # - Analyzes available CPU and memory
    # - Calculates optimal worker count
    # - Creates balanced execution batches
    # - Monitors resource usage
)

# Disable optimization if needed
unoptimized = parallel(
    node1, node2, node3,
    optimization_enabled=False  # Manual control only
)
```

### Resource Allocation

The system automatically:

1. **Analyzes available resources**:
   * CPU cores available
   * Available memory (70% utilization target)
   * Estimated memory per node (50MB default)

2. **Calculates optimal workers**:
   ```python theme={null}
   optimal_workers = min(
       max_concurrent or branch_count,
       available_cpu_cores,
       available_memory / estimated_memory_per_node
   )
   ```

3. **Creates execution batches**:
   * Balances load across workers
   * Prevents resource exhaustion
   * Maintains system stability

### Manual Resource Configuration

For fine-grained control:

```python theme={null}
# Explicit concurrency limit
parallel(
    *heavy_tasks,
    max_concurrent=4  # Exact worker count
)

# Disable optimization for predictable behavior
parallel(
    *tasks,
    max_concurrent=2,
    optimization_enabled=False
)
```

## Error Handling

### Individual Branch Failures

If any branch fails, the entire parallel block raises an error:

```python theme={null}
from egregore.core.workflow import ParallelExecutionError

@node("safe_task")
def safe_task(data: dict) -> dict:
    return {"status": "success"}

@node("failing_task")
def failing_task(data: dict) -> dict:
    raise ValueError("Task failed!")

try:
    result = parallel(safe_task, failing_task)(data)
except ParallelExecutionError as e:
    print(f"Parallel execution failed: {e}")
    # Error includes which branch failed
```

### Graceful Error Handling

Handle errors within nodes for graceful degradation:

```python theme={null}
@node("resilient_task")
def resilient_task(data: dict) -> dict:
    """Task with built-in error handling."""
    try:
        result = risky_operation(data)
        return {"status": "success", "result": result}
    except Exception as e:
        # Log error but don't crash
        return {"status": "error", "error": str(e)}

# Parallel block completes even if operations fail
result = parallel(resilient_task, another_task)(data)

# Check individual results
if result["resilient_task"]["status"] == "error":
    handle_error(result["resilient_task"]["error"])
```

## Workflow Integration

### In Sequences

Parallel nodes integrate seamlessly with sequential workflows:

```python theme={null}
workflow = Sequence(
    extract_data,           # Sequential
    parallel(               # Parallel
        transform_type_a,
        transform_type_b,
        transform_type_c
    ),
    combine_results,        # Sequential
    save_to_database        # Sequential
)
```

### Nested Parallel Execution

Parallel blocks can contain parallel blocks:

```python theme={null}
inner_parallel = parallel(subtask1, subtask2, subtask3)

outer_parallel = parallel(
    main_task1,
    inner_parallel,  # Nested parallel execution
    main_task3
)

# Execution:
# - main_task1, inner_parallel, and main_task3 run concurrently
# - Within inner_parallel, subtask1/2/3 run concurrently
```

### With Decision Nodes

Combine parallel execution with conditional routing:

```python theme={null}
@decision("route_by_size")
def route_by_size(data: dict) -> str:
    size = len(data["items"])
    return "parallel_process" if size > 100 else "sequential_process"

@node("sequential_process")
def sequential_process(data: dict) -> dict:
    return process_sequentially(data)

# For large datasets, use parallel processing
parallel_process = parallel(
    process_chunk1,
    process_chunk2,
    process_chunk3,
    process_chunk4
)

workflow = Sequence(
    load_data,
    route_by_size,
    # Execution branches based on data size
)
```

## Performance Considerations

### When to Use Parallel Execution

**Good candidates**:

* Independent operations (no data dependencies)
* I/O-bound tasks (API calls, file operations)
* CPU-bound tasks with sufficient cores
* Long-running operations

**Poor candidates**:

* Operations with dependencies between them
* Very fast operations (overhead exceeds benefit)
* Memory-intensive operations (without concurrency limits)
* Operations requiring strict ordering

### Overhead Analysis

```python theme={null}
# Parallel execution overhead: ~1-2ms per branch
# Benefits kick in when operations take >10ms each

# Not worth it (too fast):
@node("add_numbers")
def add_numbers(a: int, b: int) -> int:
    return a + b  # <1ms - sequential is faster

# Worth it (I/O bound):
@node("fetch_from_api")
def fetch_from_api(url: str) -> dict:
    return requests.get(url).json()  # 100-500ms - parallel is much faster
```

### Resource Impact

| Branches | No Limit       | max\_concurrent=2 | Optimization    |
| -------- | -------------- | ----------------- | --------------- |
| 3        | 3 workers      | 2 workers         | Auto-calculated |
| 10       | 10 workers     | 2 workers         | CPU-limited     |
| 100      | 100 workers ⚠️ | 2 workers         | Memory-limited  |

**Recommendation**: Use `max_concurrent` or enable optimization for >5 branches.

## Best Practices

<AccordionGroup>
  <Accordion title="Use parallel for independent operations only">
    ```python theme={null}
    # Good: Independent operations
    parallel(
        fetch_user_profile,
        fetch_user_posts,
        fetch_user_comments
    )

    # Bad: Dependent operations
    parallel(
        fetch_user_id,      # Must complete first!
        fetch_user_profile  # Depends on user_id
    )
    ```
  </Accordion>

  <Accordion title="Set appropriate concurrency limits">
    ```python theme={null}
    # Good: Limited concurrency
    parallel(
        *api_requests,
        max_concurrent=5  # Respect rate limits
    )

    # Bad: Unlimited concurrency
    parallel(*api_requests)  # May overwhelm API
    ```
  </Accordion>

  <Accordion title="Use timeouts for long-running operations">
    ```python theme={null}
    # Good: With timeout
    parallel(
        *long_tasks,
        timeout=60.0  # Fail if not done in 60s
    )

    # Bad: No timeout
    parallel(*long_tasks)  # May hang indefinitely
    ```
  </Accordion>

  <Accordion title="Handle errors gracefully">
    ```python theme={null}
    # Good: Resilient nodes
    @node("safe_fetch")
    def safe_fetch(url: str) -> dict:
        try:
            return fetch(url)
        except Exception as e:
            return {"error": str(e)}

    # Bad: Unhandled errors
    @node("unsafe_fetch")
    def unsafe_fetch(url: str) -> dict:
        return fetch(url)  # Crashes entire parallel block
    ```
  </Accordion>

  <Accordion title="Name nodes clearly for result access">
    ```python theme={null}
    # Good: Clear names
    @node("fetch_user_data")
    @node("fetch_order_data")
    @node("fetch_payment_data")

    result = parallel(...)(data)
    user = result["fetch_user_data"]  # Clear

    # Bad: Generic names
    @node("fetch1")
    @node("fetch2")
    @node("fetch3")

    result = parallel(...)(data)
    user = result["fetch1"]  # Which fetch is this?
    ```
  </Accordion>
</AccordionGroup>

## Advanced Patterns

### Map-Reduce Pattern

```python theme={null}
@node("split_work")
def split_work(data: dict) -> list:
    """Split data into chunks."""
    return [chunk for chunk in split(data, size=100)]

# Create parallel processors dynamically
def create_processors(chunk_count: int):
    return [
        node(f"process_chunk_{i}")(
            lambda chunk: process(chunk)
        )
        for i in range(chunk_count)
    ]

@node("reduce_results")
def reduce_results(results: dict) -> dict:
    """Combine parallel results."""
    return merge(results.values())

# Map-reduce workflow
workflow = Sequence(
    split_work,
    # Create parallel processors based on chunk count
    # (would need dynamic workflow construction)
    reduce_results
)
```

### Fan-Out Fan-In

```python theme={null}
# Fan-out: One input, many parallel operations
fan_out = parallel(
    process_for_system_a,
    process_for_system_b,
    process_for_system_c
)

# Fan-in: Many results, one output
@node("fan_in")
def fan_in(results: dict) -> dict:
    """Combine all parallel results."""
    return {
        "system_a": results["process_for_system_a"],
        "system_b": results["process_for_system_b"],
        "system_c": results["process_for_system_c"],
        "combined": True
    }

workflow = Sequence(
    prepare_data,
    fan_out,
    fan_in,
    send_notifications
)
```

## What's Next?

<CardGroup cols={2}>
  <Card title="Agent Discovery" icon="magnifying-glass" href="/features/workflows/agent-discovery">
    Access and manage agents in workflows
  </Card>

  <Card title="Shared State" icon="database" href="/features/workflows/shared-state">
    Share data across nodes
  </Card>

  <Card title="Validation" icon="check-circle" href="/features/workflows/validation">
    Validate workflows before execution
  </Card>

  <Card title="Reporting" icon="chart-line" href="/features/workflows/reporting">
    Track workflow performance
  </Card>
</CardGroup>
