Documentation Index Fetch the complete documentation index at: https://docs.egregorelabs.io/llms.txt
Use this file to discover all available pages before exploring further.
Scaffold IPC
Scaffolds communicate through agent state - a formal IPC system with source tracking, type safety, and observability.
The Problem
Direct scaffold access creates tight coupling:
# Bad: Direct coupling
class AnalyzerScaffold ( BaseContextScaffold ):
def render ( self ):
# Tightly coupled to notes scaffold
notes = self .agent.scaffolds[ "notes" ]
count = len (notes.state.notes) # Breaks if notes scaffold missing
Agent State Solution
Use agent state for formal IPC:
# Good: Formal IPC
class NotesScaffold ( BaseContextScaffold ):
def render ( self ):
# Publish state
self .agent.state.set( "note_count" , len ( self .state.notes), source = "notes" )
class AnalyzerScaffold ( BaseContextScaffold ):
def render ( self ):
# Subscribe to state
note_count = self .agent.state.get( "note_count" , default = 0 )
return TextContent( content = f "Notes: { note_count } " , key = "analysis" )
State API
Setting State
# Basic set
self .agent.state.set( "key" , "value" , source = "scaffold_name" )
# With metadata
self .agent.state.set(
"status" ,
"processing" ,
source = "task_scaffold" ,
metadata = { "priority" : "high" }
)
Getting State
# Get with default
value = self .agent.state.get( "key" , default = None )
# Check existence
if self .agent.state.exists( "key" ):
value = self .agent.state.get( "key" )
# Get with metadata
state_entry = self .agent.state.get_entry( "key" )
print (state_entry.value, state_entry.source, state_entry.metadata)
Communication Patterns
Producer-Consumer
class ProducerScaffold ( BaseContextScaffold ):
scaffold_type = "producer"
@operation
def process_data ( self , data : str ) -> str :
result = self .expensive_computation(data)
# Publish result
self .agent.state.set( "processed_data" , result, source = "producer" )
return "Processing complete"
class ConsumerScaffold ( BaseContextScaffold ):
scaffold_type = "consumer"
def render ( self ):
# Consume published data
data = self .agent.state.get( "processed_data" )
if data:
return TextContent( content = f "Using: { data } " , key = "consumer_output" )
Event Broadcasting
class EventScaffold ( BaseContextScaffold ):
def render ( self ):
# Broadcast event
self .agent.state.set(
"task_completed" ,
True ,
source = "task_scaffold" ,
metadata = { "task_id" : 123 }
)
class ListenerScaffold ( BaseContextScaffold ):
def render ( self ):
# Listen for event
if self .agent.state.get( "task_completed" ):
metadata = self .agent.state.get_entry( "task_completed" ).metadata
task_id = metadata.get( "task_id" )
return TextContent( content = f "Task { task_id } done" , key = "notification" )
State Aggregation
class AggregatorScaffold ( BaseContextScaffold ):
def render ( self ):
# Aggregate from multiple sources
metrics = {
"notes" : self .agent.state.get( "note_count" , default = 0 ),
"tasks" : self .agent.state.get( "task_count" , default = 0 ),
"files" : self .agent.state.get( "file_count" , default = 0 )
}
total = sum (metrics.values())
return TextContent(
content = f "Total items: { total } " ,
key = "aggregate_metrics"
)
Best Practices
# Good: Clear naming
self .agent.state.set( "task_completion_status" , "done" , source = "tasks" )
# Bad: Cryptic names
self .agent.state.set( "s" , "d" , source = "t" )
# Good: Track source
self .agent.state.set( "count" , 5 , source = "counter_scaffold" )
# Bad: No source tracking
self .agent.state.set( "count" , 5 , source = "unknown" )
Provide defaults when reading
# Good: Safe with default
count = self .agent.state.get( "count" , default = 0 )
# Bad: May raise exception
count = self .agent.state.get( "count" )
What’s Next?
Creating Scaffolds Build custom scaffolds
Built-in Scaffolds Explore InternalNotes, FileManager, Shell