> ## Documentation Index
> Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Subscribe API

> Imperative hook binding with agent.on(), agent.subscribe(), and context managers

# Subscribe API

The **Subscribe API** (codename: Synapse) provides a lightweight, imperative way to bind and unbind agent hooks without decorators.

## Overview

While decorators (`@agent.hooks.*`) work great for permanent hooks, the Subscribe API is designed for:

* **Dynamic hook registration**: Register/unregister hooks at runtime
* **Temporary subscriptions**: Use context managers for scoped hooks
* **Conditional monitoring**: Enable hooks only when needed
* **Bulk operations**: Register multiple hooks as a group

## Core Methods

### agent.on()

Register a single hook by friendly name.

```python theme={null}
sub_id = agent.on(name: str, fn: Callable) -> str
```

**Parameters:**

* `name`: Friendly hook name (e.g., `"stream:chunk"`, `"tool:pre_exec"`)
* `fn`: Callback function (sync or async)

**Returns:**

* Subscription ID string for later unsubscription

**Example:**

```python theme={null}
from egregore import Agent

agent = Agent(provider="openai:gpt-4")

# Register streaming hook
def on_chunk(ctx):
    print(ctx.delta, end="", flush=True)

sub_id = agent.on("stream:chunk", on_chunk)

# Use the agent - hook fires automatically
await agent.acall("Write a poem about AI")

# Unsubscribe when done
agent.unsubscribe(sub_id)
```

### agent.subscribe()

Register multiple hooks at once.

```python theme={null}
group_id = agent.subscribe(mapping: dict[str, Callable]) -> str
```

**Parameters:**

* `mapping`: Dictionary of `{friendly_name: callback_function}`

**Returns:**

* Group subscription ID for unsubscribing all at once

**Example:**

```python theme={null}
# Register multiple tool hooks
def on_tool_start(ctx):
    print(f"[START] {ctx.tool_name}")

def on_tool_end(ctx):
    print(f"[DONE] {ctx.tool_name}: {ctx.tool_result}")

def on_tool_error(ctx):
    print(f"[ERROR] {ctx.tool_name}: {ctx.error}")

# Subscribe as group
group_id = agent.subscribe({
    "tool:pre_call": on_tool_start,
    "tool:post_call": on_tool_end,
    "tool:on_error": on_tool_error,
})

agent.call("Use the calculator and file reader")

# Unsubscribe all at once
agent.unsubscribe(group_id)
```

### agent.unsubscribe()

Unregister a subscription (single or group).

```python theme={null}
agent.unsubscribe(sub_id: str) -> None
```

**Parameters:**

* `sub_id`: Subscription ID from `on()` or `subscribe()`

**Important:**

* **Idempotent**: Safe to call multiple times
* **No errors**: Never raises exception for non-existent IDs
* **Group aware**: Unsubscribes all hooks in a group

**Example:**

```python theme={null}
sub_id = agent.on("stream:chunk", on_chunk)
agent.unsubscribe(sub_id)
agent.unsubscribe(sub_id)  # Safe - no error
```

### agent.subscription()

Create a context manager for temporary subscriptions.

```python theme={null}
agent.subscription(
    name_or_mapping: str | dict[str, Callable],
    fn: Callable = None
) -> SubscriptionContext
```

**Parameters:**

* `name_or_mapping`: Either a friendly name (str) or a mapping dict
* `fn`: Callback function (required if name\_or\_mapping is str)

**Returns:**

* Context manager that auto-unsubscribes on exit

**Supports both sync and async usage.**

**Example (single hook):**

```python theme={null}
# Temporary streaming hook
async with agent.subscription("stream:chunk", on_chunk):
    await agent.acall("Stream this response")
# Auto-unsubscribed after block
```

**Example (multiple hooks):**

```python theme={null}
# Temporary monitoring
with agent.subscription({
    "tool:pre_exec": on_start,
    "tool:post_exec": on_end,
    "context:after_change": on_context_change,
}):
    agent.call("Do some work with policies")
# All hooks auto-unsubscribed
```

**Early unsubscription:**

```python theme={null}
async with agent.subscription("stream:chunk", on_chunk) as ctx:
    await agent.acall("First call")
    ctx.unsubscribe()  # Manually unsubscribe early
    await agent.acall("Second call - no hook")
```

## Available Hook Names

All hook names follow the format `category:event`.

### Streaming Hooks

