Skip to main content

Scaffold Hooks

Scaffold hooks let you observe scaffold operations and state changes - perfect for monitoring scaffold health, tracking IPC messages, and debugging scaffold behavior.

Overview

Scaffold hooks provide 2 execution points:
HookFiresUse Case
on_op_completeScaffold operation completesTrack @operation executions, monitor results
on_state_changeScaffold state changesMonitor IPC messages, track state updates

Hook Registration

Decorator Syntax

from egregore import Agent

agent = Agent(provider="openai:gpt-4")

@agent.hooks.scaffold.on_op_complete
def track_operations(ctx):
    """Track scaffold operation completions."""
    print(f"Operation complete: {ctx.operation_name}")
    print(f"Result: {ctx.result}")

@agent.hooks.scaffold.on_state_change
def monitor_state(ctx):
    """Monitor scaffold state changes."""
    print(f"State changed in {ctx.scaffold_type}")
    print(f"Changed fields: {ctx.changed_fields}")

Subscribe API

# Dynamic registration
sub_id = agent.on("scaffold:on_op_complete", lambda ctx: print(f"Op: {ctx.operation_name}"))

# Temporary hooks
with agent.subscription("scaffold:on_state_change", monitor_state):
    agent.call("Execute scaffolds")

Scaffold Hook Context

Every scaffold hook receives a ScaffoldExecContext with:
@dataclass
class ScaffoldExecContext:
    # Identity
    agent_id: str                  # Agent instance ID
    execution_id: str              # Unique execution ID
    agent: Agent                   # Full agent reference

    # Scaffold identification
    scaffold_type: str             # Scaffold type identifier
    scaffold_instance: BaseContextScaffold  # Scaffold instance

    # Operation details (on_op_complete)
    operation_name: str            # @operation method name
    operation_args: tuple          # Method arguments
    operation_kwargs: dict         # Method keyword arguments
    result: Any                    # Operation result

    # State change details (on_state_change)
    old_state: dict                # State before change
    new_state: dict                # State after change
    changed_fields: List[str]      # Field names that changed

    # Context access
    context: Context               # Full context tree

    # Metadata
    metadata: dict                 # Additional scaffold data

Execution Flow

On Operation Complete Hook

Fires when scaffold @operation completes:
@agent.hooks.scaffold.on_op_complete
def track_operation_performance(ctx):
    """Track scaffold operation performance."""
    print(f"[SCAFFOLD OP] {ctx.scaffold_type}.{ctx.operation_name}")

    # Log operation call
    logger.info(f"Scaffold operation completed", extra={
        "scaffold_type": ctx.scaffold_type,
        "operation": ctx.operation_name,
        "args": ctx.operation_args,
        "kwargs": ctx.operation_kwargs,
        "result_type": type(ctx.result).__name__,
    })

    # Track operation count
    op_key = f"{ctx.scaffold_type}.{ctx.operation_name}"
    count = ctx.agent.state.get(f"op_count_{op_key}", 0)
    ctx.agent.state.set(f"op_count_{op_key}", count + 1, source="scaffold_hooks")

    # Store last result
    ctx.agent.state.set(
        f"last_result_{op_key}",
        ctx.result,
        source="scaffold_hooks"
    )

On State Change Hook

Fires when scaffold state changes (via agent.state.set()):
@agent.hooks.scaffold.on_state_change
def monitor_scaffold_state(ctx):
    """Monitor all scaffold state changes."""
    print(f"[STATE CHANGE] {ctx.scaffold_type}")
    print(f"  Changed: {', '.join(ctx.changed_fields)}")

    # Log state changes
    for field in ctx.changed_fields:
        old_value = ctx.old_state.get(field)
        new_value = ctx.new_state.get(field)

        logger.debug(
            f"Scaffold state change: {ctx.scaffold_type}.{field}",
            extra={
                "scaffold": ctx.scaffold_type,
                "field": field,
                "old_value": old_value,
                "new_value": new_value,
            }
        )

    # Track change count
    change_count = ctx.agent.state.get(f"state_changes_{ctx.scaffold_type}", 0)
    ctx.agent.state.set(
        f"state_changes_{ctx.scaffold_type}",
        change_count + 1,
        source="scaffold_hooks"
    )

Usage Patterns

