> ## Documentation Index
> Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Workflows Overview

> Node-based execution graphs with parallel processing and native agent discovery

# Workflows Overview

**Workflows** provide a powerful node-based execution system for orchestrating complex agent operations with parallel processing, conditional logic, and built-in agent discovery.

## Core Concepts

Egregore's workflow system is built on these fundamental concepts:

### Nodes

**Nodes** are the building blocks of workflows - callable units that transform data:

```python theme={null}
from egregore.core.workflow import node

@node
def process_data(data: dict) -> dict:
    """Process some data."""
    return {"processed": True, **data}

# Use it
result = process_data({"input": "value"})
# Result: {"processed": True, "input": "value"}
```

### Sequences

**Sequences** chain nodes together into execution graphs:

```python theme={null}
from egregore.core.workflow import Sequence

# Create workflow
workflow = Sequence(
    extract_data,
    transform_data,
    load_data
)

# Execute
result = workflow.run(initial_data)
```

### Parallel Execution

**Parallel nodes** execute multiple operations concurrently:

```python theme={null}
from egregore.core.workflow import parallel

results = parallel(
    fetch_api_data,
    query_database,
    read_file
)(input_data)

# Results: {"fetch_api_data": {...}, "query_database": {...}, "read_file": {...}}
```

### Agent Discovery

**Native agent discovery** lets workflows access and manage agents automatically:

```python theme={null}
from egregore.core.workflow import get_current_agents

@node
def process_with_agents(data: dict) -> dict:
    # Automatically discover agents in current workflow
    agents = get_current_agents()

    for agent in agents:
        print(f"Found agent: {agent.agent_id}")

    return data
```

## Key Features

<CardGroup cols={2}>
  <Card title="Node Decorators" icon="cube">
    `@node`, `@decision`, `@parallel` decorators for quick workflow creation
  </Card>

  <Card title="Parallel Processing" icon="arrows-split-up-and-left">
    Execute multiple nodes concurrently with dictionary-based result collection
  </Card>

  <Card title="Conditional Logic" icon="code-branch">
    Dynamic routing with `@decision` nodes based on runtime conditions
  </Card>

  <Card title="Native Agent Discovery" icon="magnifying-glass">
    Built-in agent tracking and management within workflows
  </Card>

  <Card title="Shared State" icon="database">
    Cross-node state management with `SharedState` and `workflow_state`
  </Card>

  <Card title="Type Safety" icon="shield">
    Optional type checking with `WorkflowTypeChecker` for development
  </Card>

  <Card title="Validation System" icon="check-circle">
    Pre-execution validation with cycle detection and dependency checking
  </Card>

  <Card title="Reporting & Metrics" icon="chart-line">
    Built-in performance tracking and execution metrics
  </Card>
</CardGroup>

## Quick Start

### Basic Workflow

```python theme={null}
from egregore import Agent
from egregore.core.workflow import node, Sequence

# Create nodes
@node
def extract(data: dict) -> dict:
    """Extract data from source."""
    return {"extracted": data["source"]}

@node
def transform(data: dict) -> dict:
    """Transform extracted data."""
    return {"transformed": data["extracted"].upper()}

@node
def load(data: dict) -> dict:
    """Load transformed data."""
    print(f"Loading: {data['transformed']}")
    return {"status": "complete"}

# Create workflow
etl_workflow = Sequence(
    extract,
    transform,
    load
)

# Execute
result = etl_workflow.run({"source": "hello world"})
# Prints: Loading: HELLO WORLD
# Returns: {"status": "complete"}
```

### Parallel Workflow

```python theme={null}
from egregore.core.workflow import parallel, node, Sequence

@node
def fetch_user(user_id: str) -> dict:
    return {"name": "Alice", "id": user_id}

@node
def fetch_posts(user_id: str) -> list:
    return [{"id": 1, "title": "Post 1"}]

@node
def fetch_comments(user_id: str) -> list:
    return [{"id": 1, "text": "Comment 1"}]

@node
def combine(data: dict) -> dict:
    """Combine results from parallel fetches."""
    return {
        "user": data["fetch_user"],
        "posts": data["fetch_posts"],
        "comments": data["fetch_comments"],
    }

# Create workflow with parallel data fetching
user_workflow = Sequence(
    parallel(fetch_user, fetch_posts, fetch_comments),
    combine
)

# Execute - parallel nodes run concurrently
result = user_workflow.run("user_123")
```

### Workflow with Agents

```python theme={null}
from egregore import Agent
from egregore.core.workflow import node, Sequence, get_current_agents

agent1 = Agent(provider="openai:gpt-4")
agent2 = Agent(provider="anthropic:claude-3.5-sonnet")

@node
def analyze_with_agents(data: dict) -> dict:
    """Use discovered agents for analysis."""
    # Agents automatically discovered
    agents = get_current_agents()

    results = []
    for agent in agents:
        response = agent.call(f"Analyze: {data['text']}")
        results.append(response)

    return {"analyses": results}

workflow = Sequence(analyze_with_agents)

# Agents automatically tracked during execution
result = workflow.run({"text": "Some data to analyze"})
```

