from egregore.core.workflow import get_agents_in_node, node@node("create_specialized_agents")def create_specialized_agents(data: dict) -> dict: """Create agents for specific tasks.""" # Agents created in this node analyst = Agent(provider="openai:gpt-4", name="analyst") writer = Agent(provider="anthropic:claude-3.5-sonnet", name="writer") return {"agents_created": 2}@node("access_node_agents")def access_node_agents(data: dict) -> dict: """Access agents from specific node.""" # Get only agents created in previous node node_agents = get_agents_in_node("create_specialized_agents") return { "node_agent_count": len(node_agents), "node_agents": list(node_agents.keys()) }
from egregore.core.workflow import node, Sequence, get_current_agents# No initialization needed - just use the API@node("auto_discovery")def auto_discovery(data: dict) -> dict: agents = get_current_agents() # Works immediately return {"discovered": len(agents)}
from egregore.core.workflow import workflow_context, get_current_agentswith workflow_context("data_processing"): # All agents in this context are tracked under "data_processing" agent1 = Agent(provider="openai:gpt-4") agent2 = Agent(provider="anthropic:claude-3.5-sonnet") workflow = Sequence(process_data) result = workflow.run(data) # Access agents in this workflow agents = get_current_agents()
with workflow_context("outer"): outer_agent = Agent(provider="openai:gpt-4") with workflow_context("inner"): inner_agent = Agent(provider="anthropic:claude-3.5-sonnet") # get_current_agents() returns only inner_agent agents = get_current_agents() # Back in outer context - returns only outer_agent agents = get_current_agents()
from egregore.core.workflow import get_workflow_manager# Get global workflow managermanager = get_workflow_manager()# Get all discovered agents across all workflowsall_agents = manager.get_all_discovered_agents()# Get agents in specific workflowworkflow_agents = manager.get_workflow_agents("data_processing")# Interrupt all workflowsmanager.interrupt_all_workflows()# Get global summarysummary = manager.get_global_summary()# Returns:# {# "global_agent_discovery": {...},# "workflows": {# "workflow_1": {...},# "workflow_2": {...}# },# "total_workflows": 2# }
import threadingdef worker_workflow(): with workflow_context(f"worker_{threading.current_thread().name}"): agent = Agent(provider="openai:gpt-4") agents = get_current_agents() # Thread-safe return len(agents)# Multiple threads safely create workflowsthreads = [threading.Thread(target=worker_workflow) for _ in range(10)]for t in threads: t.start()for t in threads: t.join()
# Good: Explicit workflow nameswith workflow_context("user_processing"): workflow = Sequence(process_users) result = workflow.run(data) agents = get_current_agents()# Bad: Auto-generated workflow namesworkflow = Sequence(process_users)result = workflow.run(data) # Workflow name is random UUID
Check agent states before work distribution
# Good: Check states first@node("smart_distribute")def smart_distribute(tasks: list) -> dict: agents = get_current_agents() states = get_current_agent_states() idle_agents = [ a for aid, a in agents.items() if states.get(aid) == "idle" ] # Distribute only to idle agents return distribute_to_agents(tasks, idle_agents)# Bad: No state checking@node("blind_distribute")def blind_distribute(tasks: list) -> dict: agents = get_current_agents() # May overload busy agents return distribute_to_agents(tasks, list(agents.values()))
Apply policies early in workflow
# Good: Policies applied before workworkflow = Sequence( setup_policies, # Apply rate limits, timeouts process_data, # Use configured agents finalize)# Bad: Policies applied too lateworkflow = Sequence( process_data, # Agents unconfigured setup_policies, # Too late - work already done finalize)
@node("handle_no_agents")def handle_no_agents(data: dict) -> dict: """Handle case where no agents exist.""" agents = get_current_agents() if not agents: # Create fallback agent fallback = Agent(provider="openai:gpt-4") agents = get_current_agents() # Now has fallback return {"agent_count": len(agents)}
@node("handle_agent_errors")def handle_agent_errors(data: dict) -> dict: """Handle agents in error state.""" agents = get_current_agents() states = get_current_agent_states() error_agents = [ aid for aid, state in states.items() if state == "error" ] if error_agents: # Log errors for agent_id in error_agents: print(f"Agent {agent_id} in error state") # Continue with healthy agents only healthy_agents = { aid: agent for aid, agent in agents.items() if states.get(aid) != "error" } return process_with_agents(healthy_agents) return process_with_agents(agents)