Skip to main content

Tool Hooks

Tool hooks let you observe and modify tool execution at every stage - from initial detection through final results.

Overview

Tool hooks provide 6 execution points:
HookFiresUse Case
pre_execBefore tool loop startsSetup, validation, rate limiting
post_execAfter tool loop completesCleanup, aggregation, reporting
pre_callBefore each individual toolPer-tool logging, parameter injection
post_callAfter each individual toolResult processing, caching
on_errorTool execution failsError handling, retries, alerts
interceptDual-phase (validation + modification)Input validation, output transformation

Hook Registration

Decorator Syntax

from egregore import Agent

agent = Agent(provider="openai:gpt-4")

@agent.hooks.tool.pre_call
def before_tool(ctx):
    print(f"[START] {ctx.tool_name}")
    print(f"  Args: {ctx.tool_args}")
    print(f"  Kwargs: {ctx.tool_kwargs}")

@agent.hooks.tool.post_call
def after_tool(ctx):
    print(f"[DONE] {ctx.tool_name}")
    print(f"  Result: {ctx.tool_result}")

@agent.hooks.tool.on_error
def tool_error(ctx):
    print(f"[ERROR] {ctx.tool_name}: {ctx.error}")

Subscribe API

# Dynamic registration
sub_id = agent.on("tool:pre_call", lambda ctx: print(f"Tool: {ctx.tool_name}"))

# Temporary hooks
with agent.subscription({
    "tool:pre_call": on_start,
    "tool:post_call": on_end,
    "tool:on_error": on_error,
}):
    agent.call("Use calculator")

Tool Hook Context

Every tool hook receives a ToolExecContext with:
@dataclass
class ToolExecContext:
    # Identity
    agent_id: str                  # Agent instance ID
    execution_id: str              # Unique execution ID
    agent: Agent                   # Full agent reference

    # Tool information
    tool_name: str                 # Name of tool being called
    tool_args: tuple               # Positional arguments
    tool_kwargs: dict              # Keyword arguments
    tool_params: dict              # Combined parameters

    # Results (post_call only)
    tool_result: Any               # Tool return value

    # Error handling (on_error only)
    error: Exception               # Exception raised

    # Full context access
    context: Context               # Complete context tree

    # Metadata
    metadata: dict                 # Additional hook data

Execution Flow

Pre-Execution Hook

Fires before tool loop starts - useful for setup:
@agent.hooks.tool.pre_exec
def setup_tool_execution(ctx):
    """Setup before tool loop."""
    print(f"Starting tool execution: {ctx.execution_id}")

    # Access agent state
    ctx.agent.state.set("tool_start_time", time.time(), source="hooks")

    # Log to monitoring
    monitor.log_event("tool_execution_start", {
        "agent_id": ctx.agent_id,
        "execution_id": ctx.execution_id,
    })

Post-Execution Hook

Fires after tool loop completes - useful for cleanup:
@agent.hooks.tool.post_exec
def teardown_tool_execution(ctx):
    """Cleanup after tool loop."""
    start_time = ctx.agent.state.get("tool_start_time")
    duration = time.time() - start_time

    print(f"Tool execution complete: {duration:.2f}s")

    # Report metrics
    monitor.log_metric("tool_execution_duration", duration)

Pre-Call Hook

Fires before each individual tool call:
@agent.hooks.tool.pre_call
def before_tool_call(ctx):
    """Log each tool call."""
    logger.info(f"Calling tool: {ctx.tool_name}")
    logger.debug(f"  Parameters: {ctx.tool_params}")

    # Validate parameters
    if ctx.tool_name == "database_query":
        if "query" not in ctx.tool_kwargs:
            raise ValueError("Missing required 'query' parameter")

        # Check for dangerous queries
        query = ctx.tool_kwargs["query"].lower()
        if "drop table" in query or "delete from" in query:
            raise ValueError("Destructive queries not allowed")

Post-Call Hook

