> ## Documentation Index
> Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Creating Nodes

> Build workflow nodes with @node, @decision, and @parallel decorators

# Creating Nodes

**Nodes** are the building blocks of workflows - callable functions that transform data and can be chained together into execution graphs.

## Node Decorator

The `@node` decorator converts functions into workflow nodes:

```python theme={null}
from egregore.core.workflow import node

@node("process_data")
def process_data(input_data: dict) -> dict:
    """Process input data."""
    return {
        "processed": True,
        "value": input_data["value"] * 2
    }

# Use it
result = process_data({"value": 5})
# Returns: {"processed": True, "value": 10}
```

### Node Names

Node names are used for:

* Result storage in shared state
* Debugging and logging
* Graph visualization

```python theme={null}
@node("extract")
def extract_data(source: str) -> dict:
    return {"data": source}

@node("transform")
def transform_data(data: dict) -> dict:
    return {"transformed": data["data"].upper()}

# Names help track execution
workflow = Sequence(extract_data, transform_data)
```

## Parameter Mapping

Nodes automatically map previous node outputs to input parameters using intelligent type-based mapping:

### Single Parameter

Single-parameter nodes receive the entire previous output:

```python theme={null}
@node("processor")
def process(input_data: dict) -> dict:
    """Receives complete previous output."""
    return {"result": input_data["value"]}

@node("analyze")
def analyze(data: dict) -> str:
    """Also receives complete output."""
    return f"Analyzed: {data}"
```

### Multiple Parameters with Tuple/List

Multiple parameters map positionally from tuple/list outputs:

```python theme={null}
@node("split")
def split_data(data: str) -> tuple:
    """Returns tuple of values."""
    parts = data.split(",")
    return (parts[0], parts[1], parts[2])

@node("combine")
def combine(first: str, second: str, third: str) -> str:
    """Maps positionally: first=parts[0], second=parts[1], third=parts[2]."""
    return f"{first}-{second}-{third}"

workflow = Sequence(split_data, combine)
result = workflow.run("A,B,C")
# Returns: "A-B-C"
```

### Multiple Parameters with Dict

Multiple parameters map by name from dict outputs:

```python theme={null}
@node("fetch_user")
def fetch_user(user_id: str) -> dict:
    """Returns dict with named fields."""
    return {
        "name": "Alice",
        "age": 30,
        "email": "alice@example.com"
    }

@node("format_profile")
def format_profile(name: str, age: int, email: str) -> str:
    """Maps by name: name="Alice", age=30, email="alice@..."."""
    return f"{name} ({age}) - {email}"

workflow = Sequence(fetch_user, format_profile)
```

### Shared State Parameter

Nodes can access shared state by adding a `SharedState` parameter:

```python theme={null}
from egregore.core.workflow import SharedState

@node("process")
def process(data: dict, state: SharedState) -> dict:
    """State parameter automatically injected."""
    # Access shared state
    counter = state.get("counter", 0)
    state["counter"] = counter + 1

    return {"processed": data, "count": counter + 1}
```

**Important**: Only one `SharedState` parameter allowed per node.

## Decision Nodes

Decision nodes route execution based on runtime conditions:

```python theme={null}
from egregore.core.workflow import decision

@decision("route_by_type")
def route_by_type(data: dict) -> str:
    """Return name of next node to execute."""
    if data["type"] == "json":
        return "process_json"
    elif data["type"] == "csv":
        return "process_csv"
    else:
        return "process_default"

@node("process_json")
def process_json(data: dict) -> dict:
    return {"format": "json", "data": data}

@node("process_csv")
def process_csv(data: dict) -> dict:
    return {"format": "csv", "data": data}

@node("process_default")
def process_default(data: dict) -> dict:
    return {"format": "default", "data": data}

# Decision node returns next node name as string
workflow = Sequence(
    route_by_type,
    # Execution continues to selected node
)
```

### Pattern Matching

Decision nodes support pattern matching for complex routing:

```python theme={null}
from egregore.core.workflow import decision, _

@decision("route")
def route(value: int) -> str:
    """Route based on value ranges."""
    return (
        _ < 0 >> "process_negative"
        | _ < 10 >> "process_small"
        | _ < 100 >> "process_medium"
        | _ >> "process_large"  # Default case
    )
```

## Parallel Nodes

Parallel nodes execute multiple operations concurrently:

```python theme={null}
from egregore.core.workflow import parallel, node

@node("fetch_api")
def fetch_api(url: str) -> dict:
    return {"source": "api", "data": "..."}

@node("fetch_db")
def fetch_db(query: str) -> dict:
    return {"source": "db", "data": "..."}

@node("fetch_cache")
def fetch_cache(key: str) -> dict:
    return {"source": "cache", "data": "..."}

# Execute all three concurrently
fetch_all = parallel(
    fetch_api,
    fetch_db,
    fetch_cache
)

# Results collected as dict with node names as keys
result = fetch_all({"url": "...", "query": "...", "key": "..."})
# Returns: {
#   "fetch_api": {"source": "api", "data": "..."},
#   "fetch_db": {"source": "db", "data": "..."},
#   "fetch_cache": {"source": "cache", "data": "..."}
# }
```

## Node Classes

### BaseNode

All nodes inherit from `BaseNode`:

```python theme={null}
from egregore.core.workflow import BaseNode

class CustomNode(BaseNode):
    def execute(self, *args, **kwargs):
        """Custom execution logic."""
        return self.process(*args, **kwargs)

    def process(self, data):
        """Your logic here."""
        return data
```