Pattern 1: Operation Performance Monitoring

Track operation execution times and results:
from datetime import datetime

class OperationMonitor:
    def __init__(self):
        self.operation_times = {}
        self.operation_results = {}

    def track_operation(self, ctx):
        """Track operation execution."""
        op_key = f"{ctx.scaffold_type}.{ctx.operation_name}"

        # Record execution
        execution = {
            "timestamp": datetime.now().isoformat(),
            "args": ctx.operation_args,
            "kwargs": ctx.operation_kwargs,
            "result": ctx.result,
        }

        # Store in history
        if op_key not in self.operation_times:
            self.operation_times[op_key] = []

        self.operation_times[op_key].append(execution)

        # Store latest result
        self.operation_results[op_key] = ctx.result

        print(f"Tracked {op_key}: {len(self.operation_times[op_key])} executions")

    def get_stats(self, scaffold_type: str = None):
        """Get operation statistics."""
        if scaffold_type:
            return {
                k: v for k, v in self.operation_times.items()
                if k.startswith(f"{scaffold_type}.")
            }
        return self.operation_times

monitor = OperationMonitor()
agent.on("scaffold:on_op_complete", monitor.track_operation)

# Later: query stats
agent.call("Use scaffolds")
stats = monitor.get_stats("memory")
print(f"Memory scaffold operations: {len(stats)}")

Pattern 2: IPC Message Tracking

Monitor inter-scaffold communication via state changes:
class IPCTracker:
    def __init__(self):
        self.ipc_messages = []

    def track_ipc(self, ctx):
        """Track IPC messages between scaffolds."""
        # IPC messages use state.set() with source tracking
        changed_fields = ctx.changed_fields

        for field in changed_fields:
            old_value = ctx.old_state.get(field)
            new_value = ctx.new_state.get(field)

            # Record IPC message
            message = {
                "timestamp": datetime.now().isoformat(),
                "scaffold": ctx.scaffold_type,
                "field": field,
                "old_value": old_value,
                "new_value": new_value,
            }

            self.ipc_messages.append(message)

            # Log if value changed significantly
            if old_value != new_value:
                print(f"IPC: {ctx.scaffold_type}.{field} = {new_value}")

    def get_messages(self, scaffold_type: str = None):
        """Get IPC messages."""
        if scaffold_type:
            return [m for m in self.ipc_messages if m["scaffold"] == scaffold_type]
        return self.ipc_messages

tracker = IPCTracker()
agent.on("scaffold:on_state_change", tracker.track_ipc)

# Later: analyze IPC traffic
messages = tracker.get_messages("memory")
print(f"Memory scaffold received {len(messages)} IPC messages")

Pattern 3: Scaffold Health Monitoring

Monitor scaffold operations for errors:
class ScaffoldHealthMonitor:
    def __init__(self):
        self.error_counts = {}
        self.success_counts = {}

    def monitor_operations(self, ctx):
        """Monitor scaffold operation health."""
        op_key = f"{ctx.scaffold_type}.{ctx.operation_name}"

        # Track success/failure
        if isinstance(ctx.result, Exception):
            # Operation failed
            self.error_counts[op_key] = self.error_counts.get(op_key, 0) + 1
            print(f"⚠️  {op_key} failed: {ctx.result}")

            # Alert on repeated failures
            if self.error_counts[op_key] > 3:
                print(f"🚨 ALERT: {op_key} has failed {self.error_counts[op_key]} times")
        else:
            # Operation succeeded
            self.success_counts[op_key] = self.success_counts.get(op_key, 0) + 1

    def get_health_report(self):
        """Generate health report."""
        all_ops = set(list(self.error_counts.keys()) + list(self.success_counts.keys()))

        report = []
        for op in all_ops:
            errors = self.error_counts.get(op, 0)
            successes = self.success_counts.get(op, 0)
            total = errors + successes

            if total > 0:
                success_rate = (successes / total) * 100
                report.append({
                    "operation": op,
                    "total": total,
                    "successes": successes,
                    "errors": errors,
                    "success_rate": f"{success_rate:.1f}%",
                })

        return report

health = ScaffoldHealthMonitor()
agent.on("scaffold:on_op_complete", health.monitor_operations)

