Documentation Index Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
Use this file to discover all available pages before exploring further.
Message Hooks
Message hooks let you intercept, inspect, and modify messages as they flow through the agent - perfect for content filtering, PII redaction, validation, and audit logging.
Overview
Message hooks provide 3 execution points:
Hook Fires Use Case on_userUser message received Input validation, PII filtering, preprocessing on_providerProvider response received Output filtering, verification, postprocessing on_errorMessage processing error Error handling, logging, retry logic
Hook Registration
Decorator Syntax
from egregore import Agent
agent = Agent( provider = "openai:gpt-4" )
@agent.hooks.message.on_user
def filter_user_input ( ctx ):
"""Filter user messages before sending to provider."""
content = ctx.message_content
# Remove sensitive patterns
filtered = redact_pii(content)
# Return (modified_content, was_modified)
return filtered, filtered != content
@agent.hooks.message.on_provider
def verify_provider_response ( ctx ):
"""Verify provider responses before adding to context."""
content = ctx.message_content
# Add verification badge
verified = f "✓ VERIFIED \n\n { content } "
return verified, True
Subscribe API
# Dynamic registration
sub_id = agent.on( "message:on_user" , lambda ctx : (redact_pii(ctx.message_content), True ))
# Temporary hooks
with agent.subscription( "message:on_provider" , add_verification):
agent.call( "Process with verification" )
Message Hook Context
Every message hook receives a MessageExecContext with:
@dataclass
class MessageExecContext :
# Identity
agent_id: str # Agent instance ID
execution_id: str # Unique execution ID
agent: Agent # Full agent reference
# Message data
message_content: str # Message content (editable)
message_type: str # "user", "provider", or "system"
# Context access
context: Context # Full context tree
# Error information (on_error only)
error: Exception # Exception that occurred
# Metadata
metadata: dict # Additional message data
Message hooks that modify content must return a tuple:
@agent.hooks.message.on_user
def modify_message ( ctx ):
modified_content = process(ctx.message_content)
was_modified = modified_content != ctx.message_content
# Return (content, was_modified)
return modified_content, was_modified
Important:
Return (modified_content, True) if content was changed
Return (original_content, False) if no changes made
Returning only content without boolean will raise error
Returning None passes message through unchanged
Execution Flow
On User Message Hook
Fires when user message is received - before sending to provider:
@agent.hooks.message.on_user
def preprocess_user_input ( ctx ):
"""Preprocess user messages before provider sees them."""
content = ctx.message_content
# Validate input length
if len (content) > 10000 :
raise ValueError ( "Message too long (max 10,000 characters)" )
# Normalize whitespace
content = " " .join(content.split())
# Remove markdown artifacts
content = content.replace( "```" , "" )
# Expand abbreviations
content = content.replace( "w/" , "with" )
content = content.replace( "b/c" , "because" )
return content, True
On Provider Message Hook
Fires when provider response is received - before adding to context:
@agent.hooks.message.on_provider
def postprocess_provider_response ( ctx ):
"""Postprocess provider responses before context insertion."""
content = ctx.message_content
# Add timestamp
from datetime import datetime
timestamp = datetime.now().strftime( "%Y-%m- %d %H:%M:%S" )
content = f "[ { timestamp } ] \n\n { content } "
# Verify no code execution attempts
if "exec(" in content or "eval(" in content:
content = content.replace( "exec(" , "# BLOCKED: exec(" )
content = content.replace( "eval(" , "# BLOCKED: eval(" )
# Add verification badge
content = f "✓ Verified Safe \n\n { content } "
return content, True
On Error Hook
Fires when message processing fails :
@agent.hooks.message.on_error
def handle_message_error ( ctx ):
"""Handle errors during message processing."""
error = ctx.error
message_type = ctx.message_type
# Log error
logger.error( f "Message error ( { message_type } ): { error } " )
# Store error for debugging
ctx.agent.state.set(
"last_message_error" ,
{
"type" : message_type,
"error" : str (error),
"content_preview" : ctx.message_content[: 200 ],
},
source = "message_hooks"
)
# Track error count
error_count = ctx.agent.state.get( "message_error_count" , 0 )
ctx.agent.state.set( "message_error_count" , error_count + 1 , source = "hooks" )
# Don't modify content on error
return ctx.message_content, False
Usage Patterns
Pattern 1: PII Redaction
Remove sensitive information from user messages:
import re
@agent.hooks.message.on_user
def redact_pii ( ctx ):
"""Redact personally identifiable information."""
content = ctx.message_content
original = content
# Redact email addresses
content = re.sub(
r ' \b [ \w .+- ] + @ [ \w - ] + \. [ \w .- ] + \b ' ,
'[EMAIL_REDACTED]' ,
content
)
# Redact phone numbers (US format)
content = re.sub(
r ' \b\d {3} [ -. ] ? \d {3} [ -. ] ? \d {4} \b ' ,
'[PHONE_REDACTED]' ,
content
)
# Redact SSN
content = re.sub(
r ' \b\d {3} - \d {2} - \d {4} \b ' ,
'[SSN_REDACTED]' ,
content
)
# Redact credit card numbers
content = re.sub(
r ' \b\d {4} [ \s - ] ? \d {4} [ \s - ] ? \d {4} [ \s - ] ? \d {4} \b ' ,
'[CC_REDACTED]' ,
content
)
was_modified = content != original
if was_modified:
print ( "⚠️ PII detected and redacted from user message" )
return content, was_modified
Pattern 2: Content Validation
Enforce content policies before processing:
class ContentValidator :
def __init__ ( self ):
self .blocked_patterns = [
r ' (?i) execute \s + code' ,
r ' (?i) run \s + command' ,
r ' (?i) delete \s + all' ,
]
def validate ( self , ctx ):
"""Validate user message content."""
content = ctx.message_content
# Check for blocked patterns
for pattern in self .blocked_patterns:
if re.search(pattern, content):
raise ValueError (
f "Message contains blocked pattern: { pattern } \n "
f "Please rephrase your request."
)
# Check message length
if len (content) < 3 :
raise ValueError ( "Message too short (minimum 3 characters)" )
if len (content) > 50000 :
raise ValueError ( "Message too long (maximum 50,000 characters)" )
# No modifications
return content, False
validator = ContentValidator()
agent.on( "message:on_user" , validator.validate)
Standardize provider response formatting:
@agent.hooks.message.on_provider
def format_response ( ctx ):
"""Apply consistent formatting to provider responses."""
content = ctx.message_content
# Add markdown section headers for long responses
if len (content) > 500 :
# Split into paragraphs
paragraphs = content.split( ' \n\n ' )
# Add headers to longer paragraphs
formatted_paragraphs = []
for i, para in enumerate (paragraphs):
if len (para) > 200 and not para.startswith( '#' ):
formatted_paragraphs.append( f "## Section { i + 1 } \n\n { para } " )
else :
formatted_paragraphs.append(para)
content = ' \n\n ' .join(formatted_paragraphs)
# Ensure code blocks have language tags
content = re.sub(
r '``` \n ' ,
'```plaintext \n ' ,
content
)
return content, True
Pattern 4: Audit Logging
Log all message traffic for compliance:
import json
from datetime import datetime
class MessageAuditor :
def __init__ ( self , log_file : str = "message_audit.log" ):
self .log_file = log_file
def audit_user ( self , ctx ):
"""Audit user messages."""
self ._log_message( "USER" , ctx.message_content)
return ctx.message_content, False
def audit_provider ( self , ctx ):
"""Audit provider responses."""
self ._log_message( "PROVIDER" , ctx.message_content)
return ctx.message_content, False
def _log_message ( self , msg_type : str , content : str ):
"""Write audit entry."""
entry = {
"timestamp" : datetime.now().isoformat(),
"type" : msg_type,
"content" : content,
"content_length" : len (content),
}
with open ( self .log_file, "a" ) as f:
f.write(json.dumps(entry) + " \n " )
auditor = MessageAuditor()
agent.on( "message:on_user" , auditor.audit_user)
agent.on( "message:on_provider" , auditor.audit_provider)
Pattern 5: Translation
Translate messages between languages:
class MessageTranslator :
def __init__ ( self , target_language : str = "en" ):
self .target_language = target_language
def translate_user_input ( self , ctx ):
"""Translate user input to target language."""
content = ctx.message_content
# Detect language
detected_lang = self ._detect_language(content)
if detected_lang != self .target_language:
# Translate to target language
translated = self ._translate(content, detected_lang, self .target_language)
print ( f "Translated from { detected_lang } to { self .target_language } " )
return translated, True
return content, False
def translate_provider_response ( self , ctx ):
"""Translate provider response to user's language."""
# Get user's original language from state
user_lang = ctx.agent.state.get( "user_language" , "en" )
if user_lang != self .target_language:
content = ctx.message_content
translated = self ._translate(content, self .target_language, user_lang)
return translated, True
return ctx.message_content, False
def _detect_language ( self , text : str ) -> str :
"""Detect text language (simplified)."""
# In practice, use langdetect or similar
return "en"
def _translate ( self , text : str , from_lang : str , to_lang : str ) -> str :
"""Translate text (simplified)."""
# In practice, use translation API
return text
translator = MessageTranslator( target_language = "en" )
agent.on( "message:on_user" , translator.translate_user_input)
agent.on( "message:on_provider" , translator.translate_provider_response)
Pattern 6: Rate Limiting
Implement message rate limiting:
from collections import deque
from datetime import datetime, timedelta
class MessageRateLimiter :
def __init__ ( self , max_messages : int = 10 , window : timedelta = timedelta( minutes = 1 )):
self .max_messages = max_messages
self .window = window
self .message_times = deque()
def check_rate_limit ( self , ctx ):
"""Enforce rate limiting on user messages."""
now = datetime.now()
# Remove messages outside window
while self .message_times and self .message_times[ 0 ] < now - self .window:
self .message_times.popleft()
# Check limit
if len ( self .message_times) >= self .max_messages:
window_seconds = int ( self .window.total_seconds())
raise RuntimeError (
f "Rate limit exceeded: { self .max_messages } messages per { window_seconds } s. "
f "Please wait before sending more messages."
)
# Record this message
self .message_times.append(now)
# No content modification
return ctx.message_content, False
limiter = MessageRateLimiter( max_messages = 10 , window = timedelta( minutes = 1 ))
agent.on( "message:on_user" , limiter.check_rate_limit)
Message Types
Message hooks handle different message types:
User Messages
@agent.hooks.message.on_user
def process_user_message ( ctx ):
# ctx.message_type == "user"
# ctx.message_content contains user input
print ( f "User: { ctx.message_content } " )
return ctx.message_content, False
Provider Messages
@agent.hooks.message.on_provider
def process_provider_message ( ctx ):
# ctx.message_type == "provider"
# ctx.message_content contains provider response
print ( f "Provider: { ctx.message_content } " )
return ctx.message_content, False
System Messages
@agent.hooks.message.on_user
def process_system_message ( ctx ):
# ctx.message_type == "system"
# ctx.message_content contains system instructions
if ctx.message_type == "system" :
print ( f "System: { ctx.message_content } " )
return ctx.message_content, False
Integration with ProviderThread
Message hooks fire before messages enter the ProviderThread:
# Message flow:
# 1. User calls agent.call("message")
# 2. on_user hook fires → can modify user message
# 3. Message sent to provider
# 4. Provider responds
# 5. on_provider hook fires → can modify provider response
# 6. Response added to ProviderThread (d0,0,0)
# 7. Response added to context tree
Example with ProviderThread access:
@agent.hooks.message.on_provider
def track_provider_messages ( ctx ):
"""Track provider message count."""
# Access thread through context
thread = ctx.agent.thread
# Count provider messages
provider_count = sum (
1 for msg in thread.current.all_messages
if msg.role == "assistant"
)
# Store count in state
ctx.agent.state.set( "provider_message_count" , provider_count, source = "hooks" )
# No modification
return ctx.message_content, False
Best Practices
Always return tuple from modification hooks
# Good: Return (content, was_modified)
@agent.hooks.message.on_user
def modify_message ( ctx ):
modified = process(ctx.message_content)
return modified, True
# Bad: Return only content
@agent.hooks.message.on_user
def modify_message ( ctx ):
return process(ctx.message_content) # Missing boolean!
Use on_user for input validation, on_provider for output verification
Log modifications for debugging
# Good: Track when modifications occur
@agent.hooks.message.on_user
def filter_with_logging ( ctx ):
original = ctx.message_content
filtered = apply_filters(original)
was_modified = filtered != original
if was_modified:
logger.info( f "Modified user message: { len (original) } → { len (filtered) } chars" )
return filtered, was_modified
Use agent.state for cross-hook communication
@agent.hooks.message.on_user
def detect_language ( ctx ):
"""Detect and store user's language."""
lang = detect_language(ctx.message_content)
ctx.agent.state.set( "user_language" , lang, source = "message_hooks" )
return ctx.message_content, False
@agent.hooks.message.on_provider
def translate_response ( ctx ):
"""Translate response to user's language."""
user_lang = ctx.agent.state.get( "user_language" , "en" )
if user_lang != "en" :
translated = translate(ctx.message_content, to_lang = user_lang)
return translated, True
return ctx.message_content, False
Hook overhead : Less than 2ms per message hook
Modification impact : Only modified messages incur serialization cost
Async support : Message hooks can be async functions
Error handling : Errors in hooks propagate and cancel message processing
Error Handling
Errors in message hooks stop message processing:
@agent.hooks.message.on_user
def strict_validation ( ctx ):
"""Strict validation that can halt processing."""
try :
validate_message(ctx.message_content)
except ValidationError as e:
# Log and re-raise to stop processing
logger.error( f "Message validation failed: { e } " )
raise # Stops message from being processed
return ctx.message_content, False
@agent.hooks.message.on_error
def handle_validation_errors ( ctx ):
"""Handle errors gracefully."""
error = ctx.error
if isinstance (error, ValidationError):
# Log validation error
logger.warning( f "Validation error: { error } " )
# Store for user feedback
ctx.agent.state.set( "last_validation_error" , str (error), source = "hooks" )
# Don't modify content
return ctx.message_content, False
What’s Next?
Scaffold Hooks Scaffold state change hooks
Tool Hooks Tool execution lifecycle hooks
Subscribe API Dynamic hook registration
Context Hooks Context tree operation hooks