Skip to main content

Subscribe API

The Subscribe API (codename: Synapse) provides a lightweight, imperative way to bind and unbind agent hooks without decorators.

Overview

While decorators (@agent.hooks.*) work great for permanent hooks, the Subscribe API is designed for:
  • Dynamic hook registration: Register/unregister hooks at runtime
  • Temporary subscriptions: Use context managers for scoped hooks
  • Conditional monitoring: Enable hooks only when needed
  • Bulk operations: Register multiple hooks as a group

Core Methods

agent.on()

Register a single hook by friendly name.
sub_id = agent.on(name: str, fn: Callable) -> str
Parameters:
  • name: Friendly hook name (e.g., "stream:chunk", "tool:pre_exec")
  • fn: Callback function (sync or async)
Returns:
  • Subscription ID string for later unsubscription
Example:
from egregore import Agent

agent = Agent(provider="openai:gpt-4")

# Register streaming hook
def on_chunk(ctx):
    print(ctx.delta, end="", flush=True)

sub_id = agent.on("stream:chunk", on_chunk)

# Use the agent - hook fires automatically
await agent.acall("Write a poem about AI")

# Unsubscribe when done
agent.unsubscribe(sub_id)

agent.subscribe()

Register multiple hooks at once.
group_id = agent.subscribe(mapping: dict[str, Callable]) -> str
Parameters:
  • mapping: Dictionary of {friendly_name: callback_function}
Returns:
  • Group subscription ID for unsubscribing all at once
Example:
# Register multiple tool hooks
def on_tool_start(ctx):
    print(f"[START] {ctx.tool_name}")

def on_tool_end(ctx):
    print(f"[DONE] {ctx.tool_name}: {ctx.tool_result}")

def on_tool_error(ctx):
    print(f"[ERROR] {ctx.tool_name}: {ctx.error}")

# Subscribe as group
group_id = agent.subscribe({
    "tool:pre_call": on_tool_start,
    "tool:post_call": on_tool_end,
    "tool:on_error": on_tool_error,
})

agent.call("Use the calculator and file reader")

# Unsubscribe all at once
agent.unsubscribe(group_id)

agent.unsubscribe()

Unregister a subscription (single or group).
agent.unsubscribe(sub_id: str) -> None
Parameters:
  • sub_id: Subscription ID from on() or subscribe()
Important:
  • Idempotent: Safe to call multiple times
  • No errors: Never raises exception for non-existent IDs
  • Group aware: Unsubscribes all hooks in a group
Example:
sub_id = agent.on("stream:chunk", on_chunk)
agent.unsubscribe(sub_id)
agent.unsubscribe(sub_id)  # Safe - no error

agent.subscription()

Create a context manager for temporary subscriptions.
agent.subscription(
    name_or_mapping: str | dict[str, Callable],
    fn: Callable = None
) -> SubscriptionContext
Parameters:
  • name_or_mapping: Either a friendly name (str) or a mapping dict
  • fn: Callback function (required if name_or_mapping is str)
Returns:
  • Context manager that auto-unsubscribes on exit
Supports both sync and async usage. Example (single hook):
# Temporary streaming hook
async with agent.subscription("stream:chunk", on_chunk):
    await agent.acall("Stream this response")
# Auto-unsubscribed after block
Example (multiple hooks):
# Temporary monitoring
with agent.subscription({
    "tool:pre_exec": on_start,
    "tool:post_exec": on_end,
    "context:after_change": on_context_change,
}):
    agent.call("Do some work with policies")
# All hooks auto-unsubscribed
Early unsubscription:
async with agent.subscription("stream:chunk", on_chunk) as ctx:
    await agent.acall("First call")
    ctx.unsubscribe()  # Manually unsubscribe early
    await agent.acall("Second call - no hook")

Available Hook Names

All hook names follow the format category:event.

Streaming Hooks

"stream:chunk"          # Every chunk during streaming
"stream:tool_detect"    # Tool call detected in stream
"stream:content"        # Content chunk received
"stream:tool_start"     # Tool call start chunk
"stream:tool_delta"     # Tool argument delta
"stream:tool_complete"  # Tool call complete
"stream:tool_result"    # Tool result received

Tool Hooks

"tool:pre_exec"     # Before tool execution starts
"tool:post_exec"    # After tool execution completes
"tool:pre_call"     # Before individual tool call
"tool:post_call"    # After individual tool call
"tool:on_error"     # Tool execution error
"tool:intercept"    # Intercept tool results (dual-phase)

Context Hooks

"context:before_change"  # Before any context operation
"context:after_change"   # After any context operation
"context:on_add"         # Component added
"context:on_dispatch"    # Component dispatched
"context:on_update"      # Component updated

Message Hooks

"message:on_user"      # User message received
"message:on_provider"  # Provider response received
"message:on_error"     # Message processing error

Scaffold Hooks

"scaffold:op_complete"    # Scaffold operation completed
"scaffold:state_change"   # Scaffold state changed

Usage Patterns

Pattern 1: Dynamic Monitoring

Enable/disable logging based on conditions:
agent = Agent(provider="openai:gpt-4")

# Enable detailed logging for specific task
if task.requires_audit:
    sub_id = agent.subscribe({
        "tool:pre_call": log_tool_start,
        "tool:post_call": log_tool_end,
        "context:after_change": log_context_ops,
    })

