Scaffold Hooks
Scaffold hooks let you observe scaffold operations and state changes - perfect for monitoring scaffold health, tracking IPC messages, and debugging scaffold behavior.Overview
Scaffold hooks provide 2 execution points:| Hook | Fires | Use Case |
|---|---|---|
on_op_complete | Scaffold operation completes | Track @operation executions, monitor results |
on_state_change | Scaffold state changes | Monitor IPC messages, track state updates |
Hook Registration
Decorator Syntax
Copy
from egregore import Agent
agent = Agent(provider="openai:gpt-4")
@agent.hooks.scaffold.on_op_complete
def track_operations(ctx):
"""Track scaffold operation completions."""
print(f"Operation complete: {ctx.operation_name}")
print(f"Result: {ctx.result}")
@agent.hooks.scaffold.on_state_change
def monitor_state(ctx):
"""Monitor scaffold state changes."""
print(f"State changed in {ctx.scaffold_type}")
print(f"Changed fields: {ctx.changed_fields}")
Subscribe API
Copy
# Dynamic registration
sub_id = agent.on("scaffold:on_op_complete", lambda ctx: print(f"Op: {ctx.operation_name}"))
# Temporary hooks
with agent.subscription("scaffold:on_state_change", monitor_state):
agent.call("Execute scaffolds")
Scaffold Hook Context
Every scaffold hook receives aScaffoldExecContext with:
Copy
@dataclass
class ScaffoldExecContext:
# Identity
agent_id: str # Agent instance ID
execution_id: str # Unique execution ID
agent: Agent # Full agent reference
# Scaffold identification
scaffold_type: str # Scaffold type identifier
scaffold_instance: BaseContextScaffold # Scaffold instance
# Operation details (on_op_complete)
operation_name: str # @operation method name
operation_args: tuple # Method arguments
operation_kwargs: dict # Method keyword arguments
result: Any # Operation result
# State change details (on_state_change)
old_state: dict # State before change
new_state: dict # State after change
changed_fields: List[str] # Field names that changed
# Context access
context: Context # Full context tree
# Metadata
metadata: dict # Additional scaffold data
Execution Flow
On Operation Complete Hook
Fires when scaffold @operation completes:Copy
@agent.hooks.scaffold.on_op_complete
def track_operation_performance(ctx):
"""Track scaffold operation performance."""
print(f"[SCAFFOLD OP] {ctx.scaffold_type}.{ctx.operation_name}")
# Log operation call
logger.info(f"Scaffold operation completed", extra={
"scaffold_type": ctx.scaffold_type,
"operation": ctx.operation_name,
"args": ctx.operation_args,
"kwargs": ctx.operation_kwargs,
"result_type": type(ctx.result).__name__,
})
# Track operation count
op_key = f"{ctx.scaffold_type}.{ctx.operation_name}"
count = ctx.agent.state.get(f"op_count_{op_key}", 0)
ctx.agent.state.set(f"op_count_{op_key}", count + 1, source="scaffold_hooks")
# Store last result
ctx.agent.state.set(
f"last_result_{op_key}",
ctx.result,
source="scaffold_hooks"
)
On State Change Hook
Fires when scaffold state changes (via agent.state.set()):Copy
@agent.hooks.scaffold.on_state_change
def monitor_scaffold_state(ctx):
"""Monitor all scaffold state changes."""
print(f"[STATE CHANGE] {ctx.scaffold_type}")
print(f" Changed: {', '.join(ctx.changed_fields)}")
# Log state changes
for field in ctx.changed_fields:
old_value = ctx.old_state.get(field)
new_value = ctx.new_state.get(field)
logger.debug(
f"Scaffold state change: {ctx.scaffold_type}.{field}",
extra={
"scaffold": ctx.scaffold_type,
"field": field,
"old_value": old_value,
"new_value": new_value,
}
)
# Track change count
change_count = ctx.agent.state.get(f"state_changes_{ctx.scaffold_type}", 0)
ctx.agent.state.set(
f"state_changes_{ctx.scaffold_type}",
change_count + 1,
source="scaffold_hooks"
)
Usage Patterns
Pattern 1: Operation Performance Monitoring
Track operation execution times and results:Copy
from datetime import datetime
class OperationMonitor:
def __init__(self):
self.operation_times = {}
self.operation_results = {}
def track_operation(self, ctx):
"""Track operation execution."""
op_key = f"{ctx.scaffold_type}.{ctx.operation_name}"
# Record execution
execution = {
"timestamp": datetime.now().isoformat(),
"args": ctx.operation_args,
"kwargs": ctx.operation_kwargs,
"result": ctx.result,
}
# Store in history
if op_key not in self.operation_times:
self.operation_times[op_key] = []
self.operation_times[op_key].append(execution)
# Store latest result
self.operation_results[op_key] = ctx.result
print(f"Tracked {op_key}: {len(self.operation_times[op_key])} executions")
def get_stats(self, scaffold_type: str = None):
"""Get operation statistics."""
if scaffold_type:
return {
k: v for k, v in self.operation_times.items()
if k.startswith(f"{scaffold_type}.")
}
return self.operation_times
monitor = OperationMonitor()
agent.on("scaffold:on_op_complete", monitor.track_operation)
# Later: query stats
agent.call("Use scaffolds")
stats = monitor.get_stats("memory")
print(f"Memory scaffold operations: {len(stats)}")
Pattern 2: IPC Message Tracking
Monitor inter-scaffold communication via state changes:Copy
class IPCTracker:
def __init__(self):
self.ipc_messages = []
def track_ipc(self, ctx):
"""Track IPC messages between scaffolds."""
# IPC messages use state.set() with source tracking
changed_fields = ctx.changed_fields
for field in changed_fields:
old_value = ctx.old_state.get(field)
new_value = ctx.new_state.get(field)
# Record IPC message
message = {
"timestamp": datetime.now().isoformat(),
"scaffold": ctx.scaffold_type,
"field": field,
"old_value": old_value,
"new_value": new_value,
}
self.ipc_messages.append(message)
# Log if value changed significantly
if old_value != new_value:
print(f"IPC: {ctx.scaffold_type}.{field} = {new_value}")
def get_messages(self, scaffold_type: str = None):
"""Get IPC messages."""
if scaffold_type:
return [m for m in self.ipc_messages if m["scaffold"] == scaffold_type]
return self.ipc_messages
tracker = IPCTracker()
agent.on("scaffold:on_state_change", tracker.track_ipc)
# Later: analyze IPC traffic
messages = tracker.get_messages("memory")
print(f"Memory scaffold received {len(messages)} IPC messages")
Pattern 3: Scaffold Health Monitoring
Monitor scaffold operations for errors:Copy
class ScaffoldHealthMonitor:
def __init__(self):
self.error_counts = {}
self.success_counts = {}
def monitor_operations(self, ctx):
"""Monitor scaffold operation health."""
op_key = f"{ctx.scaffold_type}.{ctx.operation_name}"
# Track success/failure
if isinstance(ctx.result, Exception):
# Operation failed
self.error_counts[op_key] = self.error_counts.get(op_key, 0) + 1
print(f"⚠️ {op_key} failed: {ctx.result}")
# Alert on repeated failures
if self.error_counts[op_key] > 3:
print(f"🚨 ALERT: {op_key} has failed {self.error_counts[op_key]} times")
else:
# Operation succeeded
self.success_counts[op_key] = self.success_counts.get(op_key, 0) + 1
def get_health_report(self):
"""Generate health report."""
all_ops = set(list(self.error_counts.keys()) + list(self.success_counts.keys()))
report = []
for op in all_ops:
errors = self.error_counts.get(op, 0)
successes = self.success_counts.get(op, 0)
total = errors + successes
if total > 0:
success_rate = (successes / total) * 100
report.append({
"operation": op,
"total": total,
"successes": successes,
"errors": errors,
"success_rate": f"{success_rate:.1f}%",
})
return report
health = ScaffoldHealthMonitor()
agent.on("scaffold:on_op_complete", health.monitor_operations)
# Later: check health
report = health.get_health_report()
for entry in report:
print(f"{entry['operation']}: {entry['success_rate']} success rate")
Pattern 4: State Change Auditing
Audit all scaffold state changes for compliance:Copy
import json
class StateAuditor:
def __init__(self, audit_file: str = "scaffold_audit.log"):
self.audit_file = audit_file
def audit_state_change(self, ctx):
"""Audit scaffold state changes."""
# Create audit entry
entry = {
"timestamp": datetime.now().isoformat(),
"scaffold_type": ctx.scaffold_type,
"changed_fields": ctx.changed_fields,
"changes": {},
}
# Record each field change
for field in ctx.changed_fields:
entry["changes"][field] = {
"old": ctx.old_state.get(field),
"new": ctx.new_state.get(field),
}
# Write to audit log
with open(self.audit_file, "a") as f:
f.write(json.dumps(entry) + "\n")
# Log to console
print(f"[AUDIT] {ctx.scaffold_type}: {', '.join(ctx.changed_fields)}")
auditor = StateAuditor()
agent.on("scaffold:on_state_change", auditor.audit_state_change)
Pattern 5: Operation Result Validation
Validate operation results meet requirements:Copy
class ResultValidator:
def __init__(self):
self.validators = {}
def register_validator(self, scaffold_type: str, operation_name: str, validator):
"""Register result validator for operation."""
key = f"{scaffold_type}.{operation_name}"
self.validators[key] = validator
def validate_result(self, ctx):
"""Validate operation result."""
op_key = f"{ctx.scaffold_type}.{ctx.operation_name}"
if op_key in self.validators:
validator = self.validators[op_key]
try:
is_valid = validator(ctx.result)
if not is_valid:
print(f"⚠️ Invalid result from {op_key}")
logger.warning(f"Operation result validation failed", extra={
"operation": op_key,
"result": ctx.result,
})
else:
print(f"✓ Valid result from {op_key}")
except Exception as e:
print(f"❌ Validation error for {op_key}: {e}")
validator = ResultValidator()
# Register validators
validator.register_validator(
"memory",
"recall",
lambda result: isinstance(result, list) and len(result) > 0
)
validator.register_validator(
"task_manager",
"get_tasks",
lambda result: isinstance(result, dict) and "tasks" in result
)
agent.on("scaffold:on_op_complete", validator.validate_result)
Pattern 6: Scaffold Debugging
Debug scaffold behavior with detailed logging:Copy
class ScaffoldDebugger:
def __init__(self, debug_scaffold: str = None):
self.debug_scaffold = debug_scaffold # None = debug all
def debug_operation(self, ctx):
"""Debug scaffold operations."""
# Filter by scaffold if specified
if self.debug_scaffold and ctx.scaffold_type != self.debug_scaffold:
return
print(f"\n{'='*60}")
print(f"SCAFFOLD OPERATION: {ctx.scaffold_type}.{ctx.operation_name}")
print(f"{'='*60}")
# Print arguments
if ctx.operation_args:
print(f"Args: {ctx.operation_args}")
if ctx.operation_kwargs:
print(f"Kwargs: {ctx.operation_kwargs}")
# Print result
print(f"\nResult:")
print(f" Type: {type(ctx.result).__name__}")
print(f" Value: {ctx.result}")
print(f"{'='*60}\n")
def debug_state_change(self, ctx):
"""Debug scaffold state changes."""
# Filter by scaffold if specified
if self.debug_scaffold and ctx.scaffold_type != self.debug_scaffold:
return
print(f"\n{'='*60}")
print(f"STATE CHANGE: {ctx.scaffold_type}")
print(f"{'='*60}")
# Print changed fields
for field in ctx.changed_fields:
old = ctx.old_state.get(field)
new = ctx.new_state.get(field)
print(f"\n{field}:")
print(f" Old: {old}")
print(f" New: {new}")
print(f"{'='*60}\n")
# Debug all scaffolds
debugger = ScaffoldDebugger()
agent.on("scaffold:on_op_complete", debugger.debug_operation)
agent.on("scaffold:on_state_change", debugger.debug_state_change)
# Or debug specific scaffold
memory_debugger = ScaffoldDebugger(debug_scaffold="memory")
agent.on("scaffold:on_op_complete", memory_debugger.debug_operation)
Integration with Scaffold IPC
Scaffold hooks are essential for monitoring IPC communication:Copy
# Scaffold A sends IPC message
class SenderScaffold(BaseContextScaffold):
def send_message(self):
# This triggers on_state_change hook
self.agent.state.set("shared_data", {"value": 42}, source="sender")
# Scaffold B receives via hook
@agent.hooks.scaffold.on_state_change
def receive_ipc(ctx):
if "shared_data" in ctx.changed_fields:
new_value = ctx.new_state.get("shared_data")
print(f"Received IPC message: {new_value}")
Best Practices
Use on_op_complete for operation tracking, on_state_change for IPC monitoring
Use on_op_complete for operation tracking, on_state_change for IPC monitoring
Copy
# Good: Track operations
@agent.hooks.scaffold.on_op_complete
def track_ops(ctx):
logger.info(f"Operation: {ctx.scaffold_type}.{ctx.operation_name}")
# Good: Monitor IPC
@agent.hooks.scaffold.on_state_change
def monitor_ipc(ctx):
for field in ctx.changed_fields:
print(f"IPC: {field} changed")
Filter by scaffold_type for targeted monitoring
Filter by scaffold_type for targeted monitoring
Copy
# Good: Monitor specific scaffold
@agent.hooks.scaffold.on_op_complete
def monitor_memory_ops(ctx):
if ctx.scaffold_type == "memory":
print(f"Memory operation: {ctx.operation_name}")
# Bad: Monitor all scaffolds when only need one
@agent.hooks.scaffold.on_op_complete
def monitor_all(ctx):
# Processes every scaffold operation
if ctx.scaffold_type == "memory":
print(f"Memory operation: {ctx.operation_name}")
Access scaffold instance for deeper inspection
Access scaffold instance for deeper inspection
Copy
@agent.hooks.scaffold.on_op_complete
def inspect_scaffold(ctx):
# Access full scaffold instance
scaffold = ctx.scaffold_instance
# Get scaffold's internal state
if hasattr(scaffold, 'internal_counter'):
print(f"Internal counter: {scaffold.internal_counter}")
# Call scaffold methods
if hasattr(scaffold, 'get_stats'):
stats = scaffold.get_stats()
print(f"Scaffold stats: {stats}")
Use changed_fields to identify relevant state changes
Use changed_fields to identify relevant state changes
Copy
# Good: Only process relevant changes
@agent.hooks.scaffold.on_state_change
def process_important_changes(ctx):
important_fields = ["task_status", "memory_count", "error_state"]
relevant_changes = [f for f in ctx.changed_fields if f in important_fields]
if relevant_changes:
print(f"Important changes: {relevant_changes}")
# Bad: Process all state changes
@agent.hooks.scaffold.on_state_change
def process_all_changes(ctx):
# Processes every state change, even unimportant ones
print(f"Changed: {ctx.changed_fields}")
Performance Considerations
- Hook overhead: Less than 2ms per scaffold hook
- State change frequency: Can be high in IPC-heavy systems
- Operation tracking: Minimal impact on scaffold performance
- Async support: Scaffold hooks can be async functions
Integration with Agent State
Scaffold hooks have full access to agent state:Copy
@agent.hooks.scaffold.on_op_complete
def track_with_state(ctx):
"""Use agent state in scaffold hooks."""
# Read state
operation_count = ctx.agent.state.get("total_operations", 0)
# Update state
ctx.agent.state.set(
"total_operations",
operation_count + 1,
source="scaffold_hooks"
)
# Store operation history
history = ctx.agent.state.get("operation_history", [])
history.append({
"scaffold": ctx.scaffold_type,
"operation": ctx.operation_name,
"timestamp": datetime.now().isoformat(),
})
ctx.agent.state.set("operation_history", history, source="scaffold_hooks")
Error Handling
Errors in scaffold hooks don’t stop scaffold operations:Copy
@agent.hooks.scaffold.on_op_complete
def risky_tracking(ctx):
try:
process_operation(ctx)
except Exception as e:
# Log but don't propagate
logger.error(f"Scaffold hook error: {e}")
# Scaffold operation continues
@agent.hooks.scaffold.on_state_change
def safe_monitoring(ctx):
try:
monitor_state_change(ctx)
except Exception as e:
# Log error without stopping state change
logger.error(f"State monitoring error: {e}")

