> ## Documentation Index
> Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Message Hooks

> Intercept and modify messages before and after provider processing

# Message Hooks

**Message hooks** let you intercept, inspect, and modify messages as they flow through the agent - perfect for content filtering, PII redaction, validation, and audit logging.

## Overview

Message hooks provide 3 execution points:

| Hook          | Fires                      | Use Case                                       |
| ------------- | -------------------------- | ---------------------------------------------- |
| `on_user`     | User message received      | Input validation, PII filtering, preprocessing |
| `on_provider` | Provider response received | Output filtering, verification, postprocessing |
| `on_error`    | Message processing error   | Error handling, logging, retry logic           |

## Hook Registration

### Decorator Syntax

```python theme={null}
from egregore import Agent

agent = Agent(provider="openai:gpt-4")

@agent.hooks.message.on_user
def filter_user_input(ctx):
    """Filter user messages before sending to provider."""
    content = ctx.message_content

    # Remove sensitive patterns
    filtered = redact_pii(content)

    # Return (modified_content, was_modified)
    return filtered, filtered != content

@agent.hooks.message.on_provider
def verify_provider_response(ctx):
    """Verify provider responses before adding to context."""
    content = ctx.message_content

    # Add verification badge
    verified = f"✓ VERIFIED\n\n{content}"

    return verified, True
```

### Subscribe API

```python theme={null}
# Dynamic registration
sub_id = agent.on("message:on_user", lambda ctx: (redact_pii(ctx.message_content), True))

# Temporary hooks
with agent.subscription("message:on_provider", add_verification):
    agent.call("Process with verification")
```

## Message Hook Context

Every message hook receives a `MessageExecContext` with:

```python theme={null}
@dataclass
class MessageExecContext:
    # Identity
    agent_id: str                  # Agent instance ID
    execution_id: str              # Unique execution ID
    agent: Agent                   # Full agent reference

    # Message data
    message_content: str           # Message content (editable)
    message_type: str              # "user", "provider", or "system"

    # Context access
    context: Context               # Full context tree

    # Error information (on_error only)
    error: Exception               # Exception that occurred

    # Metadata
    metadata: dict                 # Additional message data
```

### Return Value Format

Message hooks that modify content must return a tuple:

```python theme={null}
@agent.hooks.message.on_user
def modify_message(ctx):
    modified_content = process(ctx.message_content)
    was_modified = modified_content != ctx.message_content

    # Return (content, was_modified)
    return modified_content, was_modified
```

**Important:**

* Return `(modified_content, True)` if content was changed
* Return `(original_content, False)` if no changes made
* Returning only content without boolean will raise error
* Returning `None` passes message through unchanged

## Execution Flow

### On User Message Hook

Fires when **user message is received** - before sending to provider:

````python theme={null}
@agent.hooks.message.on_user
def preprocess_user_input(ctx):
    """Preprocess user messages before provider sees them."""
    content = ctx.message_content

    # Validate input length
    if len(content) > 10000:
        raise ValueError("Message too long (max 10,000 characters)")

    # Normalize whitespace
    content = " ".join(content.split())

    # Remove markdown artifacts
    content = content.replace("```", "")

    # Expand abbreviations
    content = content.replace("w/", "with")
    content = content.replace("b/c", "because")

    return content, True
````

### On Provider Message Hook

Fires when **provider response is received** - before adding to context:

```python theme={null}
@agent.hooks.message.on_provider
def postprocess_provider_response(ctx):
    """Postprocess provider responses before context insertion."""
    content = ctx.message_content

    # Add timestamp
    from datetime import datetime
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    content = f"[{timestamp}]\n\n{content}"

    # Verify no code execution attempts
    if "exec(" in content or "eval(" in content:
        content = content.replace("exec(", "# BLOCKED: exec(")
        content = content.replace("eval(", "# BLOCKED: eval(")

    # Add verification badge
    content = f"✓ Verified Safe\n\n{content}"

    return content, True
```

### On Error Hook

Fires when **message processing fails**:

```python theme={null}
@agent.hooks.message.on_error
def handle_message_error(ctx):
    """Handle errors during message processing."""
    error = ctx.error
    message_type = ctx.message_type

    # Log error
    logger.error(f"Message error ({message_type}): {error}")

    # Store error for debugging
    ctx.agent.state.set(
        "last_message_error",
        {
            "type": message_type,
            "error": str(error),
            "content_preview": ctx.message_content[:200],
        },
        source="message_hooks"
    )

    # Track error count
    error_count = ctx.agent.state.get("message_error_count", 0)
    ctx.agent.state.set("message_error_count", error_count + 1, source="hooks")

    # Don't modify content on error
    return ctx.message_content, False
```

## Usage Patterns

### Pattern 1: PII Redaction

Remove sensitive information from user messages:

```python theme={null}
import re

@agent.hooks.message.on_user
def redact_pii(ctx):
    """Redact personally identifiable information."""
    content = ctx.message_content
    original = content

    # Redact email addresses
    content = re.sub(
        r'\b[\w.+-]+@[\w-]+\.[\w.-]+\b',
        '[EMAIL_REDACTED]',
        content
    )

    # Redact phone numbers (US format)
    content = re.sub(
        r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
        '[PHONE_REDACTED]',
        content
    )

    # Redact SSN
    content = re.sub(
        r'\b\d{3}-\d{2}-\d{4}\b',
        '[SSN_REDACTED]',
        content
    )

    # Redact credit card numbers
    content = re.sub(
        r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
        '[CC_REDACTED]',
        content
    )

    was_modified = content != original

    if was_modified:
        print("⚠️  PII detected and redacted from user message")

    return content, was_modified
```

### Pattern 2: Content Validation

Enforce content policies before processing:

```python theme={null}
class ContentValidator:
    def __init__(self):
        self.blocked_patterns = [
            r'(?i)execute\s+code',
            r'(?i)run\s+command',
            r'(?i)delete\s+all',
        ]

    def validate(self, ctx):
        """Validate user message content."""
        content = ctx.message_content

        # Check for blocked patterns
        for pattern in self.blocked_patterns:
            if re.search(pattern, content):
                raise ValueError(
                    f"Message contains blocked pattern: {pattern}\n"
                    f"Please rephrase your request."
                )

        # Check message length
        if len(content) < 3:
            raise ValueError("Message too short (minimum 3 characters)")

        if len(content) > 50000:
            raise ValueError("Message too long (maximum 50,000 characters)")

        # No modifications
        return content, False

validator = ContentValidator()
agent.on("message:on_user", validator.validate)
```

### Pattern 3: Response Formatting

Standardize provider response formatting:

````python theme={null}
@agent.hooks.message.on_provider
def format_response(ctx):
    """Apply consistent formatting to provider responses."""
    content = ctx.message_content

    # Add markdown section headers for long responses
    if len(content) > 500:
        # Split into paragraphs
        paragraphs = content.split('\n\n')

        # Add headers to longer paragraphs
        formatted_paragraphs = []
        for i, para in enumerate(paragraphs):
            if len(para) > 200 and not para.startswith('#'):
                formatted_paragraphs.append(f"## Section {i+1}\n\n{para}")
            else:
                formatted_paragraphs.append(para)

        content = '\n\n'.join(formatted_paragraphs)

    # Ensure code blocks have language tags
    content = re.sub(
        r'```\n',
        '```plaintext\n',
        content
    )

    return content, True
````

### Pattern 4: Audit Logging

Log all message traffic for compliance:

```python theme={null}
import json
from datetime import datetime

class MessageAuditor:
    def __init__(self, log_file: str = "message_audit.log"):
        self.log_file = log_file

    def audit_user(self, ctx):
        """Audit user messages."""
        self._log_message("USER", ctx.message_content)
        return ctx.message_content, False

    def audit_provider(self, ctx):
        """Audit provider responses."""
        self._log_message("PROVIDER", ctx.message_content)
        return ctx.message_content, False

    def _log_message(self, msg_type: str, content: str):
        """Write audit entry."""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "type": msg_type,
            "content": content,
            "content_length": len(content),
        }

        with open(self.log_file, "a") as f:
            f.write(json.dumps(entry) + "\n")

auditor = MessageAuditor()
agent.on("message:on_user", auditor.audit_user)
agent.on("message:on_provider", auditor.audit_provider)
```

