> ## Documentation Index
> Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Streaming Hooks

> Process real-time content streams with chunk-level hooks

# Streaming Hooks

**Streaming hooks** let you process content as it arrives in real-time during streaming responses - perfect for displaying progress, filtering content, or detecting tool calls.

## Overview

Streaming hooks provide 7 execution points:

| Hook               | Fires              | Use Case                     |
| ------------------ | ------------------ | ---------------------------- |
| `on_chunk`         | Every chunk        | Real-time display, filtering |
| `on_content`       | Content chunks     | Text processing, display     |
| `on_tool_detect`   | Tool call detected | Tool call awareness          |
| `on_tool_start`    | Tool call starts   | Tool execution tracking      |
| `on_tool_delta`    | Tool arg delta     | Parameter building           |
| `on_tool_complete` | Tool call complete | Tool execution start         |
| `on_tool_result`   | Tool result        | Result processing            |

## Hook Registration

### Decorator Syntax

```python theme={null}
from egregore import Agent

agent = Agent(provider="openai:gpt-4")

@agent.hooks.streaming.on_chunk
def process_chunk(ctx):
    """Process every chunk."""
    print(ctx.delta, end="", flush=True)

@agent.hooks.streaming.on_tool_detect
def on_tool_detected(ctx):
    """Alert when tool call detected."""
    print(f"\n[TOOL CALL DETECTED]\n")
```

### Subscribe API

```python theme={null}
# Dynamic registration
sub_id = agent.on("stream:chunk", lambda ctx: print(ctx.delta, end=""))

# Temporary hooks
async with agent.subscription("stream:chunk", display_chunk):
    await agent.astream("Write a story")
```

## Stream Hook Context

Every streaming hook receives a `StreamExecContext` with:

```python theme={null}
@dataclass
class StreamExecContext:
    # Identity
    agent_id: str                  # Agent instance ID
    execution_id: str              # Unique execution ID
    agent: Agent                   # Full agent reference

    # Chunk data
    chunk_data: Any                # Raw chunk from provider
    chunk_type: str                # "content", "tool_start", "tool_delta", etc.

    # Accumulated data
    accumulated_content: str       # Content accumulated so far
    finish_reason: str             # "stop", "tool_calls", etc.

    # Tool data
    tool_calls: List[Any]          # Complete tool calls
    partial_tool_calls: List[Any]  # In-progress tool calls

    # Context access
    context: Context               # Full context tree

    # Metadata
    metadata: dict                 # Additional stream data
```

### Delta Property

The `delta` property intelligently extracts text from chunks:

```python theme={null}
@agent.hooks.streaming.on_chunk
def show_delta(ctx):
    # Automatically handles different chunk formats
    print(ctx.delta, end="")  # Works with OpenAI, Anthropic, etc.
```

## Execution Flow

### On Chunk Hook

Fires for **every chunk** received during streaming:

```python theme={null}
@agent.hooks.streaming.on_chunk
def display_chunks(ctx):
    """Display every chunk as it arrives."""
    # Extract text delta
    text = ctx.delta

    if text:
        print(text, end="", flush=True)

    # Track progress
    ctx.agent.state.set(
        "stream_progress",
        len(ctx.accumulated_content),
        source="streaming"
    )
```

### On Content Hook

Fires specifically for **content chunks** (not tool-related):

```python theme={null}
@agent.hooks.streaming.on_content
def process_content(ctx):
    """Process only text content chunks."""
    print(f"Content: {ctx.delta}")

    # Apply content filtering
    if "sensitive" in ctx.delta.lower():
        print("\n[Content filtered]\n")
```

### On Tool Detect Hook

Fires when a **tool call is first detected** in the stream:

```python theme={null}
@agent.hooks.streaming.on_tool_detect
def on_tool_detected(ctx):
    """Alert when tool call detected."""
    print("\n\n🔧 Tool call detected in stream\n")

    # Log detection
    logger.info("Streaming tool call detected", extra={
        "agent_id": ctx.agent_id,
        "accumulated_content": ctx.accumulated_content,
    })
```