Fires after each individual tool call:
@agent.hooks.tool.post_call
def after_tool_call(ctx):
    """Process tool results."""
    print(f"{ctx.tool_name} returned: {ctx.tool_result}")

    # Cache result
    cache_key = f"{ctx.tool_name}:{hash(str(ctx.tool_params))}"
    cache.set(cache_key, ctx.tool_result, ttl=300)

    # Track usage
    ctx.agent.state.set(
        f"tool_{ctx.tool_name}_calls",
        ctx.agent.state.get(f"tool_{ctx.tool_name}_calls", 0) + 1,
        source="hooks"
    )

Error Hook

Fires when tool execution fails:
@agent.hooks.tool.on_error
def handle_tool_error(ctx):
    """Handle tool errors gracefully."""
    print(f"Tool {ctx.tool_name} failed: {ctx.error}")

    # Log to error tracking
    error_tracker.log_exception(ctx.error, context={
        "tool": ctx.tool_name,
        "params": ctx.tool_params,
        "execution_id": ctx.execution_id,
    })

    # Send alert for critical tools
    if ctx.tool_name in ["database_query", "api_call"]:
        alerts.send(f"Critical tool {ctx.tool_name} failed: {ctx.error}")

    # Could implement retry logic here
    if isinstance(ctx.error, TimeoutError):
        print("  (Timeout - may retry)")

Intercept Hook

Dual-phase hook for validation and output modification:
@agent.hooks.tool.intercept
def intercept_tool_call(ctx):
    """Validate inputs and modify outputs."""
    # Phase 1: Input validation (before call)
    if not hasattr(ctx, 'tool_result'):
        print(f"[VALIDATE] {ctx.tool_name}")

        # Reject calls to sensitive tools
        if ctx.tool_name == "delete_file":
            if not ctx.agent.state.get("delete_allowed"):
                raise PermissionError("File deletion not permitted")

    # Phase 2: Output modification (after call)
    else:
        print(f"[TRANSFORM] {ctx.tool_name}")

        # Transform result
        if ctx.tool_name == "get_user_data":
            # Redact sensitive fields
            if isinstance(ctx.tool_result, dict):
                ctx.tool_result = redact_pii(ctx.tool_result)

Usage Patterns

Pattern 1: Tool Usage Tracking

Track which tools are used and how often:
class ToolTracker:
    def __init__(self):
        self.usage = {}

    def track(self, ctx):
        tool = ctx.tool_name
        self.usage[tool] = self.usage.get(tool, 0) + 1

    def report(self):
        print("Tool Usage:")
        for tool, count in sorted(self.usage.items()):
            print(f"  {tool}: {count} calls")

tracker = ToolTracker()
agent.on("tool:post_call", tracker.track)

agent.call("Use calculator and file reader multiple times")
tracker.report()
# Tool Usage:
#   calculator: 3 calls
#   file_reader: 2 calls

Pattern 2: Rate Limiting

Prevent too many tool calls in a time window:
from collections import deque
from datetime import datetime, timedelta

class ToolRateLimiter:
    def __init__(self, max_calls: int = 10, window: timedelta = timedelta(minutes=1)):
        self.max_calls = max_calls
        self.window = window
        self.calls = deque()

    def check(self, ctx):
        now = datetime.now()

        # Remove old calls
        while self.calls and self.calls[0] < now - self.window:
            self.calls.popleft()

        # Check limit
        if len(self.calls) >= self.max_calls:
            raise RuntimeError(
                f"Rate limit exceeded: {self.max_calls} calls per {self.window}"
            )

        self.calls.append(now)

limiter = ToolRateLimiter(max_calls=5, window=timedelta(seconds=10))
agent.on("tool:pre_call", limiter.check)

Pattern 3: Parameter Injection

