Skip to main content

Creating Nodes

Nodes are the building blocks of workflows - callable functions that transform data and can be chained together into execution graphs.

Node Decorator

The @node decorator converts functions into workflow nodes:
from egregore.core.workflow import node

@node("process_data")
def process_data(input_data: dict) -> dict:
    """Process input data."""
    return {
        "processed": True,
        "value": input_data["value"] * 2
    }

# Use it
result = process_data({"value": 5})
# Returns: {"processed": True, "value": 10}

Node Names

Node names are used for:
  • Result storage in shared state
  • Debugging and logging
  • Graph visualization
@node("extract")
def extract_data(source: str) -> dict:
    return {"data": source}

@node("transform")
def transform_data(data: dict) -> dict:
    return {"transformed": data["data"].upper()}

# Names help track execution
workflow = Sequence(extract_data, transform_data)

Parameter Mapping

Nodes automatically map previous node outputs to input parameters using intelligent type-based mapping:

Single Parameter

Single-parameter nodes receive the entire previous output:
@node("processor")
def process(input_data: dict) -> dict:
    """Receives complete previous output."""
    return {"result": input_data["value"]}

@node("analyze")
def analyze(data: dict) -> str:
    """Also receives complete output."""
    return f"Analyzed: {data}"

Multiple Parameters with Tuple/List

Multiple parameters map positionally from tuple/list outputs:
@node("split")
def split_data(data: str) -> tuple:
    """Returns tuple of values."""
    parts = data.split(",")
    return (parts[0], parts[1], parts[2])

@node("combine")
def combine(first: str, second: str, third: str) -> str:
    """Maps positionally: first=parts[0], second=parts[1], third=parts[2]."""
    return f"{first}-{second}-{third}"

workflow = Sequence(split_data, combine)
result = workflow.run("A,B,C")
# Returns: "A-B-C"

Multiple Parameters with Dict

Multiple parameters map by name from dict outputs:
@node("fetch_user")
def fetch_user(user_id: str) -> dict:
    """Returns dict with named fields."""
    return {
        "name": "Alice",
        "age": 30,
        "email": "alice@example.com"
    }

@node("format_profile")
def format_profile(name: str, age: int, email: str) -> str:
    """Maps by name: name="Alice", age=30, email="alice@..."."""
    return f"{name} ({age}) - {email}"

workflow = Sequence(fetch_user, format_profile)

Shared State Parameter

Nodes can access shared state by adding a SharedState parameter:
from egregore.core.workflow import SharedState

@node("process")
def process(data: dict, state: SharedState) -> dict:
    """State parameter automatically injected."""
    # Access shared state
    counter = state.get("counter", 0)
    state["counter"] = counter + 1

    return {"processed": data, "count": counter + 1}
Important: Only one SharedState parameter allowed per node.

Decision Nodes

Decision nodes route execution based on runtime conditions:
from egregore.core.workflow import decision

@decision("route_by_type")
def route_by_type(data: dict) -> str:
    """Return name of next node to execute."""
    if data["type"] == "json":
        return "process_json"
    elif data["type"] == "csv":
        return "process_csv"
    else:
        return "process_default"

@node("process_json")
def process_json(data: dict) -> dict:
    return {"format": "json", "data": data}

@node("process_csv")
def process_csv(data: dict) -> dict:
    return {"format": "csv", "data": data}

@node("process_default")
def process_default(data: dict) -> dict:
    return {"format": "default", "data": data}

# Decision node returns next node name as string
workflow = Sequence(
    route_by_type,
    # Execution continues to selected node
)

Pattern Matching

Decision nodes support pattern matching for complex routing:
from egregore.core.workflow import decision, _

@decision("route")
def route(value: int) -> str:
    """Route based on value ranges."""
    return (
        _ < 0 >> "process_negative"
        | _ < 10 >> "process_small"
        | _ < 100 >> "process_medium"
        | _ >> "process_large"  # Default case
    )

Parallel Nodes

Parallel nodes execute multiple operations concurrently:
from egregore.core.workflow import parallel, node

@node("fetch_api")
def fetch_api(url: str) -> dict:
    return {"source": "api", "data": "..."}

@node("fetch_db")
def fetch_db(query: str) -> dict:
    return {"source": "db", "data": "..."}

@node("fetch_cache")
def fetch_cache(key: str) -> dict:
    return {"source": "cache", "data": "..."}

# Execute all three concurrently
fetch_all = parallel(
    fetch_api,
    fetch_db,
    fetch_cache
)

