Skip to main content

Shared State

Shared state enables data sharing and communication between workflow nodes through the SharedState class and workflow_state() function.

Core Concept

Every workflow has an associated shared state that tracks execution history, stores data, and provides access to node outputs:
from egregore.core.workflow import node, Sequence, SharedState

@node("extract")
def extract(data: dict, state: SharedState) -> dict:
    """State parameter automatically injected."""
    # Store data in state
    state["extracted_count"] = len(data["items"])

    return {"items": data["items"]}

@node("transform")
def transform(data: dict, state: SharedState) -> dict:
    """Access data from previous nodes."""
    count = state["extracted_count"]  # Access stored value

    return {"transformed": data["items"], "original_count": count}

workflow = Sequence(extract, transform)
result = workflow.run({"items": [1, 2, 3]})

SharedState Parameter

Automatic Injection

Add a SharedState parameter to any node to access state:
@node("processor")
def processor(data: dict, state: SharedState) -> dict:
    """State automatically injected by workflow system."""
    # Access state
    previous_count = state.get("count", 0)
    state["count"] = previous_count + 1

    return {"processed": data, "execution": previous_count + 1}
Important: Only one SharedState parameter allowed per node.

State Access Methods

@node("state_demo")
def state_demo(data: dict, state: SharedState) -> dict:
    """Demonstrate state access methods."""

    # Dictionary-style access
    state["key"] = "value"
    value = state["key"]

    # get() with default
    count = state.get("counter", 0)

    # Contains check
    if "key" in state:
        print("Key exists")

    # Iteration
    for key, value in state:
        print(f"{key}: {value}")

    return data

workflow_state() Function

Access state from decision functions and nested code:
from egregore.core.workflow import decision, workflow_state

@decision("route_by_count")
def route_by_count(data: dict) -> str:
    """Decision function using workflow_state()."""
    # Access previous node output
    count = workflow_state("counter")["count"]

    return "high_volume" if count > 100 else "low_volume"

@node("high_volume")
def high_volume(data: dict) -> dict:
    return {"route": "high", "data": data}

@node("low_volume")
def low_volume(data: dict) -> dict:
    return {"route": "low", "data": data}

Access Patterns

By node name:
# Get entire node output
output = workflow_state("processor")

# Access nested attribute
count = workflow_state("processor", "result.count")
By execution index:
# Get last node output
last = workflow_state(-1)

# Get first node output
first = workflow_state(0)

# With nested attribute
value = workflow_state(-2, "data.items")

Accessing Node Outputs

get_node_output()

Get output from previously executed nodes:
@node("validator")
def validator(data: dict) -> dict:
    return {"valid": True, "data": data}

@node("processor")
def processor(data: dict, state: SharedState) -> dict:
    """Access validator output."""
    # Get output from previous node
    validation = state.get_node_output("validator")

    if validation["valid"]:
        return {"status": "processed", "data": validation["data"]}
    else:
        return {"status": "skipped"}

get_node_attribute()

Access nested attributes with dot notation:
@node("fetcher")
def fetcher(data: dict) -> dict:
    return {
        "response": {
            "status": 200,
            "data": {"items": [1, 2, 3]},
            "metadata": {"count": 3}
        }
    }

@node("counter")
def counter(data: dict, state: SharedState) -> dict:
    """Extract nested attribute."""
    # Get nested value using dot notation
    count = state.get_node_attribute("fetcher", "response.metadata.count")

    return {"item_count": count}

State Properties

initial_input

Access the workflow’s original input:
@node("final_processor")
def final_processor(data: dict, state: SharedState) -> dict:
    """Compare with initial input."""
    original = state.initial_input

    return {
        "initial": original,
        "final": data,
        "changed": original != data
    }

final_output

Get the most recent node output:
@node("checker")
def checker(data: dict, state: SharedState) -> dict:
    """Access final output from workflow."""
    # Note: final_output updates after each node
    last_result = state.final_output

    return {"current": data, "previous": last_result}

previous_output

Access the immediately previous node’s output:
@node("accumulator")
def accumulator(data: dict, state: SharedState) -> dict:
    """Build on previous output."""
    prev = state.previous_output

    if isinstance(prev, dict):
        # Merge with previous
        return {**prev, **data}
    else:
        return data

execution_count

Track how many nodes have executed:
@node("progress_tracker")
def progress_tracker(data: dict, state: SharedState) -> dict:
    """Track execution progress."""
    count = state.execution_count

    return {
        "data": data,
        "progress": f"{count} nodes executed"
    }

Parallel Node Outputs

get_parallel_outputs()

Access results from parallel branches:
from egregore.core.workflow import parallel

parallel_fetch = parallel(
    fetch_api,
    fetch_database,
    fetch_cache
)

@node("combine_parallel")
def combine_parallel(data: dict, state: SharedState) -> dict:
    """Combine parallel results."""
    # Access all parallel outputs
    parallel_results = state.get_parallel_outputs("parallel_fetch")

    return {
        "api": parallel_results.get("fetch_api"),
        "db": parallel_results.get("fetch_database"),
        "cache": parallel_results.get("fetch_cache")
    }

