Skip to content

Managing Threads

Threads in Bedrock Swarm represent individual conversation flows between users and agents. They manage message history, handle tool executions, and track the state of interactions.

Thread Architecture

classDiagram
    class Thread {
        +id: str
        +agent: BedrockAgent
        +history: List[Message]
        +event_system: EventSystem
        +created_at: datetime
        +last_message_at: datetime
        +current_run: Run
        +runs: List[Run]
        +process_message()
        +get_history()
        +get_context_window()
    }

    class Run {
        +id: str
        +status: str
        +started_at: datetime
        +completed_at: datetime
        +required_action: Dict
        +last_error: str
        +complete()
        +fail()
        +require_action()
    }

    class Message {
        +role: str
        +content: str
        +timestamp: datetime
        +thread_id: str
        +metadata: Dict
    }

    Thread "1" --> "1" BedrockAgent : uses
    Thread "1" --> "*" Message : contains
    Thread "1" --> "*" Run : manages

Creating Threads

Basic Thread Creation

from bedrock_swarm.agency.thread import Thread
from bedrock_swarm.agents.base import BedrockAgent

# Create agent
agent = BedrockAgent(
    model_id="us.anthropic.claude-3-5-sonnet-20241022-v2:0",
    tools=[CalculatorTool()]
)

# Create thread
thread = Thread(agent=agent)

# Process messages
response = thread.process_message("What is 2 + 2?")
print(response)

Through Agency

# Create agency
agency = Agency(specialists=[calculator, time_expert])

# Create new thread
thread = agency.create_thread()

# Or get existing thread
thread = agency.get_thread(thread_id)

Thread Components

1. Message History

# Get all messages
history = thread.get_history()

# Get recent context
context = thread.get_context_window(n=5)

# Get last message
last_msg = thread.get_last_message()

2. Runs

Each message processing creates a new run:

stateDiagram-v2
    [*] --> queued
    queued --> in_progress
    in_progress --> requires_action
    in_progress --> completed
    in_progress --> failed
    requires_action --> in_progress
    requires_action --> failed
    completed --> [*]
    failed --> [*]

3. Event System

# Events are automatically created
thread.event_system.create_event(
    type="custom_event",
    agent_name=thread.agent.name,
    run_id=thread.current_run.id,
    thread_id=thread.id,
    details={"action": "special_task"}
)

Thread Behavior

1. Message Processing Flow

sequenceDiagram
    participant User
    participant Thread
    participant Agent
    participant Tools

    User->>Thread: process_message()
    Thread->>Thread: Create new run
    Thread->>Agent: generate()

    alt Needs Tool
        Agent->>Tools: execute()
        Tools-->>Agent: result
        Agent->>Thread: tool result
        Thread->>Agent: get final response
    end

    Agent-->>Thread: response
    Thread-->>User: formatted response

2. Run Management

# Get current run
run = thread.get_current_run()

# Get specific run
run = thread.get_run(run_id)

# Cancel run
thread.cancel_run(run_id)

3. Context Management

# Thread maintains conversation context
thread.process_message("What is 15 * 7?")
thread.process_message("Add 10 to that result")  # Understands "that result"

Best Practices

1. Thread Organization

# Good: Separate threads for different contexts
calculation_thread = agency.create_thread("calculator")
time_thread = agency.create_thread("time_expert")

# Bad: Mixed concerns in single thread
thread.process_message("What is 2 + 2?")
thread.process_message("What time is it in Tokyo?")

2. Error Handling

try:
    response = thread.process_message("Complex query")
except Exception as e:
    # Check run status
    run = thread.get_current_run()
    if run.status == "failed":
        print(f"Run failed: {run.last_error}")

    # Create error event
    thread.event_system.create_event(
        type="error",
        details={"error": str(e)}
    )

3. Memory Management

# Limit context window for efficiency
context = thread.get_context_window(n=5)

# Clear old history if needed
if len(thread.history) > 1000:
    # Archive or process old messages
    pass

Advanced Usage

1. Custom Run Handling

class CustomThread(Thread):
    def process_message(self, content: str) -> str:
        # Create run with custom status
        self.current_run = Run()
        self.current_run.status = "custom_status"

        try:
            # Custom processing
            result = self._custom_process(content)
            self.current_run.complete()
            return result
        except Exception as e:
            self.current_run.fail(str(e))
            raise

2. Thread Switching

# Switch agent in thread
thread.agent = new_agent

# Or through agency
agency.get_completion(
    message="Complex query",
    thread_id=thread.id,
    recipient_agent=specialist_agent
)

3. Event Monitoring

# Track thread events
events = thread.event_system.get_events(
    thread_id=thread.id,
    event_type="tool_complete"
)

# Format event chain
chain = thread.event_system.format_event_chain(event_id)

4. Thread State

class ThreadState:
    def __init__(self, thread: Thread):
        self.message_count = len(thread.history)
        self.last_activity = thread.last_message_at
        self.current_run_status = (
            thread.current_run.status
            if thread.current_run
            else None
        )
        self.tool_usage = self._count_tool_usage(thread)

    def _count_tool_usage(self, thread: Thread) -> Dict[str, int]:
        tool_counts = {}
        for event in thread.event_system.get_events(
            thread_id=thread.id,
            event_type="tool_complete"
        ):
            tool_name = event["details"]["tool_name"]
            tool_counts[tool_name] = tool_counts.get(tool_name, 0) + 1
        return tool_counts