# Results collected as dict with node names as keys
result = fetch_all({"url": "...", "query": "...", "key": "..."})
# Returns: {
#   "fetch_api": {"source": "api", "data": "..."},
#   "fetch_db": {"source": "db", "data": "..."},
#   "fetch_cache": {"source": "cache", "data": "..."}
# }

Node Classes

BaseNode

All nodes inherit from BaseNode:
from egregore.core.workflow import BaseNode

class CustomNode(BaseNode):
    def execute(self, *args, **kwargs):
        """Custom execution logic."""
        return self.process(*args, **kwargs)

    def process(self, data):
        """Your logic here."""
        return data

AsyncNode

For async operations:
from egregore.core.workflow import AsyncNode

class AsyncFetchNode(AsyncNode):
    async def execute(self, url: str):
        """Async execution."""
        # Async operations
        return result

BatchNode

For batch processing:
from egregore.core.workflow import BatchNode

class BatchProcessorNode(BatchNode):
    def execute_batch(self, items: list):
        """Process multiple items."""
        return [self.process(item) for item in items]

Node Attributes

Nodes can have custom attributes:
@node("retry_node")
def retry_node(data: dict) -> dict:
    return process(data)

# Set custom attributes
retry_node.max_retries = 3
retry_node.fallback_value = {}
retry_node.timeout = 30

# Access in workflow
workflow = Sequence(retry_node)

Node Registry

Nodes are automatically registered by name:
from egregore.core.workflow import node_registry

# Access registered nodes
@node("my_processor")
def my_processor(data):
    return data

# Available in registry
registered = node_registry["my_processor"]

Type Hints

Type hints enable automatic validation and documentation:
from typing import Dict, List

@node("typed_processor")
def typed_processor(input: Dict[str, int]) -> List[str]:
    """Process with type hints."""
    return [str(v) for v in input.values()]

Best Practices

# Good: Clear purpose
@node("extract_user_data")
def extract_user_data(response: dict) -> dict:
    return response["user"]

# Bad: Unclear purpose
@node("process")
def process(data: dict) -> dict:
    return data["user"]
# Good: Single responsibility
@node("validate_email")
def validate_email(email: str) -> bool:
    return "@" in email and "." in email

@node("send_email")
def send_email(email: str, message: str) -> dict:
    return send(email, message)

# Bad: Multiple responsibilities
@node("handle_email")
def handle_email(email: str, message: str) -> dict:
    if "@" not in email:
        raise ValueError("Invalid email")
    return send(email, message)
# Good: Clear types
@node("parse_json")
def parse_json(text: str) -> dict:
    return json.loads(text)

# Bad: No type hints
@node("parse_json")
def parse_json(text):
    return json.loads(text)
# Good: Explicit error handling
@node("safe_division")
def safe_division(a: float, b: float) -> float:
    if b == 0:
        return 0.0  # Or raise ValueError
    return a / b

# Bad: Unhandled errors
@node("division")
def division(a: float, b: float) -> float:
    return a / b  # Crashes on division by zero
# Good: Clear documentation
@node("filter_active_users")
def filter_active_users(users: list) -> list:
    """Filter users with active status.

    Args:
        users: List of user dicts with 'status' field

    Returns:
        List of active users only
    """
    return [u for u in users if u.get("status") == "active"]

# Bad: No documentation
@node("filter")
def filter(data: list) -> list:
    return [d for d in data if d.get("status") == "active"]

Node Lifecycle

Understanding node execution:
from egregore.core.workflow import node, Sequence, SharedState

@node("lifecycle_example")
def lifecycle_example(data: dict, state: SharedState) -> dict:
    """Node lifecycle demonstration."""

    # 1. Receive input (automatic parameter mapping)
    # 2. Access shared state if needed
    counter = state.get("counter", 0)

    # 3. Execute logic
    result = {"processed": data["value"] * 2}

    # 4. Update state if needed
    state["counter"] = counter + 1

    # 5. Return output (automatically passed to next node)
    return result

Performance Considerations

  • Function overhead: Less than 0.1ms per node call
  • Parameter mapping: Cached for performance (6x speedup)
  • State access: O(1) dictionary lookups
  • Type checking: Disabled by default in production

Error Handling

Nodes can raise exceptions to halt workflow execution:
@node("validate")
def validate(data: dict) -> dict:
    """Validation node."""
    if "required_field" not in data:
        raise ValueError("Missing required field")
    return data

try:
    workflow = Sequence(validate, process)
    workflow.run({"invalid": "data"})
except ValueError as e:
    print(f"Validation failed: {e}")

What’s Next?