## Workflow Components

### 1. Node Types

| Type         | Decorator   | Use Case                             |
| ------------ | ----------- | ------------------------------------ |
| **Standard** | `@node`     | Data processing, transformations     |
| **Decision** | `@decision` | Conditional routing, branching logic |
| **Parallel** | `@parallel` | Concurrent execution                 |
| **Async**    | `AsyncNode` | Async operations                     |
| **Batch**    | `BatchNode` | Batch processing                     |

### 2. Execution Control

| Component              | Purpose                      |
| ---------------------- | ---------------------------- |
| **Sequence**           | Chain nodes into workflows   |
| **WorkflowController** | Start/stop/pause workflows   |
| **SharedState**        | Cross-node state management  |
| **workflow\_state**    | Decorator for stateful nodes |

### 3. Agent Discovery

| Function                           | Purpose                     |
| ---------------------------------- | --------------------------- |
| `get_current_agents()`             | Get all agents in workflow  |
| `get_current_agent_states()`       | Get agent states            |
| `interrupt_current_agents()`       | Stop all agents             |
| `apply_policy_to_current_agents()` | Apply policies              |
| `get_agents_in_node()`             | Get agents in specific node |

### 4. Validation & Type Checking

| Component                   | Purpose                      |
| --------------------------- | ---------------------------- |
| **SequenceValidator**       | Pre-execution validation     |
| **CycleDetectionValidator** | Detect circular dependencies |
| **DependencyValidator**     | Check node dependencies      |
| **WorkflowTypeChecker**     | Type safety checking         |

### 5. Reporting

| Component                    | Purpose                   |
| ---------------------------- | ------------------------- |
| **WorkflowMetricsCollector** | Collect execution metrics |
| **PerformanceSummary**       | Performance analysis      |
| **ExecutionStatus**          | Track execution state     |
| **ErrorReport**              | Error tracking            |

## Common Patterns

### ETL Pipeline

```python theme={null}
workflow = Sequence(
    extract_from_source,
    validate_data,
    transform_format,
    enrich_data,
    load_to_destination
)
```

### Parallel Data Gathering

```python theme={null}
workflow = Sequence(
    parallel(
        fetch_api_1,
        fetch_api_2,
        fetch_api_3
    ),
    merge_results,
    process_merged
)
```

### Conditional Branching

```python theme={null}
from egregore.core.workflow import decision

@decision
def route_by_type(data: dict) -> str:
    """Route based on data type."""
    return "process_json" if data["type"] == "json" else "process_csv"

workflow = Sequence(
    extract_data,
    route_by_type,  # Returns next node name
    # Execution continues to selected node
)
```

### Agent Coordination

```python theme={null}
@node
def coordinate_agents(task: dict) -> dict:
    """Coordinate multiple agents on a task."""
    agents = get_current_agents()

    # Distribute work
    results = []
    for i, agent in enumerate(agents):
        subtask = task["subtasks"][i]
        result = agent.call(subtask["prompt"])
        results.append(result)

    return {"combined_results": results}
```

## Performance Considerations

* **Parallel execution**: Multiple nodes run concurrently in separate threads
* **Resource optimization**: Parallel nodes share resources efficiently
* **Terminal node collection**: Results gathered from all terminal nodes
* **Agent discovery overhead**: Less than 1ms per node
* **Type checking**: Disabled by default for production (enable for development)

## Error Handling

Workflows provide comprehensive error handling:

```python theme={null}
from egregore.core.workflow import ParallelExecutionError

try:
    result = workflow.run(data)
except ParallelExecutionError as e:
    # Parallel node execution failed
    print(f"Failed nodes: {e.failed_nodes}")
    print(f"Successful results: {e.partial_results}")
except WorkflowStoppedException:
    # Workflow was interrupted
    print("Workflow stopped")
```

## What's Next?

<CardGroup cols={2}>
  <Card title="Creating Nodes" icon="cube" href="/features/workflows/creating-nodes">
    Learn to create workflow nodes
  </Card>

  <Card title="Parallel Execution" icon="arrows-split-up-and-left" href="/features/workflows/parallel-execution">
    Execute nodes concurrently
  </Card>

  <Card title="Agent Discovery" icon="magnifying-glass" href="/features/workflows/agent-discovery">
    Access and manage agents in workflows
  </Card>

  <Card title="Shared State" icon="database" href="/features/workflows/shared-state">
    Share state across nodes
  </Card>

  <Card title="Validation" icon="check-circle" href="/features/workflows/validation">
    Validate workflows before execution
  </Card>

  <Card title="Type Safety" icon="shield" href="/features/workflows/type-safety">
    Type checking for workflows
  </Card>

  <Card title="Reporting" icon="chart-line" href="/features/workflows/reporting">
    Track workflow performance
  </Card>

  <Card title="Best Practices" icon="star" href="/features/workflows/best-practices">
    Workflow design patterns
  </Card>
</CardGroup>
