Skip to content

Event System

The event system in Bedrock Swarm provides comprehensive tracking and monitoring of agent activities, tool executions, and system operations. It enables detailed tracing of operations and facilitates debugging and monitoring.

Class Documentation

System for tracking and managing events in the agent swarm.

This class provides: 1. Event creation and storage 2. Event querying and filtering 3. Event relationship tracking (parent/child) 4. Chronological event ordering

Source code in src/bedrock_swarm/events.py
def __init__(self) -> None:
    """Initialize the event system."""
    self.events: List[Event] = []
    self.current_event_id: Optional[str] = None

Functions

create_event(type: EventType, agent_name: str, run_id: str, thread_id: str, details: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None) -> str

Create and record a new event.

PARAMETER DESCRIPTION
type

Type of event

TYPE: EventType

agent_name

Name of agent involved

TYPE: str

run_id

ID of the run this event belongs to

TYPE: str

thread_id

ID of the thread this event belongs to

TYPE: str

details

Event-specific details

TYPE: Dict[str, Any]

metadata

Optional additional metadata

TYPE: Optional[Dict[str, Any]] DEFAULT: None

RETURNS DESCRIPTION
str

ID of the created event

Source code in src/bedrock_swarm/events.py
def create_event(
    self,
    type: EventType,
    agent_name: str,
    run_id: str,
    thread_id: str,
    details: Dict[str, Any],
    metadata: Optional[Dict[str, Any]] = None,
) -> str:
    """Create and record a new event.

    Args:
        type: Type of event
        agent_name: Name of agent involved
        run_id: ID of the run this event belongs to
        thread_id: ID of the thread this event belongs to
        details: Event-specific details
        metadata: Optional additional metadata

    Returns:
        ID of the created event
    """
    event_id = str(uuid4())
    event: Event = {
        "id": event_id,
        "type": type,
        "timestamp": datetime.now().isoformat(),
        "agent_name": agent_name,
        "run_id": run_id,
        "thread_id": thread_id,
        "parent_event_id": self.current_event_id,
        "details": details,
        "metadata": metadata or {},
    }

    self.events.append(event)
    return event_id

start_event_scope(event_id: str) -> None

Start a new event scope.

This sets the current event as the parent for any new events until the scope is ended.

PARAMETER DESCRIPTION
event_id

ID of the event to set as current

TYPE: str

Source code in src/bedrock_swarm/events.py
def start_event_scope(self, event_id: str) -> None:
    """Start a new event scope.

    This sets the current event as the parent for any new events
    until the scope is ended.

    Args:
        event_id: ID of the event to set as current
    """
    self.current_event_id = event_id

end_event_scope() -> None

End the current event scope.

Source code in src/bedrock_swarm/events.py
def end_event_scope(self) -> None:
    """End the current event scope."""
    self.current_event_id = None

get_events(run_id: Optional[str] = None, thread_id: Optional[str] = None, agent_name: Optional[str] = None, event_type: Optional[EventType] = None) -> List[Event]

Get events matching the specified filters.

PARAMETER DESCRIPTION
run_id

Optional run ID to filter by

TYPE: Optional[str] DEFAULT: None

thread_id

Optional thread ID to filter by

TYPE: Optional[str] DEFAULT: None

agent_name

Optional agent name to filter by

TYPE: Optional[str] DEFAULT: None

event_type

Optional event type to filter by

TYPE: Optional[EventType] DEFAULT: None

RETURNS DESCRIPTION
List[Event]

List of matching events in chronological order

Source code in src/bedrock_swarm/events.py
def get_events(
    self,
    run_id: Optional[str] = None,
    thread_id: Optional[str] = None,
    agent_name: Optional[str] = None,
    event_type: Optional[EventType] = None,
) -> List[Event]:
    """Get events matching the specified filters.

    Args:
        run_id: Optional run ID to filter by
        thread_id: Optional thread ID to filter by
        agent_name: Optional agent name to filter by
        event_type: Optional event type to filter by

    Returns:
        List of matching events in chronological order
    """
    filtered = self.events

    if run_id:
        filtered = [e for e in filtered if e["run_id"] == run_id]
    if thread_id:
        filtered = [e for e in filtered if e["thread_id"] == thread_id]
    if agent_name:
        filtered = [e for e in filtered if e["agent_name"] == agent_name]
    if event_type:
        filtered = [e for e in filtered if e["type"] == event_type]

    return filtered

get_event_chain(event_id: str) -> List[Event]

Get the chain of events leading to the specified event.

This follows the parent_event_id links to build the chain.

PARAMETER DESCRIPTION
event_id

ID of the event to get the chain for

TYPE: str

RETURNS DESCRIPTION
List[Event]

List of events in the chain, from root to specified event

Source code in src/bedrock_swarm/events.py
def get_event_chain(self, event_id: str) -> List[Event]:
    """Get the chain of events leading to the specified event.

    This follows the parent_event_id links to build the chain.

    Args:
        event_id: ID of the event to get the chain for

    Returns:
        List of events in the chain, from root to specified event
    """
    chain = []
    current = next((e for e in self.events if e["id"] == event_id), None)

    while current:
        chain.append(current)
        if current["parent_event_id"]:
            current = next(
                (e for e in self.events if e["id"] == current["parent_event_id"]),
                None,
            )
        else:
            break

    return list(reversed(chain))  # Return in chronological order