# Later: check health
report = health.get_health_report()
for entry in report:
    print(f"{entry['operation']}: {entry['success_rate']} success rate")

Pattern 4: State Change Auditing

Audit all scaffold state changes for compliance:
import json

class StateAuditor:
    def __init__(self, audit_file: str = "scaffold_audit.log"):
        self.audit_file = audit_file

    def audit_state_change(self, ctx):
        """Audit scaffold state changes."""
        # Create audit entry
        entry = {
            "timestamp": datetime.now().isoformat(),
            "scaffold_type": ctx.scaffold_type,
            "changed_fields": ctx.changed_fields,
            "changes": {},
        }

        # Record each field change
        for field in ctx.changed_fields:
            entry["changes"][field] = {
                "old": ctx.old_state.get(field),
                "new": ctx.new_state.get(field),
            }

        # Write to audit log
        with open(self.audit_file, "a") as f:
            f.write(json.dumps(entry) + "\n")

        # Log to console
        print(f"[AUDIT] {ctx.scaffold_type}: {', '.join(ctx.changed_fields)}")

auditor = StateAuditor()
agent.on("scaffold:on_state_change", auditor.audit_state_change)

Pattern 5: Operation Result Validation

Validate operation results meet requirements:
class ResultValidator:
    def __init__(self):
        self.validators = {}

    def register_validator(self, scaffold_type: str, operation_name: str, validator):
        """Register result validator for operation."""
        key = f"{scaffold_type}.{operation_name}"
        self.validators[key] = validator

    def validate_result(self, ctx):
        """Validate operation result."""
        op_key = f"{ctx.scaffold_type}.{ctx.operation_name}"

        if op_key in self.validators:
            validator = self.validators[op_key]

            try:
                is_valid = validator(ctx.result)

                if not is_valid:
                    print(f"⚠️  Invalid result from {op_key}")
                    logger.warning(f"Operation result validation failed", extra={
                        "operation": op_key,
                        "result": ctx.result,
                    })
                else:
                    print(f"✓ Valid result from {op_key}")

            except Exception as e:
                print(f"❌ Validation error for {op_key}: {e}")

validator = ResultValidator()

# Register validators
validator.register_validator(
    "memory",
    "recall",
    lambda result: isinstance(result, list) and len(result) > 0
)

validator.register_validator(
    "task_manager",
    "get_tasks",
    lambda result: isinstance(result, dict) and "tasks" in result
)

agent.on("scaffold:on_op_complete", validator.validate_result)

Pattern 6: Scaffold Debugging

Debug scaffold behavior with detailed logging:
class ScaffoldDebugger:
    def __init__(self, debug_scaffold: str = None):
        self.debug_scaffold = debug_scaffold  # None = debug all

    def debug_operation(self, ctx):
        """Debug scaffold operations."""
        # Filter by scaffold if specified
        if self.debug_scaffold and ctx.scaffold_type != self.debug_scaffold:
            return

        print(f"\n{'='*60}")
        print(f"SCAFFOLD OPERATION: {ctx.scaffold_type}.{ctx.operation_name}")
        print(f"{'='*60}")

        # Print arguments
        if ctx.operation_args:
            print(f"Args: {ctx.operation_args}")
        if ctx.operation_kwargs:
            print(f"Kwargs: {ctx.operation_kwargs}")

        # Print result
        print(f"\nResult:")
        print(f"  Type: {type(ctx.result).__name__}")
        print(f"  Value: {ctx.result}")

        print(f"{'='*60}\n")

    def debug_state_change(self, ctx):
        """Debug scaffold state changes."""
        # Filter by scaffold if specified
        if self.debug_scaffold and ctx.scaffold_type != self.debug_scaffold:
            return

        print(f"\n{'='*60}")
        print(f"STATE CHANGE: {ctx.scaffold_type}")
        print(f"{'='*60}")

        # Print changed fields
        for field in ctx.changed_fields:
            old = ctx.old_state.get(field)
            new = ctx.new_state.get(field)

            print(f"\n{field}:")
            print(f"  Old: {old}")
            print(f"  New: {new}")

        print(f"{'='*60}\n")

# Debug all scaffolds
debugger = ScaffoldDebugger()
agent.on("scaffold:on_op_complete", debugger.debug_operation)
agent.on("scaffold:on_state_change", debugger.debug_state_change)