```python theme={null}
"stream:chunk"          # Every chunk during streaming
"stream:tool_detect"    # Tool call detected in stream
"stream:content"        # Content chunk received
"stream:tool_start"     # Tool call start chunk
"stream:tool_delta"     # Tool argument delta
"stream:tool_complete"  # Tool call complete
"stream:tool_result"    # Tool result received
```

### Tool Hooks

```python theme={null}
"tool:pre_exec"     # Before tool execution starts
"tool:post_exec"    # After tool execution completes
"tool:pre_call"     # Before individual tool call
"tool:post_call"    # After individual tool call
"tool:on_error"     # Tool execution error
"tool:intercept"    # Intercept tool results (dual-phase)
```

### Context Hooks

```python theme={null}
"context:before_change"  # Before any context operation
"context:after_change"   # After any context operation
"context:on_add"         # Component added
"context:on_dispatch"    # Component dispatched
"context:on_update"      # Component updated
```

### Message Hooks

```python theme={null}
"message:on_user"      # User message received
"message:on_provider"  # Provider response received
"message:on_error"     # Message processing error
```

### Scaffold Hooks

```python theme={null}
"scaffold:op_complete"    # Scaffold operation completed
"scaffold:state_change"   # Scaffold state changed
```

## Usage Patterns

### Pattern 1: Dynamic Monitoring

Enable/disable logging based on conditions:

```python theme={null}
agent = Agent(provider="openai:gpt-4")

# Enable detailed logging for specific task
if task.requires_audit:
    sub_id = agent.subscribe({
        "tool:pre_call": log_tool_start,
        "tool:post_call": log_tool_end,
        "context:after_change": log_context_ops,
    })

result = agent.call(task.prompt)

# Disable logging after task
if task.requires_audit:
    agent.unsubscribe(sub_id)
```

### Pattern 2: Temporary Streaming

Stream responses for interactive sessions only:

```python theme={null}
async def interactive_chat():
    async with agent.subscription("stream:chunk", print_chunk):
        while True:
            user_input = await get_user_input()
            if user_input == "exit":
                break
            await agent.acall(user_input)
    # Streaming auto-disabled after session
```

### Pattern 3: Testing and Debugging

Capture hook data for assertions:

```python theme={null}
def test_tool_execution():
    tool_calls = []

    def capture_tool(ctx):
        tool_calls.append(ctx.tool_name)

    with agent.subscription("tool:post_call", capture_tool):
        agent.call("Use calculator to add 5 + 3")

    assert "calculator" in tool_calls
```

### Pattern 4: Rate Limiting

Implement custom rate limiting:

```python theme={null}
from collections import deque
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, max_calls: int, window: timedelta):
        self.max_calls = max_calls
        self.window = window
        self.calls = deque()

    def check(self, ctx):
        now = datetime.now()

        # Remove old calls outside window
        while self.calls and self.calls[0] < now - self.window:
            self.calls.popleft()

        # Check limit
        if len(self.calls) >= self.max_calls:
            raise RuntimeError(f"Rate limit exceeded: {self.max_calls} calls per {self.window}")

        self.calls.append(now)

# Apply rate limiting
limiter = RateLimiter(max_calls=10, window=timedelta(minutes=1))
agent.on("tool:pre_call", limiter.check)
```

### Pattern 5: Progressive Enhancement

Add hooks incrementally:

```python theme={null}
agent = Agent(provider="openai:gpt-4")

# Basic usage - no hooks
agent.call("Simple query")

# Add streaming for next call
sub1 = agent.on("stream:chunk", stream_handler)
agent.call("Stream this response")

# Add tool monitoring
sub2 = agent.subscribe({
    "tool:pre_call": tool_logger,
    "tool:on_error": error_handler,
})
agent.call("Complex task with tools")

# Remove all hooks
agent.unsubscribe(sub1)
agent.unsubscribe(sub2)
agent.call("Back to basic")
```

## Integration with Decorator Hooks

Subscribe API and decorator hooks work together seamlessly:

```python theme={null}
agent = Agent(provider="openai:gpt-4")

# Permanent hook via decorator
@agent.hooks.tool.pre_call
def permanent_logger(ctx):
    print(f"[PERMANENT] {ctx.tool_name}")

# Temporary hook via subscribe API
with agent.subscription("tool:post_call", lambda ctx: print(f"[TEMP] Done: {ctx.tool_name}")):
    agent.call("Execute tools")
    # Both hooks fire together

# After context - only permanent hook remains
agent.call("Execute more tools")
```

**Execution order:**

