Skip to main content

Workflows Overview

Workflows provide a powerful node-based execution system for orchestrating complex agent operations with parallel processing, conditional logic, and built-in agent discovery.

Core Concepts

Egregore’s workflow system is built on these fundamental concepts:

Nodes

Nodes are the building blocks of workflows - callable units that transform data:
from egregore.core.workflow import node

@node
def process_data(data: dict) -> dict:
    """Process some data."""
    return {"processed": True, **data}

# Use it
result = process_data({"input": "value"})
# Result: {"processed": True, "input": "value"}

Sequences

Sequences chain nodes together into execution graphs:
from egregore.core.workflow import Sequence

# Create workflow
workflow = Sequence(
    extract_data,
    transform_data,
    load_data
)

# Execute
result = workflow.run(initial_data)

Parallel Execution

Parallel nodes execute multiple operations concurrently:
from egregore.core.workflow import parallel

results = parallel(
    fetch_api_data,
    query_database,
    read_file
)(input_data)

# Results: {"fetch_api_data": {...}, "query_database": {...}, "read_file": {...}}

Agent Discovery

Native agent discovery lets workflows access and manage agents automatically:
from egregore.core.workflow import get_current_agents

@node
def process_with_agents(data: dict) -> dict:
    # Automatically discover agents in current workflow
    agents = get_current_agents()

    for agent in agents:
        print(f"Found agent: {agent.agent_id}")

    return data

Key Features

Node Decorators

@node, @decision, @parallel decorators for quick workflow creation

Parallel Processing

Execute multiple nodes concurrently with dictionary-based result collection

Conditional Logic

Dynamic routing with @decision nodes based on runtime conditions

Native Agent Discovery

Built-in agent tracking and management within workflows

Shared State

Cross-node state management with SharedState and workflow_state

Type Safety

Optional type checking with WorkflowTypeChecker for development

Validation System

Pre-execution validation with cycle detection and dependency checking

Reporting & Metrics

Built-in performance tracking and execution metrics

Quick Start

Basic Workflow

from egregore import Agent
from egregore.core.workflow import node, Sequence

# Create nodes
@node
def extract(data: dict) -> dict:
    """Extract data from source."""
    return {"extracted": data["source"]}

@node
def transform(data: dict) -> dict:
    """Transform extracted data."""
    return {"transformed": data["extracted"].upper()}

@node
def load(data: dict) -> dict:
    """Load transformed data."""
    print(f"Loading: {data['transformed']}")
    return {"status": "complete"}

# Create workflow
etl_workflow = Sequence(
    extract,
    transform,
    load
)

# Execute
result = etl_workflow.run({"source": "hello world"})
# Prints: Loading: HELLO WORLD
# Returns: {"status": "complete"}

Parallel Workflow

from egregore.core.workflow import parallel, node, Sequence

@node
def fetch_user(user_id: str) -> dict:
    return {"name": "Alice", "id": user_id}

@node
def fetch_posts(user_id: str) -> list:
    return [{"id": 1, "title": "Post 1"}]

@node
def fetch_comments(user_id: str) -> list:
    return [{"id": 1, "text": "Comment 1"}]

@node
def combine(data: dict) -> dict:
    """Combine results from parallel fetches."""
    return {
        "user": data["fetch_user"],
        "posts": data["fetch_posts"],
        "comments": data["fetch_comments"],
    }

# Create workflow with parallel data fetching
user_workflow = Sequence(
    parallel(fetch_user, fetch_posts, fetch_comments),
    combine
)

# Execute - parallel nodes run concurrently
result = user_workflow.run("user_123")

Workflow with Agents

from egregore import Agent
from egregore.core.workflow import node, Sequence, get_current_agents

agent1 = Agent(provider="openai:gpt-4")
agent2 = Agent(provider="anthropic:claude-3.5-sonnet")

@node
def analyze_with_agents(data: dict) -> dict:
    """Use discovered agents for analysis."""
    # Agents automatically discovered
    agents = get_current_agents()

    results = []
    for agent in agents:
        response = agent.call(f"Analyze: {data['text']}")
        results.append(response)

    return {"analyses": results}

workflow = Sequence(analyze_with_agents)

# Agents automatically tracked during execution
result = workflow.run({"text": "Some data to analyze"})

Workflow Components

1. Node Types

TypeDecoratorUse Case
Standard@nodeData processing, transformations
Decision@decisionConditional routing, branching logic
Parallel@parallelConcurrent execution
AsyncAsyncNodeAsync operations
BatchBatchNodeBatch processing

2. Execution Control

ComponentPurpose
SequenceChain nodes into workflows
WorkflowControllerStart/stop/pause workflows
SharedStateCross-node state management
workflow_stateDecorator for stateful nodes

3. Agent Discovery

FunctionPurpose
get_current_agents()Get all agents in workflow
get_current_agent_states()Get agent states
interrupt_current_agents()Stop all agents
apply_policy_to_current_agents()Apply policies
get_agents_in_node()Get agents in specific node

4. Validation & Type Checking

ComponentPurpose
SequenceValidatorPre-execution validation
CycleDetectionValidatorDetect circular dependencies
DependencyValidatorCheck node dependencies
WorkflowTypeCheckerType safety checking

5. Reporting

ComponentPurpose
WorkflowMetricsCollectorCollect execution metrics
PerformanceSummaryPerformance analysis
ExecutionStatusTrack execution state
ErrorReportError tracking

Common Patterns

ETL Pipeline

workflow = Sequence(
    extract_from_source,
    validate_data,
    transform_format,
    enrich_data,
    load_to_destination
)

Parallel Data Gathering

workflow = Sequence(
    parallel(
        fetch_api_1,
        fetch_api_2,
        fetch_api_3
    ),
    merge_results,
    process_merged
)

Conditional Branching

from egregore.core.workflow import decision

@decision
def route_by_type(data: dict) -> str:
    """Route based on data type."""
    return "process_json" if data["type"] == "json" else "process_csv"

workflow = Sequence(
    extract_data,
    route_by_type,  # Returns next node name
    # Execution continues to selected node
)

Agent Coordination

@node
def coordinate_agents(task: dict) -> dict:
    """Coordinate multiple agents on a task."""
    agents = get_current_agents()

    # Distribute work
    results = []
    for i, agent in enumerate(agents):
        subtask = task["subtasks"][i]
        result = agent.call(subtask["prompt"])
        results.append(result)

    return {"combined_results": results}

Performance Considerations

  • Parallel execution: Multiple nodes run concurrently in separate threads
  • Resource optimization: Parallel nodes share resources efficiently
  • Terminal node collection: Results gathered from all terminal nodes
  • Agent discovery overhead: Less than 1ms per node
  • Type checking: Disabled by default for production (enable for development)

Error Handling

Workflows provide comprehensive error handling:
from egregore.core.workflow import ParallelExecutionError

try:
    result = workflow.run(data)
except ParallelExecutionError as e:
    # Parallel node execution failed
    print(f"Failed nodes: {e.failed_nodes}")
    print(f"Successful results: {e.partial_results}")
except WorkflowStoppedException:
    # Workflow was interrupted
    print("Workflow stopped")

What’s Next?

Creating Nodes

Learn to create workflow nodes

Parallel Execution

Execute nodes concurrently

Agent Discovery

Access and manage agents in workflows

Shared State

Share state across nodes

Validation

Validate workflows before execution

Type Safety

Type checking for workflows

Reporting

Track workflow performance

Best Practices

Workflow design patterns