### Pattern 5: Translation

Translate messages between languages:

```python theme={null}
class MessageTranslator:
    def __init__(self, target_language: str = "en"):
        self.target_language = target_language

    def translate_user_input(self, ctx):
        """Translate user input to target language."""
        content = ctx.message_content

        # Detect language
        detected_lang = self._detect_language(content)

        if detected_lang != self.target_language:
            # Translate to target language
            translated = self._translate(content, detected_lang, self.target_language)
            print(f"Translated from {detected_lang} to {self.target_language}")
            return translated, True

        return content, False

    def translate_provider_response(self, ctx):
        """Translate provider response to user's language."""
        # Get user's original language from state
        user_lang = ctx.agent.state.get("user_language", "en")

        if user_lang != self.target_language:
            content = ctx.message_content
            translated = self._translate(content, self.target_language, user_lang)
            return translated, True

        return ctx.message_content, False

    def _detect_language(self, text: str) -> str:
        """Detect text language (simplified)."""
        # In practice, use langdetect or similar
        return "en"

    def _translate(self, text: str, from_lang: str, to_lang: str) -> str:
        """Translate text (simplified)."""
        # In practice, use translation API
        return text

translator = MessageTranslator(target_language="en")
agent.on("message:on_user", translator.translate_user_input)
agent.on("message:on_provider", translator.translate_provider_response)
```

### Pattern 6: Rate Limiting

Implement message rate limiting:

```python theme={null}
from collections import deque
from datetime import datetime, timedelta

class MessageRateLimiter:
    def __init__(self, max_messages: int = 10, window: timedelta = timedelta(minutes=1)):
        self.max_messages = max_messages
        self.window = window
        self.message_times = deque()

    def check_rate_limit(self, ctx):
        """Enforce rate limiting on user messages."""
        now = datetime.now()

        # Remove messages outside window
        while self.message_times and self.message_times[0] < now - self.window:
            self.message_times.popleft()

        # Check limit
        if len(self.message_times) >= self.max_messages:
            window_seconds = int(self.window.total_seconds())
            raise RuntimeError(
                f"Rate limit exceeded: {self.max_messages} messages per {window_seconds}s. "
                f"Please wait before sending more messages."
            )

        # Record this message
        self.message_times.append(now)

        # No content modification
        return ctx.message_content, False

limiter = MessageRateLimiter(max_messages=10, window=timedelta(minutes=1))
agent.on("message:on_user", limiter.check_rate_limit)
```

## Message Types

Message hooks handle different message types:

### User Messages

```python theme={null}
@agent.hooks.message.on_user
def process_user_message(ctx):
    # ctx.message_type == "user"
    # ctx.message_content contains user input
    print(f"User: {ctx.message_content}")
    return ctx.message_content, False
```

### Provider Messages

```python theme={null}
@agent.hooks.message.on_provider
def process_provider_message(ctx):
    # ctx.message_type == "provider"
    # ctx.message_content contains provider response
    print(f"Provider: {ctx.message_content}")
    return ctx.message_content, False
```

### System Messages

```python theme={null}
@agent.hooks.message.on_user
def process_system_message(ctx):
    # ctx.message_type == "system"
    # ctx.message_content contains system instructions
    if ctx.message_type == "system":
        print(f"System: {ctx.message_content}")
    return ctx.message_content, False
```

## Integration with ProviderThread

Message hooks fire before messages enter the ProviderThread:

```python theme={null}
# Message flow:
# 1. User calls agent.call("message")
# 2. on_user hook fires → can modify user message
# 3. Message sent to provider
# 4. Provider responds
# 5. on_provider hook fires → can modify provider response
# 6. Response added to ProviderThread (d0,0,0)
# 7. Response added to context tree
```

**Example with ProviderThread access:**

