Documentation Index Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
Use this file to discover all available pages before exploring further.
Workflows Overview
Workflows provide a powerful node-based execution system for orchestrating complex agent operations with parallel processing, conditional logic, and built-in agent discovery.
Core Concepts
Egregore’s workflow system is built on these fundamental concepts:
Nodes
Nodes are the building blocks of workflows - callable units that transform data:
from egregore.core.workflow import node
@node
def process_data ( data : dict ) -> dict :
"""Process some data."""
return { "processed" : True , ** data}
# Use it
result = process_data({ "input" : "value" })
# Result: {"processed": True, "input": "value"}
Sequences
Sequences chain nodes together into execution graphs:
from egregore.core.workflow import Sequence
# Create workflow
workflow = Sequence(
extract_data,
transform_data,
load_data
)
# Execute
result = workflow.run(initial_data)
Parallel Execution
Parallel nodes execute multiple operations concurrently:
from egregore.core.workflow import parallel
results = parallel(
fetch_api_data,
query_database,
read_file
)(input_data)
# Results: {"fetch_api_data": {...}, "query_database": {...}, "read_file": {...}}
Agent Discovery
Native agent discovery lets workflows access and manage agents automatically:
from egregore.core.workflow import get_current_agents
@node
def process_with_agents ( data : dict ) -> dict :
# Automatically discover agents in current workflow
agents = get_current_agents()
for agent in agents:
print ( f "Found agent: { agent.agent_id } " )
return data
Key Features
Node Decorators @node, @decision, @parallel decorators for quick workflow creation
Parallel Processing Execute multiple nodes concurrently with dictionary-based result collection
Conditional Logic Dynamic routing with @decision nodes based on runtime conditions
Native Agent Discovery Built-in agent tracking and management within workflows
Shared State Cross-node state management with SharedState and workflow_state
Type Safety Optional type checking with WorkflowTypeChecker for development
Validation System Pre-execution validation with cycle detection and dependency checking
Reporting & Metrics Built-in performance tracking and execution metrics
Quick Start
Basic Workflow
from egregore import Agent
from egregore.core.workflow import node, Sequence
# Create nodes
@node
def extract ( data : dict ) -> dict :
"""Extract data from source."""
return { "extracted" : data[ "source" ]}
@node
def transform ( data : dict ) -> dict :
"""Transform extracted data."""
return { "transformed" : data[ "extracted" ].upper()}
@node
def load ( data : dict ) -> dict :
"""Load transformed data."""
print ( f "Loading: { data[ 'transformed' ] } " )
return { "status" : "complete" }
# Create workflow
etl_workflow = Sequence(
extract,
transform,
load
)
# Execute
result = etl_workflow.run({ "source" : "hello world" })
# Prints: Loading: HELLO WORLD
# Returns: {"status": "complete"}
Parallel Workflow
from egregore.core.workflow import parallel, node, Sequence
@node
def fetch_user ( user_id : str ) -> dict :
return { "name" : "Alice" , "id" : user_id}
@node
def fetch_posts ( user_id : str ) -> list :
return [{ "id" : 1 , "title" : "Post 1" }]
@node
def fetch_comments ( user_id : str ) -> list :
return [{ "id" : 1 , "text" : "Comment 1" }]
@node
def combine ( data : dict ) -> dict :
"""Combine results from parallel fetches."""
return {
"user" : data[ "fetch_user" ],
"posts" : data[ "fetch_posts" ],
"comments" : data[ "fetch_comments" ],
}
# Create workflow with parallel data fetching
user_workflow = Sequence(
parallel(fetch_user, fetch_posts, fetch_comments),
combine
)
# Execute - parallel nodes run concurrently
result = user_workflow.run( "user_123" )
Workflow with Agents
from egregore import Agent
from egregore.core.workflow import node, Sequence, get_current_agents
agent1 = Agent( provider = "openai:gpt-4" )
agent2 = Agent( provider = "anthropic:claude-3.5-sonnet" )
@node
def analyze_with_agents ( data : dict ) -> dict :
"""Use discovered agents for analysis."""
# Agents automatically discovered
agents = get_current_agents()
results = []
for agent in agents:
response = agent.call( f "Analyze: { data[ 'text' ] } " )
results.append(response)
return { "analyses" : results}
workflow = Sequence(analyze_with_agents)
# Agents automatically tracked during execution
result = workflow.run({ "text" : "Some data to analyze" })
Workflow Components
1. Node Types
Type Decorator Use Case Standard @nodeData processing, transformations Decision @decisionConditional routing, branching logic Parallel @parallelConcurrent execution Async AsyncNodeAsync operations Batch BatchNodeBatch processing
2. Execution Control
Component Purpose Sequence Chain nodes into workflows WorkflowController Start/stop/pause workflows SharedState Cross-node state management workflow_state Decorator for stateful nodes
3. Agent Discovery
Function Purpose get_current_agents()Get all agents in workflow get_current_agent_states()Get agent states interrupt_current_agents()Stop all agents apply_policy_to_current_agents()Apply policies get_agents_in_node()Get agents in specific node
4. Validation & Type Checking
Component Purpose SequenceValidator Pre-execution validation CycleDetectionValidator Detect circular dependencies DependencyValidator Check node dependencies WorkflowTypeChecker Type safety checking
5. Reporting
Component Purpose WorkflowMetricsCollector Collect execution metrics PerformanceSummary Performance analysis ExecutionStatus Track execution state ErrorReport Error tracking
Common Patterns
ETL Pipeline
workflow = Sequence(
extract_from_source,
validate_data,
transform_format,
enrich_data,
load_to_destination
)
Parallel Data Gathering
workflow = Sequence(
parallel(
fetch_api_1,
fetch_api_2,
fetch_api_3
),
merge_results,
process_merged
)
Conditional Branching
from egregore.core.workflow import decision
@decision
def route_by_type ( data : dict ) -> str :
"""Route based on data type."""
return "process_json" if data[ "type" ] == "json" else "process_csv"
workflow = Sequence(
extract_data,
route_by_type, # Returns next node name
# Execution continues to selected node
)
Agent Coordination
@node
def coordinate_agents ( task : dict ) -> dict :
"""Coordinate multiple agents on a task."""
agents = get_current_agents()
# Distribute work
results = []
for i, agent in enumerate (agents):
subtask = task[ "subtasks" ][i]
result = agent.call(subtask[ "prompt" ])
results.append(result)
return { "combined_results" : results}
Parallel execution : Multiple nodes run concurrently in separate threads
Resource optimization : Parallel nodes share resources efficiently
Terminal node collection : Results gathered from all terminal nodes
Agent discovery overhead : Less than 1ms per node
Type checking : Disabled by default for production (enable for development)
Error Handling
Workflows provide comprehensive error handling:
from egregore.core.workflow import ParallelExecutionError
try :
result = workflow.run(data)
except ParallelExecutionError as e:
# Parallel node execution failed
print ( f "Failed nodes: { e.failed_nodes } " )
print ( f "Successful results: { e.partial_results } " )
except WorkflowStoppedException:
# Workflow was interrupted
print ( "Workflow stopped" )
What’s Next?
Creating Nodes Learn to create workflow nodes
Parallel Execution Execute nodes concurrently
Agent Discovery Access and manage agents in workflows
Shared State Share state across nodes
Validation Validate workflows before execution
Type Safety Type checking for workflows
Reporting Track workflow performance
Best Practices Workflow design patterns