Automatically add context to tool calls:
@agent.hooks.tool.pre_call
def inject_user_context(ctx):
    """Inject user info into tool calls."""
    # Add user_id to all database queries
    if ctx.tool_name == "database_query":
        if "user_id" not in ctx.tool_kwargs:
            ctx.tool_kwargs["user_id"] = ctx.agent.state.get("current_user_id")

    # Add API keys to external calls
    if ctx.tool_name == "api_call":
        if "api_key" not in ctx.tool_kwargs:
            ctx.tool_kwargs["api_key"] = os.getenv("API_KEY")

Pattern 4: Result Caching

Cache tool results to avoid redundant calls:
class ToolCache:
    def __init__(self, ttl: int = 300):
        self.cache = {}
        self.ttl = ttl

    def cache_key(self, ctx):
        return f"{ctx.tool_name}:{hash(str(ctx.tool_params))}"

    def pre_call(self, ctx):
        """Check cache before tool call."""
        key = self.cache_key(ctx)
        if key in self.cache:
            cached_result, timestamp = self.cache[key]
            if time.time() - timestamp < self.ttl:
                print(f"[CACHE HIT] {ctx.tool_name}")
                # Could skip tool call here with custom logic
                # (would require deeper integration)

    def post_call(self, ctx):
        """Store result in cache."""
        key = self.cache_key(ctx)
        self.cache[key] = (ctx.tool_result, time.time())
        print(f"[CACHE STORE] {ctx.tool_name}")

cache = ToolCache(ttl=300)
agent.on("tool:pre_call", cache.pre_call)
agent.on("tool:post_call", cache.post_call)

Pattern 5: Audit Logging

Complete audit trail of tool usage:
import json
from datetime import datetime

class ToolAuditor:
    def __init__(self, log_file: str):
        self.log_file = log_file

    def log_call(self, ctx):
        """Log tool call details."""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "agent_id": ctx.agent_id,
            "execution_id": ctx.execution_id,
            "tool": ctx.tool_name,
            "parameters": ctx.tool_params,
            "result": str(ctx.tool_result)[:200],  # Truncate
        }

        with open(self.log_file, "a") as f:
            f.write(json.dumps(entry) + "\n")

    def log_error(self, ctx):
        """Log tool errors."""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "agent_id": ctx.agent_id,
            "tool": ctx.tool_name,
            "error": str(ctx.error),
            "error_type": type(ctx.error).__name__,
        }

        with open(self.log_file, "a") as f:
            f.write(json.dumps(entry) + "\n")

auditor = ToolAuditor("tool_audit.log")
agent.on("tool:post_call", auditor.log_call)
agent.on("tool:on_error", auditor.log_error)

Pattern 6: Sensitive Data Redaction

Automatically redact sensitive information from tool results:
import re

@agent.hooks.tool.post_call
def redact_sensitive_data(ctx):
    """Redact PII from tool results."""
    if isinstance(ctx.tool_result, str):
        # Redact email addresses
        ctx.tool_result = re.sub(
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            '[EMAIL_REDACTED]',
            ctx.tool_result
        )

        # Redact phone numbers
        ctx.tool_result = re.sub(
            r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            '[PHONE_REDACTED]',
            ctx.tool_result
        )

        # Redact credit cards
        ctx.tool_result = re.sub(
            r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
            '[CARD_REDACTED]',
            ctx.tool_result
        )

Hook Execution Order

Multiple hooks execute in registration order:
@agent.hooks.tool.pre_call
def first_hook(ctx):
    print("1")

@agent.hooks.tool.pre_call
def second_hook(ctx):
    print("2")

agent.call("Use calculator")
# Output:
# 1
# 2
# [tool executes]
Lifecycle sequence:
  1. pre_exec hook fires (once per execution)
  2. For each tool call:
    • pre_call hooks fire
    • Tool executes
    • post_call hooks fire (or on_error if failed)
  3. post_exec hook fires (once per execution)

Async Hook Support

All tool hooks support both sync and async functions:
@agent.hooks.tool.pre_call
async def async_before_tool(ctx):
    """Async pre-call hook."""
    # Async database lookup
    user_perms = await db.get_user_permissions(ctx.agent.state.get("user_id"))

    # Validate permissions
    if ctx.tool_name not in user_perms:
        raise PermissionError(f"User cannot access {ctx.tool_name}")

