Skip to main content

Agent Discovery

Native agent discovery provides automatic agent tracking and management within workflows. Discovery is always enabled - no setup required.

Core Concept

Workflows automatically track all agents created during execution, providing immediate access to agent instances, states, and control capabilities.
from egregore import Agent
from egregore.core.workflow import node, Sequence, get_current_agents

@node("process_with_agents")
def process_with_agents(data: dict) -> dict:
    """Agents are automatically discovered."""
    # Access all agents in current workflow
    agents = get_current_agents()

    results = []
    for agent_id, agent in agents.items():
        response = agent.call(f"Analyze: {data['text']}")
        results.append(response)

    return {"analyses": results, "agent_count": len(agents)}

# Create workflow with agents
agent1 = Agent(provider="openai:gpt-4")
agent2 = Agent(provider="anthropic:claude-3.5-sonnet")

workflow = Sequence(process_with_agents)
result = workflow.run({"text": "Sample data"})
# Agents automatically tracked during execution

Discovery API

get_current_agents()

Access all agents in the current workflow:
from egregore.core.workflow import get_current_agents

@node("list_agents")
def list_agents(data: dict) -> dict:
    """Get all discovered agents."""
    agents = get_current_agents()

    return {
        "agent_ids": list(agents.keys()),
        "agent_count": len(agents),
        "agents": agents
    }
Returns: Dictionary of agent_id -> agent instance

get_current_agent_states()

Get the current state of all agents:
from egregore.core.workflow import get_current_agent_states

@node("check_agent_states")
def check_agent_states(data: dict) -> dict:
    """Monitor agent states."""
    states = get_current_agent_states()

    # States: "idle", "busy", "waiting", "error"
    busy_agents = [aid for aid, state in states.items() if state == "busy"]

    return {
        "states": states,
        "busy_count": len(busy_agents)
    }
Returns: Dictionary of agent_id -> state string Possible States:
  • "idle": Agent available for work
  • "busy": Agent processing request
  • "waiting": Agent waiting for response
  • "error": Agent encountered error

interrupt_current_agents()

Stop all agents in the current workflow:
from egregore.core.workflow import interrupt_current_agents

@node("emergency_stop")
def emergency_stop(data: dict) -> dict:
    """Interrupt all agents immediately."""
    reason = data.get("reason", "User requested stop")

    # Stop all agents in workflow
    interrupt_current_agents()

    return {"status": "stopped", "reason": reason}
Use Cases:
  • Emergency shutdown
  • Cost limit exceeded
  • User cancellation
  • Timeout handling

apply_policy_to_current_agents()

Apply cross-cutting policies to all agents:
from egregore.core.workflow import apply_policy_to_current_agents

@node("apply_rate_limiting")
def apply_rate_limiting(data: dict) -> dict:
    """Apply rate limiting policy to all agents."""
    max_calls = data.get("max_calls", 100)

    def rate_limit_policy(agent_id: str, agent):
        """Apply rate limit to agent."""
        agent.set_rate_limit(max_calls_per_minute=max_calls)

    # Apply to all agents
    apply_policy_to_current_agents(rate_limit_policy)

    return {"status": "rate_limit_applied", "limit": max_calls}
Common Policies:
  • Rate limiting
  • Timeout configuration
  • Retry policies
  • Logging configuration
  • Cost tracking

get_agents_in_node()

Get agents created within a specific node:
from egregore.core.workflow import get_agents_in_node, node

@node("create_specialized_agents")
def create_specialized_agents(data: dict) -> dict:
    """Create agents for specific tasks."""
    # Agents created in this node
    analyst = Agent(provider="openai:gpt-4", name="analyst")
    writer = Agent(provider="anthropic:claude-3.5-sonnet", name="writer")

    return {"agents_created": 2}

@node("access_node_agents")
def access_node_agents(data: dict) -> dict:
    """Access agents from specific node."""
    # Get only agents created in previous node
    node_agents = get_agents_in_node("create_specialized_agents")

    return {
        "node_agent_count": len(node_agents),
        "node_agents": list(node_agents.keys())
    }
