Skip to main content

Message Hooks

Message hooks let you intercept, inspect, and modify messages as they flow through the agent - perfect for content filtering, PII redaction, validation, and audit logging.

Overview

Message hooks provide 3 execution points:
HookFiresUse Case
on_userUser message receivedInput validation, PII filtering, preprocessing
on_providerProvider response receivedOutput filtering, verification, postprocessing
on_errorMessage processing errorError handling, logging, retry logic

Hook Registration

Decorator Syntax

from egregore import Agent

agent = Agent(provider="openai:gpt-4")

@agent.hooks.message.on_user
def filter_user_input(ctx):
    """Filter user messages before sending to provider."""
    content = ctx.message_content

    # Remove sensitive patterns
    filtered = redact_pii(content)

    # Return (modified_content, was_modified)
    return filtered, filtered != content

@agent.hooks.message.on_provider
def verify_provider_response(ctx):
    """Verify provider responses before adding to context."""
    content = ctx.message_content

    # Add verification badge
    verified = f"✓ VERIFIED\n\n{content}"

    return verified, True

Subscribe API

# Dynamic registration
sub_id = agent.on("message:on_user", lambda ctx: (redact_pii(ctx.message_content), True))

# Temporary hooks
with agent.subscription("message:on_provider", add_verification):
    agent.call("Process with verification")

Message Hook Context

Every message hook receives a MessageExecContext with:
@dataclass
class MessageExecContext:
    # Identity
    agent_id: str                  # Agent instance ID
    execution_id: str              # Unique execution ID
    agent: Agent                   # Full agent reference

    # Message data
    message_content: str           # Message content (editable)
    message_type: str              # "user", "provider", or "system"

    # Context access
    context: Context               # Full context tree

    # Error information (on_error only)
    error: Exception               # Exception that occurred

    # Metadata
    metadata: dict                 # Additional message data

Return Value Format

Message hooks that modify content must return a tuple:
@agent.hooks.message.on_user
def modify_message(ctx):
    modified_content = process(ctx.message_content)
    was_modified = modified_content != ctx.message_content

    # Return (content, was_modified)
    return modified_content, was_modified
Important:
  • Return (modified_content, True) if content was changed
  • Return (original_content, False) if no changes made
  • Returning only content without boolean will raise error
  • Returning None passes message through unchanged

Execution Flow

On User Message Hook

Fires when user message is received - before sending to provider:
@agent.hooks.message.on_user
def preprocess_user_input(ctx):
    """Preprocess user messages before provider sees them."""
    content = ctx.message_content

    # Validate input length
    if len(content) > 10000:
        raise ValueError("Message too long (max 10,000 characters)")

    # Normalize whitespace
    content = " ".join(content.split())

    # Remove markdown artifacts
    content = content.replace("```", "")

    # Expand abbreviations
    content = content.replace("w/", "with")
    content = content.replace("b/c", "because")

    return content, True

On Provider Message Hook

Fires when provider response is received - before adding to context:
@agent.hooks.message.on_provider
def postprocess_provider_response(ctx):
    """Postprocess provider responses before context insertion."""
    content = ctx.message_content

    # Add timestamp
    from datetime import datetime
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    content = f"[{timestamp}]\n\n{content}"

    # Verify no code execution attempts
    if "exec(" in content or "eval(" in content:
        content = content.replace("exec(", "# BLOCKED: exec(")
        content = content.replace("eval(", "# BLOCKED: eval(")

    # Add verification badge
    content = f"✓ Verified Safe\n\n{content}"

    return content, True

On Error Hook

Fires when message processing fails:
@agent.hooks.message.on_error
def handle_message_error(ctx):
    """Handle errors during message processing."""
    error = ctx.error
    message_type = ctx.message_type

    # Log error
    logger.error(f"Message error ({message_type}): {error}")

    # Store error for debugging
    ctx.agent.state.set(
        "last_message_error",
        {
            "type": message_type,
            "error": str(error),
            "content_preview": ctx.message_content[:200],
        },
        source="message_hooks"
    )

    # Track error count
    error_count = ctx.agent.state.get("message_error_count", 0)
    ctx.agent.state.set("message_error_count", error_count + 1, source="hooks")

    # Don't modify content on error
    return ctx.message_content, False

Usage Patterns

Pattern 1: PII Redaction

Remove sensitive information from user messages:
import re

@agent.hooks.message.on_user
def redact_pii(ctx):
    """Redact personally identifiable information."""
    content = ctx.message_content
    original = content

    # Redact email addresses
    content = re.sub(
        r'\b[\w.+-]+@[\w-]+\.[\w.-]+\b',
        '[EMAIL_REDACTED]',
        content
    )

    # Redact phone numbers (US format)
    content = re.sub(
        r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
        '[PHONE_REDACTED]',
        content
    )

    # Redact SSN
    content = re.sub(
        r'\b\d{3}-\d{2}-\d{4}\b',
        '[SSN_REDACTED]',
        content
    )

    # Redact credit card numbers
    content = re.sub(
        r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
        '[CC_REDACTED]',
        content
    )

    was_modified = content != original

    if was_modified:
        print("⚠️  PII detected and redacted from user message")

    return content, was_modified