### On Tool Start Hook

Fires when a **tool call chunk starts**:

```python theme={null}
@agent.hooks.streaming.on_tool_start
def on_tool_start(ctx):
    """Handle tool call start."""
    if ctx.partial_tool_calls:
        tool = ctx.partial_tool_calls[0]
        print(f"\n[TOOL: {tool.get('function', {}).get('name', 'unknown')}]")
```

### On Tool Delta Hook

Fires for each **tool argument delta** during streaming:

```python theme={null}
@agent.hooks.streaming.on_tool_delta
def on_tool_delta(ctx):
    """Process tool argument deltas."""
    if ctx.partial_tool_calls:
        for call in ctx.partial_tool_calls:
            args = call.get('function', {}).get('arguments', '')
            print(f"Args building: {args[:50]}...")
```

### On Tool Complete Hook

Fires when a **tool call is complete**:

```python theme={null}
@agent.hooks.streaming.on_tool_complete
def on_tool_complete(ctx):
    """Handle completed tool call."""
    if ctx.tool_calls:
        for call in ctx.tool_calls:
            tool_name = call.get('function', {}).get('name')
            args = call.get('function', {}).get('arguments')
            print(f"\n✓ Tool ready: {tool_name}({args})\n")
```

### On Tool Result Hook

Fires when **tool result is received**:

```python theme={null}
@agent.hooks.streaming.on_tool_result
def on_tool_result(ctx):
    """Process tool results."""
    print("\n[Tool execution results received]\n")

    # Extract result from chunk
    if hasattr(ctx.chunk_data, 'tool_result'):
        result = ctx.chunk_data.tool_result
        print(f"Result: {result}")
```

## Usage Patterns

### Pattern 1: Real-Time Display

Display streaming content as it arrives:

```python theme={null}
@agent.hooks.streaming.on_chunk
def display_stream(ctx):
    """Real-time content display."""
    # Print delta without newline
    print(ctx.delta, end="", flush=True)

# Use with streaming
async for response in agent.astream("Write a story about AI"):
    pass  # Hook handles display

print()  # Final newline
```

### Pattern 2: Progress Tracking

Track streaming progress with visual indicator:

```python theme={null}
class StreamProgressTracker:
    def __init__(self):
        self.char_count = 0
        self.chunk_count = 0

    def track(self, ctx):
        """Track progress."""
        self.char_count += len(ctx.delta)
        self.chunk_count += 1

        # Show progress every 10 chunks
        if self.chunk_count % 10 == 0:
            print(f"\r[{self.char_count} chars, {self.chunk_count} chunks]", end="")

tracker = StreamProgressTracker()
agent.on("stream:chunk", tracker.track)

await agent.astream("Generate long content")
print(f"\nTotal: {tracker.char_count} characters")
```

### Pattern 3: Content Filtering

Filter sensitive content in real-time:

```python theme={null}
import re

@agent.hooks.streaming.on_chunk
def filter_content(ctx):
    """Filter sensitive patterns."""
    # Get original delta
    text = ctx.delta

    # Apply filters
    filtered = text
    filtered = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', filtered)
    filtered = re.sub(r'\b[\w.+-]+@[\w-]+\.[\w.-]+\b', '[EMAIL]', filtered)

    # Update chunk_data if modified
    if filtered != text:
        if isinstance(ctx.chunk_data, dict):
            ctx.chunk_data['delta'] = filtered
        elif hasattr(ctx.chunk_data, 'delta'):
            ctx.chunk_data.delta = filtered

        print(f"\n[Filtered: {text} -> {filtered}]\n")
```

### Pattern 4: Chunk Buffering

Buffer chunks for processing:

```python theme={null}
class ChunkBuffer:
    def __init__(self, buffer_size: int = 50):
        self.buffer_size = buffer_size
        self.buffer = []

    def add_chunk(self, ctx):
        """Buffer chunks."""
        self.buffer.append(ctx.delta)

        # Process when buffer full
        if len(self.buffer) >= self.buffer_size:
            self.flush()

    def flush(self):
        """Process buffered content."""
        content = "".join(self.buffer)
        print(f"\n[Buffer: {len(content)} chars]\n")
        print(content)
        self.buffer.clear()

buffer = ChunkBuffer(buffer_size=100)
agent.on("stream:chunk", buffer.add_chunk)
```