Use Cases:
  • Track which nodes created which agents
  • Clean up agents per node
  • Debug agent creation flow
  • Isolate agent groups

monitor_current_workflow()

Add monitoring callbacks for agent lifecycle events:
from egregore.core.workflow import monitor_current_workflow

def agent_event_logger(event: str, agent_id: str, data: dict):
    """Log agent lifecycle events."""
    print(f"[{event}] Agent {agent_id}: {data}")

@node("setup_monitoring")
def setup_monitoring(data: dict) -> dict:
    """Set up agent monitoring."""
    # Add callback for all agent events
    monitor_current_workflow(agent_event_logger)

    return {"monitoring": "enabled"}

# Events logged:
# - agent_created: When agent is instantiated
# - agent_called: When agent.call() is invoked
# - agent_completed: When agent finishes request
# - agent_error: When agent encounters error
# - agent_interrupted: When agent is stopped
Event Types:
  • agent_created: Agent instantiated
  • agent_called: Agent processing request
  • agent_completed: Request finished
  • agent_error: Error occurred
  • agent_interrupted: Agent stopped
Event Data:
{
    "timestamp": 1234567890.123,
    "agent_id": "agent_abc123",
    "provider": "openai:gpt-4",
    "node_name": "process_data",  # Where event occurred
    "details": {...}  # Event-specific data
}

Workflow Context

Auto-Initialization

Discovery works automatically without setup:
from egregore.core.workflow import node, Sequence, get_current_agents

# No initialization needed - just use the API
@node("auto_discovery")
def auto_discovery(data: dict) -> dict:
    agents = get_current_agents()  # Works immediately
    return {"discovered": len(agents)}

Explicit Workflow Naming

Use workflow_context for explicit workflow names:
from egregore.core.workflow import workflow_context, get_current_agents

with workflow_context("data_processing"):
    # All agents in this context are tracked under "data_processing"
    agent1 = Agent(provider="openai:gpt-4")
    agent2 = Agent(provider="anthropic:claude-3.5-sonnet")

    workflow = Sequence(process_data)
    result = workflow.run(data)

    # Access agents in this workflow
    agents = get_current_agents()
Benefits:
  • Clear workflow identification
  • Multiple concurrent workflows
  • Better logging and debugging
  • Workflow isolation

Nested Workflows

Workflows maintain their own agent contexts:
with workflow_context("outer"):
    outer_agent = Agent(provider="openai:gpt-4")

    with workflow_context("inner"):
        inner_agent = Agent(provider="anthropic:claude-3.5-sonnet")

        # get_current_agents() returns only inner_agent
        agents = get_current_agents()

    # Back in outer context - returns only outer_agent
    agents = get_current_agents()

Common Patterns

Multi-Agent Coordination

from egregore.core.workflow import node, get_current_agents, Sequence

@node("distribute_work")
def distribute_work(task: dict) -> dict:
    """Distribute subtasks to available agents."""
    agents = get_current_agents()

    # Split task into subtasks
    subtasks = split_task(task, num_parts=len(agents))

    # Distribute to agents
    results = []
    for (agent_id, agent), subtask in zip(agents.items(), subtasks):
        result = agent.call(subtask["prompt"])
        results.append({
            "agent_id": agent_id,
            "subtask": subtask,
            "result": result
        })

    return {"results": results}

@node("combine_results")
def combine_results(data: dict) -> dict:
    """Combine results from multiple agents."""
    results = data["results"]
    combined = merge_results(results)
    return {"combined": combined}

workflow = Sequence(distribute_work, combine_results)

Agent Load Balancing

@node("load_balance")
def load_balance(tasks: list) -> dict:
    """Distribute tasks to least-busy agents."""
    agents = get_current_agents()
    states = get_current_agent_states()

    # Find idle agents
    idle_agents = [
        (aid, agent) for aid, agent in agents.items()
        if states.get(aid) == "idle"
    ]

    if not idle_agents:
        return {"status": "no_idle_agents"}

    # Distribute to idle agents
    results = []
    for task, (agent_id, agent) in zip(tasks, idle_agents):
        result = agent.call(task["prompt"])
        results.append({"agent_id": agent_id, "result": result})

    return {"results": results}

