Skip to main content

Streaming Hooks

Streaming hooks let you process content as it arrives in real-time during streaming responses - perfect for displaying progress, filtering content, or detecting tool calls.

Overview

Streaming hooks provide 7 execution points:
HookFiresUse Case
on_chunkEvery chunkReal-time display, filtering
on_contentContent chunksText processing, display
on_tool_detectTool call detectedTool call awareness
on_tool_startTool call startsTool execution tracking
on_tool_deltaTool arg deltaParameter building
on_tool_completeTool call completeTool execution start
on_tool_resultTool resultResult processing

Hook Registration

Decorator Syntax

from egregore import Agent

agent = Agent(provider="openai:gpt-4")

@agent.hooks.streaming.on_chunk
def process_chunk(ctx):
    """Process every chunk."""
    print(ctx.delta, end="", flush=True)

@agent.hooks.streaming.on_tool_detect
def on_tool_detected(ctx):
    """Alert when tool call detected."""
    print(f"\n[TOOL CALL DETECTED]\n")

Subscribe API

# Dynamic registration
sub_id = agent.on("stream:chunk", lambda ctx: print(ctx.delta, end=""))

# Temporary hooks
async with agent.subscription("stream:chunk", display_chunk):
    await agent.astream("Write a story")

Stream Hook Context

Every streaming hook receives a StreamExecContext with:
@dataclass
class StreamExecContext:
    # Identity
    agent_id: str                  # Agent instance ID
    execution_id: str              # Unique execution ID
    agent: Agent                   # Full agent reference

    # Chunk data
    chunk_data: Any                # Raw chunk from provider
    chunk_type: str                # "content", "tool_start", "tool_delta", etc.

    # Accumulated data
    accumulated_content: str       # Content accumulated so far
    finish_reason: str             # "stop", "tool_calls", etc.

    # Tool data
    tool_calls: List[Any]          # Complete tool calls
    partial_tool_calls: List[Any]  # In-progress tool calls

    # Context access
    context: Context               # Full context tree

    # Metadata
    metadata: dict                 # Additional stream data

Delta Property

The delta property intelligently extracts text from chunks:
@agent.hooks.streaming.on_chunk
def show_delta(ctx):
    # Automatically handles different chunk formats
    print(ctx.delta, end="")  # Works with OpenAI, Anthropic, etc.

Execution Flow

On Chunk Hook

Fires for every chunk received during streaming:
@agent.hooks.streaming.on_chunk
def display_chunks(ctx):
    """Display every chunk as it arrives."""
    # Extract text delta
    text = ctx.delta

    if text:
        print(text, end="", flush=True)

    # Track progress
    ctx.agent.state.set(
        "stream_progress",
        len(ctx.accumulated_content),
        source="streaming"
    )

On Content Hook

Fires specifically for content chunks (not tool-related):
@agent.hooks.streaming.on_content
def process_content(ctx):
    """Process only text content chunks."""
    print(f"Content: {ctx.delta}")

    # Apply content filtering
    if "sensitive" in ctx.delta.lower():
        print("\n[Content filtered]\n")

On Tool Detect Hook

Fires when a tool call is first detected in the stream:
@agent.hooks.streaming.on_tool_detect
def on_tool_detected(ctx):
    """Alert when tool call detected."""
    print("\n\n🔧 Tool call detected in stream\n")

    # Log detection
    logger.info("Streaming tool call detected", extra={
        "agent_id": ctx.agent_id,
        "accumulated_content": ctx.accumulated_content,
    })

On Tool Start Hook

Fires when a tool call chunk starts:
@agent.hooks.streaming.on_tool_start
def on_tool_start(ctx):
    """Handle tool call start."""
    if ctx.partial_tool_calls:
        tool = ctx.partial_tool_calls[0]
        print(f"\n[TOOL: {tool.get('function', {}).get('name', 'unknown')}]")

On Tool Delta Hook

Fires for each tool argument delta during streaming:
@agent.hooks.streaming.on_tool_delta
def on_tool_delta(ctx):
    """Process tool argument deltas."""
    if ctx.partial_tool_calls:
        for call in ctx.partial_tool_calls:
            args = call.get('function', {}).get('arguments', '')
            print(f"Args building: {args[:50]}...")

On Tool Complete Hook

Fires when a tool call is complete:
@agent.hooks.streaming.on_tool_complete
def on_tool_complete(ctx):
    """Handle completed tool call."""
    if ctx.tool_calls:
        for call in ctx.tool_calls:
            tool_name = call.get('function', {}).get('name')
            args = call.get('function', {}).get('arguments')
            print(f"\n✓ Tool ready: {tool_name}({args})\n")

