Skip to content

Thread Management

The thread management system in Bedrock Swarm handles the execution and coordination of agent tasks. It provides mechanisms for creating, managing, and monitoring threads of execution.

Class Documentation

A Thread manages a single conversation between a user and an agent.

The Thread is responsible for: 1. Maintaining conversation history 2. Processing messages through the agent 3. Handling tool executions 4. Recording all interactions 5. Managing runs and their states

PARAMETER DESCRIPTION
agent

The agent that will process messages in this thread

TYPE: BedrockAgent

Source code in src/bedrock_swarm/agency/thread.py
def __init__(self, agent: BedrockAgent) -> None:
    """Initialize a new thread.

    Args:
        agent: The agent that will process messages in this thread
    """
    self.id = str(uuid4())
    self.agent = agent
    self.history: List[Message] = []
    self.created_at = datetime.now()
    self.last_message_at: Optional[datetime] = None
    self.current_run: Optional[Run] = None
    self.runs: List[Run] = []
    self.event_system = None  # Will be set by Agency
    logger.debug(f"Created new thread {self.id} for agent {agent.name}")

Attributes

thread_id: str property

Get the thread ID.

RETURNS DESCRIPTION
str

Thread ID

TYPE: str

Functions

process_message(content: str) -> str

Process a message through this thread.

This method: 1. Records the incoming message 2. Creates and tracks a new run 3. Processes it through the agent 4. Handles any tool executions 5. Records and returns the response

PARAMETER DESCRIPTION
content

Message to process

TYPE: str

RETURNS DESCRIPTION
str

The final response text

Source code in src/bedrock_swarm/agency/thread.py
def process_message(self, content: str) -> str:
    """Process a message through this thread.

    This method:
    1. Records the incoming message
    2. Creates and tracks a new run
    3. Processes it through the agent
    4. Handles any tool executions
    5. Records and returns the response

    Args:
        content: Message to process

    Returns:
        The final response text
    """
    logger.debug(f"Thread {self.id}: Processing message: {content}")

    # Record user message
    self._record_message(
        "user",
        content,
        metadata={"type": "user_message", "timestamp": datetime.now().isoformat()},
    )

    # Create new run
    self.current_run = Run()
    self.runs.append(self.current_run)
    self.current_run.status = "in_progress"
    logger.debug(f"Thread {self.id}: Created new run {self.current_run.id}")

    # Create agent start event
    agent_start_id = self.event_system.create_event(
        type="agent_start",
        agent_name=self.agent.name,
        run_id=self.current_run.id,
        thread_id=self.id,
        details={"message": content},
    )
    self.event_system.start_event_scope(agent_start_id)

    try:
        # Get initial response from agent
        logger.debug(f"Thread {self.id}: Getting initial response from agent")
        response = self.agent.generate(content)
        logger.debug(f"Thread {self.id}: Initial response: {response}")

        # Handle potential tool calls
        if response.get("type") == "tool_call" and response.get("tool_calls"):
            logger.debug(f"Thread {self.id}: Processing tool calls")
            # Set run status for tool execution
            self.current_run.require_action(
                {"type": "tool_calls", "tool_calls": response["tool_calls"]}
            )

            # Record tool call intent
            self._record_message(
                "assistant",
                json.dumps(response["tool_calls"]),
                metadata={
                    "type": "tool_call_intent",
                    "run_id": self.current_run.id,
                    "timestamp": datetime.now().isoformat(),
                    "tool_calls": response["tool_calls"],
                },
            )

            try:
                # Execute tools and get final response
                tool_outputs = self._execute_tools(response["tool_calls"])
                logger.debug(f"Thread {self.id}: Tool outputs: {tool_outputs}")

                # Record tool execution results
                for output in tool_outputs:
                    self._record_message(
                        "system",
                        str(output["output"]),
                        metadata={
                            "type": "tool_result",
                            "tool_call_id": output["tool_call_id"],
                            "timestamp": datetime.now().isoformat(),
                        },
                    )

                # Get final response incorporating tool results
                logger.debug(
                    f"Thread {self.id}: Getting final response with tool results"
                )
                final_response = self._get_final_response(content, tool_outputs)
                response_text = final_response["content"]
                logger.debug(f"Thread {self.id}: Final response: {response_text}")
            except Exception as e:
                logger.error(f"Thread {self.id}: Error executing tools: {str(e)}")
                response_text = f"I encountered an error while processing your request: {str(e)}"

        else:
            # Use direct response if no tool call
            response_text = response.get("content", "")
            logger.debug(
                f"Thread {self.id}: Using direct response: {response_text}"
            )

        # Record assistant's response
        self._record_message(
            "assistant",
            response_text,
            metadata={
                "type": "assistant_response",
                "run_id": self.current_run.id,
                "timestamp": datetime.now().isoformat(),
                "has_tool_calls": bool(response.get("tool_calls")),
            },
        )

        # Create agent complete event
        self.event_system.create_event(
            type="agent_complete",
            agent_name=self.agent.name,
            run_id=self.current_run.id,
            thread_id=self.id,
            details={"response": response_text},
        )

        # Mark run as completed
        self.current_run.complete()
        logger.debug(f"Thread {self.id}: Run completed successfully")

    except Exception as e:
        # Handle any errors
        error_msg = str(e)
        logger.error(f"Thread {self.id}: Error processing message: {error_msg}")
        if self.current_run:
            self.current_run.fail(error_msg)

        # Record error message
        self._record_message(
            "system",
            error_msg,
            metadata={
                "type": "error",
                "run_id": self.current_run.id if self.current_run else None,
                "timestamp": datetime.now().isoformat(),
            },
        )

        # Create error event
        self.event_system.create_event(
            type="error",
            agent_name=self.agent.name,
            run_id=self.current_run.id,
            thread_id=self.id,
            details={"error": error_msg},
        )

        response_text = f"Error processing message: {error_msg}"

    finally:
        self.event_system.end_event_scope()

    return response_text