Conditional Agent Selection

@node("route_to_specialist")
def route_to_specialist(request: dict) -> dict:
    """Route request to appropriate specialist agent."""
    agents = get_current_agents()

    # Find agent by capability
    if request["type"] == "code":
        specialist = find_agent_by_name(agents, "coder")
    elif request["type"] == "analysis":
        specialist = find_agent_by_name(agents, "analyst")
    else:
        specialist = list(agents.values())[0]  # Default

    result = specialist.call(request["prompt"])
    return {"result": result, "handled_by": specialist.name}

def find_agent_by_name(agents: dict, name: str):
    """Find agent by name attribute."""
    for agent in agents.values():
        if getattr(agent, "name", "") == name:
            return agent
    return None

Agent Health Monitoring

from egregore.core.workflow import get_current_agent_states

@node("health_check")
def health_check(data: dict) -> dict:
    """Monitor agent health."""
    agents = get_current_agents()
    states = get_current_agent_states()

    health = {}
    for agent_id, agent in agents.items():
        state = states.get(agent_id, "unknown")

        # Check if agent is responsive
        try:
            test_result = agent.call("Test", max_tokens=10)
            health[agent_id] = {
                "state": state,
                "responsive": True,
                "test_result": test_result
            }
        except Exception as e:
            health[agent_id] = {
                "state": "error",
                "responsive": False,
                "error": str(e)
            }

    return {"health": health}

Progressive Agent Policies

from egregore.core.workflow import apply_policy_to_current_agents

@node("progressive_rate_limiting")
def progressive_rate_limiting(data: dict) -> dict:
    """Apply increasingly strict rate limits."""
    total_calls = data.get("total_calls", 0)

    if total_calls > 10000:
        # Strict limits for high usage
        def strict_policy(agent_id: str, agent):
            agent.set_rate_limit(max_calls_per_minute=10)
        apply_policy_to_current_agents(strict_policy)

    elif total_calls > 5000:
        # Moderate limits
        def moderate_policy(agent_id: str, agent):
            agent.set_rate_limit(max_calls_per_minute=50)
        apply_policy_to_current_agents(moderate_policy)

    return {"policy": "applied", "tier": "strict" if total_calls > 10000 else "moderate"}

Global Workflow Management

Access Global Manager

from egregore.core.workflow import get_workflow_manager

# Get global workflow manager
manager = get_workflow_manager()

# Get all discovered agents across all workflows
all_agents = manager.get_all_discovered_agents()

# Get agents in specific workflow
workflow_agents = manager.get_workflow_agents("data_processing")

# Interrupt all workflows
manager.interrupt_all_workflows()

# Get global summary
summary = manager.get_global_summary()
# Returns:
# {
#     "global_agent_discovery": {...},
#     "workflows": {
#         "workflow_1": {...},
#         "workflow_2": {...}
#     },
#     "total_workflows": 2
# }

Cross-Workflow Operations

from egregore.core.workflow import get_workflow_manager, workflow_context

# Workflow 1
with workflow_context("processing"):
    agent1 = Agent(provider="openai:gpt-4")
    workflow1 = Sequence(process_node)
    workflow1.run(data1)

# Workflow 2
with workflow_context("analysis"):
    agent2 = Agent(provider="anthropic:claude-3.5-sonnet")
    workflow2 = Sequence(analyze_node)
    workflow2.run(data2)

# Access all workflows
manager = get_workflow_manager()
processing_agents = manager.get_workflow_agents("processing")
analysis_agents = manager.get_workflow_agents("analysis")

print(f"Processing: {len(processing_agents)} agents")
print(f"Analysis: {len(analysis_agents)} agents")

Performance Considerations

Discovery Overhead

  • Agent registration: Less than 1ms per agent
  • get_current_agents(): Less than 1ms (dictionary lookup)
  • State queries: Less than 1ms
  • Policy application: ~5ms per agent

Memory Impact

  • Per agent: ~100 bytes of tracking data
  • Per workflow: ~500 bytes of metadata
  • Total: Negligible for typical workflows (less than 1MB for 1000 agents)

