Documentation Index Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
Use this file to discover all available pages before exploring further.
Creating Nodes
Nodes are the building blocks of workflows - callable functions that transform data and can be chained together into execution graphs.
Node Decorator
The @node decorator converts functions into workflow nodes:
from egregore.core.workflow import node
@node ( "process_data" )
def process_data ( input_data : dict ) -> dict :
"""Process input data."""
return {
"processed" : True ,
"value" : input_data[ "value" ] * 2
}
# Use it
result = process_data({ "value" : 5 })
# Returns: {"processed": True, "value": 10}
Node Names
Node names are used for:
Result storage in shared state
Debugging and logging
Graph visualization
@node ( "extract" )
def extract_data ( source : str ) -> dict :
return { "data" : source}
@node ( "transform" )
def transform_data ( data : dict ) -> dict :
return { "transformed" : data[ "data" ].upper()}
# Names help track execution
workflow = Sequence(extract_data, transform_data)
Parameter Mapping
Nodes automatically map previous node outputs to input parameters using intelligent type-based mapping:
Single Parameter
Single-parameter nodes receive the entire previous output:
@node ( "processor" )
def process ( input_data : dict ) -> dict :
"""Receives complete previous output."""
return { "result" : input_data[ "value" ]}
@node ( "analyze" )
def analyze ( data : dict ) -> str :
"""Also receives complete output."""
return f "Analyzed: { data } "
Multiple Parameters with Tuple/List
Multiple parameters map positionally from tuple/list outputs:
@node ( "split" )
def split_data ( data : str ) -> tuple :
"""Returns tuple of values."""
parts = data.split( "," )
return (parts[ 0 ], parts[ 1 ], parts[ 2 ])
@node ( "combine" )
def combine ( first : str , second : str , third : str ) -> str :
"""Maps positionally: first=parts[0], second=parts[1], third=parts[2]."""
return f " { first } - { second } - { third } "
workflow = Sequence(split_data, combine)
result = workflow.run( "A,B,C" )
# Returns: "A-B-C"
Multiple Parameters with Dict
Multiple parameters map by name from dict outputs:
@node ( "fetch_user" )
def fetch_user ( user_id : str ) -> dict :
"""Returns dict with named fields."""
return {
"name" : "Alice" ,
"age" : 30 ,
"email" : "alice@example.com"
}
@node ( "format_profile" )
def format_profile ( name : str , age : int , email : str ) -> str :
"""Maps by name: name="Alice", age=30, email="alice@..."."""
return f " { name } ( { age } ) - { email } "
workflow = Sequence(fetch_user, format_profile)
Shared State Parameter
Nodes can access shared state by adding a SharedState parameter:
from egregore.core.workflow import SharedState
@node ( "process" )
def process ( data : dict , state : SharedState) -> dict :
"""State parameter automatically injected."""
# Access shared state
counter = state.get( "counter" , 0 )
state[ "counter" ] = counter + 1
return { "processed" : data, "count" : counter + 1 }
Important : Only one SharedState parameter allowed per node.
Decision Nodes
Decision nodes route execution based on runtime conditions:
from egregore.core.workflow import decision
@decision ( "route_by_type" )
def route_by_type ( data : dict ) -> str :
"""Return name of next node to execute."""
if data[ "type" ] == "json" :
return "process_json"
elif data[ "type" ] == "csv" :
return "process_csv"
else :
return "process_default"
@node ( "process_json" )
def process_json ( data : dict ) -> dict :
return { "format" : "json" , "data" : data}
@node ( "process_csv" )
def process_csv ( data : dict ) -> dict :
return { "format" : "csv" , "data" : data}
@node ( "process_default" )
def process_default ( data : dict ) -> dict :
return { "format" : "default" , "data" : data}
# Decision node returns next node name as string
workflow = Sequence(
route_by_type,
# Execution continues to selected node
)
Pattern Matching
Decision nodes support pattern matching for complex routing:
from egregore.core.workflow import decision, _
@decision ( "route" )
def route ( value : int ) -> str :
"""Route based on value ranges."""
return (
_ < 0 >> "process_negative"
| _ < 10 >> "process_small"
| _ < 100 >> "process_medium"
| _ >> "process_large" # Default case
)
Parallel Nodes
Parallel nodes execute multiple operations concurrently:
from egregore.core.workflow import parallel, node
@node ( "fetch_api" )
def fetch_api ( url : str ) -> dict :
return { "source" : "api" , "data" : "..." }
@node ( "fetch_db" )
def fetch_db ( query : str ) -> dict :
return { "source" : "db" , "data" : "..." }
@node ( "fetch_cache" )
def fetch_cache ( key : str ) -> dict :
return { "source" : "cache" , "data" : "..." }
# Execute all three concurrently
fetch_all = parallel(
fetch_api,
fetch_db,
fetch_cache
)
# Results collected as dict with node names as keys
result = fetch_all({ "url" : "..." , "query" : "..." , "key" : "..." })
# Returns: {
# "fetch_api": {"source": "api", "data": "..."},
# "fetch_db": {"source": "db", "data": "..."},
# "fetch_cache": {"source": "cache", "data": "..."}
# }
Node Classes
BaseNode
All nodes inherit from BaseNode:
from egregore.core.workflow import BaseNode
class CustomNode ( BaseNode ):
def execute ( self , * args , ** kwargs ):
"""Custom execution logic."""
return self .process( * args, ** kwargs)
def process ( self , data ):
"""Your logic here."""
return data
AsyncNode
For async operations:
from egregore.core.workflow import AsyncNode
class AsyncFetchNode ( AsyncNode ):
async def execute ( self , url : str ):
"""Async execution."""
# Async operations
return result
BatchNode
For batch processing:
from egregore.core.workflow import BatchNode
class BatchProcessorNode ( BatchNode ):
def execute_batch ( self , items : list ):
"""Process multiple items."""
return [ self .process(item) for item in items]
Node Attributes
Nodes can have custom attributes:
@node ( "retry_node" )
def retry_node ( data : dict ) -> dict :
return process(data)
# Set custom attributes
retry_node.max_retries = 3
retry_node.fallback_value = {}
retry_node.timeout = 30
# Access in workflow
workflow = Sequence(retry_node)
Node Registry
Nodes are automatically registered by name:
from egregore.core.workflow import node_registry
# Access registered nodes
@node ( "my_processor" )
def my_processor ( data ):
return data
# Available in registry
registered = node_registry[ "my_processor" ]
Type Hints
Type hints enable automatic validation and documentation:
from typing import Dict, List
@node ( "typed_processor" )
def typed_processor ( input : Dict[ str , int ]) -> List[ str ]:
"""Process with type hints."""
return [ str (v) for v in input .values()]
Best Practices
Use descriptive node names
# Good: Clear purpose
@node ( "extract_user_data" )
def extract_user_data ( response : dict ) -> dict :
return response[ "user" ]
# Bad: Unclear purpose
@node ( "process" )
def process ( data : dict ) -> dict :
return data[ "user" ]
# Good: Single responsibility
@node ( "validate_email" )
def validate_email ( email : str ) -> bool :
return "@" in email and "." in email
@node ( "send_email" )
def send_email ( email : str , message : str ) -> dict :
return send(email, message)
# Bad: Multiple responsibilities
@node ( "handle_email" )
def handle_email ( email : str , message : str ) -> dict :
if "@" not in email:
raise ValueError ( "Invalid email" )
return send(email, message)
Use type hints for clarity
# Good: Clear types
@node ( "parse_json" )
def parse_json ( text : str ) -> dict :
return json.loads(text)
# Bad: No type hints
@node ( "parse_json" )
def parse_json ( text ):
return json.loads(text)
Handle errors appropriately
# Good: Explicit error handling
@node ( "safe_division" )
def safe_division ( a : float , b : float ) -> float :
if b == 0 :
return 0.0 # Or raise ValueError
return a / b
# Bad: Unhandled errors
@node ( "division" )
def division ( a : float , b : float ) -> float :
return a / b # Crashes on division by zero
# Good: Clear documentation
@node ( "filter_active_users" )
def filter_active_users ( users : list ) -> list :
"""Filter users with active status.
Args:
users: List of user dicts with 'status' field
Returns:
List of active users only
"""
return [u for u in users if u.get( "status" ) == "active" ]
# Bad: No documentation
@node ( "filter" )
def filter ( data : list ) -> list :
return [d for d in data if d.get( "status" ) == "active" ]
Node Lifecycle
Understanding node execution:
from egregore.core.workflow import node, Sequence, SharedState
@node ( "lifecycle_example" )
def lifecycle_example ( data : dict , state : SharedState) -> dict :
"""Node lifecycle demonstration."""
# 1. Receive input (automatic parameter mapping)
# 2. Access shared state if needed
counter = state.get( "counter" , 0 )
# 3. Execute logic
result = { "processed" : data[ "value" ] * 2 }
# 4. Update state if needed
state[ "counter" ] = counter + 1
# 5. Return output (automatically passed to next node)
return result
Function overhead : Less than 0.1ms per node call
Parameter mapping : Cached for performance (6x speedup)
State access : O(1) dictionary lookups
Type checking : Disabled by default in production
Error Handling
Nodes can raise exceptions to halt workflow execution:
@node ( "validate" )
def validate ( data : dict ) -> dict :
"""Validation node."""
if "required_field" not in data:
raise ValueError ( "Missing required field" )
return data
try :
workflow = Sequence(validate, process)
workflow.run({ "invalid" : "data" })
except ValueError as e:
print ( f "Validation failed: { e } " )
What’s Next?
Parallel Execution Execute nodes concurrently
Shared State Share data across nodes
Validation Validate workflows before execution
Type Safety Type checking for workflows