_execute_tools(tool_calls: List[ToolCall]) -> List[ToolOutput]

Execute a list of tool calls.

Source code in src/bedrock_swarm/agency/thread.py
def _execute_tools(self, tool_calls: List[ToolCall]) -> List[ToolOutput]:
    """Execute a list of tool calls."""
    logger.debug(f"Thread {self.id}: Executing {len(tool_calls)} tool calls")
    tool_outputs = []

    for tool_call in tool_calls:
        logger.debug(f"Thread {self.id}: Executing tool call: {tool_call}")
        # Create tool start event
        tool_start_id = self.event_system.create_event(
            type="tool_start",
            agent_name=self.agent.name,
            run_id=self.current_run.id if self.current_run else "none",
            thread_id=self.id,
            details={
                "tool_name": tool_call["function"]["name"],
                "arguments": tool_call["function"]["arguments"],
            },
        )
        self.event_system.start_event_scope(tool_start_id)

        try:
            # Get the tool
            tool = self.agent.tools.get(tool_call["function"]["name"])
            if not tool:
                raise ValueError(f"Tool {tool_call['function']['name']} not found")

            # Parse arguments - handle both string and dict formats
            args = tool_call["function"]["arguments"]
            if isinstance(args, str):
                try:
                    arguments = json.loads(args)
                except json.JSONDecodeError as e:
                    raise ValueError(f"Invalid tool arguments JSON: {e}")
            else:
                arguments = args  # Already a dict

            logger.debug(f"Thread {self.id}: Parsed arguments: {arguments}")

            # Execute tool
            logger.debug(
                f"Thread {self.id}: Executing tool {tool_call['function']['name']}"
            )
            result = tool.execute(**arguments, thread=self)
            logger.debug(f"Thread {self.id}: Tool result: {result}")

            # Add to outputs
            output = {
                "tool_call_id": tool_call["id"],
                "output": result,
            }
            tool_outputs.append(output)

            # Create tool complete event
            self.event_system.create_event(
                type="tool_complete",
                agent_name=self.agent.name,
                run_id=self.current_run.id if self.current_run else "none",
                thread_id=self.id,
                details={
                    "tool_name": tool_call["function"]["name"],
                    "arguments": arguments,
                    "result": result,
                },
            )

        except Exception as e:
            error_msg = (
                f"Error executing tool {tool_call['function']['name']}: {str(e)}"
            )
            logger.error(error_msg)
            self.event_system.create_event(
                type="tool_error",
                agent_name=self.agent.name,
                run_id=self.current_run.id if self.current_run else "none",
                thread_id=self.id,
                details={
                    "error": error_msg,
                    "tool_name": tool_call["function"]["name"],
                    "arguments": tool_call["function"]["arguments"],
                },
            )
            raise
        finally:
            self.event_system.end_event_scope()

    return tool_outputs

_execute_single_tool(tool_call: ToolCall) -> ToolResult

Execute a single tool call.

PARAMETER DESCRIPTION
tool_call

The tool call to execute

