Documentation Index Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
Use this file to discover all available pages before exploring further.
Streaming Hooks
Streaming hooks let you process content as it arrives in real-time during streaming responses - perfect for displaying progress, filtering content, or detecting tool calls.
Overview
Streaming hooks provide 7 execution points:
Hook Fires Use Case on_chunkEvery chunk Real-time display, filtering on_contentContent chunks Text processing, display on_tool_detectTool call detected Tool call awareness on_tool_startTool call starts Tool execution tracking on_tool_deltaTool arg delta Parameter building on_tool_completeTool call complete Tool execution start on_tool_resultTool result Result processing
Hook Registration
Decorator Syntax
from egregore import Agent
agent = Agent( provider = "openai:gpt-4" )
@agent.hooks.streaming.on_chunk
def process_chunk ( ctx ):
"""Process every chunk."""
print (ctx.delta, end = "" , flush = True )
@agent.hooks.streaming.on_tool_detect
def on_tool_detected ( ctx ):
"""Alert when tool call detected."""
print ( f " \n [TOOL CALL DETECTED] \n " )
Subscribe API
# Dynamic registration
sub_id = agent.on( "stream:chunk" , lambda ctx : print (ctx.delta, end = "" ))
# Temporary hooks
async with agent.subscription( "stream:chunk" , display_chunk):
await agent.astream( "Write a story" )
Stream Hook Context
Every streaming hook receives a StreamExecContext with:
@dataclass
class StreamExecContext :
# Identity
agent_id: str # Agent instance ID
execution_id: str # Unique execution ID
agent: Agent # Full agent reference
# Chunk data
chunk_data: Any # Raw chunk from provider
chunk_type: str # "content", "tool_start", "tool_delta", etc.
# Accumulated data
accumulated_content: str # Content accumulated so far
finish_reason: str # "stop", "tool_calls", etc.
# Tool data
tool_calls: List[Any] # Complete tool calls
partial_tool_calls: List[Any] # In-progress tool calls
# Context access
context: Context # Full context tree
# Metadata
metadata: dict # Additional stream data
Delta Property
The delta property intelligently extracts text from chunks:
@agent.hooks.streaming.on_chunk
def show_delta ( ctx ):
# Automatically handles different chunk formats
print (ctx.delta, end = "" ) # Works with OpenAI, Anthropic, etc.
Execution Flow
On Chunk Hook
Fires for every chunk received during streaming:
@agent.hooks.streaming.on_chunk
def display_chunks ( ctx ):
"""Display every chunk as it arrives."""
# Extract text delta
text = ctx.delta
if text:
print (text, end = "" , flush = True )
# Track progress
ctx.agent.state.set(
"stream_progress" ,
len (ctx.accumulated_content),
source = "streaming"
)
On Content Hook
Fires specifically for content chunks (not tool-related):
@agent.hooks.streaming.on_content
def process_content ( ctx ):
"""Process only text content chunks."""
print ( f "Content: { ctx.delta } " )
# Apply content filtering
if "sensitive" in ctx.delta.lower():
print ( " \n [Content filtered] \n " )
Fires when a tool call is first detected in the stream:
@agent.hooks.streaming.on_tool_detect
def on_tool_detected ( ctx ):
"""Alert when tool call detected."""
print ( " \n\n 🔧 Tool call detected in stream \n " )
# Log detection
logger.info( "Streaming tool call detected" , extra = {
"agent_id" : ctx.agent_id,
"accumulated_content" : ctx.accumulated_content,
})
Fires when a tool call chunk starts :
@agent.hooks.streaming.on_tool_start
def on_tool_start ( ctx ):
"""Handle tool call start."""
if ctx.partial_tool_calls:
tool = ctx.partial_tool_calls[ 0 ]
print ( f " \n [TOOL: { tool.get( 'function' , {}).get( 'name' , 'unknown' ) } ]" )
Fires for each tool argument delta during streaming:
@agent.hooks.streaming.on_tool_delta
def on_tool_delta ( ctx ):
"""Process tool argument deltas."""
if ctx.partial_tool_calls:
for call in ctx.partial_tool_calls:
args = call.get( 'function' , {}).get( 'arguments' , '' )
print ( f "Args building: { args[: 50 ] } ..." )
Fires when a tool call is complete :
@agent.hooks.streaming.on_tool_complete
def on_tool_complete ( ctx ):
"""Handle completed tool call."""
if ctx.tool_calls:
for call in ctx.tool_calls:
tool_name = call.get( 'function' , {}).get( 'name' )
args = call.get( 'function' , {}).get( 'arguments' )
print ( f " \n ✓ Tool ready: { tool_name } ( { args } ) \n " )
Fires when tool result is received :
@agent.hooks.streaming.on_tool_result
def on_tool_result ( ctx ):
"""Process tool results."""
print ( " \n [Tool execution results received] \n " )
# Extract result from chunk
if hasattr (ctx.chunk_data, 'tool_result' ):
result = ctx.chunk_data.tool_result
print ( f "Result: { result } " )
Usage Patterns
Pattern 1: Real-Time Display
Display streaming content as it arrives:
@agent.hooks.streaming.on_chunk
def display_stream ( ctx ):
"""Real-time content display."""
# Print delta without newline
print (ctx.delta, end = "" , flush = True )
# Use with streaming
async for response in agent.astream( "Write a story about AI" ):
pass # Hook handles display
print () # Final newline
Pattern 2: Progress Tracking
Track streaming progress with visual indicator:
class StreamProgressTracker :
def __init__ ( self ):
self .char_count = 0
self .chunk_count = 0
def track ( self , ctx ):
"""Track progress."""
self .char_count += len (ctx.delta)
self .chunk_count += 1
# Show progress every 10 chunks
if self .chunk_count % 10 == 0 :
print ( f " \r [ { self .char_count } chars, { self .chunk_count } chunks]" , end = "" )
tracker = StreamProgressTracker()
agent.on( "stream:chunk" , tracker.track)
await agent.astream( "Generate long content" )
print ( f " \n Total: { tracker.char_count } characters" )
Pattern 3: Content Filtering
Filter sensitive content in real-time:
import re
@agent.hooks.streaming.on_chunk
def filter_content ( ctx ):
"""Filter sensitive patterns."""
# Get original delta
text = ctx.delta
# Apply filters
filtered = text
filtered = re.sub( r ' \b\d {3} - \d {2} - \d {4} \b ' , '[SSN]' , filtered)
filtered = re.sub( r ' \b [ \w .+- ] + @ [ \w - ] + \. [ \w .- ] + \b ' , '[EMAIL]' , filtered)
# Update chunk_data if modified
if filtered != text:
if isinstance (ctx.chunk_data, dict ):
ctx.chunk_data[ 'delta' ] = filtered
elif hasattr (ctx.chunk_data, 'delta' ):
ctx.chunk_data.delta = filtered
print ( f " \n [Filtered: { text } -> { filtered } ] \n " )
Pattern 4: Chunk Buffering
Buffer chunks for processing:
class ChunkBuffer :
def __init__ ( self , buffer_size : int = 50 ):
self .buffer_size = buffer_size
self .buffer = []
def add_chunk ( self , ctx ):
"""Buffer chunks."""
self .buffer.append(ctx.delta)
# Process when buffer full
if len ( self .buffer) >= self .buffer_size:
self .flush()
def flush ( self ):
"""Process buffered content."""
content = "" .join( self .buffer)
print ( f " \n [Buffer: { len (content) } chars] \n " )
print (content)
self .buffer.clear()
buffer = ChunkBuffer( buffer_size = 100 )
agent.on( "stream:chunk" , buffer.add_chunk)
Monitor tool calls during streaming:
class ToolCallMonitor :
def __init__ ( self ):
self .detected_calls = []
def on_detect ( self , ctx ):
"""Track tool call detection."""
print ( " \n\n 🔧 TOOL CALL DETECTED \n " )
def on_complete ( self , ctx ):
"""Track completed tool calls."""
if ctx.tool_calls:
for call in ctx.tool_calls:
tool_info = {
"name" : call.get( 'function' , {}).get( 'name' ),
"args" : call.get( 'function' , {}).get( 'arguments' ),
}
self .detected_calls.append(tool_info)
print ( f "✓ { tool_info[ 'name' ] } ready" )
def report ( self ):
"""Report detected calls."""
print ( f " \n\n Total tool calls: { len ( self .detected_calls) } " )
for call in self .detected_calls:
print ( f " - { call[ 'name' ] } " )
monitor = ToolCallMonitor()
agent.on( "stream:tool_detect" , monitor.on_detect)
agent.on( "stream:tool_complete" , monitor.on_complete)
await agent.astream( "Use calculator and search" )
monitor.report()
Pattern 6: Streaming Analytics
Collect analytics during streaming:
from datetime import datetime
class StreamAnalytics :
def __init__ ( self ):
self .start_time = None
self .first_chunk_time = None
self .chunk_times = []
self .total_chars = 0
def on_chunk ( self , ctx ):
"""Collect chunk metrics."""
now = datetime.now()
if self .start_time is None :
self .start_time = now
if self .first_chunk_time is None :
self .first_chunk_time = now
ttfb = (now - self .start_time).total_seconds()
print ( f " \n Time to first byte: { ttfb :.3f} s" )
self .chunk_times.append(now)
self .total_chars += len (ctx.delta)
def report ( self ):
"""Generate analytics report."""
if not self .chunk_times:
return
duration = ( self .chunk_times[ - 1 ] - self .start_time).total_seconds()
chars_per_sec = self .total_chars / duration if duration > 0 else 0
print ( f " \n\n Streaming Analytics:" )
print ( f " Duration: { duration :.2f} s" )
print ( f " Total chunks: { len ( self .chunk_times) } " )
print ( f " Total chars: { self .total_chars } " )
print ( f " Speed: { chars_per_sec :.1f} chars/sec" )
analytics = StreamAnalytics()
agent.on( "stream:chunk" , analytics.on_chunk)
await agent.astream( "Generate content" )
analytics.report()
Chunk Types
Streaming hooks handle different chunk types:
Content Chunks
@agent.hooks.streaming.on_content
def process_content ( ctx ):
# ctx.chunk_type == "content"
# ctx.delta contains text content
print (ctx.delta, end = "" )
@agent.hooks.streaming.on_tool_start
def process_tool_start ( ctx ):
# ctx.chunk_type == "tool_start"
# ctx.partial_tool_calls contains initial tool data
if ctx.partial_tool_calls:
tool = ctx.partial_tool_calls[ 0 ]
print ( f " \n Starting tool: { tool.get( 'function' , {}).get( 'name' ) } " )
@agent.hooks.streaming.on_tool_delta
def process_tool_delta ( ctx ):
# ctx.chunk_type == "tool_delta"
# ctx.partial_tool_calls building up
pass
@agent.hooks.streaming.on_tool_complete
def process_tool_complete ( ctx ):
# ctx.chunk_type == "tool_complete"
# ctx.tool_calls contains complete tool calls
pass
@agent.hooks.streaming.on_tool_result
def process_tool_result ( ctx ):
# ctx.chunk_type == "tool_result"
# Result available in chunk_data
pass
Best Practices
Use on_chunk for universal streaming
# Good: Universal streaming hook
@agent.hooks.streaming.on_chunk
def display_all ( ctx ):
# Handles all chunk types
if ctx.chunk_type == "content" :
print (ctx.delta, end = "" )
elif ctx.chunk_type == "tool_start" :
print ( " \n [TOOL]" )
# Also good: Specific hooks
@agent.hooks.streaming.on_content
def display_content ( ctx ):
# Only content chunks
print (ctx.delta, end = "" )
Keep streaming hooks fast
# Good: Fast processing
@agent.hooks.streaming.on_chunk
def fast_display ( ctx ):
print (ctx.delta, end = "" , flush = True )
# Bad: Slow processing blocks stream
@agent.hooks.streaming.on_chunk
def slow_process ( ctx ):
time.sleep( 0.1 ) # Blocks every chunk!
expensive_analysis(ctx.delta)
Use delta property for text extraction
Check chunk_type for specific processing
# Good: Filter by chunk type
@agent.hooks.streaming.on_chunk
def process ( ctx ):
if ctx.chunk_type == "content" :
handle_content(ctx.delta)
elif ctx.chunk_type == "tool_start" :
handle_tool_start(ctx.partial_tool_calls)
# Also good: Use specific hooks
@agent.hooks.streaming.on_content
def handle_content ( ctx ):
# Only processes content chunks
print (ctx.delta, end = "" )
Hook overhead : Less than 2ms per chunk
Chunk frequency : Can be 50-100+ chunks/second
Async support : Async hooks run concurrently
Buffer impact : Large buffers (100+ chunks) reduce hook calls
Integration with Agent Methods
Streaming hooks work with both stream() and astream():
# Sync streaming
@agent.hooks.streaming.on_chunk
def display ( ctx ):
print (ctx.delta, end = "" )
for response in agent.stream( "Write a story" ):
pass # Hook handles display
# Async streaming
async for response in agent.astream( "Write a story" ):
pass # Hook handles display
Error Handling
Errors in streaming hooks don’t stop the stream:
@agent.hooks.streaming.on_chunk
def risky_processing ( ctx ):
try :
process_chunk(ctx.delta)
except Exception as e:
# Log but don't propagate
logger.error( f "Chunk processing failed: { e } " )
# Stream continues
What’s Next?
Tool Hooks Tool execution lifecycle hooks
Message Hooks Message handling and modification
Subscribe API Dynamic hook registration
Event Streaming Learn about agent.events()