1. Permanent hooks fire first (in registration order)
2. Subscribe API hooks fire second (in registration order)

## Error Handling

### Unknown Hook Names

Helpful suggestions when hook name is misspelled:

```python theme={null}
try:
    agent.on("stream:chonk", handler)  # Typo
except ValueError as e:
    print(e)
    # Unknown hook name: 'stream:chonk'
    # Did you mean one of these?
    #   - stream:chunk
    #   - stream:content
```

### Non-Callable Functions

```python theme={null}
try:
    agent.on("stream:chunk", "not_a_function")
except TypeError as e:
    print(e)
    # Hook function must be callable, got str
```

### Safe Unsubscription

```python theme={null}
# Always safe - never raises errors
agent.unsubscribe("non_existent_id")
agent.unsubscribe("sub_12345")  # Already unsubscribed
```

## Best Practices

<AccordionGroup>
  <Accordion title="Use context managers for temporary hooks">
    ```python theme={null}
    # Good: Auto-cleanup
    async with agent.subscription("stream:chunk", handler):
        await agent.acall("Stream this")

    # Bad: Manual cleanup required
    sub_id = agent.on("stream:chunk", handler)
    await agent.acall("Stream this")
    agent.unsubscribe(sub_id)  # Easy to forget
    ```
  </Accordion>

  <Accordion title="Group related hooks with subscribe()">
    ```python theme={null}
    # Good: Manage as unit
    group_id = agent.subscribe({
        "tool:pre_call": on_start,
        "tool:post_call": on_end,
        "tool:on_error": on_error,
    })
    agent.unsubscribe(group_id)

    # Bad: Individual management
    sub1 = agent.on("tool:pre_call", on_start)
    sub2 = agent.on("tool:post_call", on_end)
    sub3 = agent.on("tool:on_error", on_error)
    agent.unsubscribe(sub1)  # Easy to miss sub2, sub3
    ```
  </Accordion>

  <Accordion title="Store subscription IDs for later cleanup">
    ```python theme={null}
    class MonitoredAgent:
        def __init__(self, agent):
            self.agent = agent
            self.subscriptions = []

        def enable_monitoring(self):
            sub_id = self.agent.subscribe({
                "tool:pre_call": self.log_tool,
                "context:after_change": self.log_context,
            })
            self.subscriptions.append(sub_id)

        def disable_monitoring(self):
            for sub_id in self.subscriptions:
                self.agent.unsubscribe(sub_id)
            self.subscriptions.clear()
    ```
  </Accordion>

  <Accordion title="Use decorators for permanent hooks">
    ```python theme={null}
    # Good: Permanent application-wide logging
    @agent.hooks.tool.pre_call
    def log_all_tools(ctx):
        logger.info(f"Tool: {ctx.tool_name}")

    # Bad: Subscribe API for permanent hooks
    sub_id = agent.on("tool:pre_call", log_all_tools)
    # Now you have to manage sub_id forever
    ```
  </Accordion>
</AccordionGroup>

## Performance Considerations

* **Minimal overhead**: Subscribe API is a thin layer over decorator system
* **No double-firing**: Same hooks registered twice are tracked separately
* **Idempotent unsubscribe**: Safe to call multiple times
* **Context manager overhead**: Negligible (less than 1ms per subscription)

## Comparison: Subscribe API vs Decorators

| Feature           | Subscribe API      | Decorators                  |
| ----------------- | ------------------ | --------------------------- |
| Registration      | Runtime            | Definition time             |
| Unsubscription    | Yes (anytime)      | No                          |
| Context manager   | Yes                | No                          |
| Conditional hooks | Easy               | Requires logic in hook      |
| Testing           | Excellent          | Good                        |
| Bulk operations   | `subscribe()`      | Multiple decorators         |
| Friendly names    | Yes                | No (use attributes)         |
| Best for          | Dynamic, temporary | Permanent, application-wide |

## What's Next?

<CardGroup cols={2}>
  <Card title="Tool Hooks" icon="wrench" href="/features/hooks/tool-hooks">
    Tool execution lifecycle hooks
  </Card>

  <Card title="Context Hooks" icon="database" href="/features/hooks/context-hooks">
    Context tree operation hooks
  </Card>

  <Card title="Streaming Hooks" icon="wave-pulse" href="/features/hooks/streaming-hooks">
    Real-time content processing
  </Card>

  <Card title="Message Hooks" icon="envelope" href="/features/hooks/message-hooks">
    Message handling and modification
  </Card>
</CardGroup>