TYPE: ToolCall

RETURNS DESCRIPTION
ToolResult

Result of the tool execution

Source code in src/bedrock_swarm/agency/thread.py
def _execute_single_tool(self, tool_call: ToolCall) -> ToolResult:
    """Execute a single tool call.

    Args:
        tool_call: The tool call to execute

    Returns:
        Result of the tool execution
    """
    tool_name = tool_call["function"]["name"]

    # Check if tool exists
    if tool_name not in self.agent.tools:
        return {
            "success": False,
            "result": "",
            "error": f"Tool {tool_name} not found",
        }

    try:
        # Parse arguments - handle both string and dict formats for backward compatibility
        args = tool_call["function"]["arguments"]
        if isinstance(args, str):
            args = json.loads(args)

        # Get and execute tool
        tool = self.agent.tools[tool_name]
        result = tool.execute(**args, thread=self)

        return {"success": True, "result": str(result), "error": None}
    except Exception as e:
        return {"success": False, "result": "", "error": str(e)}

_get_final_response(original_message: str, tool_outputs: List[ToolOutput]) -> Dict[str, str]

Get final response after tool execution.

This method builds a comprehensive prompt that includes: 1. Recent conversation history for context 2. The original message 3. Tool execution results 4. Instructions for response formatting

Source code in src/bedrock_swarm/agency/thread.py
def _get_final_response(
    self, original_message: str, tool_outputs: List[ToolOutput]
) -> Dict[str, str]:
    """Get final response after tool execution.

    This method builds a comprehensive prompt that includes:
    1. Recent conversation history for context
    2. The original message
    3. Tool execution results
    4. Instructions for response formatting
    """
    # Get recent conversation history (last 5 messages)
    recent_history = self.get_context_window(5)
    history_context = []
    for msg in recent_history[:-1]:  # Exclude the current message
        history_context.append(f"{msg.role}: {msg.content}")

    # Format tool results
    tool_results = []
    for output in tool_outputs:
        tool_results.append(f"Tool result: {output['output']}")

    # Build comprehensive prompt with history and context
    prompt = (
        f"<conversation_history>\n"
        f"{chr(10).join(history_context) if history_context else 'No previous context'}\n"
        f"</conversation_history>\n\n"
        f"<current_context>\n"
        f"Current question: {original_message}\n"
        f"Tool results:\n{chr(10).join(tool_results)}\n"
        f"</current_context>\n\n"
        f"<instructions>\n"
        f"Based on the conversation history and tool results above:\n"
        f"1. Provide a natural language response that directly answers the current question\n"
        f"2. Maintain context from the previous conversation if relevant\n"
        f"3. Format your response as a proper JSON message object\n"
        f"4. Be concise but complete in your response\n"
        f"</instructions>"
    )

    logger.debug(f"Thread {self.id}: Getting final response with prompt: {prompt}")
    response = self.agent.generate(prompt)

    # Ensure we're returning a message response
    if isinstance(response, dict):
        if response.get("type") == "message":
            return response
        elif response.get("type") == "tool_call":
            # If we get another tool call, convert it to a message
            return {
                "type": "message",
                "content": "Based on the tool results and conversation history: "
                + str(tool_results[0] if tool_results else "No results available"),
            }

    # If we get an invalid response, create a message from the tool results
    return {
        "type": "message",
        "content": "Based on the tool results and conversation history: "
        + str(tool_results[0] if tool_results else "No results available"),
    }

_record_message(role: str, content: str, metadata: Optional[Dict] = None) -> None

Record a message in the thread history.

PARAMETER DESCRIPTION
role

The role of the message sender (user/assistant/system)

TYPE: str

content

The message content

TYPE: str

metadata

Optional metadata about the message

TYPE: Optional[Dict] DEFAULT: None

Source code in src/bedrock_swarm/agency/thread.py
def _record_message(
    self, role: str, content: str, metadata: Optional[Dict] = None
) -> None:
    """Record a message in the thread history.

    Args:
        role: The role of the message sender (user/assistant/system)
        content: The message content
        metadata: Optional metadata about the message
    """
    now = datetime.now()

    # Ensure metadata includes basic timing information
    if metadata is None:
        metadata = {}

    metadata.update(
        {
            "timestamp": now.isoformat(),
            "thread_id": self.id,
            "run_id": self.current_run.id if self.current_run else None,
        }
    )

    self.history.append(
        Message(role=role, content=content, timestamp=now, metadata=metadata)
    )
    self.last_message_at = now