Pattern 2: Content Validation

Enforce content policies before processing:
class ContentValidator:
    def __init__(self):
        self.blocked_patterns = [
            r'(?i)execute\s+code',
            r'(?i)run\s+command',
            r'(?i)delete\s+all',
        ]

    def validate(self, ctx):
        """Validate user message content."""
        content = ctx.message_content

        # Check for blocked patterns
        for pattern in self.blocked_patterns:
            if re.search(pattern, content):
                raise ValueError(
                    f"Message contains blocked pattern: {pattern}\n"
                    f"Please rephrase your request."
                )

        # Check message length
        if len(content) < 3:
            raise ValueError("Message too short (minimum 3 characters)")

        if len(content) > 50000:
            raise ValueError("Message too long (maximum 50,000 characters)")

        # No modifications
        return content, False

validator = ContentValidator()
agent.on("message:on_user", validator.validate)

Pattern 3: Response Formatting

Standardize provider response formatting:
@agent.hooks.message.on_provider
def format_response(ctx):
    """Apply consistent formatting to provider responses."""
    content = ctx.message_content

    # Add markdown section headers for long responses
    if len(content) > 500:
        # Split into paragraphs
        paragraphs = content.split('\n\n')

        # Add headers to longer paragraphs
        formatted_paragraphs = []
        for i, para in enumerate(paragraphs):
            if len(para) > 200 and not para.startswith('#'):
                formatted_paragraphs.append(f"## Section {i+1}\n\n{para}")
            else:
                formatted_paragraphs.append(para)

        content = '\n\n'.join(formatted_paragraphs)

    # Ensure code blocks have language tags
    content = re.sub(
        r'```\n',
        '```plaintext\n',
        content
    )

    return content, True

Pattern 4: Audit Logging

Log all message traffic for compliance:
import json
from datetime import datetime

class MessageAuditor:
    def __init__(self, log_file: str = "message_audit.log"):
        self.log_file = log_file

    def audit_user(self, ctx):
        """Audit user messages."""
        self._log_message("USER", ctx.message_content)
        return ctx.message_content, False

    def audit_provider(self, ctx):
        """Audit provider responses."""
        self._log_message("PROVIDER", ctx.message_content)
        return ctx.message_content, False

    def _log_message(self, msg_type: str, content: str):
        """Write audit entry."""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "type": msg_type,
            "content": content,
            "content_length": len(content),
        }

        with open(self.log_file, "a") as f:
            f.write(json.dumps(entry) + "\n")

auditor = MessageAuditor()
agent.on("message:on_user", auditor.audit_user)
agent.on("message:on_provider", auditor.audit_provider)

Pattern 5: Translation

Translate messages between languages:
class MessageTranslator:
    def __init__(self, target_language: str = "en"):
        self.target_language = target_language

    def translate_user_input(self, ctx):
        """Translate user input to target language."""
        content = ctx.message_content

        # Detect language
        detected_lang = self._detect_language(content)

        if detected_lang != self.target_language:
            # Translate to target language
            translated = self._translate(content, detected_lang, self.target_language)
            print(f"Translated from {detected_lang} to {self.target_language}")
            return translated, True

        return content, False

    def translate_provider_response(self, ctx):
        """Translate provider response to user's language."""
        # Get user's original language from state
        user_lang = ctx.agent.state.get("user_language", "en")

        if user_lang != self.target_language:
            content = ctx.message_content
            translated = self._translate(content, self.target_language, user_lang)
            return translated, True

        return ctx.message_content, False

    def _detect_language(self, text: str) -> str:
        """Detect text language (simplified)."""
        # In practice, use langdetect or similar
        return "en"

    def _translate(self, text: str, from_lang: str, to_lang: str) -> str:
        """Translate text (simplified)."""
        # In practice, use translation API
        return text

translator = MessageTranslator(target_language="en")
agent.on("message:on_user", translator.translate_user_input)
agent.on("message:on_provider", translator.translate_provider_response)

Pattern 6: Rate Limiting

Implement message rate limiting:
from collections import deque
from datetime import datetime, timedelta