On Tool Result Hook

Fires when tool result is received:
@agent.hooks.streaming.on_tool_result
def on_tool_result(ctx):
    """Process tool results."""
    print("\n[Tool execution results received]\n")

    # Extract result from chunk
    if hasattr(ctx.chunk_data, 'tool_result'):
        result = ctx.chunk_data.tool_result
        print(f"Result: {result}")

Usage Patterns

Pattern 1: Real-Time Display

Display streaming content as it arrives:
@agent.hooks.streaming.on_chunk
def display_stream(ctx):
    """Real-time content display."""
    # Print delta without newline
    print(ctx.delta, end="", flush=True)

# Use with streaming
async for response in agent.astream("Write a story about AI"):
    pass  # Hook handles display

print()  # Final newline

Pattern 2: Progress Tracking

Track streaming progress with visual indicator:
class StreamProgressTracker:
    def __init__(self):
        self.char_count = 0
        self.chunk_count = 0

    def track(self, ctx):
        """Track progress."""
        self.char_count += len(ctx.delta)
        self.chunk_count += 1

        # Show progress every 10 chunks
        if self.chunk_count % 10 == 0:
            print(f"\r[{self.char_count} chars, {self.chunk_count} chunks]", end="")

tracker = StreamProgressTracker()
agent.on("stream:chunk", tracker.track)

await agent.astream("Generate long content")
print(f"\nTotal: {tracker.char_count} characters")

Pattern 3: Content Filtering

Filter sensitive content in real-time:
import re

@agent.hooks.streaming.on_chunk
def filter_content(ctx):
    """Filter sensitive patterns."""
    # Get original delta
    text = ctx.delta

    # Apply filters
    filtered = text
    filtered = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', filtered)
    filtered = re.sub(r'\b[\w.+-]+@[\w-]+\.[\w.-]+\b', '[EMAIL]', filtered)

    # Update chunk_data if modified
    if filtered != text:
        if isinstance(ctx.chunk_data, dict):
            ctx.chunk_data['delta'] = filtered
        elif hasattr(ctx.chunk_data, 'delta'):
            ctx.chunk_data.delta = filtered

        print(f"\n[Filtered: {text} -> {filtered}]\n")

Pattern 4: Chunk Buffering

Buffer chunks for processing:
class ChunkBuffer:
    def __init__(self, buffer_size: int = 50):
        self.buffer_size = buffer_size
        self.buffer = []

    def add_chunk(self, ctx):
        """Buffer chunks."""
        self.buffer.append(ctx.delta)

        # Process when buffer full
        if len(self.buffer) >= self.buffer_size:
            self.flush()

    def flush(self):
        """Process buffered content."""
        content = "".join(self.buffer)
        print(f"\n[Buffer: {len(content)} chars]\n")
        print(content)
        self.buffer.clear()

buffer = ChunkBuffer(buffer_size=100)
agent.on("stream:chunk", buffer.add_chunk)

Pattern 5: Tool Call Monitoring

Monitor tool calls during streaming:
class ToolCallMonitor:
    def __init__(self):
        self.detected_calls = []

    def on_detect(self, ctx):
        """Track tool call detection."""
        print("\n\n🔧 TOOL CALL DETECTED\n")

    def on_complete(self, ctx):
        """Track completed tool calls."""
        if ctx.tool_calls:
            for call in ctx.tool_calls:
                tool_info = {
                    "name": call.get('function', {}).get('name'),
                    "args": call.get('function', {}).get('arguments'),
                }
                self.detected_calls.append(tool_info)
                print(f"✓ {tool_info['name']} ready")

    def report(self):
        """Report detected calls."""
        print(f"\n\nTotal tool calls: {len(self.detected_calls)}")
        for call in self.detected_calls:
            print(f"  - {call['name']}")

monitor = ToolCallMonitor()
agent.on("stream:tool_detect", monitor.on_detect)
agent.on("stream:tool_complete", monitor.on_complete)

await agent.astream("Use calculator and search")
monitor.report()

Pattern 6: Streaming Analytics

Collect analytics during streaming:
from datetime import datetime

class StreamAnalytics:
    def __init__(self):
        self.start_time = None
        self.first_chunk_time = None
        self.chunk_times = []
        self.total_chars = 0

    def on_chunk(self, ctx):
        """Collect chunk metrics."""
        now = datetime.now()

        if self.start_time is None:
            self.start_time = now

        if self.first_chunk_time is None:
            self.first_chunk_time = now
            ttfb = (now - self.start_time).total_seconds()
            print(f"\nTime to first byte: {ttfb:.3f}s")

        self.chunk_times.append(now)
        self.total_chars += len(ctx.delta)

    def report(self):
        """Generate analytics report."""
        if not self.chunk_times:
            return

        duration = (self.chunk_times[-1] - self.start_time).total_seconds()
        chars_per_sec = self.total_chars / duration if duration > 0 else 0

        print(f"\n\nStreaming Analytics:")
        print(f"  Duration: {duration:.2f}s")
        print(f"  Total chunks: {len(self.chunk_times)}")
        print(f"  Total chars: {self.total_chars}")
        print(f"  Speed: {chars_per_sec:.1f} chars/sec")