### AsyncNode

For async operations:

```python theme={null}
from egregore.core.workflow import AsyncNode

class AsyncFetchNode(AsyncNode):
    async def execute(self, url: str):
        """Async execution."""
        # Async operations
        return result
```

### BatchNode

For batch processing:

```python theme={null}
from egregore.core.workflow import BatchNode

class BatchProcessorNode(BatchNode):
    def execute_batch(self, items: list):
        """Process multiple items."""
        return [self.process(item) for item in items]
```

## Node Attributes

Nodes can have custom attributes:

```python theme={null}
@node("retry_node")
def retry_node(data: dict) -> dict:
    return process(data)

# Set custom attributes
retry_node.max_retries = 3
retry_node.fallback_value = {}
retry_node.timeout = 30

# Access in workflow
workflow = Sequence(retry_node)
```

## Node Registry

Nodes are automatically registered by name:

```python theme={null}
from egregore.core.workflow import node_registry

# Access registered nodes
@node("my_processor")
def my_processor(data):
    return data

# Available in registry
registered = node_registry["my_processor"]
```

## Type Hints

Type hints enable automatic validation and documentation:

```python theme={null}
from typing import Dict, List

@node("typed_processor")
def typed_processor(input: Dict[str, int]) -> List[str]:
    """Process with type hints."""
    return [str(v) for v in input.values()]
```

## Best Practices

<AccordionGroup>
  <Accordion title="Use descriptive node names">
    ```python theme={null}
    # Good: Clear purpose
    @node("extract_user_data")
    def extract_user_data(response: dict) -> dict:
        return response["user"]

    # Bad: Unclear purpose
    @node("process")
    def process(data: dict) -> dict:
        return data["user"]
    ```
  </Accordion>

  <Accordion title="Keep nodes focused">
    ```python theme={null}
    # Good: Single responsibility
    @node("validate_email")
    def validate_email(email: str) -> bool:
        return "@" in email and "." in email

    @node("send_email")
    def send_email(email: str, message: str) -> dict:
        return send(email, message)

    # Bad: Multiple responsibilities
    @node("handle_email")
    def handle_email(email: str, message: str) -> dict:
        if "@" not in email:
            raise ValueError("Invalid email")
        return send(email, message)
    ```
  </Accordion>

  <Accordion title="Use type hints for clarity">
    ```python theme={null}
    # Good: Clear types
    @node("parse_json")
    def parse_json(text: str) -> dict:
        return json.loads(text)

    # Bad: No type hints
    @node("parse_json")
    def parse_json(text):
        return json.loads(text)
    ```
  </Accordion>

  <Accordion title="Handle errors appropriately">
    ```python theme={null}
    # Good: Explicit error handling
    @node("safe_division")
    def safe_division(a: float, b: float) -> float:
        if b == 0:
            return 0.0  # Or raise ValueError
        return a / b

    # Bad: Unhandled errors
    @node("division")
    def division(a: float, b: float) -> float:
        return a / b  # Crashes on division by zero
    ```
  </Accordion>

  <Accordion title="Document node behavior">
    ```python theme={null}
    # Good: Clear documentation
    @node("filter_active_users")
    def filter_active_users(users: list) -> list:
        """Filter users with active status.

        Args:
            users: List of user dicts with 'status' field

        Returns:
            List of active users only
        """
        return [u for u in users if u.get("status") == "active"]

    # Bad: No documentation
    @node("filter")
    def filter(data: list) -> list:
        return [d for d in data if d.get("status") == "active"]
    ```
  </Accordion>
</AccordionGroup>

## Node Lifecycle

Understanding node execution:

```python theme={null}
from egregore.core.workflow import node, Sequence, SharedState

@node("lifecycle_example")
def lifecycle_example(data: dict, state: SharedState) -> dict:
    """Node lifecycle demonstration."""

    # 1. Receive input (automatic parameter mapping)
    # 2. Access shared state if needed
    counter = state.get("counter", 0)

    # 3. Execute logic
    result = {"processed": data["value"] * 2}

    # 4. Update state if needed
    state["counter"] = counter + 1

    # 5. Return output (automatically passed to next node)
    return result
```

## Performance Considerations

* **Function overhead**: Less than 0.1ms per node call
* **Parameter mapping**: Cached for performance (6x speedup)
* **State access**: O(1) dictionary lookups
* **Type checking**: Disabled by default in production

## Error Handling

Nodes can raise exceptions to halt workflow execution:

```python theme={null}
@node("validate")
def validate(data: dict) -> dict:
    """Validation node."""
    if "required_field" not in data:
        raise ValueError("Missing required field")
    return data

try:
    workflow = Sequence(validate, process)
    workflow.run({"invalid": "data"})
except ValueError as e:
    print(f"Validation failed: {e}")
```

## What's Next?

<CardGroup cols={2}>
  <Card title="Parallel Execution" icon="arrows-split-up-and-left" href="/features/workflows/parallel-execution">
    Execute nodes concurrently
  </Card>

  <Card title="Shared State" icon="database" href="/features/workflows/shared-state">
    Share data across nodes
  </Card>

  <Card title="Validation" icon="check-circle" href="/features/workflows/validation">
    Validate workflows before execution
  </Card>

  <Card title="Type Safety" icon="shield" href="/features/workflows/type-safety">
    Type checking for workflows
  </Card>
</CardGroup>