```python theme={null}
@agent.hooks.message.on_provider
def track_provider_messages(ctx):
    """Track provider message count."""
    # Access thread through context
    thread = ctx.agent.thread

    # Count provider messages
    provider_count = sum(
        1 for msg in thread.current.all_messages
        if msg.role == "assistant"
    )

    # Store count in state
    ctx.agent.state.set("provider_message_count", provider_count, source="hooks")

    # No modification
    return ctx.message_content, False
```

## Best Practices

<AccordionGroup>
  <Accordion title="Always return tuple from modification hooks">
    ```python theme={null}
    # Good: Return (content, was_modified)
    @agent.hooks.message.on_user
    def modify_message(ctx):
        modified = process(ctx.message_content)
        return modified, True

    # Bad: Return only content
    @agent.hooks.message.on_user
    def modify_message(ctx):
        return process(ctx.message_content)  # Missing boolean!
    ```
  </Accordion>

  <Accordion title="Use on_user for input validation, on_provider for output verification">
    ```python theme={null}
    # Good: Validate input before processing
    @agent.hooks.message.on_user
    def validate_input(ctx):
        if len(ctx.message_content) > 10000:
            raise ValueError("Message too long")
        return ctx.message_content, False

    # Good: Verify output before delivery
    @agent.hooks.message.on_provider
    def verify_output(ctx):
        if contains_sensitive_data(ctx.message_content):
            return redact(ctx.message_content), True
        return ctx.message_content, False
    ```
  </Accordion>

  <Accordion title="Log modifications for debugging">
    ```python theme={null}
    # Good: Track when modifications occur
    @agent.hooks.message.on_user
    def filter_with_logging(ctx):
        original = ctx.message_content
        filtered = apply_filters(original)

        was_modified = filtered != original

        if was_modified:
            logger.info(f"Modified user message: {len(original)} → {len(filtered)} chars")

        return filtered, was_modified
    ```
  </Accordion>

  <Accordion title="Use agent.state for cross-hook communication">
    ```python theme={null}
    @agent.hooks.message.on_user
    def detect_language(ctx):
        """Detect and store user's language."""
        lang = detect_language(ctx.message_content)
        ctx.agent.state.set("user_language", lang, source="message_hooks")
        return ctx.message_content, False

    @agent.hooks.message.on_provider
    def translate_response(ctx):
        """Translate response to user's language."""
        user_lang = ctx.agent.state.get("user_language", "en")
        if user_lang != "en":
            translated = translate(ctx.message_content, to_lang=user_lang)
            return translated, True
        return ctx.message_content, False
    ```
  </Accordion>
</AccordionGroup>

## Performance Considerations

* **Hook overhead**: Less than 2ms per message hook
* **Modification impact**: Only modified messages incur serialization cost
* **Async support**: Message hooks can be async functions
* **Error handling**: Errors in hooks propagate and cancel message processing

## Error Handling

Errors in message hooks stop message processing:

```python theme={null}
@agent.hooks.message.on_user
def strict_validation(ctx):
    """Strict validation that can halt processing."""
    try:
        validate_message(ctx.message_content)
    except ValidationError as e:
        # Log and re-raise to stop processing
        logger.error(f"Message validation failed: {e}")
        raise  # Stops message from being processed

    return ctx.message_content, False

@agent.hooks.message.on_error
def handle_validation_errors(ctx):
    """Handle errors gracefully."""
    error = ctx.error

    if isinstance(error, ValidationError):
        # Log validation error
        logger.warning(f"Validation error: {error}")

        # Store for user feedback
        ctx.agent.state.set("last_validation_error", str(error), source="hooks")

    # Don't modify content
    return ctx.message_content, False
```

## What's Next?

<CardGroup cols={2}>
  <Card title="Scaffold Hooks" icon="layer-group" href="/features/hooks/scaffold-hooks">
    Scaffold state change hooks
  </Card>

  <Card title="Tool Hooks" icon="wrench" href="/features/hooks/tool-hooks">
    Tool execution lifecycle hooks
  </Card>

  <Card title="Subscribe API" icon="plug" href="/features/hooks/subscribe-api">
    Dynamic hook registration
  </Card>

  <Card title="Context Hooks" icon="database" href="/features/hooks/context-hooks">
    Context tree operation hooks
  </Card>
</CardGroup>