class MessageRateLimiter:
    def __init__(self, max_messages: int = 10, window: timedelta = timedelta(minutes=1)):
        self.max_messages = max_messages
        self.window = window
        self.message_times = deque()

    def check_rate_limit(self, ctx):
        """Enforce rate limiting on user messages."""
        now = datetime.now()

        # Remove messages outside window
        while self.message_times and self.message_times[0] < now - self.window:
            self.message_times.popleft()

        # Check limit
        if len(self.message_times) >= self.max_messages:
            window_seconds = int(self.window.total_seconds())
            raise RuntimeError(
                f"Rate limit exceeded: {self.max_messages} messages per {window_seconds}s. "
                f"Please wait before sending more messages."
            )

        # Record this message
        self.message_times.append(now)

        # No content modification
        return ctx.message_content, False

limiter = MessageRateLimiter(max_messages=10, window=timedelta(minutes=1))
agent.on("message:on_user", limiter.check_rate_limit)

Message Types

Message hooks handle different message types:

User Messages

@agent.hooks.message.on_user
def process_user_message(ctx):
    # ctx.message_type == "user"
    # ctx.message_content contains user input
    print(f"User: {ctx.message_content}")
    return ctx.message_content, False

Provider Messages

@agent.hooks.message.on_provider
def process_provider_message(ctx):
    # ctx.message_type == "provider"
    # ctx.message_content contains provider response
    print(f"Provider: {ctx.message_content}")
    return ctx.message_content, False

System Messages

@agent.hooks.message.on_user
def process_system_message(ctx):
    # ctx.message_type == "system"
    # ctx.message_content contains system instructions
    if ctx.message_type == "system":
        print(f"System: {ctx.message_content}")
    return ctx.message_content, False

Integration with ProviderThread

Message hooks fire before messages enter the ProviderThread:
# Message flow:
# 1. User calls agent.call("message")
# 2. on_user hook fires → can modify user message
# 3. Message sent to provider
# 4. Provider responds
# 5. on_provider hook fires → can modify provider response
# 6. Response added to ProviderThread (d0,0,0)
# 7. Response added to context tree
Example with ProviderThread access:
@agent.hooks.message.on_provider
def track_provider_messages(ctx):
    """Track provider message count."""
    # Access thread through context
    thread = ctx.agent.thread

    # Count provider messages
    provider_count = sum(
        1 for msg in thread.current.all_messages
        if msg.role == "assistant"
    )

    # Store count in state
    ctx.agent.state.set("provider_message_count", provider_count, source="hooks")

    # No modification
    return ctx.message_content, False

Best Practices

# Good: Return (content, was_modified)
@agent.hooks.message.on_user
def modify_message(ctx):
    modified = process(ctx.message_content)
    return modified, True

# Bad: Return only content
@agent.hooks.message.on_user
def modify_message(ctx):
    return process(ctx.message_content)  # Missing boolean!
# Good: Validate input before processing
@agent.hooks.message.on_user
def validate_input(ctx):
    if len(ctx.message_content) > 10000:
        raise ValueError("Message too long")
    return ctx.message_content, False

# Good: Verify output before delivery
@agent.hooks.message.on_provider
def verify_output(ctx):
    if contains_sensitive_data(ctx.message_content):
        return redact(ctx.message_content), True
    return ctx.message_content, False
# Good: Track when modifications occur
@agent.hooks.message.on_user
def filter_with_logging(ctx):
    original = ctx.message_content
    filtered = apply_filters(original)

    was_modified = filtered != original

    if was_modified:
        logger.info(f"Modified user message: {len(original)}{len(filtered)} chars")

    return filtered, was_modified
@agent.hooks.message.on_user
def detect_language(ctx):
    """Detect and store user's language."""
    lang = detect_language(ctx.message_content)
    ctx.agent.state.set("user_language", lang, source="message_hooks")
    return ctx.message_content, False

@agent.hooks.message.on_provider
def translate_response(ctx):
    """Translate response to user's language."""
    user_lang = ctx.agent.state.get("user_language", "en")
    if user_lang != "en":
        translated = translate(ctx.message_content, to_lang=user_lang)
        return translated, True
    return ctx.message_content, False

Performance Considerations

  • Hook overhead: Less than 2ms per message hook
  • Modification impact: Only modified messages incur serialization cost
  • Async support: Message hooks can be async functions
  • Error handling: Errors in hooks propagate and cancel message processing

Error Handling

Errors in message hooks stop message processing:
@agent.hooks.message.on_user
def strict_validation(ctx):
    """Strict validation that can halt processing."""
    try:
        validate_message(ctx.message_content)
    except ValidationError as e:
        # Log and re-raise to stop processing
        logger.error(f"Message validation failed: {e}")
        raise  # Stops message from being processed

    return ctx.message_content, False

@agent.hooks.message.on_error
def handle_validation_errors(ctx):
    """Handle errors gracefully."""
    error = ctx.error

    if isinstance(error, ValidationError):
        # Log validation error
        logger.warning(f"Validation error: {error}")

        # Store for user feedback
        ctx.agent.state.set("last_validation_error", str(error), source="hooks")

    # Don't modify content
    return ctx.message_content, False

What’s Next?