@agent.hooks.tool.post_call
async def async_after_tool(ctx):
    """Async post-call hook."""
    # Async logging
    await analytics.log_tool_usage(
        tool=ctx.tool_name,
        duration=ctx.metadata.get("duration"),
        result_size=len(str(ctx.tool_result))
    )

Error Propagation

Errors in hooks propagate to error hooks:
@agent.hooks.tool.pre_call
def validate_tool(ctx):
    if ctx.tool_name == "dangerous_operation":
        raise ValueError("Operation not permitted")

@agent.hooks.tool.on_error
def handle_error(ctx):
    """Catches errors from other hooks too."""
    print(f"Error in {ctx.metadata.get('original_hook_type', 'tool')}: {ctx.error}")

    # Error context includes:
    # - ctx.error: The exception
    # - ctx.metadata['original_hook_type']: Where error originated
    # - ctx.metadata['failed_hook']: Which hook failed

Best Practices

# Good: Validate inputs before execution
@agent.hooks.tool.pre_call
def validate_inputs(ctx):
    if ctx.tool_name == "delete_file":
        if not ctx.tool_kwargs.get("confirm"):
            raise ValueError("Must confirm file deletion")

# Good: Process outputs after execution
@agent.hooks.tool.post_call
def process_outputs(ctx):
    if ctx.tool_name == "get_data":
        ctx.agent.state.set("last_data", ctx.tool_result, source="hooks")
# Good: Fast, single-purpose hook
@agent.hooks.tool.pre_call
def log_tool(ctx):
    logger.info(f"Tool: {ctx.tool_name}")

# Bad: Slow, multi-purpose hook
@agent.hooks.tool.pre_call
def slow_hook(ctx):
    time.sleep(1)  # Blocks every tool call!
    validate_tool(ctx)
    log_to_database(ctx)
    send_metrics(ctx)
    # Too many responsibilities
@agent.hooks.tool.on_error
def handle_tool_failure(ctx):
    """Provide fallback when tool fails."""
    if ctx.tool_name == "api_call":
        # Log error but don't re-raise
        logger.error(f"API call failed: {ctx.error}")

        # Set fallback in agent state
        ctx.agent.state.set("api_unavailable", True, source="hooks")

        # Don't re-raise - allow agent to continue
        return  # Returning None = don't propagate error
@agent.hooks.tool.intercept
def validate_and_transform(ctx):
    # Check if this is pre-call (no result yet)
    if not hasattr(ctx, 'tool_result'):
        # Phase 1: Input validation
        if ctx.tool_name == "database_query":
            validate_sql(ctx.tool_kwargs["query"])
    else:
        # Phase 2: Output transformation
        if ctx.tool_name == "get_user_data":
            ctx.tool_result = anonymize_data(ctx.tool_result)

Performance Considerations

  • Hook overhead: Less than 5ms per hook on average
  • Async hooks: Run concurrently when possible
  • Error hooks: Fire even if other hooks fail
  • Context creation: Reuses agent reference (no copying)

Integration with Scaffolds

Tool hooks work seamlessly with scaffold operations:
from egregore.core.context_scaffolds.base import BaseContextScaffold, operation

class TaskScaffold(BaseContextScaffold):
    scaffold_type = "tasks"

    @operation
    def add_task(self, task: str) -> str:
        """Add task - triggers tool hooks."""
        self.state.tasks.append(task)
        return f"Added: {task}"

# Tool hooks fire for scaffold operations
@agent.hooks.tool.pre_call
def log_scaffold_ops(ctx):
    if ctx.tool_name.startswith("tasks_"):
        print(f"Scaffold operation: {ctx.tool_name}")

agent.call("Add task: write documentation")
# Output: Scaffold operation: tasks_add_task

What’s Next?