# Or debug specific scaffold
memory_debugger = ScaffoldDebugger(debug_scaffold="memory")
agent.on("scaffold:on_op_complete", memory_debugger.debug_operation)

Integration with Scaffold IPC

Scaffold hooks are essential for monitoring IPC communication:
# Scaffold A sends IPC message
class SenderScaffold(BaseContextScaffold):
    def send_message(self):
        # This triggers on_state_change hook
        self.agent.state.set("shared_data", {"value": 42}, source="sender")

# Scaffold B receives via hook
@agent.hooks.scaffold.on_state_change
def receive_ipc(ctx):
    if "shared_data" in ctx.changed_fields:
        new_value = ctx.new_state.get("shared_data")
        print(f"Received IPC message: {new_value}")

Best Practices

# Good: Track operations
@agent.hooks.scaffold.on_op_complete
def track_ops(ctx):
    logger.info(f"Operation: {ctx.scaffold_type}.{ctx.operation_name}")

# Good: Monitor IPC
@agent.hooks.scaffold.on_state_change
def monitor_ipc(ctx):
    for field in ctx.changed_fields:
        print(f"IPC: {field} changed")
# Good: Monitor specific scaffold
@agent.hooks.scaffold.on_op_complete
def monitor_memory_ops(ctx):
    if ctx.scaffold_type == "memory":
        print(f"Memory operation: {ctx.operation_name}")

# Bad: Monitor all scaffolds when only need one
@agent.hooks.scaffold.on_op_complete
def monitor_all(ctx):
    # Processes every scaffold operation
    if ctx.scaffold_type == "memory":
        print(f"Memory operation: {ctx.operation_name}")
@agent.hooks.scaffold.on_op_complete
def inspect_scaffold(ctx):
    # Access full scaffold instance
    scaffold = ctx.scaffold_instance

    # Get scaffold's internal state
    if hasattr(scaffold, 'internal_counter'):
        print(f"Internal counter: {scaffold.internal_counter}")

    # Call scaffold methods
    if hasattr(scaffold, 'get_stats'):
        stats = scaffold.get_stats()
        print(f"Scaffold stats: {stats}")
# Good: Only process relevant changes
@agent.hooks.scaffold.on_state_change
def process_important_changes(ctx):
    important_fields = ["task_status", "memory_count", "error_state"]

    relevant_changes = [f for f in ctx.changed_fields if f in important_fields]

    if relevant_changes:
        print(f"Important changes: {relevant_changes}")

# Bad: Process all state changes
@agent.hooks.scaffold.on_state_change
def process_all_changes(ctx):
    # Processes every state change, even unimportant ones
    print(f"Changed: {ctx.changed_fields}")

Performance Considerations

  • Hook overhead: Less than 2ms per scaffold hook
  • State change frequency: Can be high in IPC-heavy systems
  • Operation tracking: Minimal impact on scaffold performance
  • Async support: Scaffold hooks can be async functions

Integration with Agent State

Scaffold hooks have full access to agent state:
@agent.hooks.scaffold.on_op_complete
def track_with_state(ctx):
    """Use agent state in scaffold hooks."""
    # Read state
    operation_count = ctx.agent.state.get("total_operations", 0)

    # Update state
    ctx.agent.state.set(
        "total_operations",
        operation_count + 1,
        source="scaffold_hooks"
    )

    # Store operation history
    history = ctx.agent.state.get("operation_history", [])
    history.append({
        "scaffold": ctx.scaffold_type,
        "operation": ctx.operation_name,
        "timestamp": datetime.now().isoformat(),
    })
    ctx.agent.state.set("operation_history", history, source="scaffold_hooks")

Error Handling

Errors in scaffold hooks don’t stop scaffold operations:
@agent.hooks.scaffold.on_op_complete
def risky_tracking(ctx):
    try:
        process_operation(ctx)
    except Exception as e:
        # Log but don't propagate
        logger.error(f"Scaffold hook error: {e}")
        # Scaffold operation continues

@agent.hooks.scaffold.on_state_change
def safe_monitoring(ctx):
    try:
        monitor_state_change(ctx)
    except Exception as e:
        # Log error without stopping state change
        logger.error(f"State monitoring error: {e}")

What’s Next?