Documentation Index Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
Use this file to discover all available pages before exploring further.
Subscribe API
The Subscribe API (codename: Synapse) provides a lightweight, imperative way to bind and unbind agent hooks without decorators.
Overview
While decorators (@agent.hooks.*) work great for permanent hooks, the Subscribe API is designed for:
Dynamic hook registration : Register/unregister hooks at runtime
Temporary subscriptions : Use context managers for scoped hooks
Conditional monitoring : Enable hooks only when needed
Bulk operations : Register multiple hooks as a group
Core Methods
agent.on()
Register a single hook by friendly name.
sub_id = agent.on(name: str , fn: Callable) -> str
Parameters:
name: Friendly hook name (e.g., "stream:chunk", "tool:pre_exec")
fn: Callback function (sync or async)
Returns:
Subscription ID string for later unsubscription
Example:
from egregore import Agent
agent = Agent( provider = "openai:gpt-4" )
# Register streaming hook
def on_chunk ( ctx ):
print (ctx.delta, end = "" , flush = True )
sub_id = agent.on( "stream:chunk" , on_chunk)
# Use the agent - hook fires automatically
await agent.acall( "Write a poem about AI" )
# Unsubscribe when done
agent.unsubscribe(sub_id)
agent.subscribe()
Register multiple hooks at once.
group_id = agent.subscribe(mapping: dict[ str , Callable]) -> str
Parameters:
mapping: Dictionary of {friendly_name: callback_function}
Returns:
Group subscription ID for unsubscribing all at once
Example:
# Register multiple tool hooks
def on_tool_start ( ctx ):
print ( f "[START] { ctx.tool_name } " )
def on_tool_end ( ctx ):
print ( f "[DONE] { ctx.tool_name } : { ctx.tool_result } " )
def on_tool_error ( ctx ):
print ( f "[ERROR] { ctx.tool_name } : { ctx.error } " )
# Subscribe as group
group_id = agent.subscribe({
"tool:pre_call" : on_tool_start,
"tool:post_call" : on_tool_end,
"tool:on_error" : on_tool_error,
})
agent.call( "Use the calculator and file reader" )
# Unsubscribe all at once
agent.unsubscribe(group_id)
agent.unsubscribe()
Unregister a subscription (single or group).
agent.unsubscribe(sub_id: str ) -> None
Parameters:
sub_id: Subscription ID from on() or subscribe()
Important:
Idempotent : Safe to call multiple times
No errors : Never raises exception for non-existent IDs
Group aware : Unsubscribes all hooks in a group
Example:
sub_id = agent.on( "stream:chunk" , on_chunk)
agent.unsubscribe(sub_id)
agent.unsubscribe(sub_id) # Safe - no error
agent.subscription()
Create a context manager for temporary subscriptions.
agent.subscription(
name_or_mapping: str | dict[ str , Callable],
fn: Callable = None
) -> SubscriptionContext
Parameters:
name_or_mapping: Either a friendly name (str) or a mapping dict
fn: Callback function (required if name_or_mapping is str)
Returns:
Context manager that auto-unsubscribes on exit
Supports both sync and async usage.
Example (single hook):
# Temporary streaming hook
async with agent.subscription( "stream:chunk" , on_chunk):
await agent.acall( "Stream this response" )
# Auto-unsubscribed after block
Example (multiple hooks):
# Temporary monitoring
with agent.subscription({
"tool:pre_exec" : on_start,
"tool:post_exec" : on_end,
"context:after_change" : on_context_change,
}):
agent.call( "Do some work with policies" )
# All hooks auto-unsubscribed
Early unsubscription:
async with agent.subscription( "stream:chunk" , on_chunk) as ctx:
await agent.acall( "First call" )
ctx.unsubscribe() # Manually unsubscribe early
await agent.acall( "Second call - no hook" )
Available Hook Names
All hook names follow the format category:event.
Streaming Hooks
"stream:chunk" # Every chunk during streaming
"stream:tool_detect" # Tool call detected in stream
"stream:content" # Content chunk received
"stream:tool_start" # Tool call start chunk
"stream:tool_delta" # Tool argument delta
"stream:tool_complete" # Tool call complete
"stream:tool_result" # Tool result received
"tool:pre_exec" # Before tool execution starts
"tool:post_exec" # After tool execution completes
"tool:pre_call" # Before individual tool call
"tool:post_call" # After individual tool call
"tool:on_error" # Tool execution error
"tool:intercept" # Intercept tool results (dual-phase)
Context Hooks
"context:before_change" # Before any context operation
"context:after_change" # After any context operation
"context:on_add" # Component added
"context:on_dispatch" # Component dispatched
"context:on_update" # Component updated
Message Hooks
"message:on_user" # User message received
"message:on_provider" # Provider response received
"message:on_error" # Message processing error
Scaffold Hooks
"scaffold:op_complete" # Scaffold operation completed
"scaffold:state_change" # Scaffold state changed
Usage Patterns
Pattern 1: Dynamic Monitoring
Enable/disable logging based on conditions:
agent = Agent( provider = "openai:gpt-4" )
# Enable detailed logging for specific task
if task.requires_audit:
sub_id = agent.subscribe({
"tool:pre_call" : log_tool_start,
"tool:post_call" : log_tool_end,
"context:after_change" : log_context_ops,
})
result = agent.call(task.prompt)
# Disable logging after task
if task.requires_audit:
agent.unsubscribe(sub_id)
Pattern 2: Temporary Streaming
Stream responses for interactive sessions only:
async def interactive_chat ():
async with agent.subscription( "stream:chunk" , print_chunk):
while True :
user_input = await get_user_input()
if user_input == "exit" :
break
await agent.acall(user_input)
# Streaming auto-disabled after session
Pattern 3: Testing and Debugging
Capture hook data for assertions:
def test_tool_execution ():
tool_calls = []
def capture_tool ( ctx ):
tool_calls.append(ctx.tool_name)
with agent.subscription( "tool:post_call" , capture_tool):
agent.call( "Use calculator to add 5 + 3" )
assert "calculator" in tool_calls
Pattern 4: Rate Limiting
Implement custom rate limiting:
from collections import deque
from datetime import datetime, timedelta
class RateLimiter :
def __init__ ( self , max_calls : int , window : timedelta):
self .max_calls = max_calls
self .window = window
self .calls = deque()
def check ( self , ctx ):
now = datetime.now()
# Remove old calls outside window
while self .calls and self .calls[ 0 ] < now - self .window:
self .calls.popleft()
# Check limit
if len ( self .calls) >= self .max_calls:
raise RuntimeError ( f "Rate limit exceeded: { self .max_calls } calls per { self .window } " )
self .calls.append(now)
# Apply rate limiting
limiter = RateLimiter( max_calls = 10 , window = timedelta( minutes = 1 ))
agent.on( "tool:pre_call" , limiter.check)
Pattern 5: Progressive Enhancement
Add hooks incrementally:
agent = Agent( provider = "openai:gpt-4" )
# Basic usage - no hooks
agent.call( "Simple query" )
# Add streaming for next call
sub1 = agent.on( "stream:chunk" , stream_handler)
agent.call( "Stream this response" )
# Add tool monitoring
sub2 = agent.subscribe({
"tool:pre_call" : tool_logger,
"tool:on_error" : error_handler,
})
agent.call( "Complex task with tools" )
# Remove all hooks
agent.unsubscribe(sub1)
agent.unsubscribe(sub2)
agent.call( "Back to basic" )
Integration with Decorator Hooks
Subscribe API and decorator hooks work together seamlessly:
agent = Agent( provider = "openai:gpt-4" )
# Permanent hook via decorator
@agent.hooks.tool.pre_call
def permanent_logger ( ctx ):
print ( f "[PERMANENT] { ctx.tool_name } " )
# Temporary hook via subscribe API
with agent.subscription( "tool:post_call" , lambda ctx : print ( f "[TEMP] Done: { ctx.tool_name } " )):
agent.call( "Execute tools" )
# Both hooks fire together
# After context - only permanent hook remains
agent.call( "Execute more tools" )
Execution order:
Permanent hooks fire first (in registration order)
Subscribe API hooks fire second (in registration order)
Error Handling
Unknown Hook Names
Helpful suggestions when hook name is misspelled:
try :
agent.on( "stream:chonk" , handler) # Typo
except ValueError as e:
print (e)
# Unknown hook name: 'stream:chonk'
# Did you mean one of these?
# - stream:chunk
# - stream:content
Non-Callable Functions
try :
agent.on( "stream:chunk" , "not_a_function" )
except TypeError as e:
print (e)
# Hook function must be callable, got str
Safe Unsubscription
# Always safe - never raises errors
agent.unsubscribe( "non_existent_id" )
agent.unsubscribe( "sub_12345" ) # Already unsubscribed
Best Practices
Use context managers for temporary hooks
# Good: Auto-cleanup
async with agent.subscription( "stream:chunk" , handler):
await agent.acall( "Stream this" )
# Bad: Manual cleanup required
sub_id = agent.on( "stream:chunk" , handler)
await agent.acall( "Stream this" )
agent.unsubscribe(sub_id) # Easy to forget
Group related hooks with subscribe()
Store subscription IDs for later cleanup
class MonitoredAgent :
def __init__ ( self , agent ):
self .agent = agent
self .subscriptions = []
def enable_monitoring ( self ):
sub_id = self .agent.subscribe({
"tool:pre_call" : self .log_tool,
"context:after_change" : self .log_context,
})
self .subscriptions.append(sub_id)
def disable_monitoring ( self ):
for sub_id in self .subscriptions:
self .agent.unsubscribe(sub_id)
self .subscriptions.clear()
Use decorators for permanent hooks
# Good: Permanent application-wide logging
@agent.hooks.tool.pre_call
def log_all_tools ( ctx ):
logger.info( f "Tool: { ctx.tool_name } " )
# Bad: Subscribe API for permanent hooks
sub_id = agent.on( "tool:pre_call" , log_all_tools)
# Now you have to manage sub_id forever
Minimal overhead : Subscribe API is a thin layer over decorator system
No double-firing : Same hooks registered twice are tracked separately
Idempotent unsubscribe : Safe to call multiple times
Context manager overhead : Negligible (less than 1ms per subscription)
Comparison: Subscribe API vs Decorators
Feature Subscribe API Decorators Registration Runtime Definition time Unsubscription Yes (anytime) No Context manager Yes No Conditional hooks Easy Requires logic in hook Testing Excellent Good Bulk operations subscribe()Multiple decorators Friendly names Yes No (use attributes) Best for Dynamic, temporary Permanent, application-wide
What’s Next?
Tool Hooks Tool execution lifecycle hooks
Context Hooks Context tree operation hooks
Streaming Hooks Real-time content processing
Message Hooks Message handling and modification