### Pattern 5: Tool Call Monitoring

Monitor tool calls during streaming:

```python theme={null}
class ToolCallMonitor:
    def __init__(self):
        self.detected_calls = []

    def on_detect(self, ctx):
        """Track tool call detection."""
        print("\n\n🔧 TOOL CALL DETECTED\n")

    def on_complete(self, ctx):
        """Track completed tool calls."""
        if ctx.tool_calls:
            for call in ctx.tool_calls:
                tool_info = {
                    "name": call.get('function', {}).get('name'),
                    "args": call.get('function', {}).get('arguments'),
                }
                self.detected_calls.append(tool_info)
                print(f"✓ {tool_info['name']} ready")

    def report(self):
        """Report detected calls."""
        print(f"\n\nTotal tool calls: {len(self.detected_calls)}")
        for call in self.detected_calls:
            print(f"  - {call['name']}")

monitor = ToolCallMonitor()
agent.on("stream:tool_detect", monitor.on_detect)
agent.on("stream:tool_complete", monitor.on_complete)

await agent.astream("Use calculator and search")
monitor.report()
```

### Pattern 6: Streaming Analytics

Collect analytics during streaming:

```python theme={null}
from datetime import datetime

class StreamAnalytics:
    def __init__(self):
        self.start_time = None
        self.first_chunk_time = None
        self.chunk_times = []
        self.total_chars = 0

    def on_chunk(self, ctx):
        """Collect chunk metrics."""
        now = datetime.now()

        if self.start_time is None:
            self.start_time = now

        if self.first_chunk_time is None:
            self.first_chunk_time = now
            ttfb = (now - self.start_time).total_seconds()
            print(f"\nTime to first byte: {ttfb:.3f}s")

        self.chunk_times.append(now)
        self.total_chars += len(ctx.delta)

    def report(self):
        """Generate analytics report."""
        if not self.chunk_times:
            return

        duration = (self.chunk_times[-1] - self.start_time).total_seconds()
        chars_per_sec = self.total_chars / duration if duration > 0 else 0

        print(f"\n\nStreaming Analytics:")
        print(f"  Duration: {duration:.2f}s")
        print(f"  Total chunks: {len(self.chunk_times)}")
        print(f"  Total chars: {self.total_chars}")
        print(f"  Speed: {chars_per_sec:.1f} chars/sec")

analytics = StreamAnalytics()
agent.on("stream:chunk", analytics.on_chunk)

await agent.astream("Generate content")
analytics.report()
```

## Chunk Types

Streaming hooks handle different chunk types:

### Content Chunks

```python theme={null}
@agent.hooks.streaming.on_content
def process_content(ctx):
    # ctx.chunk_type == "content"
    # ctx.delta contains text content
    print(ctx.delta, end="")
```

### Tool Start Chunks

```python theme={null}
@agent.hooks.streaming.on_tool_start
def process_tool_start(ctx):
    # ctx.chunk_type == "tool_start"
    # ctx.partial_tool_calls contains initial tool data
    if ctx.partial_tool_calls:
        tool = ctx.partial_tool_calls[0]
        print(f"\nStarting tool: {tool.get('function', {}).get('name')}")
```

### Tool Delta Chunks

```python theme={null}
@agent.hooks.streaming.on_tool_delta
def process_tool_delta(ctx):
    # ctx.chunk_type == "tool_delta"
    # ctx.partial_tool_calls building up
    pass
```

### Tool Complete Chunks

```python theme={null}
@agent.hooks.streaming.on_tool_complete
def process_tool_complete(ctx):
    # ctx.chunk_type == "tool_complete"
    # ctx.tool_calls contains complete tool calls
    pass
```

### Tool Result Chunks

```python theme={null}
@agent.hooks.streaming.on_tool_result
def process_tool_result(ctx):
    # ctx.chunk_type == "tool_result"
    # Result available in chunk_data
    pass
```