format_event(event: Event) -> str

Format an event for display.

PARAMETER DESCRIPTION
event

Event to format

TYPE: Event

RETURNS DESCRIPTION
str

Formatted string representation of the event

Source code in src/bedrock_swarm/events.py
def format_event(self, event: Event) -> str:
    """Format an event for display.

    Args:
        event: Event to format

    Returns:
        Formatted string representation of the event
    """
    # Parse timestamp for formatting
    timestamp = datetime.fromisoformat(event["timestamp"])
    time_str = timestamp.strftime("%H:%M:%S.%f")[:-3]

    # Format basic event info
    lines = [
        f"[{time_str}] {event['type'].upper()} - Agent: {event['agent_name']}",
    ]

    # Add event details
    if event["details"]:
        for key, value in event["details"].items():
            if isinstance(value, dict):
                lines.append(f"  {key}:")
                for k, v in value.items():
                    lines.append(f"    {k}: {v}")
            else:
                lines.append(f"  {key}: {value}")

    # Add metadata if present
    if event["metadata"]:
        lines.append("  Metadata:")
        for key, value in event["metadata"].items():
            lines.append(f"    {key}: {value}")

    return "\n".join(lines)

format_event_chain(event_id: str) -> str

Format a chain of events for display.

PARAMETER DESCRIPTION
event_id

ID of the event to get the chain for

TYPE: str

RETURNS DESCRIPTION
str

Formatted string representation of the event chain

Source code in src/bedrock_swarm/events.py
def format_event_chain(self, event_id: str) -> str:
    """Format a chain of events for display.

    Args:
        event_id: ID of the event to get the chain for

    Returns:
        Formatted string representation of the event chain
    """
    chain = self.get_event_chain(event_id)
    return "\n\n".join(self.format_event(event) for event in chain)

Event Types

The system supports these event types:

  1. Agent Events:
  2. agent_start: Agent begins processing
  3. agent_complete: Agent completes processing
  4. message_sent: Message sent between agents
  5. message_received: Message received by agent

  6. Tool Events:

  7. tool_start: Tool execution begins
  8. tool_complete: Tool execution completes
  9. tool_error: Tool execution error

  10. Run Events:

  11. run_start: Run begins
  12. run_complete: Run completes
  13. error: Error occurred

Event Structure

Each event follows this structure:

{
    "id": "evt_123",              # Unique event ID
    "type": "agent_start",        # Event type
    "timestamp": "2024-02-29...", # ISO format timestamp
    "agent_name": "calculator",   # Name of agent involved
    "run_id": "run_456",         # ID of the run
    "thread_id": "thread_789",   # ID of the thread
    "parent_event_id": "evt_122", # ID of parent event
    "details": {                  # Event-specific details
        "message": "Processing request",
        "status": "active"
    },
    "metadata": {                # Additional metadata
        "version": "1.0",
        "environment": "prod"
    }
}

Usage Examples

from bedrock_swarm.agency import EventSystem

# Create event system
events = EventSystem()

# Record an event
event_id = events.record_event(
    type="agent_start",
    agent_name="calculator",
    run_id="run_123",
    thread_id="thread_456",
    details={"message": "Starting calculation"}
)

# Get event chain
chain = events.get_event_chain(event_id)
print(chain)  # Shows event and all related events

# Filter events
agent_events = events.filter_events(
    agent_name="calculator",
    event_type="tool_start"
)

# Get event trace
trace = events.get_trace(run_id="run_123")
print(trace)  # Shows complete trace of run

Event Chains

Events can form parent-child relationships:

  1. Parent Events:
  2. Agent start events
  3. Run start events
  4. Tool start events

  5. Child Events:

  6. Tool execution events
  7. Message events
  8. Completion events

Event Filtering

The system supports filtering by:

  1. Event type
  2. Agent name
  3. Time range
  4. Run ID
  5. Thread ID

Implementation Details

The event system provides:

  1. Event Recording:
  2. Unique ID generation
  3. Timestamp management
  4. Parent-child linking
  5. Metadata attachment

  6. Event Retrieval:

  7. Chain reconstruction
  8. Trace generation
  9. Event filtering
  10. Chronological ordering

  11. Event Analysis:

  12. Duration calculation
  13. Success rate tracking
  14. Performance monitoring
  15. Error pattern detection

Best Practices

  1. Event Recording:

    # Record start of operation
    start_id = events.record_event(
        type="operation_start",
        details={"operation": "calculation"}
    )
    
    try:
        # Perform operation
        result = perform_calculation()
    
        # Record success
        events.record_event(
            type="operation_complete",
            parent_id=start_id,
            details={"result": result}
        )
    except Exception as e:
        # Record error
        events.record_event(
            type="error",
            parent_id=start_id,
            details={"error": str(e)}
        )
    

  2. Event Monitoring:

    # Monitor specific agent
    agent_trace = events.get_agent_trace("calculator")
    
    # Monitor tool usage
    tool_events = events.filter_events(
        event_type="tool_start",
        tool_name="calculator"
    )
    
    # Monitor errors
    error_events = events.filter_events(
        event_type="error"
    )
    

  3. Performance Analysis:

    # Get operation duration
    duration = events.calculate_duration(
        start_event_id,
        end_event_id
    )
    
    # Get success rate
    success_rate = events.calculate_success_rate(
        agent_name="calculator"
    )