Documentation Index Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
Use this file to discover all available pages before exploring further.
Shared State
Shared state enables data sharing and communication between workflow nodes through the SharedState class and workflow_state() function.
Core Concept
Every workflow has an associated shared state that tracks execution history, stores data, and provides access to node outputs:
from egregore.core.workflow import node, Sequence, SharedState
@node ( "extract" )
def extract ( data : dict , state : SharedState) -> dict :
"""State parameter automatically injected."""
# Store data in state
state[ "extracted_count" ] = len (data[ "items" ])
return { "items" : data[ "items" ]}
@node ( "transform" )
def transform ( data : dict , state : SharedState) -> dict :
"""Access data from previous nodes."""
count = state[ "extracted_count" ] # Access stored value
return { "transformed" : data[ "items" ], "original_count" : count}
workflow = Sequence(extract, transform)
result = workflow.run({ "items" : [ 1 , 2 , 3 ]})
SharedState Parameter
Automatic Injection
Add a SharedState parameter to any node to access state:
@node ( "processor" )
def processor ( data : dict , state : SharedState) -> dict :
"""State automatically injected by workflow system."""
# Access state
previous_count = state.get( "count" , 0 )
state[ "count" ] = previous_count + 1
return { "processed" : data, "execution" : previous_count + 1 }
Important : Only one SharedState parameter allowed per node.
State Access Methods
@node ( "state_demo" )
def state_demo ( data : dict , state : SharedState) -> dict :
"""Demonstrate state access methods."""
# Dictionary-style access
state[ "key" ] = "value"
value = state[ "key" ]
# get() with default
count = state.get( "counter" , 0 )
# Contains check
if "key" in state:
print ( "Key exists" )
# Iteration
for key, value in state:
print ( f " { key } : { value } " )
return data
workflow_state() Function
Access state from decision functions and nested code:
from egregore.core.workflow import decision, workflow_state
@decision ( "route_by_count" )
def route_by_count ( data : dict ) -> str :
"""Decision function using workflow_state()."""
# Access previous node output
count = workflow_state( "counter" )[ "count" ]
return "high_volume" if count > 100 else "low_volume"
@node ( "high_volume" )
def high_volume ( data : dict ) -> dict :
return { "route" : "high" , "data" : data}
@node ( "low_volume" )
def low_volume ( data : dict ) -> dict :
return { "route" : "low" , "data" : data}
Access Patterns
By node name :
# Get entire node output
output = workflow_state( "processor" )
# Access nested attribute
count = workflow_state( "processor" , "result.count" )
By execution index :
# Get last node output
last = workflow_state( - 1 )
# Get first node output
first = workflow_state( 0 )
# With nested attribute
value = workflow_state( - 2 , "data.items" )
Accessing Node Outputs
get_node_output()
Get output from previously executed nodes:
@node ( "validator" )
def validator ( data : dict ) -> dict :
return { "valid" : True , "data" : data}
@node ( "processor" )
def processor ( data : dict , state : SharedState) -> dict :
"""Access validator output."""
# Get output from previous node
validation = state.get_node_output( "validator" )
if validation[ "valid" ]:
return { "status" : "processed" , "data" : validation[ "data" ]}
else :
return { "status" : "skipped" }
get_node_attribute()
Access nested attributes with dot notation:
@node ( "fetcher" )
def fetcher ( data : dict ) -> dict :
return {
"response" : {
"status" : 200 ,
"data" : { "items" : [ 1 , 2 , 3 ]},
"metadata" : { "count" : 3 }
}
}
@node ( "counter" )
def counter ( data : dict , state : SharedState) -> dict :
"""Extract nested attribute."""
# Get nested value using dot notation
count = state.get_node_attribute( "fetcher" , "response.metadata.count" )
return { "item_count" : count}
State Properties
Access the workflow’s original input:
@node ( "final_processor" )
def final_processor ( data : dict , state : SharedState) -> dict :
"""Compare with initial input."""
original = state.initial_input
return {
"initial" : original,
"final" : data,
"changed" : original != data
}
final_output
Get the most recent node output:
@node ( "checker" )
def checker ( data : dict , state : SharedState) -> dict :
"""Access final output from workflow."""
# Note: final_output updates after each node
last_result = state.final_output
return { "current" : data, "previous" : last_result}
previous_output
Access the immediately previous node’s output:
@node ( "accumulator" )
def accumulator ( data : dict , state : SharedState) -> dict :
"""Build on previous output."""
prev = state.previous_output
if isinstance (prev, dict ):
# Merge with previous
return { ** prev, ** data}
else :
return data
execution_count
Track how many nodes have executed:
@node ( "progress_tracker" )
def progress_tracker ( data : dict , state : SharedState) -> dict :
"""Track execution progress."""
count = state.execution_count
return {
"data" : data,
"progress" : f " { count } nodes executed"
}
Parallel Node Outputs
get_parallel_outputs()
Access results from parallel branches:
from egregore.core.workflow import parallel
parallel_fetch = parallel(
fetch_api,
fetch_database,
fetch_cache
)
@node ( "combine_parallel" )
def combine_parallel ( data : dict , state : SharedState) -> dict :
"""Combine parallel results."""
# Access all parallel outputs
parallel_results = state.get_parallel_outputs( "parallel_fetch" )
return {
"api" : parallel_results.get( "fetch_api" ),
"db" : parallel_results.get( "fetch_database" ),
"cache" : parallel_results.get( "fetch_cache" )
}
workflow = Sequence(parallel_fetch, combine_parallel)
Dictionary-based Access
Parallel results automatically stored in state dict:
@node ( "parallel_processor" )
def parallel_processor ( data : dict , state : SharedState) -> dict :
"""Access parallel results by name."""
# Results stored with node names as keys
api_result = state[ "fetch_api" ]
db_result = state[ "fetch_database" ]
return {
"combined" : merge(api_result, db_result)
}
Execution History
get_execution_history()
Get full execution history:
@node ( "history_analyzer" )
def history_analyzer ( data : dict , state : SharedState) -> dict :
"""Analyze execution history."""
# Get all executions
history = state.get_execution_history()
# Filter by node name
validator_runs = state.get_execution_history( node_name = "validator" )
# Limit results
recent = state.get_execution_history( limit = 5 )
return {
"total_executions" : len (history),
"validator_runs" : len (validator_runs)
}
get_node_execution_count()
Count how many times a node has executed:
@node ( "loop_controller" )
def loop_controller ( data : dict , state : SharedState) -> dict :
"""Control loop execution."""
# Count executions
count = state.get_node_execution_count( "processor" )
if count > 10 :
return { "status" : "limit_reached" }
return { "status" : "continue" , "iterations" : count}
get_execution_sequence()
Get sequence of executed node names:
@node ( "flow_analyzer" )
def flow_analyzer ( data : dict , state : SharedState) -> dict :
"""Analyze execution flow."""
# Get sequence of node names
sequence = state.get_execution_sequence()
# Get last 5 nodes
recent_nodes = state.get_execution_sequence( limit = 5 )
return {
"flow" : sequence,
"path_length" : len (sequence)
}
Loop Detection
detect_execution_loop()
Detect repetitive execution patterns:
@decision ( "smart_router" )
def smart_router ( data : dict ) -> str :
"""Route with loop detection."""
# Check if selecting "processor" would create a loop
would_loop = workflow_state.detect_execution_loop( "processor" , pattern_length = 3 )
if would_loop:
return "exit_node" # Break potential loop
else :
return "processor" # Continue processing
check_node_execution_limit()
Enforce execution limits per node:
@decision ( "limit_checker" )
def limit_checker ( data : dict ) -> str :
"""Check execution limits."""
# Check if node has exceeded limit
check = workflow_state.check_node_execution_limit( "processor" , max_executions = 5 )
if check[ "allowed" ]:
return "processor"
else :
return "exit_node" # Limit reached
Generic State Store
store Property
Store arbitrary workflow data:
@node ( "data_collector" )
def data_collector ( data : dict , state : SharedState) -> dict :
"""Store generic data."""
# Use state.store for arbitrary data
state.store[ "timestamps" ] = []
state.store[ "metrics" ] = { "count" : 0 , "total" : 0 }
return data
@node ( "data_user" )
def data_user ( data : dict , state : SharedState) -> dict :
"""Access stored data."""
timestamps = state.store.get( "timestamps" , [])
metrics = state.store.get( "metrics" , {})
return { "data" : data, "metrics" : metrics}
Use Cases :
Accumulating metrics across nodes
Storing configuration data
Caching computed values
Temporary workflow-level storage
State Serialization
Save State
Persist state to disk:
@node ( "save_checkpoint" )
def save_checkpoint ( data : dict , state : SharedState) -> dict :
"""Save workflow state."""
# Save as JSON
state.save( "workflow_state.json" , format = "json" )
# Save as pickle (preserves Python objects)
state.save( "workflow_state.pkl" , format = "pickle" )
# Save without output values (smaller file)
state.save( "workflow_state_meta.json" , format = "json" , include_outputs = False )
return { "checkpoint" : "saved" }
Load State
Restore previously saved state:
from egregore.core.workflow import SharedState
# Load from file
state = SharedState.load_from_file( "workflow_state.json" , format = "json" )
# Or load into existing state
existing_state = SharedState( "workflow" )
existing_state.load( "workflow_state.json" , format = "json" )
to_dict() / from_dict()
Manual serialization:
@node ( "export_state" )
def export_state ( data : dict , state : SharedState) -> dict :
"""Export state as dictionary."""
# Get state as dict
state_dict = state.to_dict( include_outputs = True )
# Save to custom format
custom_storage.save(state_dict)
return { "exported" : True }
@node ( "import_state" )
def import_state ( data : dict , state : SharedState) -> dict :
"""Import state from dictionary."""
# Load from custom format
state_dict = custom_storage.load()
# Restore state
state.from_dict(state_dict)
return { "imported" : True }
Advanced Patterns
State-Based Routing
@decision ( "adaptive_router" )
def adaptive_router ( data : dict ) -> str :
"""Route based on execution history."""
# Get execution counts
fast_count = workflow_state.get_node_execution_count( "fast_processor" )
thorough_count = workflow_state.get_node_execution_count( "thorough_processor" )
# Balance load
if fast_count < thorough_count:
return "fast_processor"
else :
return "thorough_processor"
Accumulator Pattern
@node ( "accumulator" )
def accumulator ( item : dict , state : SharedState) -> dict :
"""Accumulate results across executions."""
# Initialize if first run
if "results" not in state.store:
state.store[ "results" ] = []
# Accumulate
state.store[ "results" ].append(item)
return {
"current" : item,
"total" : len (state.store[ "results" ])
}
Conditional Processing
@node ( "conditional_processor" )
def conditional_processor ( data : dict , state : SharedState) -> dict :
"""Process based on previous results."""
# Check if validation passed
validation = state.get_node_output( "validator" )
if validation and validation.get( "valid" ):
# Full processing
return process_thoroughly(data)
else :
# Skip processing
return { "status" : "skipped" , "reason" : "validation_failed" }
Metrics Collection
@node ( "metrics_collector" )
def metrics_collector ( data : dict , state : SharedState) -> dict :
"""Collect workflow metrics."""
# Initialize metrics
if "metrics" not in state.store:
state.store[ "metrics" ] = {
"items_processed" : 0 ,
"errors" : 0 ,
"start_time" : time.time()
}
# Update metrics
metrics = state.store[ "metrics" ]
metrics[ "items_processed" ] += 1
if data.get( "error" ):
metrics[ "errors" ] += 1
# Calculate elapsed time
elapsed = time.time() - metrics[ "start_time" ]
return {
"data" : data,
"metrics" : {
** metrics,
"elapsed" : elapsed,
"items_per_second" : metrics[ "items_processed" ] / elapsed
}
}
Best Practices
Use store for temporary data
# Good: Use store for temporary data
@node ( "processor" )
def processor ( data : dict , state : SharedState) -> dict :
state.store[ "temp_cache" ] = compute_expensive_data()
return process(data, state.store[ "temp_cache" ])
# Bad: Pollute main state dict
@node ( "processor" )
def processor ( data : dict , state : SharedState) -> dict :
state[ "temp_cache" ] = compute_expensive_data() # Mixed with node outputs
return process(data)
Check if node has executed before accessing
# Good: Check before access
@node ( "safe_processor" )
def safe_processor ( data : dict , state : SharedState) -> dict :
if "validator" in state:
validation = state[ "validator" ]
# Use validation
return process(data)
# Bad: Assume node executed
@node ( "unsafe_processor" )
def unsafe_processor ( data : dict , state : SharedState) -> dict :
validation = state[ "validator" ] # May not exist!
return process(data)
Use workflow_state() in decision functions
# Good: Use workflow_state() in decision
@decision ( "router" )
def router ( data : dict ) -> str :
count = workflow_state( "counter" )[ "count" ]
return "high" if count > 100 else "low"
# Bad: Pass state parameter to decision
@decision ( "router" )
def router ( data : dict , state : SharedState) -> str : # NOT SUPPORTED
return "high"
Initialize store data at workflow start
# Good: Initialize early
@node ( "init" )
def init ( data : dict , state : SharedState) -> dict :
state.store[ "metrics" ] = { "count" : 0 }
state.store[ "results" ] = []
return data
workflow = Sequence(init, process1, process2)
# Bad: Initialize in middle of workflow
@node ( "process2" )
def process2 ( data : dict , state : SharedState) -> dict :
if "metrics" not in state.store: # Fragile
state.store[ "metrics" ] = {}
return data
Save state at checkpoints
# Good: Checkpoint at critical points
@node ( "checkpoint" )
def checkpoint ( data : dict , state : SharedState) -> dict :
state.save( "checkpoint.json" , include_outputs = False )
return data
workflow = Sequence(
load_data,
process_batch1,
checkpoint, # Save after batch 1
process_batch2,
checkpoint, # Save after batch 2
finalize
)
# Bad: Never save state
workflow = Sequence(
load_data,
process_batch1,
process_batch2, # Lose all data if crashes
finalize
)
State Access Overhead
Dictionary access : Less than 1ms
get_node_output() : Less than 1ms (cached)
workflow_state() : Less than 1ms
Serialization : ~10ms per MB
Memory Impact
Per node output : ~100-500 bytes metadata
Execution history : ~50 bytes per entry
Store data : Depends on stored values
Optimization Tips
# Cache expensive lookups
@node ( "optimizer" )
def optimizer ( data : dict , state : SharedState) -> dict :
# Cache in store
if "cached_result" not in state.store:
state.store[ "cached_result" ] = expensive_computation()
result = state.store[ "cached_result" ]
return process(data, result)
# Limit execution history size
state.default_max_node_executions = 100 # Prevent unbounded growth
Error Handling
Missing Node Output
@node ( "safe_accessor" )
def safe_accessor ( data : dict , state : SharedState) -> dict :
"""Safely access node output."""
try :
result = state.get_node_output( "processor" )
except NodeNotFoundError:
# Node hasn't executed yet
result = None
return { "result" : result}
Invalid State Access
@node ( "defensive_access" )
def defensive_access ( data : dict , state : SharedState) -> dict :
"""Defensive state access."""
# Use get() with default
count = state.get( "counter" , 0 )
# Check existence
if "validator" in state:
validation = state[ "validator" ]
else :
validation = { "valid" : False }
return process(data, count, validation)
What’s Next?
Validation Validate workflows before execution
Type Safety Type checking for workflows
Reporting Track workflow performance
Best Practices Workflow design patterns