## Best Practices

<AccordionGroup>
  <Accordion title="Use on_chunk for universal streaming">
    ```python theme={null}
    # Good: Universal streaming hook
    @agent.hooks.streaming.on_chunk
    def display_all(ctx):
        # Handles all chunk types
        if ctx.chunk_type == "content":
            print(ctx.delta, end="")
        elif ctx.chunk_type == "tool_start":
            print("\n[TOOL]")

    # Also good: Specific hooks
    @agent.hooks.streaming.on_content
    def display_content(ctx):
        # Only content chunks
        print(ctx.delta, end="")
    ```
  </Accordion>

  <Accordion title="Keep streaming hooks fast">
    ```python theme={null}
    # Good: Fast processing
    @agent.hooks.streaming.on_chunk
    def fast_display(ctx):
        print(ctx.delta, end="", flush=True)

    # Bad: Slow processing blocks stream
    @agent.hooks.streaming.on_chunk
    def slow_process(ctx):
        time.sleep(0.1)  # Blocks every chunk!
        expensive_analysis(ctx.delta)
    ```
  </Accordion>

  <Accordion title="Use delta property for text extraction">
    ```python theme={null}
    # Good: Use delta property
    @agent.hooks.streaming.on_chunk
    def display(ctx):
        text = ctx.delta  # Handles different formats
        print(text, end="")

    # Bad: Manual extraction
    @agent.hooks.streaming.on_chunk
    def display(ctx):
        if isinstance(ctx.chunk_data, dict):
            text = ctx.chunk_data.get('delta', '')
        elif hasattr(ctx.chunk_data, 'delta'):
            text = ctx.chunk_data.delta
        else:
            text = str(ctx.chunk_data)
        print(text, end="")
    ```
  </Accordion>

  <Accordion title="Check chunk_type for specific processing">
    ```python theme={null}
    # Good: Filter by chunk type
    @agent.hooks.streaming.on_chunk
    def process(ctx):
        if ctx.chunk_type == "content":
            handle_content(ctx.delta)
        elif ctx.chunk_type == "tool_start":
            handle_tool_start(ctx.partial_tool_calls)

    # Also good: Use specific hooks
    @agent.hooks.streaming.on_content
    def handle_content(ctx):
        # Only processes content chunks
        print(ctx.delta, end="")
    ```
  </Accordion>
</AccordionGroup>

## Performance Considerations

* **Hook overhead**: Less than 2ms per chunk
* **Chunk frequency**: Can be 50-100+ chunks/second
* **Async support**: Async hooks run concurrently
* **Buffer impact**: Large buffers (100+ chunks) reduce hook calls

## Integration with Agent Methods

Streaming hooks work with both `stream()` and `astream()`:

```python theme={null}
# Sync streaming
@agent.hooks.streaming.on_chunk
def display(ctx):
    print(ctx.delta, end="")

for response in agent.stream("Write a story"):
    pass  # Hook handles display

# Async streaming
async for response in agent.astream("Write a story"):
    pass  # Hook handles display
```

## Error Handling

Errors in streaming hooks don't stop the stream:

```python theme={null}
@agent.hooks.streaming.on_chunk
def risky_processing(ctx):
    try:
        process_chunk(ctx.delta)
    except Exception as e:
        # Log but don't propagate
        logger.error(f"Chunk processing failed: {e}")
        # Stream continues
```

## What's Next?

<CardGroup cols={2}>
  <Card title="Tool Hooks" icon="wrench" href="/features/hooks/tool-hooks">
    Tool execution lifecycle hooks
  </Card>

  <Card title="Message Hooks" icon="envelope" href="/features/hooks/message-hooks">
    Message handling and modification
  </Card>

  <Card title="Subscribe API" icon="plug" href="/features/hooks/subscribe-api">
    Dynamic hook registration
  </Card>

  <Card title="Event Streaming" icon="wave-pulse" href="/guides/basic-usage/event-streaming">
    Learn about agent.events()
  </Card>
</CardGroup>
