Documentation Index Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
Use this file to discover all available pages before exploring further.
Tool Hooks
Tool hooks let you observe and modify tool execution at every stage - from initial detection through final results.
Overview
Tool hooks provide 6 execution points:
Hook Fires Use Case pre_execBefore tool loop starts Setup, validation, rate limiting post_execAfter tool loop completes Cleanup, aggregation, reporting pre_callBefore each individual tool Per-tool logging, parameter injection post_callAfter each individual tool Result processing, caching on_errorTool execution fails Error handling, retries, alerts interceptDual-phase (validation + modification) Input validation, output transformation
Hook Registration
Decorator Syntax
from egregore import Agent
agent = Agent( provider = "openai:gpt-4" )
@agent.hooks.tool.pre_call
def before_tool ( ctx ):
print ( f "[START] { ctx.tool_name } " )
print ( f " Args: { ctx.tool_args } " )
print ( f " Kwargs: { ctx.tool_kwargs } " )
@agent.hooks.tool.post_call
def after_tool ( ctx ):
print ( f "[DONE] { ctx.tool_name } " )
print ( f " Result: { ctx.tool_result } " )
@agent.hooks.tool.on_error
def tool_error ( ctx ):
print ( f "[ERROR] { ctx.tool_name } : { ctx.error } " )
Subscribe API
# Dynamic registration
sub_id = agent.on( "tool:pre_call" , lambda ctx : print ( f "Tool: { ctx.tool_name } " ))
# Temporary hooks
with agent.subscription({
"tool:pre_call" : on_start,
"tool:post_call" : on_end,
"tool:on_error" : on_error,
}):
agent.call( "Use calculator" )
Tool Hook Context
Every tool hook receives a ToolExecContext with:
@dataclass
class ToolExecContext :
# Identity
agent_id: str # Agent instance ID
execution_id: str # Unique execution ID
agent: Agent # Full agent reference
# Tool information
tool_name: str # Name of tool being called
tool_args: tuple # Positional arguments
tool_kwargs: dict # Keyword arguments
tool_params: dict # Combined parameters
# Results (post_call only)
tool_result: Any # Tool return value
# Error handling (on_error only)
error: Exception # Exception raised
# Full context access
context: Context # Complete context tree
# Metadata
metadata: dict # Additional hook data
Execution Flow
Pre-Execution Hook
Fires before tool loop starts - useful for setup:
@agent.hooks.tool.pre_exec
def setup_tool_execution ( ctx ):
"""Setup before tool loop."""
print ( f "Starting tool execution: { ctx.execution_id } " )
# Access agent state
ctx.agent.state.set( "tool_start_time" , time.time(), source = "hooks" )
# Log to monitoring
monitor.log_event( "tool_execution_start" , {
"agent_id" : ctx.agent_id,
"execution_id" : ctx.execution_id,
})
Post-Execution Hook
Fires after tool loop completes - useful for cleanup:
@agent.hooks.tool.post_exec
def teardown_tool_execution ( ctx ):
"""Cleanup after tool loop."""
start_time = ctx.agent.state.get( "tool_start_time" )
duration = time.time() - start_time
print ( f "Tool execution complete: { duration :.2f} s" )
# Report metrics
monitor.log_metric( "tool_execution_duration" , duration)
Pre-Call Hook
Fires before each individual tool call:
@agent.hooks.tool.pre_call
def before_tool_call ( ctx ):
"""Log each tool call."""
logger.info( f "Calling tool: { ctx.tool_name } " )
logger.debug( f " Parameters: { ctx.tool_params } " )
# Validate parameters
if ctx.tool_name == "database_query" :
if "query" not in ctx.tool_kwargs:
raise ValueError ( "Missing required 'query' parameter" )
# Check for dangerous queries
query = ctx.tool_kwargs[ "query" ].lower()
if "drop table" in query or "delete from" in query:
raise ValueError ( "Destructive queries not allowed" )
Post-Call Hook
Fires after each individual tool call:
@agent.hooks.tool.post_call
def after_tool_call ( ctx ):
"""Process tool results."""
print ( f " { ctx.tool_name } returned: { ctx.tool_result } " )
# Cache result
cache_key = f " { ctx.tool_name } : { hash ( str (ctx.tool_params)) } "
cache.set(cache_key, ctx.tool_result, ttl = 300 )
# Track usage
ctx.agent.state.set(
f "tool_ { ctx.tool_name } _calls" ,
ctx.agent.state.get( f "tool_ { ctx.tool_name } _calls" , 0 ) + 1 ,
source = "hooks"
)
Error Hook
Fires when tool execution fails:
@agent.hooks.tool.on_error
def handle_tool_error ( ctx ):
"""Handle tool errors gracefully."""
print ( f "Tool { ctx.tool_name } failed: { ctx.error } " )
# Log to error tracking
error_tracker.log_exception(ctx.error, context = {
"tool" : ctx.tool_name,
"params" : ctx.tool_params,
"execution_id" : ctx.execution_id,
})
# Send alert for critical tools
if ctx.tool_name in [ "database_query" , "api_call" ]:
alerts.send( f "Critical tool { ctx.tool_name } failed: { ctx.error } " )
# Could implement retry logic here
if isinstance (ctx.error, TimeoutError ):
print ( " (Timeout - may retry)" )
Intercept Hook
Dual-phase hook for validation and output modification:
@agent.hooks.tool.intercept
def intercept_tool_call ( ctx ):
"""Validate inputs and modify outputs."""
# Phase 1: Input validation (before call)
if not hasattr (ctx, 'tool_result' ):
print ( f "[VALIDATE] { ctx.tool_name } " )
# Reject calls to sensitive tools
if ctx.tool_name == "delete_file" :
if not ctx.agent.state.get( "delete_allowed" ):
raise PermissionError ( "File deletion not permitted" )
# Phase 2: Output modification (after call)
else :
print ( f "[TRANSFORM] { ctx.tool_name } " )
# Transform result
if ctx.tool_name == "get_user_data" :
# Redact sensitive fields
if isinstance (ctx.tool_result, dict ):
ctx.tool_result = redact_pii(ctx.tool_result)
Usage Patterns
Track which tools are used and how often:
class ToolTracker :
def __init__ ( self ):
self .usage = {}
def track ( self , ctx ):
tool = ctx.tool_name
self .usage[tool] = self .usage.get(tool, 0 ) + 1
def report ( self ):
print ( "Tool Usage:" )
for tool, count in sorted ( self .usage.items()):
print ( f " { tool } : { count } calls" )
tracker = ToolTracker()
agent.on( "tool:post_call" , tracker.track)
agent.call( "Use calculator and file reader multiple times" )
tracker.report()
# Tool Usage:
# calculator: 3 calls
# file_reader: 2 calls
Pattern 2: Rate Limiting
Prevent too many tool calls in a time window:
from collections import deque
from datetime import datetime, timedelta
class ToolRateLimiter :
def __init__ ( self , max_calls : int = 10 , window : timedelta = timedelta( minutes = 1 )):
self .max_calls = max_calls
self .window = window
self .calls = deque()
def check ( self , ctx ):
now = datetime.now()
# Remove old calls
while self .calls and self .calls[ 0 ] < now - self .window:
self .calls.popleft()
# Check limit
if len ( self .calls) >= self .max_calls:
raise RuntimeError (
f "Rate limit exceeded: { self .max_calls } calls per { self .window } "
)
self .calls.append(now)
limiter = ToolRateLimiter( max_calls = 5 , window = timedelta( seconds = 10 ))
agent.on( "tool:pre_call" , limiter.check)
Pattern 3: Parameter Injection
Automatically add context to tool calls:
@agent.hooks.tool.pre_call
def inject_user_context ( ctx ):
"""Inject user info into tool calls."""
# Add user_id to all database queries
if ctx.tool_name == "database_query" :
if "user_id" not in ctx.tool_kwargs:
ctx.tool_kwargs[ "user_id" ] = ctx.agent.state.get( "current_user_id" )
# Add API keys to external calls
if ctx.tool_name == "api_call" :
if "api_key" not in ctx.tool_kwargs:
ctx.tool_kwargs[ "api_key" ] = os.getenv( "API_KEY" )
Pattern 4: Result Caching
Cache tool results to avoid redundant calls:
class ToolCache :
def __init__ ( self , ttl : int = 300 ):
self .cache = {}
self .ttl = ttl
def cache_key ( self , ctx ):
return f " { ctx.tool_name } : { hash ( str (ctx.tool_params)) } "
def pre_call ( self , ctx ):
"""Check cache before tool call."""
key = self .cache_key(ctx)
if key in self .cache:
cached_result, timestamp = self .cache[key]
if time.time() - timestamp < self .ttl:
print ( f "[CACHE HIT] { ctx.tool_name } " )
# Could skip tool call here with custom logic
# (would require deeper integration)
def post_call ( self , ctx ):
"""Store result in cache."""
key = self .cache_key(ctx)
self .cache[key] = (ctx.tool_result, time.time())
print ( f "[CACHE STORE] { ctx.tool_name } " )
cache = ToolCache( ttl = 300 )
agent.on( "tool:pre_call" , cache.pre_call)
agent.on( "tool:post_call" , cache.post_call)
Pattern 5: Audit Logging
Complete audit trail of tool usage:
import json
from datetime import datetime
class ToolAuditor :
def __init__ ( self , log_file : str ):
self .log_file = log_file
def log_call ( self , ctx ):
"""Log tool call details."""
entry = {
"timestamp" : datetime.now().isoformat(),
"agent_id" : ctx.agent_id,
"execution_id" : ctx.execution_id,
"tool" : ctx.tool_name,
"parameters" : ctx.tool_params,
"result" : str (ctx.tool_result)[: 200 ], # Truncate
}
with open ( self .log_file, "a" ) as f:
f.write(json.dumps(entry) + " \n " )
def log_error ( self , ctx ):
"""Log tool errors."""
entry = {
"timestamp" : datetime.now().isoformat(),
"agent_id" : ctx.agent_id,
"tool" : ctx.tool_name,
"error" : str (ctx.error),
"error_type" : type (ctx.error). __name__ ,
}
with open ( self .log_file, "a" ) as f:
f.write(json.dumps(entry) + " \n " )
auditor = ToolAuditor( "tool_audit.log" )
agent.on( "tool:post_call" , auditor.log_call)
agent.on( "tool:on_error" , auditor.log_error)
Pattern 6: Sensitive Data Redaction
Automatically redact sensitive information from tool results:
import re
@agent.hooks.tool.post_call
def redact_sensitive_data ( ctx ):
"""Redact PII from tool results."""
if isinstance (ctx.tool_result, str ):
# Redact email addresses
ctx.tool_result = re.sub(
r ' \b [ A-Za-z0-9._%+- ] + @ [ A-Za-z0-9.- ] + \. [ A-Z|a-z ] {2,} \b ' ,
'[EMAIL_REDACTED]' ,
ctx.tool_result
)
# Redact phone numbers
ctx.tool_result = re.sub(
r ' \b\d {3} [ -. ] ? \d {3} [ -. ] ? \d {4} \b ' ,
'[PHONE_REDACTED]' ,
ctx.tool_result
)
# Redact credit cards
ctx.tool_result = re.sub(
r ' \b\d {4} [ - \s ] ? \d {4} [ - \s ] ? \d {4} [ - \s ] ? \d {4} \b ' ,
'[CARD_REDACTED]' ,
ctx.tool_result
)
Hook Execution Order
Multiple hooks execute in registration order:
@agent.hooks.tool.pre_call
def first_hook ( ctx ):
print ( "1" )
@agent.hooks.tool.pre_call
def second_hook ( ctx ):
print ( "2" )
agent.call( "Use calculator" )
# Output:
# 1
# 2
# [tool executes]
Lifecycle sequence:
pre_exec hook fires (once per execution)
For each tool call:
pre_call hooks fire
Tool executes
post_call hooks fire (or on_error if failed)
post_exec hook fires (once per execution)
Async Hook Support
All tool hooks support both sync and async functions:
@agent.hooks.tool.pre_call
async def async_before_tool ( ctx ):
"""Async pre-call hook."""
# Async database lookup
user_perms = await db.get_user_permissions(ctx.agent.state.get( "user_id" ))
# Validate permissions
if ctx.tool_name not in user_perms:
raise PermissionError ( f "User cannot access { ctx.tool_name } " )
@agent.hooks.tool.post_call
async def async_after_tool ( ctx ):
"""Async post-call hook."""
# Async logging
await analytics.log_tool_usage(
tool = ctx.tool_name,
duration = ctx.metadata.get( "duration" ),
result_size = len ( str (ctx.tool_result))
)
Error Propagation
Errors in hooks propagate to error hooks:
@agent.hooks.tool.pre_call
def validate_tool ( ctx ):
if ctx.tool_name == "dangerous_operation" :
raise ValueError ( "Operation not permitted" )
@agent.hooks.tool.on_error
def handle_error ( ctx ):
"""Catches errors from other hooks too."""
print ( f "Error in { ctx.metadata.get( 'original_hook_type' , 'tool' ) } : { ctx.error } " )
# Error context includes:
# - ctx.error: The exception
# - ctx.metadata['original_hook_type']: Where error originated
# - ctx.metadata['failed_hook']: Which hook failed
Best Practices
Use pre_call for validation, post_call for processing
# Good: Validate inputs before execution
@agent.hooks.tool.pre_call
def validate_inputs ( ctx ):
if ctx.tool_name == "delete_file" :
if not ctx.tool_kwargs.get( "confirm" ):
raise ValueError ( "Must confirm file deletion" )
# Good: Process outputs after execution
@agent.hooks.tool.post_call
def process_outputs ( ctx ):
if ctx.tool_name == "get_data" :
ctx.agent.state.set( "last_data" , ctx.tool_result, source = "hooks" )
Keep hooks fast and focused
# Good: Fast, single-purpose hook
@agent.hooks.tool.pre_call
def log_tool ( ctx ):
logger.info( f "Tool: { ctx.tool_name } " )
# Bad: Slow, multi-purpose hook
@agent.hooks.tool.pre_call
def slow_hook ( ctx ):
time.sleep( 1 ) # Blocks every tool call!
validate_tool(ctx)
log_to_database(ctx)
send_metrics(ctx)
# Too many responsibilities
Use on_error for graceful degradation
@agent.hooks.tool.on_error
def handle_tool_failure ( ctx ):
"""Provide fallback when tool fails."""
if ctx.tool_name == "api_call" :
# Log error but don't re-raise
logger.error( f "API call failed: { ctx.error } " )
# Set fallback in agent state
ctx.agent.state.set( "api_unavailable" , True , source = "hooks" )
# Don't re-raise - allow agent to continue
return # Returning None = don't propagate error
Use intercept for dual-phase operations
@agent.hooks.tool.intercept
def validate_and_transform ( ctx ):
# Check if this is pre-call (no result yet)
if not hasattr (ctx, 'tool_result' ):
# Phase 1: Input validation
if ctx.tool_name == "database_query" :
validate_sql(ctx.tool_kwargs[ "query" ])
else :
# Phase 2: Output transformation
if ctx.tool_name == "get_user_data" :
ctx.tool_result = anonymize_data(ctx.tool_result)
Hook overhead : Less than 5ms per hook on average
Async hooks : Run concurrently when possible
Error hooks : Fire even if other hooks fail
Context creation : Reuses agent reference (no copying)
Integration with Scaffolds
Tool hooks work seamlessly with scaffold operations:
from egregore.core.context_scaffolds.base import BaseContextScaffold, operation
class TaskScaffold ( BaseContextScaffold ):
scaffold_type = "tasks"
@operation
def add_task ( self , task : str ) -> str :
"""Add task - triggers tool hooks."""
self .state.tasks.append(task)
return f "Added: { task } "
# Tool hooks fire for scaffold operations
@agent.hooks.tool.pre_call
def log_scaffold_ops ( ctx ):
if ctx.tool_name.startswith( "tasks_" ):
print ( f "Scaffold operation: { ctx.tool_name } " )
agent.call( "Add task: write documentation" )
# Output: Scaffold operation: tasks_add_task
What’s Next?
Context Hooks Context tree operation hooks
Streaming Hooks Real-time content processing
Subscribe API Dynamic hook registration
Modifying Tool Outputs Comprehensive guide to output modification