workflow = Sequence(parallel_fetch, combine_parallel)

Dictionary-based Access

Parallel results automatically stored in state dict:
@node("parallel_processor")
def parallel_processor(data: dict, state: SharedState) -> dict:
    """Access parallel results by name."""
    # Results stored with node names as keys
    api_result = state["fetch_api"]
    db_result = state["fetch_database"]

    return {
        "combined": merge(api_result, db_result)
    }

Execution History

get_execution_history()

Get full execution history:
@node("history_analyzer")
def history_analyzer(data: dict, state: SharedState) -> dict:
    """Analyze execution history."""
    # Get all executions
    history = state.get_execution_history()

    # Filter by node name
    validator_runs = state.get_execution_history(node_name="validator")

    # Limit results
    recent = state.get_execution_history(limit=5)

    return {
        "total_executions": len(history),
        "validator_runs": len(validator_runs)
    }

get_node_execution_count()

Count how many times a node has executed:
@node("loop_controller")
def loop_controller(data: dict, state: SharedState) -> dict:
    """Control loop execution."""
    # Count executions
    count = state.get_node_execution_count("processor")

    if count > 10:
        return {"status": "limit_reached"}

    return {"status": "continue", "iterations": count}

get_execution_sequence()

Get sequence of executed node names:
@node("flow_analyzer")
def flow_analyzer(data: dict, state: SharedState) -> dict:
    """Analyze execution flow."""
    # Get sequence of node names
    sequence = state.get_execution_sequence()

    # Get last 5 nodes
    recent_nodes = state.get_execution_sequence(limit=5)

    return {
        "flow": sequence,
        "path_length": len(sequence)
    }

Loop Detection

detect_execution_loop()

Detect repetitive execution patterns:
@decision("smart_router")
def smart_router(data: dict) -> str:
    """Route with loop detection."""
    # Check if selecting "processor" would create a loop
    would_loop = workflow_state.detect_execution_loop("processor", pattern_length=3)

    if would_loop:
        return "exit_node"  # Break potential loop
    else:
        return "processor"  # Continue processing

check_node_execution_limit()

Enforce execution limits per node:
@decision("limit_checker")
def limit_checker(data: dict) -> str:
    """Check execution limits."""
    # Check if node has exceeded limit
    check = workflow_state.check_node_execution_limit("processor", max_executions=5)

    if check["allowed"]:
        return "processor"
    else:
        return "exit_node"  # Limit reached

Generic State Store

store Property

Store arbitrary workflow data:
@node("data_collector")
def data_collector(data: dict, state: SharedState) -> dict:
    """Store generic data."""
    # Use state.store for arbitrary data
    state.store["timestamps"] = []
    state.store["metrics"] = {"count": 0, "total": 0}

    return data

@node("data_user")
def data_user(data: dict, state: SharedState) -> dict:
    """Access stored data."""
    timestamps = state.store.get("timestamps", [])
    metrics = state.store.get("metrics", {})

    return {"data": data, "metrics": metrics}
Use Cases:
  • Accumulating metrics across nodes
  • Storing configuration data
  • Caching computed values
  • Temporary workflow-level storage

State Serialization

Save State

Persist state to disk:
@node("save_checkpoint")
def save_checkpoint(data: dict, state: SharedState) -> dict:
    """Save workflow state."""
    # Save as JSON
    state.save("workflow_state.json", format="json")

    # Save as pickle (preserves Python objects)
    state.save("workflow_state.pkl", format="pickle")

    # Save without output values (smaller file)
    state.save("workflow_state_meta.json", format="json", include_outputs=False)

    return {"checkpoint": "saved"}

Load State

Restore previously saved state:
from egregore.core.workflow import SharedState

# Load from file
state = SharedState.load_from_file("workflow_state.json", format="json")

# Or load into existing state
existing_state = SharedState("workflow")
existing_state.load("workflow_state.json", format="json")

to_dict() / from_dict()

Manual serialization:
@node("export_state")
def export_state(data: dict, state: SharedState) -> dict:
    """Export state as dictionary."""
    # Get state as dict
    state_dict = state.to_dict(include_outputs=True)

    # Save to custom format
    custom_storage.save(state_dict)

    return {"exported": True}

@node("import_state")
def import_state(data: dict, state: SharedState) -> dict:
    """Import state from dictionary."""
    # Load from custom format
    state_dict = custom_storage.load()

    # Restore state
    state.from_dict(state_dict)

    return {"imported": True}

Advanced Patterns

State-Based Routing

@decision("adaptive_router")
def adaptive_router(data: dict) -> str:
    """Route based on execution history."""
    # Get execution counts
    fast_count = workflow_state.get_node_execution_count("fast_processor")
    thorough_count = workflow_state.get_node_execution_count("thorough_processor")

    # Balance load
    if fast_count < thorough_count:
        return "fast_processor"
    else:
        return "thorough_processor"