Thread Safety

All discovery operations are thread-safe:
import threading

def worker_workflow():
    with workflow_context(f"worker_{threading.current_thread().name}"):
        agent = Agent(provider="openai:gpt-4")
        agents = get_current_agents()  # Thread-safe
        return len(agents)

# Multiple threads safely create workflows
threads = [threading.Thread(target=worker_workflow) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Best Practices

# Good: Explicit workflow names
with workflow_context("user_processing"):
    workflow = Sequence(process_users)
    result = workflow.run(data)
    agents = get_current_agents()

# Bad: Auto-generated workflow names
workflow = Sequence(process_users)
result = workflow.run(data)  # Workflow name is random UUID
# Good: Check states first
@node("smart_distribute")
def smart_distribute(tasks: list) -> dict:
    agents = get_current_agents()
    states = get_current_agent_states()

    idle_agents = [
        a for aid, a in agents.items()
        if states.get(aid) == "idle"
    ]

    # Distribute only to idle agents
    return distribute_to_agents(tasks, idle_agents)

# Bad: No state checking
@node("blind_distribute")
def blind_distribute(tasks: list) -> dict:
    agents = get_current_agents()
    # May overload busy agents
    return distribute_to_agents(tasks, list(agents.values()))
# Good: Policies applied before work
workflow = Sequence(
    setup_policies,     # Apply rate limits, timeouts
    process_data,       # Use configured agents
    finalize
)

# Bad: Policies applied too late
workflow = Sequence(
    process_data,       # Agents unconfigured
    setup_policies,     # Too late - work already done
    finalize
)
# Good: Comprehensive monitoring
def production_monitor(event: str, agent_id: str, data: dict):
    logger.info(f"Agent event: {event}", extra={
        "agent_id": agent_id,
        "data": data,
        "timestamp": time.time()
    })

    # Alert on errors
    if event == "agent_error":
        alert_system.notify(f"Agent {agent_id} error: {data}")

@node("setup_production")
def setup_production(data: dict) -> dict:
    monitor_current_workflow(production_monitor)
    return data

# Bad: No monitoring
@node("no_monitoring")
def no_monitoring(data: dict) -> dict:
    # Silent failures
    return process(data)
# Good: Interrupt on failure
@node("safe_processing")
def safe_processing(data: dict) -> dict:
    try:
        return process(data)
    except Exception as e:
        # Stop all agents on failure
        interrupt_current_agents()
        raise

# Bad: Agents continue after failure
@node("unsafe_processing")
def unsafe_processing(data: dict) -> dict:
    try:
        return process(data)
    except Exception:
        return {"error": True}  # Agents keep running

Error Handling

No Agents Discovered

@node("handle_no_agents")
def handle_no_agents(data: dict) -> dict:
    """Handle case where no agents exist."""
    agents = get_current_agents()

    if not agents:
        # Create fallback agent
        fallback = Agent(provider="openai:gpt-4")
        agents = get_current_agents()  # Now has fallback

    return {"agent_count": len(agents)}

Agent State Errors

@node("handle_agent_errors")
def handle_agent_errors(data: dict) -> dict:
    """Handle agents in error state."""
    agents = get_current_agents()
    states = get_current_agent_states()

    error_agents = [
        aid for aid, state in states.items()
        if state == "error"
    ]

    if error_agents:
        # Log errors
        for agent_id in error_agents:
            print(f"Agent {agent_id} in error state")

        # Continue with healthy agents only
        healthy_agents = {
            aid: agent for aid, agent in agents.items()
            if states.get(aid) != "error"
        }

        return process_with_agents(healthy_agents)

    return process_with_agents(agents)

Discovery API Failures

from egregore.core.workflow import get_current_agents

@node("safe_discovery")
def safe_discovery(data: dict) -> dict:
    """Safely access discovery API."""
    try:
        agents = get_current_agents()
    except Exception as e:
        print(f"Discovery failed: {e}")
        agents = {}  # Empty fallback

    return {
        "discovered": len(agents),
        "agents": agents
    }

What’s Next?