result = agent.call(task.prompt)

# Disable logging after task
if task.requires_audit:
    agent.unsubscribe(sub_id)

Pattern 2: Temporary Streaming

Stream responses for interactive sessions only:
async def interactive_chat():
    async with agent.subscription("stream:chunk", print_chunk):
        while True:
            user_input = await get_user_input()
            if user_input == "exit":
                break
            await agent.acall(user_input)
    # Streaming auto-disabled after session

Pattern 3: Testing and Debugging

Capture hook data for assertions:
def test_tool_execution():
    tool_calls = []

    def capture_tool(ctx):
        tool_calls.append(ctx.tool_name)

    with agent.subscription("tool:post_call", capture_tool):
        agent.call("Use calculator to add 5 + 3")

    assert "calculator" in tool_calls

Pattern 4: Rate Limiting

Implement custom rate limiting:
from collections import deque
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, max_calls: int, window: timedelta):
        self.max_calls = max_calls
        self.window = window
        self.calls = deque()

    def check(self, ctx):
        now = datetime.now()

        # Remove old calls outside window
        while self.calls and self.calls[0] < now - self.window:
            self.calls.popleft()

        # Check limit
        if len(self.calls) >= self.max_calls:
            raise RuntimeError(f"Rate limit exceeded: {self.max_calls} calls per {self.window}")

        self.calls.append(now)

# Apply rate limiting
limiter = RateLimiter(max_calls=10, window=timedelta(minutes=1))
agent.on("tool:pre_call", limiter.check)

Pattern 5: Progressive Enhancement

Add hooks incrementally:
agent = Agent(provider="openai:gpt-4")

# Basic usage - no hooks
agent.call("Simple query")

# Add streaming for next call
sub1 = agent.on("stream:chunk", stream_handler)
agent.call("Stream this response")

# Add tool monitoring
sub2 = agent.subscribe({
    "tool:pre_call": tool_logger,
    "tool:on_error": error_handler,
})
agent.call("Complex task with tools")

# Remove all hooks
agent.unsubscribe(sub1)
agent.unsubscribe(sub2)
agent.call("Back to basic")

Integration with Decorator Hooks

Subscribe API and decorator hooks work together seamlessly:
agent = Agent(provider="openai:gpt-4")

# Permanent hook via decorator
@agent.hooks.tool.pre_call
def permanent_logger(ctx):
    print(f"[PERMANENT] {ctx.tool_name}")

# Temporary hook via subscribe API
with agent.subscription("tool:post_call", lambda ctx: print(f"[TEMP] Done: {ctx.tool_name}")):
    agent.call("Execute tools")
    # Both hooks fire together

# After context - only permanent hook remains
agent.call("Execute more tools")
Execution order:
  1. Permanent hooks fire first (in registration order)
  2. Subscribe API hooks fire second (in registration order)

Error Handling

Unknown Hook Names

Helpful suggestions when hook name is misspelled:
try:
    agent.on("stream:chonk", handler)  # Typo
except ValueError as e:
    print(e)
    # Unknown hook name: 'stream:chonk'
    # Did you mean one of these?
    #   - stream:chunk
    #   - stream:content

Non-Callable Functions

try:
    agent.on("stream:chunk", "not_a_function")
except TypeError as e:
    print(e)
    # Hook function must be callable, got str

Safe Unsubscription

# Always safe - never raises errors
agent.unsubscribe("non_existent_id")
agent.unsubscribe("sub_12345")  # Already unsubscribed

Best Practices

# Good: Auto-cleanup
async with agent.subscription("stream:chunk", handler):
    await agent.acall("Stream this")

# Bad: Manual cleanup required
sub_id = agent.on("stream:chunk", handler)
await agent.acall("Stream this")
agent.unsubscribe(sub_id)  # Easy to forget
class MonitoredAgent:
    def __init__(self, agent):
        self.agent = agent
        self.subscriptions = []

    def enable_monitoring(self):
        sub_id = self.agent.subscribe({
            "tool:pre_call": self.log_tool,
            "context:after_change": self.log_context,
        })
        self.subscriptions.append(sub_id)

    def disable_monitoring(self):
        for sub_id in self.subscriptions:
            self.agent.unsubscribe(sub_id)
        self.subscriptions.clear()
# Good: Permanent application-wide logging
@agent.hooks.tool.pre_call
def log_all_tools(ctx):
    logger.info(f"Tool: {ctx.tool_name}")

# Bad: Subscribe API for permanent hooks
sub_id = agent.on("tool:pre_call", log_all_tools)
# Now you have to manage sub_id forever

Performance Considerations

  • Minimal overhead: Subscribe API is a thin layer over decorator system
  • No double-firing: Same hooks registered twice are tracked separately
  • Idempotent unsubscribe: Safe to call multiple times
  • Context manager overhead: Negligible (less than 1ms per subscription)

Comparison: Subscribe API vs Decorators

FeatureSubscribe APIDecorators
RegistrationRuntimeDefinition time
UnsubscriptionYes (anytime)No
Context managerYesNo
Conditional hooksEasyRequires logic in hook
TestingExcellentGood
Bulk operationssubscribe()Multiple decorators
Friendly namesYesNo (use attributes)
Best forDynamic, temporaryPermanent, application-wide

What’s Next?