Accumulator Pattern

@node("accumulator")
def accumulator(item: dict, state: SharedState) -> dict:
    """Accumulate results across executions."""
    # Initialize if first run
    if "results" not in state.store:
        state.store["results"] = []

    # Accumulate
    state.store["results"].append(item)

    return {
        "current": item,
        "total": len(state.store["results"])
    }

Conditional Processing

@node("conditional_processor")
def conditional_processor(data: dict, state: SharedState) -> dict:
    """Process based on previous results."""
    # Check if validation passed
    validation = state.get_node_output("validator")

    if validation and validation.get("valid"):
        # Full processing
        return process_thoroughly(data)
    else:
        # Skip processing
        return {"status": "skipped", "reason": "validation_failed"}

Metrics Collection

@node("metrics_collector")
def metrics_collector(data: dict, state: SharedState) -> dict:
    """Collect workflow metrics."""
    # Initialize metrics
    if "metrics" not in state.store:
        state.store["metrics"] = {
            "items_processed": 0,
            "errors": 0,
            "start_time": time.time()
        }

    # Update metrics
    metrics = state.store["metrics"]
    metrics["items_processed"] += 1

    if data.get("error"):
        metrics["errors"] += 1

    # Calculate elapsed time
    elapsed = time.time() - metrics["start_time"]

    return {
        "data": data,
        "metrics": {
            **metrics,
            "elapsed": elapsed,
            "items_per_second": metrics["items_processed"] / elapsed
        }
    }

Best Practices

# Good: Use store for temporary data
@node("processor")
def processor(data: dict, state: SharedState) -> dict:
    state.store["temp_cache"] = compute_expensive_data()
    return process(data, state.store["temp_cache"])

# Bad: Pollute main state dict
@node("processor")
def processor(data: dict, state: SharedState) -> dict:
    state["temp_cache"] = compute_expensive_data()  # Mixed with node outputs
    return process(data)
# Good: Check before access
@node("safe_processor")
def safe_processor(data: dict, state: SharedState) -> dict:
    if "validator" in state:
        validation = state["validator"]
        # Use validation
    return process(data)

# Bad: Assume node executed
@node("unsafe_processor")
def unsafe_processor(data: dict, state: SharedState) -> dict:
    validation = state["validator"]  # May not exist!
    return process(data)
# Good: Use workflow_state() in decision
@decision("router")
def router(data: dict) -> str:
    count = workflow_state("counter")["count"]
    return "high" if count > 100 else "low"

# Bad: Pass state parameter to decision
@decision("router")
def router(data: dict, state: SharedState) -> str:  # NOT SUPPORTED
    return "high"
# Good: Initialize early
@node("init")
def init(data: dict, state: SharedState) -> dict:
    state.store["metrics"] = {"count": 0}
    state.store["results"] = []
    return data

workflow = Sequence(init, process1, process2)

# Bad: Initialize in middle of workflow
@node("process2")
def process2(data: dict, state: SharedState) -> dict:
    if "metrics" not in state.store:  # Fragile
        state.store["metrics"] = {}
    return data
# Good: Checkpoint at critical points
@node("checkpoint")
def checkpoint(data: dict, state: SharedState) -> dict:
    state.save("checkpoint.json", include_outputs=False)
    return data

workflow = Sequence(
    load_data,
    process_batch1,
    checkpoint,  # Save after batch 1
    process_batch2,
    checkpoint,  # Save after batch 2
    finalize
)

# Bad: Never save state
workflow = Sequence(
    load_data,
    process_batch1,
    process_batch2,  # Lose all data if crashes
    finalize
)

Performance Considerations

State Access Overhead

  • Dictionary access: Less than 1ms
  • get_node_output(): Less than 1ms (cached)
  • workflow_state(): Less than 1ms
  • Serialization: ~10ms per MB

Memory Impact

  • Per node output: ~100-500 bytes metadata
  • Execution history: ~50 bytes per entry
  • Store data: Depends on stored values

Optimization Tips

# Cache expensive lookups
@node("optimizer")
def optimizer(data: dict, state: SharedState) -> dict:
    # Cache in store
    if "cached_result" not in state.store:
        state.store["cached_result"] = expensive_computation()

    result = state.store["cached_result"]
    return process(data, result)

# Limit execution history size
state.default_max_node_executions = 100  # Prevent unbounded growth

Error Handling

Missing Node Output

@node("safe_accessor")
def safe_accessor(data: dict, state: SharedState) -> dict:
    """Safely access node output."""
    try:
        result = state.get_node_output("processor")
    except NodeNotFoundError:
        # Node hasn't executed yet
        result = None

    return {"result": result}

Invalid State Access

@node("defensive_access")
def defensive_access(data: dict, state: SharedState) -> dict:
    """Defensive state access."""
    # Use get() with default
    count = state.get("counter", 0)

    # Check existence
    if "validator" in state:
        validation = state["validator"]
    else:
        validation = {"valid": False}

    return process(data, count, validation)

What’s Next?