Skip to main content

Workflows Overview

Workflows provide a powerful node-based execution system for orchestrating complex agent operations with parallel processing, conditional logic, and built-in agent discovery.

Core Concepts

Egregore’s workflow system is built on these fundamental concepts:

Nodes

Nodes are the building blocks of workflows - callable units that transform data:
from egregore.core.workflow import node

@node
def process_data(data: dict) -> dict:
    """Process some data."""
    return {"processed": True, **data}

# Use it
result = process_data({"input": "value"})
# Result: {"processed": True, "input": "value"}

Sequences

Sequences chain nodes together into execution graphs:
from egregore.core.workflow import Sequence

# Create workflow
workflow = Sequence(
    extract_data,
    transform_data,
    load_data
)

# Execute
result = workflow.run(initial_data)

Parallel Execution

Parallel nodes execute multiple operations concurrently:
from egregore.core.workflow import parallel

results = parallel(
    fetch_api_data,
    query_database,
    read_file
)(input_data)

# Results: {"fetch_api_data": {...}, "query_database": {...}, "read_file": {...}}

Agent Discovery

Native agent discovery lets workflows access and manage agents automatically:
from egregore.core.workflow import get_current_agents

@node
def process_with_agents(data: dict) -> dict:
    # Automatically discover agents in current workflow
    agents = get_current_agents()

    for agent in agents:
        print(f"Found agent: {agent.agent_id}")

    return data

Key Features

Node Decorators

@node, @decision, @parallel decorators for quick workflow creation

Parallel Processing

Execute multiple nodes concurrently with dictionary-based result collection

Conditional Logic

Dynamic routing with @decision nodes based on runtime conditions

Native Agent Discovery

Built-in agent tracking and management within workflows

Shared State

Cross-node state management with SharedState and workflow_state

Type Safety

Optional type checking with WorkflowTypeChecker for development

Validation System

Pre-execution validation with cycle detection and dependency checking

Reporting & Metrics

Built-in performance tracking and execution metrics

Quick Start

Basic Workflow

from egregore import Agent
from egregore.core.workflow import node, Sequence

# Create nodes
@node
def extract(data: dict) -> dict:
    """Extract data from source."""
    return {"extracted": data["source"]}

@node
def transform(data: dict) -> dict:
    """Transform extracted data."""
    return {"transformed": data["extracted"].upper()}

@node
def load(data: dict) -> dict:
    """Load transformed data."""
    print(f"Loading: {data['transformed']}")
    return {"status": "complete"}

# Create workflow
etl_workflow = Sequence(
    extract,
    transform,
    load
)

# Execute
result = etl_workflow.run({"source": "hello world"})
# Prints: Loading: HELLO WORLD
# Returns: {"status": "complete"}

Parallel Workflow

from egregore.core.workflow import parallel, node, Sequence

@node
def fetch_user(user_id: str) -> dict:
    return {"name": "Alice", "id": user_id}

@node
def fetch_posts(user_id: str) -> list:
    return [{"id": 1, "title": "Post 1"}]

@node
def fetch_comments(user_id: str) -> list:
    return [{"id": 1, "text": "Comment 1"}]

@node
def combine(data: dict) -> dict:
    """Combine results from parallel fetches."""
    return {
        "user": data["fetch_user"],
        "posts": data["fetch_posts"],
        "comments": data["fetch_comments"],
    }

# Create workflow with parallel data fetching
user_workflow = Sequence(
    parallel(fetch_user, fetch_posts, fetch_comments),
    combine
)

# Execute - parallel nodes run concurrently
result = user_workflow.run("user_123")

Workflow with Agents

from egregore import Agent
from egregore.core.workflow import node, Sequence, get_current_agents

agent1 = Agent(provider="openai:gpt-4")
agent2 = Agent(provider="anthropic:claude-3.5-sonnet")

@node
def analyze_with_agents(data: dict) -> dict:
    """Use discovered agents for analysis."""
    # Agents automatically discovered
    agents = get_current_agents()

    results = []
    for agent in agents:
        response = agent.call(f"Analyze: {data['text']}")
        results.append(response)

    return {"analyses": results}

workflow = Sequence(analyze_with_agents)

# Agents automatically tracked during execution
result = workflow.run({"text": "Some data to analyze"})

Workflow Components

1. Node Types

TypeDecoratorUse Case
Standard@nodeData processing, transformations
Decision@decisionConditional routing, branching logic
Parallel@parallelConcurrent execution
AsyncAsyncNodeAsync operations
BatchBatchNodeBatch processing

2. Execution Control

ComponentPurpose
SequenceChain nodes into workflows
WorkflowControllerStart/stop/pause workflows
SharedStateCross-node state management
workflow_stateDecorator for stateful nodes

3. Agent Discovery

FunctionPurpose
get_current_agents()Get all agents in workflow
get_current_agent_states()Get agent states
interrupt_current_agents()Stop all agents
apply_policy_to_current_agents()Apply policies
get_agents_in_node()Get agents in specific node

4. Validation & Type Checking

ComponentPurpose
SequenceValidatorPre-execution validation
CycleDetectionValidatorDetect circular dependencies
DependencyValidatorCheck node dependencies
WorkflowTypeCheckerType safety checking

5. Reporting

ComponentPurpose
WorkflowMetricsCollectorCollect execution metrics
PerformanceSummaryPerformance analysis
ExecutionStatusTrack execution state
ErrorReportError tracking

Common Patterns

ETL Pipeline

workflow = Sequence(
    extract_from_source,
    validate_data,
    transform_format,
    enrich_data,
    load_to_destination
)

Parallel Data Gathering

workflow = Sequence(
    parallel(
        fetch_api_1,
        fetch_api_2,
        fetch_api_3
    ),
    merge_results,
    process_merged
)

Conditional Branching

from egregore.core.workflow import decision

@decision
def route_by_type(data: dict) -> str:
    """Route based on data type."""
    return "process_json" if data["type"] == "json" else "process_csv"

workflow = Sequence(
    extract_data,
    route_by_type,  # Returns next node name
    # Execution continues to selected node
)

Agent Coordination

@node
def coordinate_agents(task: dict) -> dict:
    """Coordinate multiple agents on a task."""
    agents = get_current_agents()

    # Distribute work
    results = []
    for i, agent in enumerate(agents):
        subtask = task["subtasks"][i]
        result = agent.call(subtask["prompt"])
        results.append(result)

    return {"combined_results": results}

Performance Considerations

  • Parallel execution: Multiple nodes run concurrently in separate threads
  • Resource optimization: Parallel nodes share resources efficiently
  • Terminal node collection: Results gathered from all terminal nodes
  • Agent discovery overhead: Less than 1ms per node
  • Type checking: Disabled by default for production (enable for development)

Error Handling

Workflows provide comprehensive error handling:
from egregore.core.workflow import ParallelExecutionError

try:
    result = workflow.run(data)
except ParallelExecutionError as e:
    # Parallel node execution failed
    print(f"Failed nodes: {e.failed_nodes}")
    print(f"Successful results: {e.partial_results}")
except WorkflowStoppedException:
    # Workflow was interrupted
    print("Workflow stopped")

What’s Next?