get_history() -> List[Message]

Get the complete message history.

RETURNS DESCRIPTION
List[Message]

List of all messages in chronological order

Source code in src/bedrock_swarm/agency/thread.py
def get_history(self) -> List[Message]:
    """Get the complete message history.

    Returns:
        List of all messages in chronological order
    """
    return self.history.copy()

get_last_message() -> Optional[Message]

Get the most recent message.

RETURNS DESCRIPTION
Optional[Message]

The last message or None if history is empty

Source code in src/bedrock_swarm/agency/thread.py
def get_last_message(self) -> Optional[Message]:
    """Get the most recent message.

    Returns:
        The last message or None if history is empty
    """
    return self.history[-1] if self.history else None

get_context_window(n: int = 5) -> List[Message]

Get the n most recent messages.

PARAMETER DESCRIPTION
n

Number of messages to return

TYPE: int DEFAULT: 5

RETURNS DESCRIPTION
List[Message]

List of up to n most recent messages

Source code in src/bedrock_swarm/agency/thread.py
def get_context_window(self, n: int = 5) -> List[Message]:
    """Get the n most recent messages.

    Args:
        n: Number of messages to return

    Returns:
        List of up to n most recent messages
    """
    return self.history[-n:] if self.history else []

get_run(run_id: str) -> Optional[Run]

Get a specific run by ID.

PARAMETER DESCRIPTION
run_id

ID of the run to retrieve

TYPE: str

RETURNS DESCRIPTION
Optional[Run]

The run if found, None otherwise

Source code in src/bedrock_swarm/agency/thread.py
def get_run(self, run_id: str) -> Optional[Run]:
    """Get a specific run by ID.

    Args:
        run_id: ID of the run to retrieve

    Returns:
        The run if found, None otherwise
    """
    return next((run for run in self.runs if run.id == run_id), None)

get_current_run() -> Optional[Run]

Get the current run.

RETURNS DESCRIPTION
Optional[Run]

The current run if one exists

Source code in src/bedrock_swarm/agency/thread.py
def get_current_run(self) -> Optional[Run]:
    """Get the current run.

    Returns:
        The current run if one exists
    """
    return self.current_run

cancel_run(run_id: str) -> bool

Cancel a specific run.

PARAMETER DESCRIPTION
run_id

ID of the run to cancel

TYPE: str

RETURNS DESCRIPTION
bool

True if run was cancelled, False otherwise

Source code in src/bedrock_swarm/agency/thread.py
def cancel_run(self, run_id: str) -> bool:
    """Cancel a specific run.

    Args:
        run_id: ID of the run to cancel

    Returns:
        True if run was cancelled, False otherwise
    """
    run = self.get_run(run_id)
    if run and run.status in ["queued", "in_progress", "requires_action"]:
        run.fail("Run cancelled by user")
        return True
    return False

Features

The thread system provides:

  1. Thread Creation:
  2. Dynamic thread creation
  3. Thread configuration
  4. Resource allocation

  5. Thread Management:

  6. Start/stop control
  7. Pause/resume
  8. Status monitoring
  9. Resource cleanup

  10. Thread Communication:

  11. Message passing
  12. Event handling
  13. State synchronization

Usage Examples

from bedrock_swarm.agency import Thread

# Create a thread
thread = Thread(
    name="processing_thread",
    target=process_function,
    args=("input_data",),
    kwargs={"option": "value"}
)

# Start the thread
await thread.start()

# Check thread status
status = await thread.get_status()
print(status)  # Output: {"state": "running", "progress": 50}

# Send message to thread
await thread.send_message("update_config", {"param": "new_value"})

# Wait for completion
await thread.join()

Thread States

Threads can be in these states:

  1. Created: Initial state
  2. Running: Active execution
  3. Paused: Temporarily stopped
  4. Completed: Finished execution
  5. Failed: Error state

Error Handling

The thread system handles:

  1. Thread creation errors
  2. Execution failures
  3. Resource allocation errors
  4. Communication errors
  5. Cleanup failures

Implementation Details

The thread implementation includes:

  1. Async execution support
  2. Resource management
  3. Error recovery
  4. State persistence
  5. Performance monitoring

Thread Configuration

Threads can be configured with:

thread_config = {
    "name": "thread_name",
    "priority": "high",
    "timeout": 300,
    "retry_policy": {
        "max_retries": 3,
        "delay": 1.0
    },
    "resources": {
        "memory_limit": "1GB",
        "cpu_limit": 2
    }
}