analytics = StreamAnalytics()
agent.on("stream:chunk", analytics.on_chunk)

await agent.astream("Generate content")
analytics.report()

Chunk Types

Streaming hooks handle different chunk types:

Content Chunks

@agent.hooks.streaming.on_content
def process_content(ctx):
    # ctx.chunk_type == "content"
    # ctx.delta contains text content
    print(ctx.delta, end="")

Tool Start Chunks

@agent.hooks.streaming.on_tool_start
def process_tool_start(ctx):
    # ctx.chunk_type == "tool_start"
    # ctx.partial_tool_calls contains initial tool data
    if ctx.partial_tool_calls:
        tool = ctx.partial_tool_calls[0]
        print(f"\nStarting tool: {tool.get('function', {}).get('name')}")

Tool Delta Chunks

@agent.hooks.streaming.on_tool_delta
def process_tool_delta(ctx):
    # ctx.chunk_type == "tool_delta"
    # ctx.partial_tool_calls building up
    pass

Tool Complete Chunks

@agent.hooks.streaming.on_tool_complete
def process_tool_complete(ctx):
    # ctx.chunk_type == "tool_complete"
    # ctx.tool_calls contains complete tool calls
    pass

Tool Result Chunks

@agent.hooks.streaming.on_tool_result
def process_tool_result(ctx):
    # ctx.chunk_type == "tool_result"
    # Result available in chunk_data
    pass

Best Practices

# Good: Universal streaming hook
@agent.hooks.streaming.on_chunk
def display_all(ctx):
    # Handles all chunk types
    if ctx.chunk_type == "content":
        print(ctx.delta, end="")
    elif ctx.chunk_type == "tool_start":
        print("\n[TOOL]")

# Also good: Specific hooks
@agent.hooks.streaming.on_content
def display_content(ctx):
    # Only content chunks
    print(ctx.delta, end="")
# Good: Fast processing
@agent.hooks.streaming.on_chunk
def fast_display(ctx):
    print(ctx.delta, end="", flush=True)

# Bad: Slow processing blocks stream
@agent.hooks.streaming.on_chunk
def slow_process(ctx):
    time.sleep(0.1)  # Blocks every chunk!
    expensive_analysis(ctx.delta)
# Good: Use delta property
@agent.hooks.streaming.on_chunk
def display(ctx):
    text = ctx.delta  # Handles different formats
    print(text, end="")

# Bad: Manual extraction
@agent.hooks.streaming.on_chunk
def display(ctx):
    if isinstance(ctx.chunk_data, dict):
        text = ctx.chunk_data.get('delta', '')
    elif hasattr(ctx.chunk_data, 'delta'):
        text = ctx.chunk_data.delta
    else:
        text = str(ctx.chunk_data)
    print(text, end="")
# Good: Filter by chunk type
@agent.hooks.streaming.on_chunk
def process(ctx):
    if ctx.chunk_type == "content":
        handle_content(ctx.delta)
    elif ctx.chunk_type == "tool_start":
        handle_tool_start(ctx.partial_tool_calls)

# Also good: Use specific hooks
@agent.hooks.streaming.on_content
def handle_content(ctx):
    # Only processes content chunks
    print(ctx.delta, end="")

Performance Considerations

  • Hook overhead: Less than 2ms per chunk
  • Chunk frequency: Can be 50-100+ chunks/second
  • Async support: Async hooks run concurrently
  • Buffer impact: Large buffers (100+ chunks) reduce hook calls

Integration with Agent Methods

Streaming hooks work with both stream() and astream():
# Sync streaming
@agent.hooks.streaming.on_chunk
def display(ctx):
    print(ctx.delta, end="")

for response in agent.stream("Write a story"):
    pass  # Hook handles display

# Async streaming
async for response in agent.astream("Write a story"):
    pass  # Hook handles display

Error Handling

Errors in streaming hooks don’t stop the stream:
@agent.hooks.streaming.on_chunk
def risky_processing(ctx):
    try:
        process_chunk(ctx.delta)
    except Exception as e:
        # Log but don't propagate
        logger.error(f"Chunk processing failed: {e}")
        # Stream continues

What’s Next?