Skip to content

strands.experimental.bidi.agent.agent

Bidirectional Agent for real-time streaming conversations.

Provides real-time audio and text interaction through persistent streaming connections. Unlike traditional request-response patterns, this agent maintains long-running conversations where users can interrupt, provide additional input, and receive continuous responses including audio output.

Key capabilities:

  • Persistent conversation connections with concurrent processing
  • Real-time audio input/output streaming
  • Automatic interruption detection and tool execution
  • Event-driven communication with model providers

AgentState = JSONSerializableDict module-attribute

BidiAgentInput = str | BidiTextInputEvent | BidiAudioInputEvent | BidiImageInputEvent module-attribute

BidiInputEvent = BidiTextInputEvent | BidiAudioInputEvent | BidiImageInputEvent module-attribute

Union of different bidi input event types.

BidiOutputEvent = BidiConnectionStartEvent | BidiConnectionRestartEvent | BidiResponseStartEvent | BidiAudioStreamEvent | BidiTranscriptStreamEvent | BidiInterruptionEvent | BidiResponseCompleteEvent | BidiUsageEvent | BidiConnectionCloseEvent | BidiErrorEvent | ToolUseStreamEvent module-attribute

Union of different bidi output event types.

Messages = list[Message] module-attribute

A list of messages representing a conversation.

_DEFAULT_AGENT_ID = 'default' module-attribute

_DEFAULT_AGENT_NAME = 'Strands Agents' module-attribute

logger = logging.getLogger(__name__) module-attribute

AgentTool

Bases: ABC

Abstract base class for all SDK tools.

This class defines the interface that all tool implementations must follow. Each tool must provide its name, specification, and implement a stream method that executes the tool's functionality.

Source code in strands/types/tools.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
class AgentTool(ABC):
    """Abstract base class for all SDK tools.

    This class defines the interface that all tool implementations must follow. Each tool must provide its name,
    specification, and implement a stream method that executes the tool's functionality.
    """

    _is_dynamic: bool

    def __init__(self) -> None:
        """Initialize the base agent tool with default dynamic state."""
        self._is_dynamic = False

    @property
    @abstractmethod
    # pragma: no cover
    def tool_name(self) -> str:
        """The unique name of the tool used for identification and invocation."""
        pass

    @property
    @abstractmethod
    # pragma: no cover
    def tool_spec(self) -> ToolSpec:
        """Tool specification that describes its functionality and parameters."""
        pass

    @property
    @abstractmethod
    # pragma: no cover
    def tool_type(self) -> str:
        """The type of the tool implementation (e.g., 'python', 'javascript', 'lambda').

        Used for categorization and appropriate handling.
        """
        pass

    @property
    def supports_hot_reload(self) -> bool:
        """Whether the tool supports automatic reloading when modified.

        Returns:
            False by default.
        """
        return False

    @abstractmethod
    # pragma: no cover
    def stream(self, tool_use: ToolUse, invocation_state: dict[str, Any], **kwargs: Any) -> ToolGenerator:
        """Stream tool events and return the final result.

        Args:
            tool_use: The tool use request containing tool ID and parameters.
            invocation_state: Caller-provided kwargs that were passed to the agent when it was invoked (agent(),
                              agent.invoke_async(), etc.).
            **kwargs: Additional keyword arguments for future extensibility.

        Yields:
            Tool events with the last being the tool result.
        """
        ...

    @property
    def is_dynamic(self) -> bool:
        """Whether the tool was dynamically loaded during runtime.

        Dynamic tools may have different lifecycle management.

        Returns:
            True if loaded dynamically, False otherwise.
        """
        return self._is_dynamic

    def mark_dynamic(self) -> None:
        """Mark this tool as dynamically loaded."""
        self._is_dynamic = True

    def get_display_properties(self) -> dict[str, str]:
        """Get properties to display in UI representations of this tool.

        Subclasses can extend this to include additional properties.

        Returns:
            Dictionary of property names and their string values.
        """
        return {
            "Name": self.tool_name,
            "Type": self.tool_type,
        }

is_dynamic property

Whether the tool was dynamically loaded during runtime.

Dynamic tools may have different lifecycle management.

Returns:

Type Description
bool

True if loaded dynamically, False otherwise.

supports_hot_reload property

Whether the tool supports automatic reloading when modified.

Returns:

Type Description
bool

False by default.

tool_name abstractmethod property

The unique name of the tool used for identification and invocation.

tool_spec abstractmethod property

Tool specification that describes its functionality and parameters.

tool_type abstractmethod property

The type of the tool implementation (e.g., 'python', 'javascript', 'lambda').

Used for categorization and appropriate handling.

__init__()

Initialize the base agent tool with default dynamic state.

Source code in strands/types/tools.py
219
220
221
def __init__(self) -> None:
    """Initialize the base agent tool with default dynamic state."""
    self._is_dynamic = False

get_display_properties()

Get properties to display in UI representations of this tool.

Subclasses can extend this to include additional properties.

Returns:

Type Description
dict[str, str]

Dictionary of property names and their string values.

Source code in strands/types/tools.py
287
288
289
290
291
292
293
294
295
296
297
298
def get_display_properties(self) -> dict[str, str]:
    """Get properties to display in UI representations of this tool.

    Subclasses can extend this to include additional properties.

    Returns:
        Dictionary of property names and their string values.
    """
    return {
        "Name": self.tool_name,
        "Type": self.tool_type,
    }

mark_dynamic()

Mark this tool as dynamically loaded.

Source code in strands/types/tools.py
283
284
285
def mark_dynamic(self) -> None:
    """Mark this tool as dynamically loaded."""
    self._is_dynamic = True

stream(tool_use, invocation_state, **kwargs) abstractmethod

Stream tool events and return the final result.

Parameters:

Name Type Description Default
tool_use ToolUse

The tool use request containing tool ID and parameters.

required
invocation_state dict[str, Any]

Caller-provided kwargs that were passed to the agent when it was invoked (agent(), agent.invoke_async(), etc.).

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Yields:

Type Description
ToolGenerator

Tool events with the last being the tool result.

Source code in strands/types/tools.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
@abstractmethod
# pragma: no cover
def stream(self, tool_use: ToolUse, invocation_state: dict[str, Any], **kwargs: Any) -> ToolGenerator:
    """Stream tool events and return the final result.

    Args:
        tool_use: The tool use request containing tool ID and parameters.
        invocation_state: Caller-provided kwargs that were passed to the agent when it was invoked (agent(),
                          agent.invoke_async(), etc.).
        **kwargs: Additional keyword arguments for future extensibility.

    Yields:
        Tool events with the last being the tool result.
    """
    ...

BidiAgent

Agent for bidirectional streaming conversations.

Enables real-time audio and text interaction with AI models through persistent connections. Supports concurrent tool execution and interruption handling.

Source code in strands/experimental/bidi/agent/agent.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
class BidiAgent:
    """Agent for bidirectional streaming conversations.

    Enables real-time audio and text interaction with AI models through persistent
    connections. Supports concurrent tool execution and interruption handling.
    """

    def __init__(
        self,
        model: BidiModel | str | None = None,
        tools: list[str | AgentTool | ToolProvider] | None = None,
        system_prompt: str | None = None,
        messages: Messages | None = None,
        record_direct_tool_call: bool = True,
        load_tools_from_directory: bool = False,
        agent_id: str | None = None,
        name: str | None = None,
        description: str | None = None,
        hooks: list[HookProvider] | None = None,
        state: AgentState | dict | None = None,
        session_manager: "SessionManager | None" = None,
        tool_executor: ToolExecutor | None = None,
        **kwargs: Any,
    ):
        """Initialize bidirectional agent.

        Args:
            model: BidiModel instance, string model_id, or None for default detection.
            tools: Optional list of tools with flexible format support.
            system_prompt: Optional system prompt for conversations.
            messages: Optional conversation history to initialize with.
            record_direct_tool_call: Whether to record direct tool calls in message history.
            load_tools_from_directory: Whether to load and automatically reload tools in the `./tools/` directory.
            agent_id: Optional ID for the agent, useful for connection management and multi-agent scenarios.
            name: Name of the Agent.
            description: Description of what the Agent does.
            hooks: Optional list of hook providers to register for lifecycle events.
            state: Stateful information for the agent. Can be either an AgentState object, or a json serializable dict.
            session_manager: Manager for handling agent sessions including conversation history and state.
                If provided, enables session-based persistence and state management.
            tool_executor: Definition of tool execution strategy (e.g., sequential, concurrent, etc.).
            **kwargs: Additional configuration for future extensibility.

        Raises:
            ValueError: If model configuration is invalid or state is invalid type.
            TypeError: If model type is unsupported.
        """
        if isinstance(model, BidiModel):
            self.model = model
        else:
            from ..models.nova_sonic import BidiNovaSonicModel

            self.model = BidiNovaSonicModel(model_id=model) if isinstance(model, str) else BidiNovaSonicModel()

        self.system_prompt = system_prompt
        self.messages = messages or []

        # Agent identification
        self.agent_id = _identifier.validate(agent_id or _DEFAULT_AGENT_ID, _identifier.Identifier.AGENT)
        self.name = name or _DEFAULT_AGENT_NAME
        self.description = description

        # Tool execution configuration
        self.record_direct_tool_call = record_direct_tool_call
        self.load_tools_from_directory = load_tools_from_directory

        # Initialize tool registry
        self.tool_registry = ToolRegistry()

        if tools is not None:
            self.tool_registry.process_tools(tools)

        self.tool_registry.initialize_tools(self.load_tools_from_directory)

        # Initialize tool watcher if directory loading is enabled
        if self.load_tools_from_directory:
            self.tool_watcher = ToolWatcher(tool_registry=self.tool_registry)

        # Initialize agent state management
        if state is not None:
            if isinstance(state, dict):
                self.state = AgentState(state)
            elif isinstance(state, AgentState):
                self.state = state
            else:
                raise ValueError("state must be an AgentState object or a dict")
        else:
            self.state = AgentState()

        # Initialize other components
        self._tool_caller = _ToolCaller(self)

        # Initialize tool executor
        self.tool_executor = tool_executor or ConcurrentToolExecutor()

        # Initialize hooks registry
        self.hooks = HookRegistry()
        if hooks:
            for hook in hooks:
                self.hooks.add_hook(hook)

        # Initialize session management functionality
        self._session_manager = session_manager
        if self._session_manager:
            self.hooks.add_hook(self._session_manager)

        self._loop = _BidiAgentLoop(self)

        # Emit initialization event
        self.hooks.invoke_callbacks(BidiAgentInitializedEvent(agent=self))

        # TODO: Determine if full support is required
        self._interrupt_state = _InterruptState()

        # Lock to ensure that paired messages are added to history in sequence without interference
        self._message_lock = asyncio.Lock()

        self._started = False

    @property
    def tool(self) -> _ToolCaller:
        """Call tool as a function.

        Returns:
            ToolCaller for method-style tool execution.

        Example:
            ```
            agent = BidiAgent(model=model, tools=[calculator])
            agent.tool.calculator(expression="2+2")
            ```
        """
        return self._tool_caller

    @property
    def tool_names(self) -> list[str]:
        """Get a list of all registered tool names.

        Returns:
            Names of all tools available to this agent.
        """
        all_tools = self.tool_registry.get_all_tools_config()
        return list(all_tools.keys())

    async def start(self, invocation_state: dict[str, Any] | None = None) -> None:
        """Start a persistent bidirectional conversation connection.

        Initializes the streaming connection and starts background tasks for processing
        model events, tool execution, and connection management.

        Args:
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Raises:
            RuntimeError:
                If agent already started.

        Example:
            ```python
            await agent.start(invocation_state={
                "user_id": "user_123",
                "session_id": "session_456",
                "database": db_connection,
            })
            ```
        """
        if self._started:
            raise RuntimeError("agent already started | call stop before starting again")

        logger.debug("agent starting")
        await self._loop.start(invocation_state)
        self._started = True

    async def send(self, input_data: BidiAgentInput | dict[str, Any]) -> None:
        """Send input to the model (text, audio, image, or event dict).

        Unified method for sending text, audio, and image input to the model during
        an active conversation session. Accepts TypedEvent instances or plain dicts
        (e.g., from WebSocket clients) which are automatically reconstructed.

        Args:
            input_data: Can be:

                - str: Text message from user
                - BidiInputEvent: TypedEvent
                - dict: Event dictionary (will be reconstructed to TypedEvent)

        Raises:
            RuntimeError: If start has not been called.
            ValueError: If invalid input type.

        Example:
            await agent.send("Hello")
            await agent.send(BidiAudioInputEvent(audio="base64...", format="pcm", ...))
            await agent.send({"type": "bidirectional_text_input", "text": "Hello", "role": "user"})
        """
        if not self._started:
            raise RuntimeError("agent not started | call start before sending")

        input_event: BidiInputEvent

        if isinstance(input_data, str):
            input_event = BidiTextInputEvent(text=input_data)

        elif isinstance(input_data, BidiInputEvent):
            input_event = input_data

        elif isinstance(input_data, dict) and "type" in input_data:
            input_type = input_data["type"]
            input_data = {key: value for key, value in input_data.items() if key != "type"}
            if input_type == "bidi_text_input":
                input_event = BidiTextInputEvent(**input_data)
            elif input_type == "bidi_audio_input":
                input_event = BidiAudioInputEvent(**input_data)
            elif input_type == "bidi_image_input":
                input_event = BidiImageInputEvent(**input_data)
            else:
                raise ValueError(f"input_type=<{input_type}> | input type not supported")

        else:
            raise ValueError("invalid input | must be str, BidiInputEvent, or event dict")

        await self._loop.send(input_event)

    async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
        """Receive events from the model including audio, text, and tool calls.

        Yields:
            Model output events processed by background tasks including audio output,
            text responses, tool calls, and connection updates.

        Raises:
            RuntimeError: If start has not been called.
        """
        if not self._started:
            raise RuntimeError("agent not started | call start before receiving")

        async for event in self._loop.receive():
            yield event

    async def stop(self) -> None:
        """End the conversation connection and cleanup all resources.

        Terminates the streaming connection, cancels background tasks, and
        closes the connection to the model provider.
        """
        self._started = False
        await self._loop.stop()

    async def __aenter__(self, invocation_state: dict[str, Any] | None = None) -> "BidiAgent":
        """Async context manager entry point.

        Automatically starts the bidirectional connection when entering the context.

        Args:
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Returns:
            Self for use in the context.
        """
        logger.debug("context_manager=<enter> | starting agent")
        await self.start(invocation_state)
        return self

    async def __aexit__(self, *_: Any) -> None:
        """Async context manager exit point.

        Automatically ends the connection and cleans up resources including
        when exiting the context, regardless of whether an exception occurred.
        """
        logger.debug("context_manager=<exit> | stopping agent")
        await self.stop()

    async def run(
        self, inputs: list[BidiInput], outputs: list[BidiOutput], invocation_state: dict[str, Any] | None = None
    ) -> None:
        """Run the agent using provided IO channels for bidirectional communication.

        Args:
            inputs: Input callables to read data from a source
            outputs: Output callables to receive events from the agent
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Example:
            ```python
            # Using model defaults:
            model = BidiNovaSonicModel()
            audio_io = BidiAudioIO()
            text_io = BidiTextIO()
            agent = BidiAgent(model=model, tools=[calculator])
            await agent.run(
                inputs=[audio_io.input()],
                outputs=[audio_io.output(), text_io.output()],
                invocation_state={"user_id": "user_123"}
            )

            # Using custom audio config:
            model = BidiNovaSonicModel(
                provider_config={"audio": {"input_rate": 48000, "output_rate": 24000}}
            )
            audio_io = BidiAudioIO()
            agent = BidiAgent(model=model, tools=[calculator])
            await agent.run(
                inputs=[audio_io.input()],
                outputs=[audio_io.output()],
            )
            ```
        """

        async def run_inputs() -> None:
            async def task(input_: BidiInput) -> None:
                while True:
                    event = await input_()
                    await self.send(event)

            await asyncio.gather(*[task(input_) for input_ in inputs])

        async def run_outputs(inputs_task: asyncio.Task) -> None:
            async for event in self.receive():
                await asyncio.gather(*[output(event) for output in outputs])

            inputs_task.cancel()

        try:
            await self.start(invocation_state)

            input_starts = [input_.start for input_ in inputs if isinstance(input_, BidiInput)]
            output_starts = [output.start for output in outputs if isinstance(output, BidiOutput)]
            for start in [*input_starts, *output_starts]:
                await start(self)

            async with _TaskGroup() as task_group:
                inputs_task = task_group.create_task(run_inputs())
                task_group.create_task(run_outputs(inputs_task))

        finally:
            input_stops = [input_.stop for input_ in inputs if isinstance(input_, BidiInput)]
            output_stops = [output.stop for output in outputs if isinstance(output, BidiOutput)]

            await stop_all(*input_stops, *output_stops, self.stop)

    async def _append_messages(self, *messages: Message) -> None:
        """Append messages to history in sequence without interference.

        The message lock ensures that paired messages are added to history in sequence without interference. For
        example, tool use and tool result messages must be added adjacent to each other.

        Args:
            *messages: List of messages to add into history.
        """
        async with self._message_lock:
            for message in messages:
                self.messages.append(message)
                await self.hooks.invoke_callbacks_async(BidiMessageAddedEvent(agent=self, message=message))

tool property

Call tool as a function.

Returns:

Type Description
_ToolCaller

ToolCaller for method-style tool execution.

Example
agent = BidiAgent(model=model, tools=[calculator])
agent.tool.calculator(expression="2+2")

tool_names property

Get a list of all registered tool names.

Returns:

Type Description
list[str]

Names of all tools available to this agent.

__aenter__(invocation_state=None) async

Async context manager entry point.

Automatically starts the bidirectional connection when entering the context.

Parameters:

Name Type Description Default
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None

Returns:

Type Description
BidiAgent

Self for use in the context.

Source code in strands/experimental/bidi/agent/agent.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
async def __aenter__(self, invocation_state: dict[str, Any] | None = None) -> "BidiAgent":
    """Async context manager entry point.

    Automatically starts the bidirectional connection when entering the context.

    Args:
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Returns:
        Self for use in the context.
    """
    logger.debug("context_manager=<enter> | starting agent")
    await self.start(invocation_state)
    return self

__aexit__(*_) async

Async context manager exit point.

Automatically ends the connection and cleans up resources including when exiting the context, regardless of whether an exception occurred.

Source code in strands/experimental/bidi/agent/agent.py
323
324
325
326
327
328
329
330
async def __aexit__(self, *_: Any) -> None:
    """Async context manager exit point.

    Automatically ends the connection and cleans up resources including
    when exiting the context, regardless of whether an exception occurred.
    """
    logger.debug("context_manager=<exit> | stopping agent")
    await self.stop()

__init__(model=None, tools=None, system_prompt=None, messages=None, record_direct_tool_call=True, load_tools_from_directory=False, agent_id=None, name=None, description=None, hooks=None, state=None, session_manager=None, tool_executor=None, **kwargs)

Initialize bidirectional agent.

Parameters:

Name Type Description Default
model BidiModel | str | None

BidiModel instance, string model_id, or None for default detection.

None
tools list[str | AgentTool | ToolProvider] | None

Optional list of tools with flexible format support.

None
system_prompt str | None

Optional system prompt for conversations.

None
messages Messages | None

Optional conversation history to initialize with.

None
record_direct_tool_call bool

Whether to record direct tool calls in message history.

True
load_tools_from_directory bool

Whether to load and automatically reload tools in the ./tools/ directory.

False
agent_id str | None

Optional ID for the agent, useful for connection management and multi-agent scenarios.

None
name str | None

Name of the Agent.

None
description str | None

Description of what the Agent does.

None
hooks list[HookProvider] | None

Optional list of hook providers to register for lifecycle events.

None
state AgentState | dict | None

Stateful information for the agent. Can be either an AgentState object, or a json serializable dict.

None
session_manager SessionManager | None

Manager for handling agent sessions including conversation history and state. If provided, enables session-based persistence and state management.

None
tool_executor ToolExecutor | None

Definition of tool execution strategy (e.g., sequential, concurrent, etc.).

None
**kwargs Any

Additional configuration for future extensibility.

{}

Raises:

Type Description
ValueError

If model configuration is invalid or state is invalid type.

TypeError

If model type is unsupported.

Source code in strands/experimental/bidi/agent/agent.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def __init__(
    self,
    model: BidiModel | str | None = None,
    tools: list[str | AgentTool | ToolProvider] | None = None,
    system_prompt: str | None = None,
    messages: Messages | None = None,
    record_direct_tool_call: bool = True,
    load_tools_from_directory: bool = False,
    agent_id: str | None = None,
    name: str | None = None,
    description: str | None = None,
    hooks: list[HookProvider] | None = None,
    state: AgentState | dict | None = None,
    session_manager: "SessionManager | None" = None,
    tool_executor: ToolExecutor | None = None,
    **kwargs: Any,
):
    """Initialize bidirectional agent.

    Args:
        model: BidiModel instance, string model_id, or None for default detection.
        tools: Optional list of tools with flexible format support.
        system_prompt: Optional system prompt for conversations.
        messages: Optional conversation history to initialize with.
        record_direct_tool_call: Whether to record direct tool calls in message history.
        load_tools_from_directory: Whether to load and automatically reload tools in the `./tools/` directory.
        agent_id: Optional ID for the agent, useful for connection management and multi-agent scenarios.
        name: Name of the Agent.
        description: Description of what the Agent does.
        hooks: Optional list of hook providers to register for lifecycle events.
        state: Stateful information for the agent. Can be either an AgentState object, or a json serializable dict.
        session_manager: Manager for handling agent sessions including conversation history and state.
            If provided, enables session-based persistence and state management.
        tool_executor: Definition of tool execution strategy (e.g., sequential, concurrent, etc.).
        **kwargs: Additional configuration for future extensibility.

    Raises:
        ValueError: If model configuration is invalid or state is invalid type.
        TypeError: If model type is unsupported.
    """
    if isinstance(model, BidiModel):
        self.model = model
    else:
        from ..models.nova_sonic import BidiNovaSonicModel

        self.model = BidiNovaSonicModel(model_id=model) if isinstance(model, str) else BidiNovaSonicModel()

    self.system_prompt = system_prompt
    self.messages = messages or []

    # Agent identification
    self.agent_id = _identifier.validate(agent_id or _DEFAULT_AGENT_ID, _identifier.Identifier.AGENT)
    self.name = name or _DEFAULT_AGENT_NAME
    self.description = description

    # Tool execution configuration
    self.record_direct_tool_call = record_direct_tool_call
    self.load_tools_from_directory = load_tools_from_directory

    # Initialize tool registry
    self.tool_registry = ToolRegistry()

    if tools is not None:
        self.tool_registry.process_tools(tools)

    self.tool_registry.initialize_tools(self.load_tools_from_directory)

    # Initialize tool watcher if directory loading is enabled
    if self.load_tools_from_directory:
        self.tool_watcher = ToolWatcher(tool_registry=self.tool_registry)

    # Initialize agent state management
    if state is not None:
        if isinstance(state, dict):
            self.state = AgentState(state)
        elif isinstance(state, AgentState):
            self.state = state
        else:
            raise ValueError("state must be an AgentState object or a dict")
    else:
        self.state = AgentState()

    # Initialize other components
    self._tool_caller = _ToolCaller(self)

    # Initialize tool executor
    self.tool_executor = tool_executor or ConcurrentToolExecutor()

    # Initialize hooks registry
    self.hooks = HookRegistry()
    if hooks:
        for hook in hooks:
            self.hooks.add_hook(hook)

    # Initialize session management functionality
    self._session_manager = session_manager
    if self._session_manager:
        self.hooks.add_hook(self._session_manager)

    self._loop = _BidiAgentLoop(self)

    # Emit initialization event
    self.hooks.invoke_callbacks(BidiAgentInitializedEvent(agent=self))

    # TODO: Determine if full support is required
    self._interrupt_state = _InterruptState()

    # Lock to ensure that paired messages are added to history in sequence without interference
    self._message_lock = asyncio.Lock()

    self._started = False

receive() async

Receive events from the model including audio, text, and tool calls.

Yields:

Type Description
AsyncGenerator[BidiOutputEvent, None]

Model output events processed by background tasks including audio output,

AsyncGenerator[BidiOutputEvent, None]

text responses, tool calls, and connection updates.

Raises:

Type Description
RuntimeError

If start has not been called.

Source code in strands/experimental/bidi/agent/agent.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
    """Receive events from the model including audio, text, and tool calls.

    Yields:
        Model output events processed by background tasks including audio output,
        text responses, tool calls, and connection updates.

    Raises:
        RuntimeError: If start has not been called.
    """
    if not self._started:
        raise RuntimeError("agent not started | call start before receiving")

    async for event in self._loop.receive():
        yield event

run(inputs, outputs, invocation_state=None) async

Run the agent using provided IO channels for bidirectional communication.

Parameters:

Name Type Description Default
inputs list[BidiInput]

Input callables to read data from a source

required
outputs list[BidiOutput]

Output callables to receive events from the agent

required
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None
Example
# Using model defaults:
model = BidiNovaSonicModel()
audio_io = BidiAudioIO()
text_io = BidiTextIO()
agent = BidiAgent(model=model, tools=[calculator])
await agent.run(
    inputs=[audio_io.input()],
    outputs=[audio_io.output(), text_io.output()],
    invocation_state={"user_id": "user_123"}
)

# Using custom audio config:
model = BidiNovaSonicModel(
    provider_config={"audio": {"input_rate": 48000, "output_rate": 24000}}
)
audio_io = BidiAudioIO()
agent = BidiAgent(model=model, tools=[calculator])
await agent.run(
    inputs=[audio_io.input()],
    outputs=[audio_io.output()],
)
Source code in strands/experimental/bidi/agent/agent.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
async def run(
    self, inputs: list[BidiInput], outputs: list[BidiOutput], invocation_state: dict[str, Any] | None = None
) -> None:
    """Run the agent using provided IO channels for bidirectional communication.

    Args:
        inputs: Input callables to read data from a source
        outputs: Output callables to receive events from the agent
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Example:
        ```python
        # Using model defaults:
        model = BidiNovaSonicModel()
        audio_io = BidiAudioIO()
        text_io = BidiTextIO()
        agent = BidiAgent(model=model, tools=[calculator])
        await agent.run(
            inputs=[audio_io.input()],
            outputs=[audio_io.output(), text_io.output()],
            invocation_state={"user_id": "user_123"}
        )

        # Using custom audio config:
        model = BidiNovaSonicModel(
            provider_config={"audio": {"input_rate": 48000, "output_rate": 24000}}
        )
        audio_io = BidiAudioIO()
        agent = BidiAgent(model=model, tools=[calculator])
        await agent.run(
            inputs=[audio_io.input()],
            outputs=[audio_io.output()],
        )
        ```
    """

    async def run_inputs() -> None:
        async def task(input_: BidiInput) -> None:
            while True:
                event = await input_()
                await self.send(event)

        await asyncio.gather(*[task(input_) for input_ in inputs])

    async def run_outputs(inputs_task: asyncio.Task) -> None:
        async for event in self.receive():
            await asyncio.gather(*[output(event) for output in outputs])

        inputs_task.cancel()

    try:
        await self.start(invocation_state)

        input_starts = [input_.start for input_ in inputs if isinstance(input_, BidiInput)]
        output_starts = [output.start for output in outputs if isinstance(output, BidiOutput)]
        for start in [*input_starts, *output_starts]:
            await start(self)

        async with _TaskGroup() as task_group:
            inputs_task = task_group.create_task(run_inputs())
            task_group.create_task(run_outputs(inputs_task))

    finally:
        input_stops = [input_.stop for input_ in inputs if isinstance(input_, BidiInput)]
        output_stops = [output.stop for output in outputs if isinstance(output, BidiOutput)]

        await stop_all(*input_stops, *output_stops, self.stop)

send(input_data) async

Send input to the model (text, audio, image, or event dict).

Unified method for sending text, audio, and image input to the model during an active conversation session. Accepts TypedEvent instances or plain dicts (e.g., from WebSocket clients) which are automatically reconstructed.

Parameters:

Name Type Description Default
input_data BidiAgentInput | dict[str, Any]

Can be:

  • str: Text message from user
  • BidiInputEvent: TypedEvent
  • dict: Event dictionary (will be reconstructed to TypedEvent)
required

Raises:

Type Description
RuntimeError

If start has not been called.

ValueError

If invalid input type.

Example

await agent.send("Hello") await agent.send(BidiAudioInputEvent(audio="base64...", format="pcm", ...)) await agent.send({"type": "bidirectional_text_input", "text": "Hello", "role": "user"})

Source code in strands/experimental/bidi/agent/agent.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
async def send(self, input_data: BidiAgentInput | dict[str, Any]) -> None:
    """Send input to the model (text, audio, image, or event dict).

    Unified method for sending text, audio, and image input to the model during
    an active conversation session. Accepts TypedEvent instances or plain dicts
    (e.g., from WebSocket clients) which are automatically reconstructed.

    Args:
        input_data: Can be:

            - str: Text message from user
            - BidiInputEvent: TypedEvent
            - dict: Event dictionary (will be reconstructed to TypedEvent)

    Raises:
        RuntimeError: If start has not been called.
        ValueError: If invalid input type.

    Example:
        await agent.send("Hello")
        await agent.send(BidiAudioInputEvent(audio="base64...", format="pcm", ...))
        await agent.send({"type": "bidirectional_text_input", "text": "Hello", "role": "user"})
    """
    if not self._started:
        raise RuntimeError("agent not started | call start before sending")

    input_event: BidiInputEvent

    if isinstance(input_data, str):
        input_event = BidiTextInputEvent(text=input_data)

    elif isinstance(input_data, BidiInputEvent):
        input_event = input_data

    elif isinstance(input_data, dict) and "type" in input_data:
        input_type = input_data["type"]
        input_data = {key: value for key, value in input_data.items() if key != "type"}
        if input_type == "bidi_text_input":
            input_event = BidiTextInputEvent(**input_data)
        elif input_type == "bidi_audio_input":
            input_event = BidiAudioInputEvent(**input_data)
        elif input_type == "bidi_image_input":
            input_event = BidiImageInputEvent(**input_data)
        else:
            raise ValueError(f"input_type=<{input_type}> | input type not supported")

    else:
        raise ValueError("invalid input | must be str, BidiInputEvent, or event dict")

    await self._loop.send(input_event)

start(invocation_state=None) async

Start a persistent bidirectional conversation connection.

Initializes the streaming connection and starts background tasks for processing model events, tool execution, and connection management.

Parameters:

Name Type Description Default
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None

Raises:

Type Description
RuntimeError

If agent already started.

Example
await agent.start(invocation_state={
    "user_id": "user_123",
    "session_id": "session_456",
    "database": db_connection,
})
Source code in strands/experimental/bidi/agent/agent.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
async def start(self, invocation_state: dict[str, Any] | None = None) -> None:
    """Start a persistent bidirectional conversation connection.

    Initializes the streaming connection and starts background tasks for processing
    model events, tool execution, and connection management.

    Args:
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Raises:
        RuntimeError:
            If agent already started.

    Example:
        ```python
        await agent.start(invocation_state={
            "user_id": "user_123",
            "session_id": "session_456",
            "database": db_connection,
        })
        ```
    """
    if self._started:
        raise RuntimeError("agent already started | call stop before starting again")

    logger.debug("agent starting")
    await self._loop.start(invocation_state)
    self._started = True

stop() async

End the conversation connection and cleanup all resources.

Terminates the streaming connection, cancels background tasks, and closes the connection to the model provider.

Source code in strands/experimental/bidi/agent/agent.py
297
298
299
300
301
302
303
304
async def stop(self) -> None:
    """End the conversation connection and cleanup all resources.

    Terminates the streaming connection, cancels background tasks, and
    closes the connection to the model provider.
    """
    self._started = False
    await self._loop.stop()

BidiAgentInitializedEvent dataclass

Bases: BidiHookEvent

Event triggered when a BidiAgent has finished initialization.

This event is fired after the BidiAgent has been fully constructed and all built-in components have been initialized. Hook providers can use this event to perform setup tasks that require a fully initialized agent.

Source code in strands/experimental/hooks/events.py
54
55
56
57
58
59
60
61
62
63
@dataclass
class BidiAgentInitializedEvent(BidiHookEvent):
    """Event triggered when a BidiAgent has finished initialization.

    This event is fired after the BidiAgent has been fully constructed and all
    built-in components have been initialized. Hook providers can use this
    event to perform setup tasks that require a fully initialized agent.
    """

    pass

BidiAudioInputEvent

Bases: TypedEvent

Audio input event for sending audio to the model.

Used for sending audio data through the send() method.

Parameters:

Name Type Description Default
audio str

Base64-encoded audio string to send to model.

required
format AudioFormat | str

Audio format from SUPPORTED_AUDIO_FORMATS.

required
sample_rate AudioSampleRate

Sample rate from SUPPORTED_SAMPLE_RATES.

required
channels AudioChannel

Channel count from SUPPORTED_CHANNELS.

required
Source code in strands/experimental/bidi/types/events.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class BidiAudioInputEvent(TypedEvent):
    """Audio input event for sending audio to the model.

    Used for sending audio data through the send() method.

    Parameters:
        audio: Base64-encoded audio string to send to model.
        format: Audio format from SUPPORTED_AUDIO_FORMATS.
        sample_rate: Sample rate from SUPPORTED_SAMPLE_RATES.
        channels: Channel count from SUPPORTED_CHANNELS.
    """

    def __init__(
        self,
        audio: str,
        format: AudioFormat | str,
        sample_rate: AudioSampleRate,
        channels: AudioChannel,
    ):
        """Initialize audio input event."""
        super().__init__(
            {
                "type": "bidi_audio_input",
                "audio": audio,
                "format": format,
                "sample_rate": sample_rate,
                "channels": channels,
            }
        )

    @property
    def audio(self) -> str:
        """Base64-encoded audio string."""
        return cast(str, self["audio"])

    @property
    def format(self) -> AudioFormat:
        """Audio encoding format."""
        return cast(AudioFormat, self["format"])

    @property
    def sample_rate(self) -> AudioSampleRate:
        """Number of audio samples per second in Hz."""
        return cast(AudioSampleRate, self["sample_rate"])

    @property
    def channels(self) -> AudioChannel:
        """Number of audio channels (1=mono, 2=stereo)."""
        return cast(AudioChannel, self["channels"])

audio property

Base64-encoded audio string.

channels property

Number of audio channels (1=mono, 2=stereo).

format property

Audio encoding format.

sample_rate property

Number of audio samples per second in Hz.

__init__(audio, format, sample_rate, channels)

Initialize audio input event.

Source code in strands/experimental/bidi/types/events.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(
    self,
    audio: str,
    format: AudioFormat | str,
    sample_rate: AudioSampleRate,
    channels: AudioChannel,
):
    """Initialize audio input event."""
    super().__init__(
        {
            "type": "bidi_audio_input",
            "audio": audio,
            "format": format,
            "sample_rate": sample_rate,
            "channels": channels,
        }
    )

BidiImageInputEvent

Bases: TypedEvent

Image input event for sending images/video frames to the model.

Used for sending image data through the send() method.

Parameters:

Name Type Description Default
image str

Base64-encoded image string.

required
mime_type str

MIME type (e.g., "image/jpeg", "image/png").

required
Source code in strands/experimental/bidi/types/events.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class BidiImageInputEvent(TypedEvent):
    """Image input event for sending images/video frames to the model.

    Used for sending image data through the send() method.

    Parameters:
        image: Base64-encoded image string.
        mime_type: MIME type (e.g., "image/jpeg", "image/png").
    """

    def __init__(
        self,
        image: str,
        mime_type: str,
    ):
        """Initialize image input event."""
        super().__init__(
            {
                "type": "bidi_image_input",
                "image": image,
                "mime_type": mime_type,
            }
        )

    @property
    def image(self) -> str:
        """Base64-encoded image string."""
        return cast(str, self["image"])

    @property
    def mime_type(self) -> str:
        """MIME type of the image (e.g., "image/jpeg", "image/png")."""
        return cast(str, self["mime_type"])

image property

Base64-encoded image string.

mime_type property

MIME type of the image (e.g., "image/jpeg", "image/png").

__init__(image, mime_type)

Initialize image input event.

Source code in strands/experimental/bidi/types/events.py
156
157
158
159
160
161
162
163
164
165
166
167
168
def __init__(
    self,
    image: str,
    mime_type: str,
):
    """Initialize image input event."""
    super().__init__(
        {
            "type": "bidi_image_input",
            "image": image,
            "mime_type": mime_type,
        }
    )

BidiInput

Bases: Protocol

Protocol for bidirectional input callables.

Input callables read data from a source (microphone, camera, websocket, etc.) and return events to be sent to the agent.

Source code in strands/experimental/bidi/types/io.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@runtime_checkable
class BidiInput(Protocol):
    """Protocol for bidirectional input callables.

    Input callables read data from a source (microphone, camera, websocket, etc.)
    and return events to be sent to the agent.
    """

    async def start(self, agent: "BidiAgent") -> None:
        """Start input."""
        return

    async def stop(self) -> None:
        """Stop input."""
        return

    def __call__(self) -> Awaitable[BidiInputEvent]:
        """Read input data from the source.

        Returns:
            Awaitable that resolves to an input event (audio, text, image, etc.)
        """
        ...

__call__()

Read input data from the source.

Returns:

Type Description
Awaitable[BidiInputEvent]

Awaitable that resolves to an input event (audio, text, image, etc.)

Source code in strands/experimental/bidi/types/io.py
32
33
34
35
36
37
38
def __call__(self) -> Awaitable[BidiInputEvent]:
    """Read input data from the source.

    Returns:
        Awaitable that resolves to an input event (audio, text, image, etc.)
    """
    ...

start(agent) async

Start input.

Source code in strands/experimental/bidi/types/io.py
24
25
26
async def start(self, agent: "BidiAgent") -> None:
    """Start input."""
    return

stop() async

Stop input.

Source code in strands/experimental/bidi/types/io.py
28
29
30
async def stop(self) -> None:
    """Stop input."""
    return

BidiMessageAddedEvent dataclass

Bases: BidiHookEvent

Event triggered when BidiAgent adds a message to the conversation.

This event is fired whenever the BidiAgent adds a new message to its internal message history, including user messages (from transcripts), assistant responses, and tool results. Hook providers can use this event for logging, monitoring, or implementing custom message processing logic.

Note: This event is only triggered for messages added by the framework itself, not for messages manually added by tools or external code.

Attributes:

Name Type Description
message Message

The message that was added to the conversation history.

Source code in strands/experimental/hooks/events.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@dataclass
class BidiMessageAddedEvent(BidiHookEvent):
    """Event triggered when BidiAgent adds a message to the conversation.

    This event is fired whenever the BidiAgent adds a new message to its internal
    message history, including user messages (from transcripts), assistant responses,
    and tool results. Hook providers can use this event for logging, monitoring, or
    implementing custom message processing logic.

    Note: This event is only triggered for messages added by the framework
    itself, not for messages manually added by tools or external code.

    Attributes:
        message: The message that was added to the conversation history.
    """

    message: Message

BidiModel

Bases: Protocol

Protocol for bidirectional streaming models.

This interface defines the contract for models that support persistent streaming connections with real-time audio and text communication. Implementations handle provider-specific protocols while exposing a standardized event-based API.

Attributes:

Name Type Description
config dict[str, Any]

Configuration dictionary with provider-specific settings.

Source code in strands/experimental/bidi/models/model.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@runtime_checkable
class BidiModel(Protocol):
    """Protocol for bidirectional streaming models.

    This interface defines the contract for models that support persistent streaming
    connections with real-time audio and text communication. Implementations handle
    provider-specific protocols while exposing a standardized event-based API.

    Attributes:
        config: Configuration dictionary with provider-specific settings.
    """

    config: dict[str, Any]

    async def start(
        self,
        system_prompt: str | None = None,
        tools: list[ToolSpec] | None = None,
        messages: Messages | None = None,
        **kwargs: Any,
    ) -> None:
        """Establish a persistent streaming connection with the model.

        Opens a bidirectional connection that remains active for real-time communication.
        The connection supports concurrent sending and receiving of events until explicitly
        closed. Must be called before any send() or receive() operations.

        Args:
            system_prompt: System instructions to configure model behavior.
            tools: Tool specifications that the model can invoke during the conversation.
            messages: Initial conversation history to provide context.
            **kwargs: Provider-specific configuration options.
        """
        ...

    async def stop(self) -> None:
        """Close the streaming connection and release resources.

        Terminates the active bidirectional connection and cleans up any associated
        resources such as network connections, buffers, or background tasks. After
        calling close(), the model instance cannot be used until start() is called again.
        """
        ...

    def receive(self) -> AsyncIterable[BidiOutputEvent]:
        """Receive streaming events from the model.

        Continuously yields events from the model as they arrive over the connection.
        Events are normalized to a provider-agnostic format for uniform processing.
        This method should be called in a loop or async task to process model responses.

        The stream continues until the connection is closed or an error occurs.

        Yields:
            BidiOutputEvent: Standardized event objects containing audio output,
                transcripts, tool calls, or control signals.
        """
        ...

    async def send(
        self,
        content: BidiInputEvent | ToolResultEvent,
    ) -> None:
        """Send content to the model over the active connection.

        Transmits user input or tool results to the model during an active streaming
        session. Supports multiple content types including text, audio, images, and
        tool execution results. Can be called multiple times during a conversation.

        Args:
            content: The content to send. Must be one of:

                - BidiTextInputEvent: Text message from the user
                - BidiAudioInputEvent: Audio data for speech input
                - BidiImageInputEvent: Image data for visual understanding
                - ToolResultEvent: Result from a tool execution

        Example:
            ```
            await model.send(BidiTextInputEvent(text="Hello", role="user"))
            await model.send(BidiAudioInputEvent(audio=bytes, format="pcm", sample_rate=16000, channels=1))
            await model.send(BidiImageInputEvent(image=bytes, mime_type="image/jpeg", encoding="raw"))
            await model.send(ToolResultEvent(tool_result))
            ```
        """
        ...

receive()

Receive streaming events from the model.

Continuously yields events from the model as they arrive over the connection. Events are normalized to a provider-agnostic format for uniform processing. This method should be called in a loop or async task to process model responses.

The stream continues until the connection is closed or an error occurs.

Yields:

Name Type Description
BidiOutputEvent AsyncIterable[BidiOutputEvent]

Standardized event objects containing audio output, transcripts, tool calls, or control signals.

Source code in strands/experimental/bidi/models/model.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def receive(self) -> AsyncIterable[BidiOutputEvent]:
    """Receive streaming events from the model.

    Continuously yields events from the model as they arrive over the connection.
    Events are normalized to a provider-agnostic format for uniform processing.
    This method should be called in a loop or async task to process model responses.

    The stream continues until the connection is closed or an error occurs.

    Yields:
        BidiOutputEvent: Standardized event objects containing audio output,
            transcripts, tool calls, or control signals.
    """
    ...

send(content) async

Send content to the model over the active connection.

Transmits user input or tool results to the model during an active streaming session. Supports multiple content types including text, audio, images, and tool execution results. Can be called multiple times during a conversation.

Parameters:

Name Type Description Default
content BidiInputEvent | ToolResultEvent

The content to send. Must be one of:

  • BidiTextInputEvent: Text message from the user
  • BidiAudioInputEvent: Audio data for speech input
  • BidiImageInputEvent: Image data for visual understanding
  • ToolResultEvent: Result from a tool execution
required
Example
await model.send(BidiTextInputEvent(text="Hello", role="user"))
await model.send(BidiAudioInputEvent(audio=bytes, format="pcm", sample_rate=16000, channels=1))
await model.send(BidiImageInputEvent(image=bytes, mime_type="image/jpeg", encoding="raw"))
await model.send(ToolResultEvent(tool_result))
Source code in strands/experimental/bidi/models/model.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
async def send(
    self,
    content: BidiInputEvent | ToolResultEvent,
) -> None:
    """Send content to the model over the active connection.

    Transmits user input or tool results to the model during an active streaming
    session. Supports multiple content types including text, audio, images, and
    tool execution results. Can be called multiple times during a conversation.

    Args:
        content: The content to send. Must be one of:

            - BidiTextInputEvent: Text message from the user
            - BidiAudioInputEvent: Audio data for speech input
            - BidiImageInputEvent: Image data for visual understanding
            - ToolResultEvent: Result from a tool execution

    Example:
        ```
        await model.send(BidiTextInputEvent(text="Hello", role="user"))
        await model.send(BidiAudioInputEvent(audio=bytes, format="pcm", sample_rate=16000, channels=1))
        await model.send(BidiImageInputEvent(image=bytes, mime_type="image/jpeg", encoding="raw"))
        await model.send(ToolResultEvent(tool_result))
        ```
    """
    ...

start(system_prompt=None, tools=None, messages=None, **kwargs) async

Establish a persistent streaming connection with the model.

Opens a bidirectional connection that remains active for real-time communication. The connection supports concurrent sending and receiving of events until explicitly closed. Must be called before any send() or receive() operations.

Parameters:

Name Type Description Default
system_prompt str | None

System instructions to configure model behavior.

None
tools list[ToolSpec] | None

Tool specifications that the model can invoke during the conversation.

None
messages Messages | None

Initial conversation history to provide context.

None
**kwargs Any

Provider-specific configuration options.

{}
Source code in strands/experimental/bidi/models/model.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
async def start(
    self,
    system_prompt: str | None = None,
    tools: list[ToolSpec] | None = None,
    messages: Messages | None = None,
    **kwargs: Any,
) -> None:
    """Establish a persistent streaming connection with the model.

    Opens a bidirectional connection that remains active for real-time communication.
    The connection supports concurrent sending and receiving of events until explicitly
    closed. Must be called before any send() or receive() operations.

    Args:
        system_prompt: System instructions to configure model behavior.
        tools: Tool specifications that the model can invoke during the conversation.
        messages: Initial conversation history to provide context.
        **kwargs: Provider-specific configuration options.
    """
    ...

stop() async

Close the streaming connection and release resources.

Terminates the active bidirectional connection and cleans up any associated resources such as network connections, buffers, or background tasks. After calling close(), the model instance cannot be used until start() is called again.

Source code in strands/experimental/bidi/models/model.py
65
66
67
68
69
70
71
72
async def stop(self) -> None:
    """Close the streaming connection and release resources.

    Terminates the active bidirectional connection and cleans up any associated
    resources such as network connections, buffers, or background tasks. After
    calling close(), the model instance cannot be used until start() is called again.
    """
    ...

BidiOutput

Bases: Protocol

Protocol for bidirectional output callables.

Output callables receive events from the agent and handle them appropriately (play audio, display text, send over websocket, etc.).

Source code in strands/experimental/bidi/types/io.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@runtime_checkable
class BidiOutput(Protocol):
    """Protocol for bidirectional output callables.

    Output callables receive events from the agent and handle them appropriately
    (play audio, display text, send over websocket, etc.).
    """

    async def start(self, agent: "BidiAgent") -> None:
        """Start output."""
        return

    async def stop(self) -> None:
        """Stop output."""
        return

    def __call__(self, event: BidiOutputEvent) -> Awaitable[None]:
        """Process output events from the agent.

        Args:
            event: Output event from the agent (audio, text, tool calls, etc.)
        """
        ...

__call__(event)

Process output events from the agent.

Parameters:

Name Type Description Default
event BidiOutputEvent

Output event from the agent (audio, text, tool calls, etc.)

required
Source code in strands/experimental/bidi/types/io.py
57
58
59
60
61
62
63
def __call__(self, event: BidiOutputEvent) -> Awaitable[None]:
    """Process output events from the agent.

    Args:
        event: Output event from the agent (audio, text, tool calls, etc.)
    """
    ...

start(agent) async

Start output.

Source code in strands/experimental/bidi/types/io.py
49
50
51
async def start(self, agent: "BidiAgent") -> None:
    """Start output."""
    return

stop() async

Stop output.

Source code in strands/experimental/bidi/types/io.py
53
54
55
async def stop(self) -> None:
    """Stop output."""
    return

BidiTextInputEvent

Bases: TypedEvent

Text input event for sending text to the model.

Used for sending text content through the send() method.

Parameters:

Name Type Description Default
text str

The text content to send to the model.

required
role Role

The role of the message sender (default: "user").

'user'
Source code in strands/experimental/bidi/types/events.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class BidiTextInputEvent(TypedEvent):
    """Text input event for sending text to the model.

    Used for sending text content through the send() method.

    Parameters:
        text: The text content to send to the model.
        role: The role of the message sender (default: "user").
    """

    def __init__(self, text: str, role: Role = "user"):
        """Initialize text input event."""
        super().__init__(
            {
                "type": "bidi_text_input",
                "text": text,
                "role": role,
            }
        )

    @property
    def text(self) -> str:
        """The text content to send to the model."""
        return cast(str, self["text"])

    @property
    def role(self) -> Role:
        """The role of the message sender."""
        return cast(Role, self["role"])

role property

The role of the message sender.

text property

The text content to send to the model.

__init__(text, role='user')

Initialize text input event.

Source code in strands/experimental/bidi/types/events.py
74
75
76
77
78
79
80
81
82
def __init__(self, text: str, role: Role = "user"):
    """Initialize text input event."""
    super().__init__(
        {
            "type": "bidi_text_input",
            "text": text,
            "role": role,
        }
    )

ConcurrentToolExecutor

Bases: ToolExecutor

Concurrent tool executor.

Source code in strands/tools/executors/concurrent.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class ConcurrentToolExecutor(ToolExecutor):
    """Concurrent tool executor."""

    @override
    async def _execute(
        self,
        agent: "Agent",
        tool_uses: list[ToolUse],
        tool_results: list[ToolResult],
        cycle_trace: Trace,
        cycle_span: Any,
        invocation_state: dict[str, Any],
        structured_output_context: "StructuredOutputContext | None" = None,
    ) -> AsyncGenerator[TypedEvent, None]:
        """Execute tools concurrently.

        Args:
            agent: The agent for which tools are being executed.
            tool_uses: Metadata and inputs for the tools to be executed.
            tool_results: List of tool results from each tool execution.
            cycle_trace: Trace object for the current event loop cycle.
            cycle_span: Span object for tracing the cycle.
            invocation_state: Context for the tool invocation.
            structured_output_context: Context for structured output handling.

        Yields:
            Events from the tool execution stream.
        """
        task_queue: asyncio.Queue[tuple[int, Any]] = asyncio.Queue()
        task_events = [asyncio.Event() for _ in tool_uses]
        stop_event = object()

        tasks = [
            asyncio.create_task(
                self._task(
                    agent,
                    tool_use,
                    tool_results,
                    cycle_trace,
                    cycle_span,
                    invocation_state,
                    task_id,
                    task_queue,
                    task_events[task_id],
                    stop_event,
                    structured_output_context,
                )
            )
            for task_id, tool_use in enumerate(tool_uses)
        ]

        task_count = len(tasks)
        while task_count:
            task_id, event = await task_queue.get()
            if event is stop_event:
                task_count -= 1
                continue

            yield event
            task_events[task_id].set()

    async def _task(
        self,
        agent: "Agent",
        tool_use: ToolUse,
        tool_results: list[ToolResult],
        cycle_trace: Trace,
        cycle_span: Any,
        invocation_state: dict[str, Any],
        task_id: int,
        task_queue: asyncio.Queue,
        task_event: asyncio.Event,
        stop_event: object,
        structured_output_context: "StructuredOutputContext | None",
    ) -> None:
        """Execute a single tool and put results in the task queue.

        Args:
            agent: The agent executing the tool.
            tool_use: Tool use metadata and inputs.
            tool_results: List of tool results from each tool execution.
            cycle_trace: Trace object for the current event loop cycle.
            cycle_span: Span object for tracing the cycle.
            invocation_state: Context for tool execution.
            task_id: Unique identifier for this task.
            task_queue: Queue to put tool events into.
            task_event: Event to signal when task can continue.
            stop_event: Sentinel object to signal task completion.
            structured_output_context: Context for structured output handling.
        """
        try:
            events = ToolExecutor._stream_with_trace(
                agent, tool_use, tool_results, cycle_trace, cycle_span, invocation_state, structured_output_context
            )
            async for event in events:
                task_queue.put_nowait((task_id, event))
                await task_event.wait()
                task_event.clear()

        finally:
            task_queue.put_nowait((task_id, stop_event))

HookProvider

Bases: Protocol

Protocol for objects that provide hook callbacks to an agent.

Hook providers offer a composable way to extend agent functionality by subscribing to various events in the agent lifecycle. This protocol enables building reusable components that can hook into agent events.

Example
class MyHookProvider(HookProvider):
    def register_hooks(self, registry: HookRegistry) -> None:
        registry.add_callback(StartRequestEvent, self.on_request_start)
        registry.add_callback(EndRequestEvent, self.on_request_end)

agent = Agent(hooks=[MyHookProvider()])
Source code in strands/hooks/registry.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@runtime_checkable
class HookProvider(Protocol):
    """Protocol for objects that provide hook callbacks to an agent.

    Hook providers offer a composable way to extend agent functionality by
    subscribing to various events in the agent lifecycle. This protocol enables
    building reusable components that can hook into agent events.

    Example:
        ```python
        class MyHookProvider(HookProvider):
            def register_hooks(self, registry: HookRegistry) -> None:
                registry.add_callback(StartRequestEvent, self.on_request_start)
                registry.add_callback(EndRequestEvent, self.on_request_end)

        agent = Agent(hooks=[MyHookProvider()])
        ```
    """

    def register_hooks(self, registry: "HookRegistry", **kwargs: Any) -> None:
        """Register callback functions for specific event types.

        Args:
            registry: The hook registry to register callbacks with.
            **kwargs: Additional keyword arguments for future extensibility.
        """
        ...

register_hooks(registry, **kwargs)

Register callback functions for specific event types.

Parameters:

Name Type Description Default
registry HookRegistry

The hook registry to register callbacks with.

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}
Source code in strands/hooks/registry.py
107
108
109
110
111
112
113
114
def register_hooks(self, registry: "HookRegistry", **kwargs: Any) -> None:
    """Register callback functions for specific event types.

    Args:
        registry: The hook registry to register callbacks with.
        **kwargs: Additional keyword arguments for future extensibility.
    """
    ...

HookRegistry

Registry for managing hook callbacks associated with event types.

The HookRegistry maintains a mapping of event types to callback functions and provides methods for registering callbacks and invoking them when events occur.

The registry handles callback ordering, including reverse ordering for cleanup events, and provides type-safe event dispatching.

Source code in strands/hooks/registry.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class HookRegistry:
    """Registry for managing hook callbacks associated with event types.

    The HookRegistry maintains a mapping of event types to callback functions
    and provides methods for registering callbacks and invoking them when
    events occur.

    The registry handles callback ordering, including reverse ordering for
    cleanup events, and provides type-safe event dispatching.
    """

    def __init__(self) -> None:
        """Initialize an empty hook registry."""
        self._registered_callbacks: dict[type, list[HookCallback]] = {}

    def add_callback(self, event_type: type[TEvent], callback: HookCallback[TEvent]) -> None:
        """Register a callback function for a specific event type.

        Args:
            event_type: The class type of events this callback should handle.
            callback: The callback function to invoke when events of this type occur.

        Example:
            ```python
            def my_handler(event: StartRequestEvent):
                print("Request started")

            registry.add_callback(StartRequestEvent, my_handler)
            ```
        """
        # Related issue: https://github.com/strands-agents/sdk-python/issues/330
        if event_type.__name__ == "AgentInitializedEvent" and inspect.iscoroutinefunction(callback):
            raise ValueError("AgentInitializedEvent can only be registered with a synchronous callback")

        callbacks = self._registered_callbacks.setdefault(event_type, [])
        callbacks.append(callback)

    def add_hook(self, hook: HookProvider) -> None:
        """Register all callbacks from a hook provider.

        This method allows bulk registration of callbacks by delegating to
        the hook provider's register_hooks method. This is the preferred
        way to register multiple related callbacks.

        Args:
            hook: The hook provider containing callbacks to register.

        Example:
            ```python
            class MyHooks(HookProvider):
                def register_hooks(self, registry: HookRegistry):
                    registry.add_callback(StartRequestEvent, self.on_start)
                    registry.add_callback(EndRequestEvent, self.on_end)

            registry.add_hook(MyHooks())
            ```
        """
        hook.register_hooks(self)

    async def invoke_callbacks_async(self, event: TInvokeEvent) -> tuple[TInvokeEvent, list[Interrupt]]:
        """Invoke all registered callbacks for the given event.

        This method finds all callbacks registered for the event's type and
        invokes them in the appropriate order. For events with should_reverse_callbacks=True,
        callbacks are invoked in reverse registration order. Any exceptions raised by callback
        functions will propagate to the caller.

        Additionally, this method aggregates interrupts raised by the user to instantiate human-in-the-loop workflows.

        Args:
            event: The event to dispatch to registered callbacks.

        Returns:
            The event dispatched to registered callbacks and any interrupts raised by the user.

        Raises:
            ValueError: If interrupt name is used more than once.

        Example:
            ```python
            event = StartRequestEvent(agent=my_agent)
            await registry.invoke_callbacks_async(event)
            ```
        """
        interrupts: dict[str, Interrupt] = {}

        for callback in self.get_callbacks_for(event):
            try:
                if inspect.iscoroutinefunction(callback):
                    await callback(event)
                else:
                    callback(event)

            except InterruptException as exception:
                interrupt = exception.interrupt
                if interrupt.name in interrupts:
                    message = f"interrupt_name=<{interrupt.name}> | interrupt name used more than once"
                    logger.error(message)
                    raise ValueError(message) from exception

                # Each callback is allowed to raise their own interrupt.
                interrupts[interrupt.name] = interrupt

        return event, list(interrupts.values())

    def invoke_callbacks(self, event: TInvokeEvent) -> tuple[TInvokeEvent, list[Interrupt]]:
        """Invoke all registered callbacks for the given event.

        This method finds all callbacks registered for the event's type and
        invokes them in the appropriate order. For events with should_reverse_callbacks=True,
        callbacks are invoked in reverse registration order. Any exceptions raised by callback
        functions will propagate to the caller.

        Additionally, this method aggregates interrupts raised by the user to instantiate human-in-the-loop workflows.

        Args:
            event: The event to dispatch to registered callbacks.

        Returns:
            The event dispatched to registered callbacks and any interrupts raised by the user.

        Raises:
            RuntimeError: If at least one callback is async.
            ValueError: If interrupt name is used more than once.

        Example:
            ```python
            event = StartRequestEvent(agent=my_agent)
            registry.invoke_callbacks(event)
            ```
        """
        callbacks = list(self.get_callbacks_for(event))
        interrupts: dict[str, Interrupt] = {}

        if any(inspect.iscoroutinefunction(callback) for callback in callbacks):
            raise RuntimeError(f"event=<{event}> | use invoke_callbacks_async to invoke async callback")

        for callback in callbacks:
            try:
                callback(event)
            except InterruptException as exception:
                interrupt = exception.interrupt
                if interrupt.name in interrupts:
                    message = f"interrupt_name=<{interrupt.name}> | interrupt name used more than once"
                    logger.error(message)
                    raise ValueError(message) from exception

                # Each callback is allowed to raise their own interrupt.
                interrupts[interrupt.name] = interrupt

        return event, list(interrupts.values())

    def has_callbacks(self) -> bool:
        """Check if the registry has any registered callbacks.

        Returns:
            True if there are any registered callbacks, False otherwise.

        Example:
            ```python
            if registry.has_callbacks():
                print("Registry has callbacks registered")
            ```
        """
        return bool(self._registered_callbacks)

    def get_callbacks_for(self, event: TEvent) -> Generator[HookCallback[TEvent], None, None]:
        """Get callbacks registered for the given event in the appropriate order.

        This method returns callbacks in registration order for normal events,
        or reverse registration order for events that have should_reverse_callbacks=True.
        This enables proper cleanup ordering for teardown events.

        Args:
            event: The event to get callbacks for.

        Yields:
            Callback functions registered for this event type, in the appropriate order.

        Example:
            ```python
            event = EndRequestEvent(agent=my_agent)
            for callback in registry.get_callbacks_for(event):
                callback(event)
            ```
        """
        event_type = type(event)

        callbacks = self._registered_callbacks.get(event_type, [])
        if event.should_reverse_callbacks:
            yield from reversed(callbacks)
        else:
            yield from callbacks

__init__()

Initialize an empty hook registry.

Source code in strands/hooks/registry.py
156
157
158
def __init__(self) -> None:
    """Initialize an empty hook registry."""
    self._registered_callbacks: dict[type, list[HookCallback]] = {}

add_callback(event_type, callback)

Register a callback function for a specific event type.

Parameters:

Name Type Description Default
event_type type[TEvent]

The class type of events this callback should handle.

required
callback HookCallback[TEvent]

The callback function to invoke when events of this type occur.

required
Example
def my_handler(event: StartRequestEvent):
    print("Request started")

registry.add_callback(StartRequestEvent, my_handler)
Source code in strands/hooks/registry.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def add_callback(self, event_type: type[TEvent], callback: HookCallback[TEvent]) -> None:
    """Register a callback function for a specific event type.

    Args:
        event_type: The class type of events this callback should handle.
        callback: The callback function to invoke when events of this type occur.

    Example:
        ```python
        def my_handler(event: StartRequestEvent):
            print("Request started")

        registry.add_callback(StartRequestEvent, my_handler)
        ```
    """
    # Related issue: https://github.com/strands-agents/sdk-python/issues/330
    if event_type.__name__ == "AgentInitializedEvent" and inspect.iscoroutinefunction(callback):
        raise ValueError("AgentInitializedEvent can only be registered with a synchronous callback")

    callbacks = self._registered_callbacks.setdefault(event_type, [])
    callbacks.append(callback)

add_hook(hook)

Register all callbacks from a hook provider.

This method allows bulk registration of callbacks by delegating to the hook provider's register_hooks method. This is the preferred way to register multiple related callbacks.

Parameters:

Name Type Description Default
hook HookProvider

The hook provider containing callbacks to register.

required
Example
class MyHooks(HookProvider):
    def register_hooks(self, registry: HookRegistry):
        registry.add_callback(StartRequestEvent, self.on_start)
        registry.add_callback(EndRequestEvent, self.on_end)

registry.add_hook(MyHooks())
Source code in strands/hooks/registry.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def add_hook(self, hook: HookProvider) -> None:
    """Register all callbacks from a hook provider.

    This method allows bulk registration of callbacks by delegating to
    the hook provider's register_hooks method. This is the preferred
    way to register multiple related callbacks.

    Args:
        hook: The hook provider containing callbacks to register.

    Example:
        ```python
        class MyHooks(HookProvider):
            def register_hooks(self, registry: HookRegistry):
                registry.add_callback(StartRequestEvent, self.on_start)
                registry.add_callback(EndRequestEvent, self.on_end)

        registry.add_hook(MyHooks())
        ```
    """
    hook.register_hooks(self)

get_callbacks_for(event)

Get callbacks registered for the given event in the appropriate order.

This method returns callbacks in registration order for normal events, or reverse registration order for events that have should_reverse_callbacks=True. This enables proper cleanup ordering for teardown events.

Parameters:

Name Type Description Default
event TEvent

The event to get callbacks for.

required

Yields:

Type Description
HookCallback[TEvent]

Callback functions registered for this event type, in the appropriate order.

Example
event = EndRequestEvent(agent=my_agent)
for callback in registry.get_callbacks_for(event):
    callback(event)
Source code in strands/hooks/registry.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def get_callbacks_for(self, event: TEvent) -> Generator[HookCallback[TEvent], None, None]:
    """Get callbacks registered for the given event in the appropriate order.

    This method returns callbacks in registration order for normal events,
    or reverse registration order for events that have should_reverse_callbacks=True.
    This enables proper cleanup ordering for teardown events.

    Args:
        event: The event to get callbacks for.

    Yields:
        Callback functions registered for this event type, in the appropriate order.

    Example:
        ```python
        event = EndRequestEvent(agent=my_agent)
        for callback in registry.get_callbacks_for(event):
            callback(event)
        ```
    """
    event_type = type(event)

    callbacks = self._registered_callbacks.get(event_type, [])
    if event.should_reverse_callbacks:
        yield from reversed(callbacks)
    else:
        yield from callbacks

has_callbacks()

Check if the registry has any registered callbacks.

Returns:

Type Description
bool

True if there are any registered callbacks, False otherwise.

Example
if registry.has_callbacks():
    print("Registry has callbacks registered")
Source code in strands/hooks/registry.py
297
298
299
300
301
302
303
304
305
306
307
308
309
def has_callbacks(self) -> bool:
    """Check if the registry has any registered callbacks.

    Returns:
        True if there are any registered callbacks, False otherwise.

    Example:
        ```python
        if registry.has_callbacks():
            print("Registry has callbacks registered")
        ```
    """
    return bool(self._registered_callbacks)

invoke_callbacks(event)

Invoke all registered callbacks for the given event.

This method finds all callbacks registered for the event's type and invokes them in the appropriate order. For events with should_reverse_callbacks=True, callbacks are invoked in reverse registration order. Any exceptions raised by callback functions will propagate to the caller.

Additionally, this method aggregates interrupts raised by the user to instantiate human-in-the-loop workflows.

Parameters:

Name Type Description Default
event TInvokeEvent

The event to dispatch to registered callbacks.

required

Returns:

Type Description
tuple[TInvokeEvent, list[Interrupt]]

The event dispatched to registered callbacks and any interrupts raised by the user.

Raises:

Type Description
RuntimeError

If at least one callback is async.

ValueError

If interrupt name is used more than once.

Example
event = StartRequestEvent(agent=my_agent)
registry.invoke_callbacks(event)
Source code in strands/hooks/registry.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def invoke_callbacks(self, event: TInvokeEvent) -> tuple[TInvokeEvent, list[Interrupt]]:
    """Invoke all registered callbacks for the given event.

    This method finds all callbacks registered for the event's type and
    invokes them in the appropriate order. For events with should_reverse_callbacks=True,
    callbacks are invoked in reverse registration order. Any exceptions raised by callback
    functions will propagate to the caller.

    Additionally, this method aggregates interrupts raised by the user to instantiate human-in-the-loop workflows.

    Args:
        event: The event to dispatch to registered callbacks.

    Returns:
        The event dispatched to registered callbacks and any interrupts raised by the user.

    Raises:
        RuntimeError: If at least one callback is async.
        ValueError: If interrupt name is used more than once.

    Example:
        ```python
        event = StartRequestEvent(agent=my_agent)
        registry.invoke_callbacks(event)
        ```
    """
    callbacks = list(self.get_callbacks_for(event))
    interrupts: dict[str, Interrupt] = {}

    if any(inspect.iscoroutinefunction(callback) for callback in callbacks):
        raise RuntimeError(f"event=<{event}> | use invoke_callbacks_async to invoke async callback")

    for callback in callbacks:
        try:
            callback(event)
        except InterruptException as exception:
            interrupt = exception.interrupt
            if interrupt.name in interrupts:
                message = f"interrupt_name=<{interrupt.name}> | interrupt name used more than once"
                logger.error(message)
                raise ValueError(message) from exception

            # Each callback is allowed to raise their own interrupt.
            interrupts[interrupt.name] = interrupt

    return event, list(interrupts.values())

invoke_callbacks_async(event) async

Invoke all registered callbacks for the given event.

This method finds all callbacks registered for the event's type and invokes them in the appropriate order. For events with should_reverse_callbacks=True, callbacks are invoked in reverse registration order. Any exceptions raised by callback functions will propagate to the caller.

Additionally, this method aggregates interrupts raised by the user to instantiate human-in-the-loop workflows.

Parameters:

Name Type Description Default
event TInvokeEvent

The event to dispatch to registered callbacks.

required

Returns:

Type Description
tuple[TInvokeEvent, list[Interrupt]]

The event dispatched to registered callbacks and any interrupts raised by the user.

Raises:

Type Description
ValueError

If interrupt name is used more than once.

Example
event = StartRequestEvent(agent=my_agent)
await registry.invoke_callbacks_async(event)
Source code in strands/hooks/registry.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
async def invoke_callbacks_async(self, event: TInvokeEvent) -> tuple[TInvokeEvent, list[Interrupt]]:
    """Invoke all registered callbacks for the given event.

    This method finds all callbacks registered for the event's type and
    invokes them in the appropriate order. For events with should_reverse_callbacks=True,
    callbacks are invoked in reverse registration order. Any exceptions raised by callback
    functions will propagate to the caller.

    Additionally, this method aggregates interrupts raised by the user to instantiate human-in-the-loop workflows.

    Args:
        event: The event to dispatch to registered callbacks.

    Returns:
        The event dispatched to registered callbacks and any interrupts raised by the user.

    Raises:
        ValueError: If interrupt name is used more than once.

    Example:
        ```python
        event = StartRequestEvent(agent=my_agent)
        await registry.invoke_callbacks_async(event)
        ```
    """
    interrupts: dict[str, Interrupt] = {}

    for callback in self.get_callbacks_for(event):
        try:
            if inspect.iscoroutinefunction(callback):
                await callback(event)
            else:
                callback(event)

        except InterruptException as exception:
            interrupt = exception.interrupt
            if interrupt.name in interrupts:
                message = f"interrupt_name=<{interrupt.name}> | interrupt name used more than once"
                logger.error(message)
                raise ValueError(message) from exception

            # Each callback is allowed to raise their own interrupt.
            interrupts[interrupt.name] = interrupt

    return event, list(interrupts.values())

Message

Bases: TypedDict

A message in a conversation with the agent.

Attributes:

Name Type Description
content list[ContentBlock]

The message content.

role Role

The role of the message sender.

Source code in strands/types/content.py
178
179
180
181
182
183
184
185
186
187
class Message(TypedDict):
    """A message in a conversation with the agent.

    Attributes:
        content: The message content.
        role: The role of the message sender.
    """

    content: list[ContentBlock]
    role: Role

SessionManager

Bases: HookProvider, ABC

Abstract interface for managing sessions.

A session manager is in charge of persisting the conversation and state of an agent across its interaction. Changes made to the agents conversation, state, or other attributes should be persisted immediately after they are changed. The different methods introduced in this class are called at important lifecycle events for an agent, and should be persisted in the session.

Source code in strands/session/session_manager.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class SessionManager(HookProvider, ABC):
    """Abstract interface for managing sessions.

    A session manager is in charge of persisting the conversation and state of an agent across its interaction.
    Changes made to the agents conversation, state, or other attributes should be persisted immediately after
    they are changed. The different methods introduced in this class are called at important lifecycle events
    for an agent, and should be persisted in the session.
    """

    def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
        """Register hooks for persisting the agent to the session."""
        # After the normal Agent initialization behavior, call the session initialize function to restore the agent
        registry.add_callback(AgentInitializedEvent, lambda event: self.initialize(event.agent))

        # For each message appended to the Agents messages, store that message in the session
        registry.add_callback(MessageAddedEvent, lambda event: self.append_message(event.message, event.agent))

        # Sync the agent into the session for each message in case the agent state was updated
        registry.add_callback(MessageAddedEvent, lambda event: self.sync_agent(event.agent))

        # After an agent was invoked, sync it with the session to capture any conversation manager state updates
        registry.add_callback(AfterInvocationEvent, lambda event: self.sync_agent(event.agent))

        registry.add_callback(MultiAgentInitializedEvent, lambda event: self.initialize_multi_agent(event.source))
        registry.add_callback(AfterNodeCallEvent, lambda event: self.sync_multi_agent(event.source))
        registry.add_callback(AfterMultiAgentInvocationEvent, lambda event: self.sync_multi_agent(event.source))

        # Register BidiAgent hooks
        registry.add_callback(BidiAgentInitializedEvent, lambda event: self.initialize_bidi_agent(event.agent))
        registry.add_callback(BidiMessageAddedEvent, lambda event: self.append_bidi_message(event.message, event.agent))
        registry.add_callback(BidiMessageAddedEvent, lambda event: self.sync_bidi_agent(event.agent))
        registry.add_callback(BidiAfterInvocationEvent, lambda event: self.sync_bidi_agent(event.agent))

    @abstractmethod
    def redact_latest_message(self, redact_message: Message, agent: "Agent", **kwargs: Any) -> None:
        """Redact the message most recently appended to the agent in the session.

        Args:
            redact_message: New message to use that contains the redact content
            agent: Agent to apply the message redaction to
            **kwargs: Additional keyword arguments for future extensibility.
        """

    @abstractmethod
    def append_message(self, message: Message, agent: "Agent", **kwargs: Any) -> None:
        """Append a message to the agent's session.

        Args:
            message: Message to add to the agent in the session
            agent: Agent to append the message to
            **kwargs: Additional keyword arguments for future extensibility.
        """

    @abstractmethod
    def sync_agent(self, agent: "Agent", **kwargs: Any) -> None:
        """Serialize and sync the agent with the session storage.

        Args:
            agent: Agent who should be synchronized with the session storage
            **kwargs: Additional keyword arguments for future extensibility.
        """

    @abstractmethod
    def initialize(self, agent: "Agent", **kwargs: Any) -> None:
        """Initialize an agent with a session.

        Args:
            agent: Agent to initialize
            **kwargs: Additional keyword arguments for future extensibility.
        """

    def sync_multi_agent(self, source: "MultiAgentBase", **kwargs: Any) -> None:
        """Serialize and sync multi-agent with the session storage.

        Args:
            source: Multi-agent source object to persist
            **kwargs: Additional keyword arguments for future extensibility.
        """
        raise NotImplementedError(
            f"{self.__class__.__name__} does not support multi-agent persistence "
            "(sync_multi_agent). Provide an implementation or use a "
            "SessionManager with session_type=SessionType.MULTI_AGENT."
        )

    def initialize_multi_agent(self, source: "MultiAgentBase", **kwargs: Any) -> None:
        """Read multi-agent state from persistent storage.

        Args:
            **kwargs: Additional keyword arguments for future extensibility.
            source: Multi-agent state to initialize.

        Returns:
            Multi-agent state dictionary or empty dict if not found.

        """
        raise NotImplementedError(
            f"{self.__class__.__name__} does not support multi-agent persistence "
            "(initialize_multi_agent). Provide an implementation or use a "
            "SessionManager with session_type=SessionType.MULTI_AGENT."
        )

    def initialize_bidi_agent(self, agent: "BidiAgent", **kwargs: Any) -> None:
        """Initialize a bidirectional agent with a session.

        Args:
            agent: BidiAgent to initialize
            **kwargs: Additional keyword arguments for future extensibility.
        """
        raise NotImplementedError(
            f"{self.__class__.__name__} does not support bidirectional agent persistence "
            "(initialize_bidi_agent). Provide an implementation or use a "
            "SessionManager with bidirectional agent support."
        )

    def append_bidi_message(self, message: Message, agent: "BidiAgent", **kwargs: Any) -> None:
        """Append a message to the bidirectional agent's session.

        Args:
            message: Message to add to the agent in the session
            agent: BidiAgent to append the message to
            **kwargs: Additional keyword arguments for future extensibility.
        """
        raise NotImplementedError(
            f"{self.__class__.__name__} does not support bidirectional agent persistence "
            "(append_bidi_message). Provide an implementation or use a "
            "SessionManager with bidirectional agent support."
        )

    def sync_bidi_agent(self, agent: "BidiAgent", **kwargs: Any) -> None:
        """Serialize and sync the bidirectional agent with the session storage.

        Args:
            agent: BidiAgent who should be synchronized with the session storage
            **kwargs: Additional keyword arguments for future extensibility.
        """
        raise NotImplementedError(
            f"{self.__class__.__name__} does not support bidirectional agent persistence "
            "(sync_bidi_agent). Provide an implementation or use a "
            "SessionManager with bidirectional agent support."
        )

append_bidi_message(message, agent, **kwargs)

Append a message to the bidirectional agent's session.

Parameters:

Name Type Description Default
message Message

Message to add to the agent in the session

required
agent BidiAgent

BidiAgent to append the message to

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}
Source code in strands/session/session_manager.py
145
146
147
148
149
150
151
152
153
154
155
156
157
def append_bidi_message(self, message: Message, agent: "BidiAgent", **kwargs: Any) -> None:
    """Append a message to the bidirectional agent's session.

    Args:
        message: Message to add to the agent in the session
        agent: BidiAgent to append the message to
        **kwargs: Additional keyword arguments for future extensibility.
    """
    raise NotImplementedError(
        f"{self.__class__.__name__} does not support bidirectional agent persistence "
        "(append_bidi_message). Provide an implementation or use a "
        "SessionManager with bidirectional agent support."
    )

append_message(message, agent, **kwargs) abstractmethod

Append a message to the agent's session.

Parameters:

Name Type Description Default
message Message

Message to add to the agent in the session

required
agent Agent

Agent to append the message to

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}
Source code in strands/session/session_manager.py
74
75
76
77
78
79
80
81
82
@abstractmethod
def append_message(self, message: Message, agent: "Agent", **kwargs: Any) -> None:
    """Append a message to the agent's session.

    Args:
        message: Message to add to the agent in the session
        agent: Agent to append the message to
        **kwargs: Additional keyword arguments for future extensibility.
    """

initialize(agent, **kwargs) abstractmethod

Initialize an agent with a session.

Parameters:

Name Type Description Default
agent Agent

Agent to initialize

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}
Source code in strands/session/session_manager.py
 93
 94
 95
 96
 97
 98
 99
100
@abstractmethod
def initialize(self, agent: "Agent", **kwargs: Any) -> None:
    """Initialize an agent with a session.

    Args:
        agent: Agent to initialize
        **kwargs: Additional keyword arguments for future extensibility.
    """

initialize_bidi_agent(agent, **kwargs)

Initialize a bidirectional agent with a session.

Parameters:

Name Type Description Default
agent BidiAgent

BidiAgent to initialize

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}
Source code in strands/session/session_manager.py
132
133
134
135
136
137
138
139
140
141
142
143
def initialize_bidi_agent(self, agent: "BidiAgent", **kwargs: Any) -> None:
    """Initialize a bidirectional agent with a session.

    Args:
        agent: BidiAgent to initialize
        **kwargs: Additional keyword arguments for future extensibility.
    """
    raise NotImplementedError(
        f"{self.__class__.__name__} does not support bidirectional agent persistence "
        "(initialize_bidi_agent). Provide an implementation or use a "
        "SessionManager with bidirectional agent support."
    )

initialize_multi_agent(source, **kwargs)

Read multi-agent state from persistent storage.

Parameters:

Name Type Description Default
**kwargs Any

Additional keyword arguments for future extensibility.

{}
source MultiAgentBase

Multi-agent state to initialize.

required

Returns:

Type Description
None

Multi-agent state dictionary or empty dict if not found.

Source code in strands/session/session_manager.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def initialize_multi_agent(self, source: "MultiAgentBase", **kwargs: Any) -> None:
    """Read multi-agent state from persistent storage.

    Args:
        **kwargs: Additional keyword arguments for future extensibility.
        source: Multi-agent state to initialize.

    Returns:
        Multi-agent state dictionary or empty dict if not found.

    """
    raise NotImplementedError(
        f"{self.__class__.__name__} does not support multi-agent persistence "
        "(initialize_multi_agent). Provide an implementation or use a "
        "SessionManager with session_type=SessionType.MULTI_AGENT."
    )

redact_latest_message(redact_message, agent, **kwargs) abstractmethod

Redact the message most recently appended to the agent in the session.

Parameters:

Name Type Description Default
redact_message Message

New message to use that contains the redact content

required
agent Agent

Agent to apply the message redaction to

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}
Source code in strands/session/session_manager.py
64
65
66
67
68
69
70
71
72
@abstractmethod
def redact_latest_message(self, redact_message: Message, agent: "Agent", **kwargs: Any) -> None:
    """Redact the message most recently appended to the agent in the session.

    Args:
        redact_message: New message to use that contains the redact content
        agent: Agent to apply the message redaction to
        **kwargs: Additional keyword arguments for future extensibility.
    """

register_hooks(registry, **kwargs)

Register hooks for persisting the agent to the session.

Source code in strands/session/session_manager.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
    """Register hooks for persisting the agent to the session."""
    # After the normal Agent initialization behavior, call the session initialize function to restore the agent
    registry.add_callback(AgentInitializedEvent, lambda event: self.initialize(event.agent))

    # For each message appended to the Agents messages, store that message in the session
    registry.add_callback(MessageAddedEvent, lambda event: self.append_message(event.message, event.agent))

    # Sync the agent into the session for each message in case the agent state was updated
    registry.add_callback(MessageAddedEvent, lambda event: self.sync_agent(event.agent))

    # After an agent was invoked, sync it with the session to capture any conversation manager state updates
    registry.add_callback(AfterInvocationEvent, lambda event: self.sync_agent(event.agent))

    registry.add_callback(MultiAgentInitializedEvent, lambda event: self.initialize_multi_agent(event.source))
    registry.add_callback(AfterNodeCallEvent, lambda event: self.sync_multi_agent(event.source))
    registry.add_callback(AfterMultiAgentInvocationEvent, lambda event: self.sync_multi_agent(event.source))

    # Register BidiAgent hooks
    registry.add_callback(BidiAgentInitializedEvent, lambda event: self.initialize_bidi_agent(event.agent))
    registry.add_callback(BidiMessageAddedEvent, lambda event: self.append_bidi_message(event.message, event.agent))
    registry.add_callback(BidiMessageAddedEvent, lambda event: self.sync_bidi_agent(event.agent))
    registry.add_callback(BidiAfterInvocationEvent, lambda event: self.sync_bidi_agent(event.agent))

sync_agent(agent, **kwargs) abstractmethod

Serialize and sync the agent with the session storage.

Parameters:

Name Type Description Default
agent Agent

Agent who should be synchronized with the session storage

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}
Source code in strands/session/session_manager.py
84
85
86
87
88
89
90
91
@abstractmethod
def sync_agent(self, agent: "Agent", **kwargs: Any) -> None:
    """Serialize and sync the agent with the session storage.

    Args:
        agent: Agent who should be synchronized with the session storage
        **kwargs: Additional keyword arguments for future extensibility.
    """

sync_bidi_agent(agent, **kwargs)

Serialize and sync the bidirectional agent with the session storage.

Parameters:

Name Type Description Default
agent BidiAgent

BidiAgent who should be synchronized with the session storage

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}
Source code in strands/session/session_manager.py
159
160
161
162
163
164
165
166
167
168
169
170
def sync_bidi_agent(self, agent: "BidiAgent", **kwargs: Any) -> None:
    """Serialize and sync the bidirectional agent with the session storage.

    Args:
        agent: BidiAgent who should be synchronized with the session storage
        **kwargs: Additional keyword arguments for future extensibility.
    """
    raise NotImplementedError(
        f"{self.__class__.__name__} does not support bidirectional agent persistence "
        "(sync_bidi_agent). Provide an implementation or use a "
        "SessionManager with bidirectional agent support."
    )

sync_multi_agent(source, **kwargs)

Serialize and sync multi-agent with the session storage.

Parameters:

Name Type Description Default
source MultiAgentBase

Multi-agent source object to persist

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}
Source code in strands/session/session_manager.py
102
103
104
105
106
107
108
109
110
111
112
113
def sync_multi_agent(self, source: "MultiAgentBase", **kwargs: Any) -> None:
    """Serialize and sync multi-agent with the session storage.

    Args:
        source: Multi-agent source object to persist
        **kwargs: Additional keyword arguments for future extensibility.
    """
    raise NotImplementedError(
        f"{self.__class__.__name__} does not support multi-agent persistence "
        "(sync_multi_agent). Provide an implementation or use a "
        "SessionManager with session_type=SessionType.MULTI_AGENT."
    )

ToolExecutor

Bases: ABC

Abstract base class for tool executors.

Source code in strands/tools/executors/_executor.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
class ToolExecutor(abc.ABC):
    """Abstract base class for tool executors."""

    @staticmethod
    def _is_agent(agent: "Agent | BidiAgent") -> bool:
        """Check if the agent is an Agent instance, otherwise we assume BidiAgent.

        Note, we use a runtime import to avoid a circular dependency error.
        """
        from ...agent import Agent

        return isinstance(agent, Agent)

    @staticmethod
    async def _invoke_before_tool_call_hook(
        agent: "Agent | BidiAgent",
        tool_func: Any,
        tool_use: ToolUse,
        invocation_state: dict[str, Any],
    ) -> tuple[BeforeToolCallEvent | BidiBeforeToolCallEvent, list[Interrupt]]:
        """Invoke the appropriate before tool call hook based on agent type."""
        kwargs = {
            "selected_tool": tool_func,
            "tool_use": tool_use,
            "invocation_state": invocation_state,
        }
        event = (
            BeforeToolCallEvent(agent=cast("Agent", agent), **kwargs)
            if ToolExecutor._is_agent(agent)
            else BidiBeforeToolCallEvent(agent=cast("BidiAgent", agent), **kwargs)
        )

        return await agent.hooks.invoke_callbacks_async(event)

    @staticmethod
    async def _invoke_after_tool_call_hook(
        agent: "Agent | BidiAgent",
        selected_tool: Any,
        tool_use: ToolUse,
        invocation_state: dict[str, Any],
        result: ToolResult,
        exception: Exception | None = None,
        cancel_message: str | None = None,
    ) -> tuple[AfterToolCallEvent | BidiAfterToolCallEvent, list[Interrupt]]:
        """Invoke the appropriate after tool call hook based on agent type."""
        kwargs = {
            "selected_tool": selected_tool,
            "tool_use": tool_use,
            "invocation_state": invocation_state,
            "result": result,
            "exception": exception,
            "cancel_message": cancel_message,
        }
        event = (
            AfterToolCallEvent(agent=cast("Agent", agent), **kwargs)
            if ToolExecutor._is_agent(agent)
            else BidiAfterToolCallEvent(agent=cast("BidiAgent", agent), **kwargs)
        )

        return await agent.hooks.invoke_callbacks_async(event)

    @staticmethod
    async def _stream(
        agent: "Agent | BidiAgent",
        tool_use: ToolUse,
        tool_results: list[ToolResult],
        invocation_state: dict[str, Any],
        structured_output_context: StructuredOutputContext | None = None,
        **kwargs: Any,
    ) -> AsyncGenerator[TypedEvent, None]:
        """Stream tool events.

        This method adds additional logic to the stream invocation including:

        - Tool lookup and validation
        - Before/after hook execution
        - Tracing and metrics collection
        - Error handling and recovery
        - Interrupt handling for human-in-the-loop workflows

        Args:
            agent: The agent (Agent or BidiAgent) for which the tool is being executed.
            tool_use: Metadata and inputs for the tool to be executed.
            tool_results: List of tool results from each tool execution.
            invocation_state: Context for the tool invocation.
            structured_output_context: Context for structured output management.
            **kwargs: Additional keyword arguments for future extensibility.

        Yields:
            Tool events with the last being the tool result.
        """
        logger.debug("tool_use=<%s> | streaming", tool_use)
        tool_name = tool_use["name"]
        structured_output_context = structured_output_context or StructuredOutputContext()

        tool_info = agent.tool_registry.dynamic_tools.get(tool_name)
        tool_func = tool_info if tool_info is not None else agent.tool_registry.registry.get(tool_name)
        tool_spec = tool_func.tool_spec if tool_func is not None else None

        current_span = trace_api.get_current_span()
        if current_span and tool_spec is not None:
            current_span.set_attribute("gen_ai.tool.description", tool_spec["description"])
            input_schema = tool_spec["inputSchema"]
            if "json" in input_schema:
                current_span.set_attribute("gen_ai.tool.json_schema", serialize(input_schema["json"]))

        invocation_state.update(
            {
                "agent": agent,
                "model": agent.model,
                "messages": agent.messages,
                "system_prompt": agent.system_prompt,
                "tool_config": ToolConfig(  # for backwards compatibility
                    tools=[{"toolSpec": tool_spec} for tool_spec in agent.tool_registry.get_all_tool_specs()],
                    toolChoice=cast(ToolChoice, {"auto": ToolChoiceAuto()}),
                ),
            }
        )

        # Retry loop for tool execution - hooks can set after_event.retry = True to retry
        while True:
            before_event, interrupts = await ToolExecutor._invoke_before_tool_call_hook(
                agent, tool_func, tool_use, invocation_state
            )

            if interrupts:
                yield ToolInterruptEvent(tool_use, interrupts)
                return

            if before_event.cancel_tool:
                cancel_message = (
                    before_event.cancel_tool if isinstance(before_event.cancel_tool, str) else "tool cancelled by user"
                )
                yield ToolCancelEvent(tool_use, cancel_message)

                cancel_result: ToolResult = {
                    "toolUseId": str(tool_use.get("toolUseId")),
                    "status": "error",
                    "content": [{"text": cancel_message}],
                }

                after_event, _ = await ToolExecutor._invoke_after_tool_call_hook(
                    agent, None, tool_use, invocation_state, cancel_result, cancel_message=cancel_message
                )
                yield ToolResultEvent(after_event.result)
                tool_results.append(after_event.result)
                return

            try:
                selected_tool = before_event.selected_tool
                tool_use = before_event.tool_use
                invocation_state = before_event.invocation_state

                if not selected_tool:
                    if tool_func == selected_tool:
                        logger.error(
                            "tool_name=<%s>, available_tools=<%s> | tool not found in registry",
                            tool_name,
                            list(agent.tool_registry.registry.keys()),
                        )
                    else:
                        logger.debug(
                            "tool_name=<%s>, tool_use_id=<%s> | a hook resulted in a non-existing tool call",
                            tool_name,
                            str(tool_use.get("toolUseId")),
                        )

                    result: ToolResult = {
                        "toolUseId": str(tool_use.get("toolUseId")),
                        "status": "error",
                        "content": [{"text": f"Unknown tool: {tool_name}"}],
                    }

                    after_event, _ = await ToolExecutor._invoke_after_tool_call_hook(
                        agent, selected_tool, tool_use, invocation_state, result
                    )
                    # Check if retry requested for unknown tool error
                    # Use getattr because BidiAfterToolCallEvent doesn't have retry attribute
                    if getattr(after_event, "retry", False):
                        logger.debug("tool_name=<%s> | retry requested, retrying tool call", tool_name)
                        continue
                    yield ToolResultEvent(after_event.result)
                    tool_results.append(after_event.result)
                    return
                if structured_output_context.is_enabled:
                    kwargs["structured_output_context"] = structured_output_context
                async for event in selected_tool.stream(tool_use, invocation_state, **kwargs):
                    # Internal optimization; for built-in AgentTools, we yield TypedEvents out of .stream()
                    # so that we don't needlessly yield ToolStreamEvents for non-generator callbacks.
                    # In which case, as soon as we get a ToolResultEvent we're done and for ToolStreamEvent
                    # we yield it directly; all other cases (non-sdk AgentTools), we wrap events in
                    # ToolStreamEvent and the last event is just the result.

                    if isinstance(event, ToolInterruptEvent):
                        yield event
                        return

                    if isinstance(event, ToolResultEvent):
                        # below the last "event" must point to the tool_result
                        event = event.tool_result
                        break

                    if isinstance(event, ToolStreamEvent):
                        yield event
                    else:
                        yield ToolStreamEvent(tool_use, event)

                result = cast(ToolResult, event)

                after_event, _ = await ToolExecutor._invoke_after_tool_call_hook(
                    agent, selected_tool, tool_use, invocation_state, result
                )

                # Check if retry requested (getattr for BidiAfterToolCallEvent compatibility)
                if getattr(after_event, "retry", False):
                    logger.debug("tool_name=<%s> | retry requested, retrying tool call", tool_name)
                    continue

                yield ToolResultEvent(after_event.result)
                tool_results.append(after_event.result)
                return

            except Exception as e:
                logger.exception("tool_name=<%s> | failed to process tool", tool_name)
                error_result: ToolResult = {
                    "toolUseId": str(tool_use.get("toolUseId")),
                    "status": "error",
                    "content": [{"text": f"Error: {str(e)}"}],
                }

                after_event, _ = await ToolExecutor._invoke_after_tool_call_hook(
                    agent, selected_tool, tool_use, invocation_state, error_result, exception=e
                )
                # Check if retry requested (getattr for BidiAfterToolCallEvent compatibility)
                if getattr(after_event, "retry", False):
                    logger.debug("tool_name=<%s> | retry requested after exception, retrying tool call", tool_name)
                    continue
                yield ToolResultEvent(after_event.result)
                tool_results.append(after_event.result)
                return

    @staticmethod
    async def _stream_with_trace(
        agent: "Agent",
        tool_use: ToolUse,
        tool_results: list[ToolResult],
        cycle_trace: Trace,
        cycle_span: Any,
        invocation_state: dict[str, Any],
        structured_output_context: StructuredOutputContext | None = None,
        **kwargs: Any,
    ) -> AsyncGenerator[TypedEvent, None]:
        """Execute tool with tracing and metrics collection.

        Args:
            agent: The agent for which the tool is being executed.
            tool_use: Metadata and inputs for the tool to be executed.
            tool_results: List of tool results from each tool execution.
            cycle_trace: Trace object for the current event loop cycle.
            cycle_span: Span object for tracing the cycle.
            invocation_state: Context for the tool invocation.
            structured_output_context: Context for structured output management.
            **kwargs: Additional keyword arguments for future extensibility.

        Yields:
            Tool events with the last being the tool result.
        """
        tool_name = tool_use["name"]
        structured_output_context = structured_output_context or StructuredOutputContext()

        tracer = get_tracer()

        tool_call_span = tracer.start_tool_call_span(
            tool_use, cycle_span, custom_trace_attributes=agent.trace_attributes
        )
        tool_trace = Trace(f"Tool: {tool_name}", parent_id=cycle_trace.id, raw_name=tool_name)
        tool_start_time = time.time()

        with trace_api.use_span(tool_call_span):
            async for event in ToolExecutor._stream(
                agent, tool_use, tool_results, invocation_state, structured_output_context, **kwargs
            ):
                yield event

            if isinstance(event, ToolInterruptEvent):
                tracer.end_tool_call_span(tool_call_span, tool_result=None)
                return

            result_event = cast(ToolResultEvent, event)
            result = result_event.tool_result

            tool_success = result.get("status") == "success"
            tool_duration = time.time() - tool_start_time
            message = Message(role="user", content=[{"toolResult": result}])
            if ToolExecutor._is_agent(agent):
                agent.event_loop_metrics.add_tool_usage(tool_use, tool_duration, tool_trace, tool_success, message)
            cycle_trace.add_child(tool_trace)

            tracer.end_tool_call_span(tool_call_span, result)

    @abc.abstractmethod
    # pragma: no cover
    def _execute(
        self,
        agent: "Agent",
        tool_uses: list[ToolUse],
        tool_results: list[ToolResult],
        cycle_trace: Trace,
        cycle_span: Any,
        invocation_state: dict[str, Any],
        structured_output_context: "StructuredOutputContext | None" = None,
    ) -> AsyncGenerator[TypedEvent, None]:
        """Execute the given tools according to this executor's strategy.

        Args:
            agent: The agent for which tools are being executed.
            tool_uses: Metadata and inputs for the tools to be executed.
            tool_results: List of tool results from each tool execution.
            cycle_trace: Trace object for the current event loop cycle.
            cycle_span: Span object for tracing the cycle.
            invocation_state: Context for the tool invocation.
            structured_output_context: Context for structured output management.

        Yields:
            Events from the tool execution stream.
        """
        pass

ToolProvider

Bases: ABC

Interface for providing tools with lifecycle management.

Provides a way to load a collection of tools and clean them up when done, with lifecycle managed by the agent.

Source code in strands/tools/tool_provider.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class ToolProvider(ABC):
    """Interface for providing tools with lifecycle management.

    Provides a way to load a collection of tools and clean them up
    when done, with lifecycle managed by the agent.
    """

    @abstractmethod
    async def load_tools(self, **kwargs: Any) -> Sequence["AgentTool"]:
        """Load and return the tools in this provider.

        Args:
            **kwargs: Additional arguments for future compatibility.

        Returns:
            List of tools that are ready to use.
        """
        pass

    @abstractmethod
    def add_consumer(self, consumer_id: Any, **kwargs: Any) -> None:
        """Add a consumer to this tool provider.

        Args:
            consumer_id: Unique identifier for the consumer.
            **kwargs: Additional arguments for future compatibility.
        """
        pass

    @abstractmethod
    def remove_consumer(self, consumer_id: Any, **kwargs: Any) -> None:
        """Remove a consumer from this tool provider.

        This method must be idempotent - calling it multiple times with the same ID
        should have no additional effect after the first call.

        Provider may clean up resources when no consumers remain.

        Args:
            consumer_id: Unique identifier for the consumer.
            **kwargs: Additional arguments for future compatibility.
        """
        pass

add_consumer(consumer_id, **kwargs) abstractmethod

Add a consumer to this tool provider.

Parameters:

Name Type Description Default
consumer_id Any

Unique identifier for the consumer.

required
**kwargs Any

Additional arguments for future compatibility.

{}
Source code in strands/tools/tool_provider.py
30
31
32
33
34
35
36
37
38
@abstractmethod
def add_consumer(self, consumer_id: Any, **kwargs: Any) -> None:
    """Add a consumer to this tool provider.

    Args:
        consumer_id: Unique identifier for the consumer.
        **kwargs: Additional arguments for future compatibility.
    """
    pass

load_tools(**kwargs) abstractmethod async

Load and return the tools in this provider.

Parameters:

Name Type Description Default
**kwargs Any

Additional arguments for future compatibility.

{}

Returns:

Type Description
Sequence[AgentTool]

List of tools that are ready to use.

Source code in strands/tools/tool_provider.py
18
19
20
21
22
23
24
25
26
27
28
@abstractmethod
async def load_tools(self, **kwargs: Any) -> Sequence["AgentTool"]:
    """Load and return the tools in this provider.

    Args:
        **kwargs: Additional arguments for future compatibility.

    Returns:
        List of tools that are ready to use.
    """
    pass

remove_consumer(consumer_id, **kwargs) abstractmethod

Remove a consumer from this tool provider.

This method must be idempotent - calling it multiple times with the same ID should have no additional effect after the first call.

Provider may clean up resources when no consumers remain.

Parameters:

Name Type Description Default
consumer_id Any

Unique identifier for the consumer.

required
**kwargs Any

Additional arguments for future compatibility.

{}
Source code in strands/tools/tool_provider.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@abstractmethod
def remove_consumer(self, consumer_id: Any, **kwargs: Any) -> None:
    """Remove a consumer from this tool provider.

    This method must be idempotent - calling it multiple times with the same ID
    should have no additional effect after the first call.

    Provider may clean up resources when no consumers remain.

    Args:
        consumer_id: Unique identifier for the consumer.
        **kwargs: Additional arguments for future compatibility.
    """
    pass

ToolRegistry

Central registry for all tools available to the agent.

This class manages tool registration, validation, discovery, and invocation.

Source code in strands/tools/registry.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
class ToolRegistry:
    """Central registry for all tools available to the agent.

    This class manages tool registration, validation, discovery, and invocation.
    """

    def __init__(self) -> None:
        """Initialize the tool registry."""
        self.registry: dict[str, AgentTool] = {}
        self.dynamic_tools: dict[str, AgentTool] = {}
        self.tool_config: dict[str, Any] | None = None
        self._tool_providers: list[ToolProvider] = []
        self._registry_id = str(uuid.uuid4())

    def process_tools(self, tools: list[Any]) -> list[str]:
        """Process tools list.

        Process list of tools that can contain local file path string, module import path string,
        imported modules, @tool decorated functions, or instances of AgentTool.

        Args:
            tools: List of tool specifications. Can be:

                1. Local file path to a module based tool: `./path/to/module/tool.py`
                2. Module import path

                    2.1. Path to a module based tool: `strands_tools.file_read`
                    2.2. Path to a module with multiple AgentTool instances (@tool decorated):
                        `tests.fixtures.say_tool`
                    2.3. Path to a module and a specific function: `tests.fixtures.say_tool:say`

                3. A module for a module based tool
                4. Instances of AgentTool (@tool decorated functions)
                5. Dictionaries with name/path keys (deprecated)


        Returns:
            List of tool names that were processed.
        """
        tool_names = []

        def add_tool(tool: Any) -> None:
            try:
                # String based tool
                # Can be a file path, a module path, or a module path with a targeted function. Examples:
                # './path/to/tool.py'
                # 'my.module.tool'
                # 'my.module.tool:tool_name'
                if isinstance(tool, str):
                    tools = load_tool_from_string(tool)
                    for a_tool in tools:
                        a_tool.mark_dynamic()
                        self.register_tool(a_tool)
                        tool_names.append(a_tool.tool_name)

                # Dictionary with name and path
                elif isinstance(tool, dict) and "name" in tool and "path" in tool:
                    tools = load_tool_from_string(tool["path"])

                    tool_found = False
                    for a_tool in tools:
                        if a_tool.tool_name == tool["name"]:
                            a_tool.mark_dynamic()
                            self.register_tool(a_tool)
                            tool_names.append(a_tool.tool_name)
                            tool_found = True

                    if not tool_found:
                        raise ValueError(f'Tool "{tool["name"]}" not found in "{tool["path"]}"')

                # Dictionary with path only
                elif isinstance(tool, dict) and "path" in tool:
                    tools = load_tool_from_string(tool["path"])

                    for a_tool in tools:
                        a_tool.mark_dynamic()
                        self.register_tool(a_tool)
                        tool_names.append(a_tool.tool_name)

                # Imported Python module
                elif hasattr(tool, "__file__") and inspect.ismodule(tool):
                    # Extract the tool name from the module name
                    module_tool_name = tool.__name__.split(".")[-1]

                    tools = load_tools_from_module(tool, module_tool_name)
                    for a_tool in tools:
                        self.register_tool(a_tool)
                        tool_names.append(a_tool.tool_name)

                # Case 5: AgentTools (which also covers @tool)
                elif isinstance(tool, AgentTool):
                    self.register_tool(tool)
                    tool_names.append(tool.tool_name)

                # Case 6: Nested iterable (list, tuple, etc.) - add each sub-tool
                elif isinstance(tool, Iterable) and not isinstance(tool, (str, bytes, bytearray)):
                    for t in tool:
                        add_tool(t)

                # Case 5: ToolProvider
                elif isinstance(tool, ToolProvider):
                    self._tool_providers.append(tool)
                    tool.add_consumer(self._registry_id)

                    async def get_tools() -> Sequence[AgentTool]:
                        return await tool.load_tools()

                    provider_tools = run_async(get_tools)

                    for provider_tool in provider_tools:
                        self.register_tool(provider_tool)
                        tool_names.append(provider_tool.tool_name)
                else:
                    logger.warning("tool=<%s> | unrecognized tool specification", tool)

            except Exception as e:
                exception_str = str(e)
                logger.exception("tool_name=<%s> | failed to load tool", tool)
                raise ValueError(f"Failed to load tool {tool}: {exception_str}") from e

        for tool in tools:
            add_tool(tool)
        return tool_names

    def load_tool_from_filepath(self, tool_name: str, tool_path: str) -> None:
        """DEPRECATED: Load a tool from a file path.

        Args:
            tool_name: Name of the tool.
            tool_path: Path to the tool file.

        Raises:
            FileNotFoundError: If the tool file is not found.
            ValueError: If the tool cannot be loaded.
        """
        warnings.warn(
            "load_tool_from_filepath is deprecated and will be removed in Strands SDK 2.0. "
            "`process_tools` automatically handles loading tools from a filepath.",
            DeprecationWarning,
            stacklevel=2,
        )

        from .loader import ToolLoader

        try:
            tool_path = expanduser(tool_path)
            if not os.path.exists(tool_path):
                raise FileNotFoundError(f"Tool file not found: {tool_path}")

            loaded_tools = ToolLoader.load_tools(tool_path, tool_name)
            for t in loaded_tools:
                t.mark_dynamic()
                # Because we're explicitly registering the tool we don't need an allowlist
                self.register_tool(t)
        except Exception as e:
            exception_str = str(e)
            logger.exception("tool_name=<%s> | failed to load tool", tool_name)
            raise ValueError(f"Failed to load tool {tool_name}: {exception_str}") from e

    def get_all_tools_config(self) -> dict[str, Any]:
        """Dynamically generate tool configuration by combining built-in and dynamic tools.

        Returns:
            Dictionary containing all tool configurations.
        """
        tool_config = {}
        logger.debug("getting tool configurations")

        # Add all registered tools
        for tool_name, tool in self.registry.items():
            # Make a deep copy to avoid modifying the original
            spec = tool.tool_spec.copy()
            try:
                # Normalize the schema before validation
                spec = normalize_tool_spec(spec)
                self.validate_tool_spec(spec)
                tool_config[tool_name] = spec
                logger.debug("tool_name=<%s> | loaded tool config", tool_name)
            except ValueError as e:
                logger.warning("tool_name=<%s> | spec validation failed | %s", tool_name, e)

        # Add any dynamic tools
        for tool_name, tool in self.dynamic_tools.items():
            if tool_name not in tool_config:
                # Make a deep copy to avoid modifying the original
                spec = tool.tool_spec.copy()
                try:
                    # Normalize the schema before validation
                    spec = normalize_tool_spec(spec)
                    self.validate_tool_spec(spec)
                    tool_config[tool_name] = spec
                    logger.debug("tool_name=<%s> | loaded dynamic tool config", tool_name)
                except ValueError as e:
                    logger.warning("tool_name=<%s> | dynamic tool spec validation failed | %s", tool_name, e)

        logger.debug("tool_count=<%s> | tools configured", len(tool_config))
        return tool_config

    # mypy has problems converting between DecoratedFunctionTool <-> AgentTool
    def register_tool(self, tool: AgentTool) -> None:
        """Register a tool function with the given name.

        Args:
            tool: The tool to register.
        """
        logger.debug(
            "tool_name=<%s>, tool_type=<%s>, is_dynamic=<%s> | registering tool",
            tool.tool_name,
            tool.tool_type,
            tool.is_dynamic,
        )

        # Check duplicate tool name, throw on duplicate tool names except if hot_reloading is enabled
        if tool.tool_name in self.registry and not tool.supports_hot_reload:
            raise ValueError(
                f"Tool name '{tool.tool_name}' already exists. Cannot register tools with exact same name."
            )

        # Check for normalized name conflicts (- vs _)
        if self.registry.get(tool.tool_name) is None:
            normalized_name = tool.tool_name.replace("-", "_")

            matching_tools = [
                tool_name
                for (tool_name, tool) in self.registry.items()
                if tool_name.replace("-", "_") == normalized_name
            ]

            if matching_tools:
                raise ValueError(
                    f"Tool name '{tool.tool_name}' already exists as '{matching_tools[0]}'."
                    " Cannot add a duplicate tool which differs by a '-' or '_'"
                )

        # Register in main registry
        self.registry[tool.tool_name] = tool

        # Register in dynamic tools if applicable
        if tool.is_dynamic:
            self.dynamic_tools[tool.tool_name] = tool

            if not tool.supports_hot_reload:
                logger.debug("tool_name=<%s>, tool_type=<%s> | skipping hot reloading", tool.tool_name, tool.tool_type)
                return

            logger.debug(
                "tool_name=<%s>, tool_registry=<%s>, dynamic_tools=<%s> | tool registered",
                tool.tool_name,
                list(self.registry.keys()),
                list(self.dynamic_tools.keys()),
            )

    def replace(self, new_tool: AgentTool) -> None:
        """Replace an existing tool with a new implementation.

        This performs a swap of the tool implementation in the registry.
        The replacement takes effect on the next agent invocation.

        Args:
            new_tool: New tool implementation. Its name must match the tool being replaced.

        Raises:
            ValueError: If the tool doesn't exist.
        """
        tool_name = new_tool.tool_name

        if tool_name not in self.registry:
            raise ValueError(f"Cannot replace tool '{tool_name}' - tool does not exist")

        # Update main registry
        self.registry[tool_name] = new_tool

        # Update dynamic_tools to match new tool's dynamic status
        if new_tool.is_dynamic:
            self.dynamic_tools[tool_name] = new_tool
        elif tool_name in self.dynamic_tools:
            del self.dynamic_tools[tool_name]

    def get_tools_dirs(self) -> list[Path]:
        """Get all tool directory paths.

        Returns:
            A list of Path objects for current working directory's "./tools/".
        """
        # Current working directory's tools directory
        cwd_tools_dir = Path.cwd() / "tools"

        # Return all directories that exist
        tool_dirs = []
        for directory in [cwd_tools_dir]:
            if directory.exists() and directory.is_dir():
                tool_dirs.append(directory)
                logger.debug("tools_dir=<%s> | found tools directory", directory)
            else:
                logger.debug("tools_dir=<%s> | tools directory not found", directory)

        return tool_dirs

    def discover_tool_modules(self) -> dict[str, Path]:
        """Discover available tool modules in all tools directories.

        Returns:
            Dictionary mapping tool names to their full paths.
        """
        tool_modules = {}
        tools_dirs = self.get_tools_dirs()

        for tools_dir in tools_dirs:
            logger.debug("tools_dir=<%s> | scanning", tools_dir)

            # Find Python tools
            for extension in ["*.py"]:
                for item in tools_dir.glob(extension):
                    if item.is_file() and not item.name.startswith("__"):
                        module_name = item.stem
                        # If tool already exists, newer paths take precedence
                        if module_name in tool_modules:
                            logger.debug("tools_dir=<%s>, module_name=<%s> | tool overridden", tools_dir, module_name)
                        tool_modules[module_name] = item

        logger.debug("tool_modules=<%s> | discovered", list(tool_modules.keys()))
        return tool_modules

    def reload_tool(self, tool_name: str) -> None:
        """Reload a specific tool module.

        Args:
            tool_name: Name of the tool to reload.

        Raises:
            FileNotFoundError: If the tool file cannot be found.
            ImportError: If there are issues importing the tool module.
            ValueError: If the tool specification is invalid or required components are missing.
            Exception: For other errors during tool reloading.
        """
        try:
            # Check for tool file
            logger.debug("tool_name=<%s> | searching directories for tool", tool_name)
            tools_dirs = self.get_tools_dirs()
            tool_path = None

            # Search for the tool file in all tool directories
            for tools_dir in tools_dirs:
                temp_path = tools_dir / f"{tool_name}.py"
                if temp_path.exists():
                    tool_path = temp_path
                    break

            if not tool_path:
                raise FileNotFoundError(f"No tool file found for: {tool_name}")

            logger.debug("tool_name=<%s> | reloading tool", tool_name)

            # Add tool directory to path temporarily
            tool_dir = str(tool_path.parent)
            sys.path.insert(0, tool_dir)
            try:
                # Load the module directly using spec
                spec = util.spec_from_file_location(tool_name, str(tool_path))
                if spec is None:
                    raise ImportError(f"Could not load spec for {tool_name}")

                module = util.module_from_spec(spec)
                sys.modules[tool_name] = module

                if spec.loader is None:
                    raise ImportError(f"Could not load {tool_name}")

                spec.loader.exec_module(module)

            finally:
                # Remove the temporary path
                sys.path.remove(tool_dir)

            # Look for function-based tools first
            try:
                function_tools = self._scan_module_for_tools(module)

                if function_tools:
                    for function_tool in function_tools:
                        # Register the function-based tool
                        self.register_tool(function_tool)

                        # Update tool configuration if available
                        if self.tool_config is not None:
                            self._update_tool_config(self.tool_config, {"spec": function_tool.tool_spec})

                    logger.debug("tool_name=<%s> | successfully reloaded function-based tool from module", tool_name)
                    return
            except ImportError:
                logger.debug("function tool loader not available | falling back to traditional tools")

            # Fall back to traditional module-level tools
            if not hasattr(module, "TOOL_SPEC"):
                raise ValueError(
                    f"Tool {tool_name} is missing TOOL_SPEC (neither at module level nor as a decorated function)"
                )

            expected_func_name = tool_name
            if not hasattr(module, expected_func_name):
                raise ValueError(f"Tool {tool_name} is missing {expected_func_name} function")

            tool_function = getattr(module, expected_func_name)
            if not callable(tool_function):
                raise ValueError(f"Tool {tool_name} function is not callable")

            # Validate tool spec
            self.validate_tool_spec(module.TOOL_SPEC)

            new_tool = PythonAgentTool(tool_name, module.TOOL_SPEC, tool_function)

            # Register the tool
            self.register_tool(new_tool)

            # Update tool configuration if available
            if self.tool_config is not None:
                self._update_tool_config(self.tool_config, {"spec": module.TOOL_SPEC})
            logger.debug("tool_name=<%s> | successfully reloaded tool", tool_name)

        except Exception:
            logger.exception("tool_name=<%s> | failed to reload tool", tool_name)
            raise

    def initialize_tools(self, load_tools_from_directory: bool = False) -> None:
        """Initialize all tools by discovering and loading them dynamically from all tool directories.

        Args:
            load_tools_from_directory: Whether to reload tools if changes are made at runtime.
        """
        self.tool_config = None

        # Then discover and load other tools
        tool_modules = self.discover_tool_modules()
        successful_loads = 0
        total_tools = len(tool_modules)
        tool_import_errors = {}

        # Process Python tools
        for tool_name, tool_path in tool_modules.items():
            if tool_name in ["__init__"]:
                continue

            if not load_tools_from_directory:
                continue

            try:
                # Add directory to path temporarily
                tool_dir = str(tool_path.parent)
                sys.path.insert(0, tool_dir)
                try:
                    module = import_module(tool_name)
                finally:
                    if tool_dir in sys.path:
                        sys.path.remove(tool_dir)

                # Process Python tool
                if tool_path.suffix == ".py":
                    # Check for decorated function tools first
                    try:
                        function_tools = self._scan_module_for_tools(module)

                        if function_tools:
                            for function_tool in function_tools:
                                self.register_tool(function_tool)
                                successful_loads += 1
                        else:
                            # Fall back to traditional tools
                            # Check for expected tool function
                            expected_func_name = tool_name
                            if hasattr(module, expected_func_name):
                                tool_function = getattr(module, expected_func_name)
                                if not callable(tool_function):
                                    logger.warning(
                                        "tool_name=<%s> | tool function exists but is not callable", tool_name
                                    )
                                    continue

                                # Validate tool spec before registering
                                if not hasattr(module, "TOOL_SPEC"):
                                    logger.warning("tool_name=<%s> | tool is missing TOOL_SPEC | skipping", tool_name)
                                    continue

                                try:
                                    self.validate_tool_spec(module.TOOL_SPEC)
                                except ValueError as e:
                                    logger.warning("tool_name=<%s> | tool spec validation failed | %s", tool_name, e)
                                    continue

                                tool_spec = module.TOOL_SPEC
                                tool = PythonAgentTool(tool_name, tool_spec, tool_function)
                                self.register_tool(tool)
                                successful_loads += 1

                            else:
                                logger.warning("tool_name=<%s> | tool function missing", tool_name)
                    except ImportError:
                        # Function tool loader not available, fall back to traditional tools
                        # Check for expected tool function
                        expected_func_name = tool_name
                        if hasattr(module, expected_func_name):
                            tool_function = getattr(module, expected_func_name)
                            if not callable(tool_function):
                                logger.warning("tool_name=<%s> | tool function exists but is not callable", tool_name)
                                continue

                            # Validate tool spec before registering
                            if not hasattr(module, "TOOL_SPEC"):
                                logger.warning("tool_name=<%s> | tool is missing TOOL_SPEC | skipping", tool_name)
                                continue

                            try:
                                self.validate_tool_spec(module.TOOL_SPEC)
                            except ValueError as e:
                                logger.warning("tool_name=<%s> | tool spec validation failed | %s", tool_name, e)
                                continue

                            tool_spec = module.TOOL_SPEC
                            tool = PythonAgentTool(tool_name, tool_spec, tool_function)
                            self.register_tool(tool)
                            successful_loads += 1

                        else:
                            logger.warning("tool_name=<%s> | tool function missing", tool_name)

            except Exception as e:
                logger.warning("tool_name=<%s> | failed to load tool | %s", tool_name, e)
                tool_import_errors[tool_name] = str(e)

        # Log summary
        logger.debug("tool_count=<%d>, success_count=<%d> | finished loading tools", total_tools, successful_loads)
        if tool_import_errors:
            for tool_name, error in tool_import_errors.items():
                logger.debug("tool_name=<%s> | import error | %s", tool_name, error)

    def get_all_tool_specs(self) -> list[ToolSpec]:
        """Get all the tool specs for all tools in this registry..

        Returns:
            A list of ToolSpecs.
        """
        all_tools = self.get_all_tools_config()
        tools: list[ToolSpec] = [tool_spec for tool_spec in all_tools.values()]
        return tools

    def register_dynamic_tool(self, tool: AgentTool) -> None:
        """Register a tool dynamically for temporary use.

        Args:
            tool: The tool to register dynamically

        Raises:
            ValueError: If a tool with this name already exists
        """
        if tool.tool_name in self.registry or tool.tool_name in self.dynamic_tools:
            raise ValueError(f"Tool '{tool.tool_name}' already exists")

        self.dynamic_tools[tool.tool_name] = tool
        logger.debug("Registered dynamic tool: %s", tool.tool_name)

    def validate_tool_spec(self, tool_spec: ToolSpec) -> None:
        """Validate tool specification against required schema.

        Args:
            tool_spec: Tool specification to validate.

        Raises:
            ValueError: If the specification is invalid.
        """
        required_fields = ["name", "description"]
        missing_fields = [field for field in required_fields if field not in tool_spec]
        if missing_fields:
            raise ValueError(f"Missing required fields in tool spec: {', '.join(missing_fields)}")

        if "json" not in tool_spec["inputSchema"]:
            # Convert direct schema to proper format
            json_schema = normalize_schema(tool_spec["inputSchema"])
            tool_spec["inputSchema"] = {"json": json_schema}
            return

        # Validate json schema fields
        json_schema = tool_spec["inputSchema"]["json"]

        # Ensure schema has required fields
        if "type" not in json_schema:
            json_schema["type"] = "object"
        if "properties" not in json_schema:
            json_schema["properties"] = {}
        if "required" not in json_schema:
            json_schema["required"] = []

        # Validate property definitions
        for prop_name, prop_def in json_schema.get("properties", {}).items():
            if not isinstance(prop_def, dict):
                json_schema["properties"][prop_name] = {
                    "type": "string",
                    "description": f"Property {prop_name}",
                }
                continue

            # It is expected that type and description are already included in referenced $def.
            if "$ref" in prop_def:
                continue

            has_composition = any(kw in prop_def for kw in _COMPOSITION_KEYWORDS)
            if "type" not in prop_def and not has_composition:
                prop_def["type"] = "string"
            if "description" not in prop_def:
                prop_def["description"] = f"Property {prop_name}"

    class NewToolDict(TypedDict):
        """Dictionary type for adding or updating a tool in the configuration.

        Attributes:
            spec: The tool specification that defines the tool's interface and behavior.
        """

        spec: ToolSpec

    def _update_tool_config(self, tool_config: dict[str, Any], new_tool: NewToolDict) -> None:
        """Update tool configuration with a new tool.

        Args:
            tool_config: The current tool configuration dictionary.
            new_tool: The new tool to add/update.

        Raises:
            ValueError: If the new tool spec is invalid.
        """
        if not new_tool.get("spec"):
            raise ValueError("Invalid tool format - missing spec")

        # Validate tool spec before updating
        try:
            self.validate_tool_spec(new_tool["spec"])
        except ValueError as e:
            raise ValueError(f"Tool specification validation failed: {str(e)}") from e

        new_tool_name = new_tool["spec"]["name"]
        existing_tool_idx = None

        # Find if tool already exists
        for idx, tool_entry in enumerate(tool_config["tools"]):
            if tool_entry["toolSpec"]["name"] == new_tool_name:
                existing_tool_idx = idx
                break

        # Update existing tool or add new one
        new_tool_entry = {"toolSpec": new_tool["spec"]}
        if existing_tool_idx is not None:
            tool_config["tools"][existing_tool_idx] = new_tool_entry
            logger.debug("tool_name=<%s> | updated existing tool", new_tool_name)
        else:
            tool_config["tools"].append(new_tool_entry)
            logger.debug("tool_name=<%s> | added new tool", new_tool_name)

    def _scan_module_for_tools(self, module: Any) -> list[AgentTool]:
        """Scan a module for function-based tools.

        Args:
            module: The module to scan.

        Returns:
            List of FunctionTool instances found in the module.
        """
        tools: list[AgentTool] = []

        for name, obj in inspect.getmembers(module):
            if isinstance(obj, DecoratedFunctionTool):
                # Create a function tool with correct name
                try:
                    # Cast as AgentTool for mypy
                    tools.append(cast(AgentTool, obj))
                except Exception as e:
                    logger.warning("tool_name=<%s> | failed to create function tool | %s", name, e)

        return tools

    def cleanup(self, **kwargs: Any) -> None:
        """Synchronously clean up all tool providers in this registry."""
        # Attempt cleanup of all providers even if one fails to minimize resource leakage
        exceptions = []
        for provider in self._tool_providers:
            try:
                provider.remove_consumer(self._registry_id)
                logger.debug("provider=<%s> | removed provider consumer", type(provider).__name__)
            except Exception as e:
                exceptions.append(e)
                logger.error(
                    "provider=<%s>, error=<%s> | failed to remove provider consumer", type(provider).__name__, e
                )

        if exceptions:
            raise exceptions[0]

NewToolDict

Bases: TypedDict

Dictionary type for adding or updating a tool in the configuration.

Attributes:

Name Type Description
spec ToolSpec

The tool specification that defines the tool's interface and behavior.

Source code in strands/tools/registry.py
640
641
642
643
644
645
646
647
class NewToolDict(TypedDict):
    """Dictionary type for adding or updating a tool in the configuration.

    Attributes:
        spec: The tool specification that defines the tool's interface and behavior.
    """

    spec: ToolSpec

__init__()

Initialize the tool registry.

Source code in strands/tools/registry.py
37
38
39
40
41
42
43
def __init__(self) -> None:
    """Initialize the tool registry."""
    self.registry: dict[str, AgentTool] = {}
    self.dynamic_tools: dict[str, AgentTool] = {}
    self.tool_config: dict[str, Any] | None = None
    self._tool_providers: list[ToolProvider] = []
    self._registry_id = str(uuid.uuid4())

cleanup(**kwargs)

Synchronously clean up all tool providers in this registry.

Source code in strands/tools/registry.py
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
def cleanup(self, **kwargs: Any) -> None:
    """Synchronously clean up all tool providers in this registry."""
    # Attempt cleanup of all providers even if one fails to minimize resource leakage
    exceptions = []
    for provider in self._tool_providers:
        try:
            provider.remove_consumer(self._registry_id)
            logger.debug("provider=<%s> | removed provider consumer", type(provider).__name__)
        except Exception as e:
            exceptions.append(e)
            logger.error(
                "provider=<%s>, error=<%s> | failed to remove provider consumer", type(provider).__name__, e
            )

    if exceptions:
        raise exceptions[0]

discover_tool_modules()

Discover available tool modules in all tools directories.

Returns:

Type Description
dict[str, Path]

Dictionary mapping tool names to their full paths.

Source code in strands/tools/registry.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def discover_tool_modules(self) -> dict[str, Path]:
    """Discover available tool modules in all tools directories.

    Returns:
        Dictionary mapping tool names to their full paths.
    """
    tool_modules = {}
    tools_dirs = self.get_tools_dirs()

    for tools_dir in tools_dirs:
        logger.debug("tools_dir=<%s> | scanning", tools_dir)

        # Find Python tools
        for extension in ["*.py"]:
            for item in tools_dir.glob(extension):
                if item.is_file() and not item.name.startswith("__"):
                    module_name = item.stem
                    # If tool already exists, newer paths take precedence
                    if module_name in tool_modules:
                        logger.debug("tools_dir=<%s>, module_name=<%s> | tool overridden", tools_dir, module_name)
                    tool_modules[module_name] = item

    logger.debug("tool_modules=<%s> | discovered", list(tool_modules.keys()))
    return tool_modules

get_all_tool_specs()

Get all the tool specs for all tools in this registry..

Returns:

Type Description
list[ToolSpec]

A list of ToolSpecs.

Source code in strands/tools/registry.py
565
566
567
568
569
570
571
572
573
def get_all_tool_specs(self) -> list[ToolSpec]:
    """Get all the tool specs for all tools in this registry..

    Returns:
        A list of ToolSpecs.
    """
    all_tools = self.get_all_tools_config()
    tools: list[ToolSpec] = [tool_spec for tool_spec in all_tools.values()]
    return tools

get_all_tools_config()

Dynamically generate tool configuration by combining built-in and dynamic tools.

Returns:

Type Description
dict[str, Any]

Dictionary containing all tool configurations.

Source code in strands/tools/registry.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def get_all_tools_config(self) -> dict[str, Any]:
    """Dynamically generate tool configuration by combining built-in and dynamic tools.

    Returns:
        Dictionary containing all tool configurations.
    """
    tool_config = {}
    logger.debug("getting tool configurations")

    # Add all registered tools
    for tool_name, tool in self.registry.items():
        # Make a deep copy to avoid modifying the original
        spec = tool.tool_spec.copy()
        try:
            # Normalize the schema before validation
            spec = normalize_tool_spec(spec)
            self.validate_tool_spec(spec)
            tool_config[tool_name] = spec
            logger.debug("tool_name=<%s> | loaded tool config", tool_name)
        except ValueError as e:
            logger.warning("tool_name=<%s> | spec validation failed | %s", tool_name, e)

    # Add any dynamic tools
    for tool_name, tool in self.dynamic_tools.items():
        if tool_name not in tool_config:
            # Make a deep copy to avoid modifying the original
            spec = tool.tool_spec.copy()
            try:
                # Normalize the schema before validation
                spec = normalize_tool_spec(spec)
                self.validate_tool_spec(spec)
                tool_config[tool_name] = spec
                logger.debug("tool_name=<%s> | loaded dynamic tool config", tool_name)
            except ValueError as e:
                logger.warning("tool_name=<%s> | dynamic tool spec validation failed | %s", tool_name, e)

    logger.debug("tool_count=<%s> | tools configured", len(tool_config))
    return tool_config

get_tools_dirs()

Get all tool directory paths.

Returns:

Type Description
list[Path]

A list of Path objects for current working directory's "./tools/".

Source code in strands/tools/registry.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def get_tools_dirs(self) -> list[Path]:
    """Get all tool directory paths.

    Returns:
        A list of Path objects for current working directory's "./tools/".
    """
    # Current working directory's tools directory
    cwd_tools_dir = Path.cwd() / "tools"

    # Return all directories that exist
    tool_dirs = []
    for directory in [cwd_tools_dir]:
        if directory.exists() and directory.is_dir():
            tool_dirs.append(directory)
            logger.debug("tools_dir=<%s> | found tools directory", directory)
        else:
            logger.debug("tools_dir=<%s> | tools directory not found", directory)

    return tool_dirs

initialize_tools(load_tools_from_directory=False)

Initialize all tools by discovering and loading them dynamically from all tool directories.

Parameters:

Name Type Description Default
load_tools_from_directory bool

Whether to reload tools if changes are made at runtime.

False
Source code in strands/tools/registry.py
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
def initialize_tools(self, load_tools_from_directory: bool = False) -> None:
    """Initialize all tools by discovering and loading them dynamically from all tool directories.

    Args:
        load_tools_from_directory: Whether to reload tools if changes are made at runtime.
    """
    self.tool_config = None

    # Then discover and load other tools
    tool_modules = self.discover_tool_modules()
    successful_loads = 0
    total_tools = len(tool_modules)
    tool_import_errors = {}

    # Process Python tools
    for tool_name, tool_path in tool_modules.items():
        if tool_name in ["__init__"]:
            continue

        if not load_tools_from_directory:
            continue

        try:
            # Add directory to path temporarily
            tool_dir = str(tool_path.parent)
            sys.path.insert(0, tool_dir)
            try:
                module = import_module(tool_name)
            finally:
                if tool_dir in sys.path:
                    sys.path.remove(tool_dir)

            # Process Python tool
            if tool_path.suffix == ".py":
                # Check for decorated function tools first
                try:
                    function_tools = self._scan_module_for_tools(module)

                    if function_tools:
                        for function_tool in function_tools:
                            self.register_tool(function_tool)
                            successful_loads += 1
                    else:
                        # Fall back to traditional tools
                        # Check for expected tool function
                        expected_func_name = tool_name
                        if hasattr(module, expected_func_name):
                            tool_function = getattr(module, expected_func_name)
                            if not callable(tool_function):
                                logger.warning(
                                    "tool_name=<%s> | tool function exists but is not callable", tool_name
                                )
                                continue

                            # Validate tool spec before registering
                            if not hasattr(module, "TOOL_SPEC"):
                                logger.warning("tool_name=<%s> | tool is missing TOOL_SPEC | skipping", tool_name)
                                continue

                            try:
                                self.validate_tool_spec(module.TOOL_SPEC)
                            except ValueError as e:
                                logger.warning("tool_name=<%s> | tool spec validation failed | %s", tool_name, e)
                                continue

                            tool_spec = module.TOOL_SPEC
                            tool = PythonAgentTool(tool_name, tool_spec, tool_function)
                            self.register_tool(tool)
                            successful_loads += 1

                        else:
                            logger.warning("tool_name=<%s> | tool function missing", tool_name)
                except ImportError:
                    # Function tool loader not available, fall back to traditional tools
                    # Check for expected tool function
                    expected_func_name = tool_name
                    if hasattr(module, expected_func_name):
                        tool_function = getattr(module, expected_func_name)
                        if not callable(tool_function):
                            logger.warning("tool_name=<%s> | tool function exists but is not callable", tool_name)
                            continue

                        # Validate tool spec before registering
                        if not hasattr(module, "TOOL_SPEC"):
                            logger.warning("tool_name=<%s> | tool is missing TOOL_SPEC | skipping", tool_name)
                            continue

                        try:
                            self.validate_tool_spec(module.TOOL_SPEC)
                        except ValueError as e:
                            logger.warning("tool_name=<%s> | tool spec validation failed | %s", tool_name, e)
                            continue

                        tool_spec = module.TOOL_SPEC
                        tool = PythonAgentTool(tool_name, tool_spec, tool_function)
                        self.register_tool(tool)
                        successful_loads += 1

                    else:
                        logger.warning("tool_name=<%s> | tool function missing", tool_name)

        except Exception as e:
            logger.warning("tool_name=<%s> | failed to load tool | %s", tool_name, e)
            tool_import_errors[tool_name] = str(e)

    # Log summary
    logger.debug("tool_count=<%d>, success_count=<%d> | finished loading tools", total_tools, successful_loads)
    if tool_import_errors:
        for tool_name, error in tool_import_errors.items():
            logger.debug("tool_name=<%s> | import error | %s", tool_name, error)

load_tool_from_filepath(tool_name, tool_path)

DEPRECATED: Load a tool from a file path.

Parameters:

Name Type Description Default
tool_name str

Name of the tool.

required
tool_path str

Path to the tool file.

required

Raises:

Type Description
FileNotFoundError

If the tool file is not found.

ValueError

If the tool cannot be loaded.

Source code in strands/tools/registry.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def load_tool_from_filepath(self, tool_name: str, tool_path: str) -> None:
    """DEPRECATED: Load a tool from a file path.

    Args:
        tool_name: Name of the tool.
        tool_path: Path to the tool file.

    Raises:
        FileNotFoundError: If the tool file is not found.
        ValueError: If the tool cannot be loaded.
    """
    warnings.warn(
        "load_tool_from_filepath is deprecated and will be removed in Strands SDK 2.0. "
        "`process_tools` automatically handles loading tools from a filepath.",
        DeprecationWarning,
        stacklevel=2,
    )

    from .loader import ToolLoader

    try:
        tool_path = expanduser(tool_path)
        if not os.path.exists(tool_path):
            raise FileNotFoundError(f"Tool file not found: {tool_path}")

        loaded_tools = ToolLoader.load_tools(tool_path, tool_name)
        for t in loaded_tools:
            t.mark_dynamic()
            # Because we're explicitly registering the tool we don't need an allowlist
            self.register_tool(t)
    except Exception as e:
        exception_str = str(e)
        logger.exception("tool_name=<%s> | failed to load tool", tool_name)
        raise ValueError(f"Failed to load tool {tool_name}: {exception_str}") from e

process_tools(tools)

Process tools list.

Process list of tools that can contain local file path string, module import path string, imported modules, @tool decorated functions, or instances of AgentTool.

Parameters:

Name Type Description Default
tools list[Any]

List of tool specifications. Can be:

  1. Local file path to a module based tool: ./path/to/module/tool.py
  2. Module import path

    2.1. Path to a module based tool: strands_tools.file_read 2.2. Path to a module with multiple AgentTool instances (@tool decorated): tests.fixtures.say_tool 2.3. Path to a module and a specific function: tests.fixtures.say_tool:say

  3. A module for a module based tool

  4. Instances of AgentTool (@tool decorated functions)
  5. Dictionaries with name/path keys (deprecated)
required

Returns:

Type Description
list[str]

List of tool names that were processed.

Source code in strands/tools/registry.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def process_tools(self, tools: list[Any]) -> list[str]:
    """Process tools list.

    Process list of tools that can contain local file path string, module import path string,
    imported modules, @tool decorated functions, or instances of AgentTool.

    Args:
        tools: List of tool specifications. Can be:

            1. Local file path to a module based tool: `./path/to/module/tool.py`
            2. Module import path

                2.1. Path to a module based tool: `strands_tools.file_read`
                2.2. Path to a module with multiple AgentTool instances (@tool decorated):
                    `tests.fixtures.say_tool`
                2.3. Path to a module and a specific function: `tests.fixtures.say_tool:say`

            3. A module for a module based tool
            4. Instances of AgentTool (@tool decorated functions)
            5. Dictionaries with name/path keys (deprecated)


    Returns:
        List of tool names that were processed.
    """
    tool_names = []

    def add_tool(tool: Any) -> None:
        try:
            # String based tool
            # Can be a file path, a module path, or a module path with a targeted function. Examples:
            # './path/to/tool.py'
            # 'my.module.tool'
            # 'my.module.tool:tool_name'
            if isinstance(tool, str):
                tools = load_tool_from_string(tool)
                for a_tool in tools:
                    a_tool.mark_dynamic()
                    self.register_tool(a_tool)
                    tool_names.append(a_tool.tool_name)

            # Dictionary with name and path
            elif isinstance(tool, dict) and "name" in tool and "path" in tool:
                tools = load_tool_from_string(tool["path"])

                tool_found = False
                for a_tool in tools:
                    if a_tool.tool_name == tool["name"]:
                        a_tool.mark_dynamic()
                        self.register_tool(a_tool)
                        tool_names.append(a_tool.tool_name)
                        tool_found = True

                if not tool_found:
                    raise ValueError(f'Tool "{tool["name"]}" not found in "{tool["path"]}"')

            # Dictionary with path only
            elif isinstance(tool, dict) and "path" in tool:
                tools = load_tool_from_string(tool["path"])

                for a_tool in tools:
                    a_tool.mark_dynamic()
                    self.register_tool(a_tool)
                    tool_names.append(a_tool.tool_name)

            # Imported Python module
            elif hasattr(tool, "__file__") and inspect.ismodule(tool):
                # Extract the tool name from the module name
                module_tool_name = tool.__name__.split(".")[-1]

                tools = load_tools_from_module(tool, module_tool_name)
                for a_tool in tools:
                    self.register_tool(a_tool)
                    tool_names.append(a_tool.tool_name)

            # Case 5: AgentTools (which also covers @tool)
            elif isinstance(tool, AgentTool):
                self.register_tool(tool)
                tool_names.append(tool.tool_name)

            # Case 6: Nested iterable (list, tuple, etc.) - add each sub-tool
            elif isinstance(tool, Iterable) and not isinstance(tool, (str, bytes, bytearray)):
                for t in tool:
                    add_tool(t)

            # Case 5: ToolProvider
            elif isinstance(tool, ToolProvider):
                self._tool_providers.append(tool)
                tool.add_consumer(self._registry_id)

                async def get_tools() -> Sequence[AgentTool]:
                    return await tool.load_tools()

                provider_tools = run_async(get_tools)

                for provider_tool in provider_tools:
                    self.register_tool(provider_tool)
                    tool_names.append(provider_tool.tool_name)
            else:
                logger.warning("tool=<%s> | unrecognized tool specification", tool)

        except Exception as e:
            exception_str = str(e)
            logger.exception("tool_name=<%s> | failed to load tool", tool)
            raise ValueError(f"Failed to load tool {tool}: {exception_str}") from e

    for tool in tools:
        add_tool(tool)
    return tool_names

register_dynamic_tool(tool)

Register a tool dynamically for temporary use.

Parameters:

Name Type Description Default
tool AgentTool

The tool to register dynamically

required

Raises:

Type Description
ValueError

If a tool with this name already exists

Source code in strands/tools/registry.py
575
576
577
578
579
580
581
582
583
584
585
586
587
588
def register_dynamic_tool(self, tool: AgentTool) -> None:
    """Register a tool dynamically for temporary use.

    Args:
        tool: The tool to register dynamically

    Raises:
        ValueError: If a tool with this name already exists
    """
    if tool.tool_name in self.registry or tool.tool_name in self.dynamic_tools:
        raise ValueError(f"Tool '{tool.tool_name}' already exists")

    self.dynamic_tools[tool.tool_name] = tool
    logger.debug("Registered dynamic tool: %s", tool.tool_name)

register_tool(tool)

Register a tool function with the given name.

Parameters:

Name Type Description Default
tool AgentTool

The tool to register.

required
Source code in strands/tools/registry.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def register_tool(self, tool: AgentTool) -> None:
    """Register a tool function with the given name.

    Args:
        tool: The tool to register.
    """
    logger.debug(
        "tool_name=<%s>, tool_type=<%s>, is_dynamic=<%s> | registering tool",
        tool.tool_name,
        tool.tool_type,
        tool.is_dynamic,
    )

    # Check duplicate tool name, throw on duplicate tool names except if hot_reloading is enabled
    if tool.tool_name in self.registry and not tool.supports_hot_reload:
        raise ValueError(
            f"Tool name '{tool.tool_name}' already exists. Cannot register tools with exact same name."
        )

    # Check for normalized name conflicts (- vs _)
    if self.registry.get(tool.tool_name) is None:
        normalized_name = tool.tool_name.replace("-", "_")

        matching_tools = [
            tool_name
            for (tool_name, tool) in self.registry.items()
            if tool_name.replace("-", "_") == normalized_name
        ]

        if matching_tools:
            raise ValueError(
                f"Tool name '{tool.tool_name}' already exists as '{matching_tools[0]}'."
                " Cannot add a duplicate tool which differs by a '-' or '_'"
            )

    # Register in main registry
    self.registry[tool.tool_name] = tool

    # Register in dynamic tools if applicable
    if tool.is_dynamic:
        self.dynamic_tools[tool.tool_name] = tool

        if not tool.supports_hot_reload:
            logger.debug("tool_name=<%s>, tool_type=<%s> | skipping hot reloading", tool.tool_name, tool.tool_type)
            return

        logger.debug(
            "tool_name=<%s>, tool_registry=<%s>, dynamic_tools=<%s> | tool registered",
            tool.tool_name,
            list(self.registry.keys()),
            list(self.dynamic_tools.keys()),
        )

reload_tool(tool_name)

Reload a specific tool module.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to reload.

required

Raises:

Type Description
FileNotFoundError

If the tool file cannot be found.

ImportError

If there are issues importing the tool module.

ValueError

If the tool specification is invalid or required components are missing.

Exception

For other errors during tool reloading.

Source code in strands/tools/registry.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def reload_tool(self, tool_name: str) -> None:
    """Reload a specific tool module.

    Args:
        tool_name: Name of the tool to reload.

    Raises:
        FileNotFoundError: If the tool file cannot be found.
        ImportError: If there are issues importing the tool module.
        ValueError: If the tool specification is invalid or required components are missing.
        Exception: For other errors during tool reloading.
    """
    try:
        # Check for tool file
        logger.debug("tool_name=<%s> | searching directories for tool", tool_name)
        tools_dirs = self.get_tools_dirs()
        tool_path = None

        # Search for the tool file in all tool directories
        for tools_dir in tools_dirs:
            temp_path = tools_dir / f"{tool_name}.py"
            if temp_path.exists():
                tool_path = temp_path
                break

        if not tool_path:
            raise FileNotFoundError(f"No tool file found for: {tool_name}")

        logger.debug("tool_name=<%s> | reloading tool", tool_name)

        # Add tool directory to path temporarily
        tool_dir = str(tool_path.parent)
        sys.path.insert(0, tool_dir)
        try:
            # Load the module directly using spec
            spec = util.spec_from_file_location(tool_name, str(tool_path))
            if spec is None:
                raise ImportError(f"Could not load spec for {tool_name}")

            module = util.module_from_spec(spec)
            sys.modules[tool_name] = module

            if spec.loader is None:
                raise ImportError(f"Could not load {tool_name}")

            spec.loader.exec_module(module)

        finally:
            # Remove the temporary path
            sys.path.remove(tool_dir)

        # Look for function-based tools first
        try:
            function_tools = self._scan_module_for_tools(module)

            if function_tools:
                for function_tool in function_tools:
                    # Register the function-based tool
                    self.register_tool(function_tool)

                    # Update tool configuration if available
                    if self.tool_config is not None:
                        self._update_tool_config(self.tool_config, {"spec": function_tool.tool_spec})

                logger.debug("tool_name=<%s> | successfully reloaded function-based tool from module", tool_name)
                return
        except ImportError:
            logger.debug("function tool loader not available | falling back to traditional tools")

        # Fall back to traditional module-level tools
        if not hasattr(module, "TOOL_SPEC"):
            raise ValueError(
                f"Tool {tool_name} is missing TOOL_SPEC (neither at module level nor as a decorated function)"
            )

        expected_func_name = tool_name
        if not hasattr(module, expected_func_name):
            raise ValueError(f"Tool {tool_name} is missing {expected_func_name} function")

        tool_function = getattr(module, expected_func_name)
        if not callable(tool_function):
            raise ValueError(f"Tool {tool_name} function is not callable")

        # Validate tool spec
        self.validate_tool_spec(module.TOOL_SPEC)

        new_tool = PythonAgentTool(tool_name, module.TOOL_SPEC, tool_function)

        # Register the tool
        self.register_tool(new_tool)

        # Update tool configuration if available
        if self.tool_config is not None:
            self._update_tool_config(self.tool_config, {"spec": module.TOOL_SPEC})
        logger.debug("tool_name=<%s> | successfully reloaded tool", tool_name)

    except Exception:
        logger.exception("tool_name=<%s> | failed to reload tool", tool_name)
        raise

replace(new_tool)

Replace an existing tool with a new implementation.

This performs a swap of the tool implementation in the registry. The replacement takes effect on the next agent invocation.

Parameters:

Name Type Description Default
new_tool AgentTool

New tool implementation. Its name must match the tool being replaced.

required

Raises:

Type Description
ValueError

If the tool doesn't exist.

Source code in strands/tools/registry.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def replace(self, new_tool: AgentTool) -> None:
    """Replace an existing tool with a new implementation.

    This performs a swap of the tool implementation in the registry.
    The replacement takes effect on the next agent invocation.

    Args:
        new_tool: New tool implementation. Its name must match the tool being replaced.

    Raises:
        ValueError: If the tool doesn't exist.
    """
    tool_name = new_tool.tool_name

    if tool_name not in self.registry:
        raise ValueError(f"Cannot replace tool '{tool_name}' - tool does not exist")

    # Update main registry
    self.registry[tool_name] = new_tool

    # Update dynamic_tools to match new tool's dynamic status
    if new_tool.is_dynamic:
        self.dynamic_tools[tool_name] = new_tool
    elif tool_name in self.dynamic_tools:
        del self.dynamic_tools[tool_name]

validate_tool_spec(tool_spec)

Validate tool specification against required schema.

Parameters:

Name Type Description Default
tool_spec ToolSpec

Tool specification to validate.

required

Raises:

Type Description
ValueError

If the specification is invalid.

Source code in strands/tools/registry.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
def validate_tool_spec(self, tool_spec: ToolSpec) -> None:
    """Validate tool specification against required schema.

    Args:
        tool_spec: Tool specification to validate.

    Raises:
        ValueError: If the specification is invalid.
    """
    required_fields = ["name", "description"]
    missing_fields = [field for field in required_fields if field not in tool_spec]
    if missing_fields:
        raise ValueError(f"Missing required fields in tool spec: {', '.join(missing_fields)}")

    if "json" not in tool_spec["inputSchema"]:
        # Convert direct schema to proper format
        json_schema = normalize_schema(tool_spec["inputSchema"])
        tool_spec["inputSchema"] = {"json": json_schema}
        return

    # Validate json schema fields
    json_schema = tool_spec["inputSchema"]["json"]

    # Ensure schema has required fields
    if "type" not in json_schema:
        json_schema["type"] = "object"
    if "properties" not in json_schema:
        json_schema["properties"] = {}
    if "required" not in json_schema:
        json_schema["required"] = []

    # Validate property definitions
    for prop_name, prop_def in json_schema.get("properties", {}).items():
        if not isinstance(prop_def, dict):
            json_schema["properties"][prop_name] = {
                "type": "string",
                "description": f"Property {prop_name}",
            }
            continue

        # It is expected that type and description are already included in referenced $def.
        if "$ref" in prop_def:
            continue

        has_composition = any(kw in prop_def for kw in _COMPOSITION_KEYWORDS)
        if "type" not in prop_def and not has_composition:
            prop_def["type"] = "string"
        if "description" not in prop_def:
            prop_def["description"] = f"Property {prop_name}"

ToolWatcher

Watches tool directories for changes and reloads tools when they are modified.

Source code in strands/tools/watcher.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class ToolWatcher:
    """Watches tool directories for changes and reloads tools when they are modified."""

    # This class uses class variables for the observer and handlers because watchdog allows only one Observer instance
    # per directory. Using class variables ensures that all ToolWatcher instances share a single Observer, with the
    # MasterChangeHandler routing file system events to the appropriate individual handlers for each registry. This
    # design pattern avoids conflicts when multiple tool registries are watching the same directories.

    _shared_observer = None
    _watched_dirs: set[str] = set()
    _observer_started = False
    _registry_handlers: dict[str, dict[int, "ToolWatcher.ToolChangeHandler"]] = {}

    def __init__(self, tool_registry: ToolRegistry) -> None:
        """Initialize a tool watcher for the given tool registry.

        Args:
            tool_registry: The tool registry to report changes.
        """
        self.tool_registry = tool_registry
        self.start()

    class ToolChangeHandler(FileSystemEventHandler):
        """Handler for tool file changes."""

        def __init__(self, tool_registry: ToolRegistry) -> None:
            """Initialize a tool change handler.

            Args:
                tool_registry: The tool registry to update when tools change.
            """
            self.tool_registry = tool_registry

        def on_modified(self, event: Any) -> None:
            """Reload tool if file modification detected.

            Args:
                event: The file system event that triggered this handler.
            """
            if event.src_path.endswith(".py"):
                tool_path = Path(event.src_path)
                tool_name = tool_path.stem

                if tool_name not in ["__init__"]:
                    logger.debug("tool_name=<%s> | tool change detected", tool_name)
                    try:
                        self.tool_registry.reload_tool(tool_name)
                    except Exception as e:
                        logger.error("tool_name=<%s>, exception=<%s> | failed to reload tool", tool_name, str(e))

    class MasterChangeHandler(FileSystemEventHandler):
        """Master handler that delegates to all registered handlers."""

        def __init__(self, dir_path: str) -> None:
            """Initialize a master change handler for a specific directory.

            Args:
                dir_path: The directory path to watch.
            """
            self.dir_path = dir_path

        def on_modified(self, event: Any) -> None:
            """Delegate file modification events to all registered handlers.

            Args:
                event: The file system event that triggered this handler.
            """
            if event.src_path.endswith(".py"):
                tool_path = Path(event.src_path)
                tool_name = tool_path.stem

                if tool_name not in ["__init__"]:
                    # Delegate to all registered handlers for this directory
                    for handler in ToolWatcher._registry_handlers.get(self.dir_path, {}).values():
                        try:
                            handler.on_modified(event)
                        except Exception as e:
                            logger.error("exception=<%s> | handler error", str(e))

    def start(self) -> None:
        """Start watching all tools directories for changes."""
        # Initialize shared observer if not already done
        if ToolWatcher._shared_observer is None:
            ToolWatcher._shared_observer = Observer()

        # Create handler for this instance
        self.tool_change_handler = self.ToolChangeHandler(self.tool_registry)
        registry_id = id(self.tool_registry)

        # Get tools directories to watch
        tools_dirs = self.tool_registry.get_tools_dirs()

        for tools_dir in tools_dirs:
            dir_str = str(tools_dir)

            # Initialize the registry handlers dict for this directory if needed
            if dir_str not in ToolWatcher._registry_handlers:
                ToolWatcher._registry_handlers[dir_str] = {}

            # Store this handler with its registry id
            ToolWatcher._registry_handlers[dir_str][registry_id] = self.tool_change_handler

            # Schedule or update the master handler for this directory
            if dir_str not in ToolWatcher._watched_dirs:
                # First time seeing this directory, create a master handler
                master_handler = self.MasterChangeHandler(dir_str)
                ToolWatcher._shared_observer.schedule(master_handler, dir_str, recursive=False)
                ToolWatcher._watched_dirs.add(dir_str)
                logger.debug("tools_dir=<%s> | started watching tools directory", tools_dir)
            else:
                # Directory already being watched, just log it
                logger.debug("tools_dir=<%s> | directory already being watched", tools_dir)

        # Start the observer if not already started
        if not ToolWatcher._observer_started:
            ToolWatcher._shared_observer.start()
            ToolWatcher._observer_started = True
            logger.debug("tool directory watching initialized")

MasterChangeHandler

Bases: FileSystemEventHandler

Master handler that delegates to all registered handlers.

Source code in strands/tools/watcher.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class MasterChangeHandler(FileSystemEventHandler):
    """Master handler that delegates to all registered handlers."""

    def __init__(self, dir_path: str) -> None:
        """Initialize a master change handler for a specific directory.

        Args:
            dir_path: The directory path to watch.
        """
        self.dir_path = dir_path

    def on_modified(self, event: Any) -> None:
        """Delegate file modification events to all registered handlers.

        Args:
            event: The file system event that triggered this handler.
        """
        if event.src_path.endswith(".py"):
            tool_path = Path(event.src_path)
            tool_name = tool_path.stem

            if tool_name not in ["__init__"]:
                # Delegate to all registered handlers for this directory
                for handler in ToolWatcher._registry_handlers.get(self.dir_path, {}).values():
                    try:
                        handler.on_modified(event)
                    except Exception as e:
                        logger.error("exception=<%s> | handler error", str(e))

__init__(dir_path)

Initialize a master change handler for a specific directory.

Parameters:

Name Type Description Default
dir_path str

The directory path to watch.

required
Source code in strands/tools/watcher.py
72
73
74
75
76
77
78
def __init__(self, dir_path: str) -> None:
    """Initialize a master change handler for a specific directory.

    Args:
        dir_path: The directory path to watch.
    """
    self.dir_path = dir_path

on_modified(event)

Delegate file modification events to all registered handlers.

Parameters:

Name Type Description Default
event Any

The file system event that triggered this handler.

required
Source code in strands/tools/watcher.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def on_modified(self, event: Any) -> None:
    """Delegate file modification events to all registered handlers.

    Args:
        event: The file system event that triggered this handler.
    """
    if event.src_path.endswith(".py"):
        tool_path = Path(event.src_path)
        tool_name = tool_path.stem

        if tool_name not in ["__init__"]:
            # Delegate to all registered handlers for this directory
            for handler in ToolWatcher._registry_handlers.get(self.dir_path, {}).values():
                try:
                    handler.on_modified(event)
                except Exception as e:
                    logger.error("exception=<%s> | handler error", str(e))

ToolChangeHandler

Bases: FileSystemEventHandler

Handler for tool file changes.

Source code in strands/tools/watcher.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class ToolChangeHandler(FileSystemEventHandler):
    """Handler for tool file changes."""

    def __init__(self, tool_registry: ToolRegistry) -> None:
        """Initialize a tool change handler.

        Args:
            tool_registry: The tool registry to update when tools change.
        """
        self.tool_registry = tool_registry

    def on_modified(self, event: Any) -> None:
        """Reload tool if file modification detected.

        Args:
            event: The file system event that triggered this handler.
        """
        if event.src_path.endswith(".py"):
            tool_path = Path(event.src_path)
            tool_name = tool_path.stem

            if tool_name not in ["__init__"]:
                logger.debug("tool_name=<%s> | tool change detected", tool_name)
                try:
                    self.tool_registry.reload_tool(tool_name)
                except Exception as e:
                    logger.error("tool_name=<%s>, exception=<%s> | failed to reload tool", tool_name, str(e))

__init__(tool_registry)

Initialize a tool change handler.

Parameters:

Name Type Description Default
tool_registry ToolRegistry

The tool registry to update when tools change.

required
Source code in strands/tools/watcher.py
44
45
46
47
48
49
50
def __init__(self, tool_registry: ToolRegistry) -> None:
    """Initialize a tool change handler.

    Args:
        tool_registry: The tool registry to update when tools change.
    """
    self.tool_registry = tool_registry

on_modified(event)

Reload tool if file modification detected.

Parameters:

Name Type Description Default
event Any

The file system event that triggered this handler.

required
Source code in strands/tools/watcher.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def on_modified(self, event: Any) -> None:
    """Reload tool if file modification detected.

    Args:
        event: The file system event that triggered this handler.
    """
    if event.src_path.endswith(".py"):
        tool_path = Path(event.src_path)
        tool_name = tool_path.stem

        if tool_name not in ["__init__"]:
            logger.debug("tool_name=<%s> | tool change detected", tool_name)
            try:
                self.tool_registry.reload_tool(tool_name)
            except Exception as e:
                logger.error("tool_name=<%s>, exception=<%s> | failed to reload tool", tool_name, str(e))

__init__(tool_registry)

Initialize a tool watcher for the given tool registry.

Parameters:

Name Type Description Default
tool_registry ToolRegistry

The tool registry to report changes.

required
Source code in strands/tools/watcher.py
32
33
34
35
36
37
38
39
def __init__(self, tool_registry: ToolRegistry) -> None:
    """Initialize a tool watcher for the given tool registry.

    Args:
        tool_registry: The tool registry to report changes.
    """
    self.tool_registry = tool_registry
    self.start()

start()

Start watching all tools directories for changes.

Source code in strands/tools/watcher.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def start(self) -> None:
    """Start watching all tools directories for changes."""
    # Initialize shared observer if not already done
    if ToolWatcher._shared_observer is None:
        ToolWatcher._shared_observer = Observer()

    # Create handler for this instance
    self.tool_change_handler = self.ToolChangeHandler(self.tool_registry)
    registry_id = id(self.tool_registry)

    # Get tools directories to watch
    tools_dirs = self.tool_registry.get_tools_dirs()

    for tools_dir in tools_dirs:
        dir_str = str(tools_dir)

        # Initialize the registry handlers dict for this directory if needed
        if dir_str not in ToolWatcher._registry_handlers:
            ToolWatcher._registry_handlers[dir_str] = {}

        # Store this handler with its registry id
        ToolWatcher._registry_handlers[dir_str][registry_id] = self.tool_change_handler

        # Schedule or update the master handler for this directory
        if dir_str not in ToolWatcher._watched_dirs:
            # First time seeing this directory, create a master handler
            master_handler = self.MasterChangeHandler(dir_str)
            ToolWatcher._shared_observer.schedule(master_handler, dir_str, recursive=False)
            ToolWatcher._watched_dirs.add(dir_str)
            logger.debug("tools_dir=<%s> | started watching tools directory", tools_dir)
        else:
            # Directory already being watched, just log it
            logger.debug("tools_dir=<%s> | directory already being watched", tools_dir)

    # Start the observer if not already started
    if not ToolWatcher._observer_started:
        ToolWatcher._shared_observer.start()
        ToolWatcher._observer_started = True
        logger.debug("tool directory watching initialized")

_BidiAgentLoop

Agent loop.

Attributes:

Name Type Description
_agent

BidiAgent instance to loop.

_started

Flag if agent loop has started.

_task_pool

Track active async tasks created in loop.

_event_queue Queue

Queue model and tool call events for receiver.

_invocation_state dict[str, Any]

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

_send_gate

Gate the sending of events to the model. Blocks when agent is reseting the model connection after timeout.

Source code in strands/experimental/bidi/agent/loop.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
class _BidiAgentLoop:
    """Agent loop.

    Attributes:
        _agent: BidiAgent instance to loop.
        _started: Flag if agent loop has started.
        _task_pool: Track active async tasks created in loop.
        _event_queue: Queue model and tool call events for receiver.
        _invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.
        _send_gate: Gate the sending of events to the model.
            Blocks when agent is reseting the model connection after timeout.
    """

    def __init__(self, agent: "BidiAgent") -> None:
        """Initialize members of the agent loop.

        Note, before receiving events from the loop, the user must call `start`.

        Args:
            agent: Bidirectional agent to loop over.
        """
        self._agent = agent
        self._started = False
        self._task_pool = _TaskPool()
        self._event_queue: asyncio.Queue
        self._invocation_state: dict[str, Any]

        self._send_gate = asyncio.Event()

    async def start(self, invocation_state: dict[str, Any] | None = None) -> None:
        """Start the agent loop.

        The agent model is started as part of this call.

        Args:
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Raises:
            RuntimeError: If loop already started.
        """
        if self._started:
            raise RuntimeError("loop already started | call stop before starting again")

        logger.debug("agent loop starting")
        await self._agent.hooks.invoke_callbacks_async(BidiBeforeInvocationEvent(agent=self._agent))

        await self._agent.model.start(
            system_prompt=self._agent.system_prompt,
            tools=self._agent.tool_registry.get_all_tool_specs(),
            messages=self._agent.messages,
        )

        self._event_queue = asyncio.Queue(maxsize=1)

        self._task_pool = _TaskPool()
        self._task_pool.create(self._run_model())

        self._invocation_state = invocation_state or {}
        self._send_gate.set()
        self._started = True

    async def stop(self) -> None:
        """Stop the agent loop."""
        logger.debug("agent loop stopping")

        self._started = False
        self._send_gate.clear()
        self._invocation_state = {}

        async def stop_tasks() -> None:
            await self._task_pool.cancel()

        async def stop_model() -> None:
            await self._agent.model.stop()

        try:
            await stop_all(stop_tasks, stop_model)
        finally:
            await self._agent.hooks.invoke_callbacks_async(BidiAfterInvocationEvent(agent=self._agent))

    async def send(self, event: BidiInputEvent | ToolResultEvent) -> None:
        """Send model event.

        Additionally, add text input to messages array.

        Args:
            event: User input event or tool result.

        Raises:
            RuntimeError: If start has not been called.
        """
        if not self._started:
            raise RuntimeError("loop not started | call start before sending")

        if not self._send_gate.is_set():
            logger.debug("waiting for model send signal")
            await self._send_gate.wait()

        if isinstance(event, BidiTextInputEvent):
            message: Message = {"role": "user", "content": [{"text": event.text}]}
            await self._agent._append_messages(message)

        await self._agent.model.send(event)

    async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
        """Receive model and tool call events.

        Returns:
            Model and tool call events.

        Raises:
            RuntimeError: If start has not been called.
        """
        if not self._started:
            raise RuntimeError("loop not started | call start before receiving")

        while True:
            event = await self._event_queue.get()
            if isinstance(event, BidiModelTimeoutError):
                logger.debug("model timeout error received")
                yield BidiConnectionRestartEvent(event)
                await self._restart_connection(event)
                continue

            if isinstance(event, Exception):
                raise event

            # Check for graceful shutdown event
            if isinstance(event, BidiConnectionCloseEvent) and event.reason == "user_request":
                yield event
                break

            yield event

    async def _restart_connection(self, timeout_error: BidiModelTimeoutError) -> None:
        """Restart the model connection after timeout.

        Args:
            timeout_error: Timeout error reported by the model.
        """
        logger.debug("reseting model connection")

        self._send_gate.clear()

        await self._agent.hooks.invoke_callbacks_async(BidiBeforeConnectionRestartEvent(self._agent, timeout_error))

        restart_exception = None
        try:
            await self._agent.model.stop()
            await self._agent.model.start(
                self._agent.system_prompt,
                self._agent.tool_registry.get_all_tool_specs(),
                self._agent.messages,
                **timeout_error.restart_config,
            )
            self._task_pool.create(self._run_model())
        except Exception as exception:
            restart_exception = exception
        finally:
            await self._agent.hooks.invoke_callbacks_async(
                BidiAfterConnectionRestartEvent(self._agent, restart_exception)
            )

        self._send_gate.set()

    async def _run_model(self) -> None:
        """Task for running the model.

        Events are streamed through the event queue.
        """
        logger.debug("model task starting")

        try:
            async for event in self._agent.model.receive():
                await self._event_queue.put(event)

                if isinstance(event, BidiTranscriptStreamEvent):
                    if event["is_final"]:
                        message: Message = {"role": event["role"], "content": [{"text": event["text"]}]}
                        await self._agent._append_messages(message)

                elif isinstance(event, ToolUseStreamEvent):
                    tool_use = event["current_tool_use"]
                    self._task_pool.create(self._run_tool(tool_use))

                elif isinstance(event, BidiInterruptionEvent):
                    await self._agent.hooks.invoke_callbacks_async(
                        BidiInterruptionHookEvent(
                            agent=self._agent,
                            reason=event["reason"],
                            interrupted_response_id=event.get("interrupted_response_id"),
                        )
                    )

        except Exception as error:
            await self._event_queue.put(error)

    async def _run_tool(self, tool_use: ToolUse) -> None:
        """Task for running tool requested by the model using the tool executor.

        Args:
            tool_use: Tool use request from model.
        """
        logger.debug("tool_name=<%s> | tool execution starting", tool_use["name"])

        tool_results: list[ToolResult] = []

        invocation_state: dict[str, Any] = {
            **self._invocation_state,
            "agent": self._agent,
            "model": self._agent.model,
            "messages": self._agent.messages,
            "system_prompt": self._agent.system_prompt,
        }

        try:
            tool_events = self._agent.tool_executor._stream(
                self._agent,
                tool_use,
                tool_results,
                invocation_state,
                structured_output_context=None,
            )

            async for tool_event in tool_events:
                if isinstance(tool_event, ToolInterruptEvent):
                    self._agent._interrupt_state.deactivate()
                    interrupt_names = [interrupt.name for interrupt in tool_event.interrupts]
                    raise RuntimeError(f"interrupts={interrupt_names} | tool interrupts are not supported in bidi")

                await self._event_queue.put(tool_event)

            # Normal flow for all tools (including stop_conversation)
            tool_result_event = cast(ToolResultEvent, tool_event)

            tool_use_message: Message = {"role": "assistant", "content": [{"toolUse": tool_use}]}
            tool_result_message: Message = {"role": "user", "content": [{"toolResult": tool_result_event.tool_result}]}
            await self._agent._append_messages(tool_use_message, tool_result_message)

            await self._event_queue.put(ToolResultMessageEvent(tool_result_message))

            # Check for stop_conversation before sending to model
            if tool_use["name"] == "stop_conversation":
                logger.info("tool_name=<%s> | conversation stop requested, skipping model send", tool_use["name"])
                connection_id = getattr(self._agent.model, "_connection_id", "unknown")
                await self._event_queue.put(
                    BidiConnectionCloseEvent(connection_id=connection_id, reason="user_request")
                )
                return  # Skip the model send

            # Send result to model (all tools except stop_conversation)
            await self.send(tool_result_event)

        except Exception as error:
            await self._event_queue.put(error)

__init__(agent)

Initialize members of the agent loop.

Note, before receiving events from the loop, the user must call start.

Parameters:

Name Type Description Default
agent BidiAgent

Bidirectional agent to loop over.

required
Source code in strands/experimental/bidi/agent/loop.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(self, agent: "BidiAgent") -> None:
    """Initialize members of the agent loop.

    Note, before receiving events from the loop, the user must call `start`.

    Args:
        agent: Bidirectional agent to loop over.
    """
    self._agent = agent
    self._started = False
    self._task_pool = _TaskPool()
    self._event_queue: asyncio.Queue
    self._invocation_state: dict[str, Any]

    self._send_gate = asyncio.Event()

receive() async

Receive model and tool call events.

Returns:

Type Description
AsyncGenerator[BidiOutputEvent, None]

Model and tool call events.

Raises:

Type Description
RuntimeError

If start has not been called.

Source code in strands/experimental/bidi/agent/loop.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
    """Receive model and tool call events.

    Returns:
        Model and tool call events.

    Raises:
        RuntimeError: If start has not been called.
    """
    if not self._started:
        raise RuntimeError("loop not started | call start before receiving")

    while True:
        event = await self._event_queue.get()
        if isinstance(event, BidiModelTimeoutError):
            logger.debug("model timeout error received")
            yield BidiConnectionRestartEvent(event)
            await self._restart_connection(event)
            continue

        if isinstance(event, Exception):
            raise event

        # Check for graceful shutdown event
        if isinstance(event, BidiConnectionCloseEvent) and event.reason == "user_request":
            yield event
            break

        yield event

send(event) async

Send model event.

Additionally, add text input to messages array.

Parameters:

Name Type Description Default
event BidiInputEvent | ToolResultEvent

User input event or tool result.

required

Raises:

Type Description
RuntimeError

If start has not been called.

Source code in strands/experimental/bidi/agent/loop.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
async def send(self, event: BidiInputEvent | ToolResultEvent) -> None:
    """Send model event.

    Additionally, add text input to messages array.

    Args:
        event: User input event or tool result.

    Raises:
        RuntimeError: If start has not been called.
    """
    if not self._started:
        raise RuntimeError("loop not started | call start before sending")

    if not self._send_gate.is_set():
        logger.debug("waiting for model send signal")
        await self._send_gate.wait()

    if isinstance(event, BidiTextInputEvent):
        message: Message = {"role": "user", "content": [{"text": event.text}]}
        await self._agent._append_messages(message)

    await self._agent.model.send(event)

start(invocation_state=None) async

Start the agent loop.

The agent model is started as part of this call.

Parameters:

Name Type Description Default
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None

Raises:

Type Description
RuntimeError

If loop already started.

Source code in strands/experimental/bidi/agent/loop.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
async def start(self, invocation_state: dict[str, Any] | None = None) -> None:
    """Start the agent loop.

    The agent model is started as part of this call.

    Args:
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Raises:
        RuntimeError: If loop already started.
    """
    if self._started:
        raise RuntimeError("loop already started | call stop before starting again")

    logger.debug("agent loop starting")
    await self._agent.hooks.invoke_callbacks_async(BidiBeforeInvocationEvent(agent=self._agent))

    await self._agent.model.start(
        system_prompt=self._agent.system_prompt,
        tools=self._agent.tool_registry.get_all_tool_specs(),
        messages=self._agent.messages,
    )

    self._event_queue = asyncio.Queue(maxsize=1)

    self._task_pool = _TaskPool()
    self._task_pool.create(self._run_model())

    self._invocation_state = invocation_state or {}
    self._send_gate.set()
    self._started = True

stop() async

Stop the agent loop.

Source code in strands/experimental/bidi/agent/loop.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
async def stop(self) -> None:
    """Stop the agent loop."""
    logger.debug("agent loop stopping")

    self._started = False
    self._send_gate.clear()
    self._invocation_state = {}

    async def stop_tasks() -> None:
        await self._task_pool.cancel()

    async def stop_model() -> None:
        await self._agent.model.stop()

    try:
        await stop_all(stop_tasks, stop_model)
    finally:
        await self._agent.hooks.invoke_callbacks_async(BidiAfterInvocationEvent(agent=self._agent))

_InterruptState dataclass

Track the state of interrupt events raised by the user.

Note, interrupt state is cleared after resuming.

Attributes:

Name Type Description
interrupts dict[str, Interrupt]

Interrupts raised by the user.

context dict[str, Any]

Additional context associated with an interrupt event.

activated bool

True if agent is in an interrupt state, False otherwise.

Source code in strands/interrupt.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@dataclass
class _InterruptState:
    """Track the state of interrupt events raised by the user.

    Note, interrupt state is cleared after resuming.

    Attributes:
        interrupts: Interrupts raised by the user.
        context: Additional context associated with an interrupt event.
        activated: True if agent is in an interrupt state, False otherwise.
    """

    interrupts: dict[str, Interrupt] = field(default_factory=dict)
    context: dict[str, Any] = field(default_factory=dict)
    activated: bool = False

    def activate(self) -> None:
        """Activate the interrupt state."""
        self.activated = True

    def deactivate(self) -> None:
        """Deacitvate the interrupt state.

        Interrupts and context are cleared.
        """
        self.interrupts = {}
        self.context = {}
        self.activated = False

    def resume(self, prompt: "AgentInput") -> None:
        """Configure the interrupt state if resuming from an interrupt event.

        Args:
            prompt: User responses if resuming from interrupt.

        Raises:
            TypeError: If in interrupt state but user did not provide responses.
        """
        if not self.activated:
            return

        if not isinstance(prompt, list):
            raise TypeError(f"prompt_type={type(prompt)} | must resume from interrupt with list of interruptResponse's")

        invalid_types = [
            content_type for content in prompt for content_type in content if content_type != "interruptResponse"
        ]
        if invalid_types:
            raise TypeError(
                f"content_types=<{invalid_types}> | must resume from interrupt with list of interruptResponse's"
            )

        contents = cast(list["InterruptResponseContent"], prompt)
        for content in contents:
            interrupt_id = content["interruptResponse"]["interruptId"]
            interrupt_response = content["interruptResponse"]["response"]

            if interrupt_id not in self.interrupts:
                raise KeyError(f"interrupt_id=<{interrupt_id}> | no interrupt found")

            self.interrupts[interrupt_id].response = interrupt_response

        self.context["responses"] = contents

    def to_dict(self) -> dict[str, Any]:
        """Serialize to dict for session management."""
        return asdict(self)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "_InterruptState":
        """Initiailize interrupt state from serialized interrupt state.

        Interrupt state can be serialized with the `to_dict` method.
        """
        return cls(
            interrupts={
                interrupt_id: Interrupt(**interrupt_data) for interrupt_id, interrupt_data in data["interrupts"].items()
            },
            context=data["context"],
            activated=data["activated"],
        )

activate()

Activate the interrupt state.

Source code in strands/interrupt.py
56
57
58
def activate(self) -> None:
    """Activate the interrupt state."""
    self.activated = True

deactivate()

Deacitvate the interrupt state.

Interrupts and context are cleared.

Source code in strands/interrupt.py
60
61
62
63
64
65
66
67
def deactivate(self) -> None:
    """Deacitvate the interrupt state.

    Interrupts and context are cleared.
    """
    self.interrupts = {}
    self.context = {}
    self.activated = False

from_dict(data) classmethod

Initiailize interrupt state from serialized interrupt state.

Interrupt state can be serialized with the to_dict method.

Source code in strands/interrupt.py
108
109
110
111
112
113
114
115
116
117
118
119
120
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "_InterruptState":
    """Initiailize interrupt state from serialized interrupt state.

    Interrupt state can be serialized with the `to_dict` method.
    """
    return cls(
        interrupts={
            interrupt_id: Interrupt(**interrupt_data) for interrupt_id, interrupt_data in data["interrupts"].items()
        },
        context=data["context"],
        activated=data["activated"],
    )

resume(prompt)

Configure the interrupt state if resuming from an interrupt event.

Parameters:

Name Type Description Default
prompt AgentInput

User responses if resuming from interrupt.

required

Raises:

Type Description
TypeError

If in interrupt state but user did not provide responses.

Source code in strands/interrupt.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def resume(self, prompt: "AgentInput") -> None:
    """Configure the interrupt state if resuming from an interrupt event.

    Args:
        prompt: User responses if resuming from interrupt.

    Raises:
        TypeError: If in interrupt state but user did not provide responses.
    """
    if not self.activated:
        return

    if not isinstance(prompt, list):
        raise TypeError(f"prompt_type={type(prompt)} | must resume from interrupt with list of interruptResponse's")

    invalid_types = [
        content_type for content in prompt for content_type in content if content_type != "interruptResponse"
    ]
    if invalid_types:
        raise TypeError(
            f"content_types=<{invalid_types}> | must resume from interrupt with list of interruptResponse's"
        )

    contents = cast(list["InterruptResponseContent"], prompt)
    for content in contents:
        interrupt_id = content["interruptResponse"]["interruptId"]
        interrupt_response = content["interruptResponse"]["response"]

        if interrupt_id not in self.interrupts:
            raise KeyError(f"interrupt_id=<{interrupt_id}> | no interrupt found")

        self.interrupts[interrupt_id].response = interrupt_response

    self.context["responses"] = contents

to_dict()

Serialize to dict for session management.

Source code in strands/interrupt.py
104
105
106
def to_dict(self) -> dict[str, Any]:
    """Serialize to dict for session management."""
    return asdict(self)

_TaskGroup

Shim of asyncio.TaskGroup for use in Python 3.10.

Attributes:

Name Type Description
_tasks set[Task]

Set of tasks in group.

Source code in strands/experimental/bidi/_async/_task_group.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class _TaskGroup:
    """Shim of asyncio.TaskGroup for use in Python 3.10.

    Attributes:
        _tasks: Set of tasks in group.
    """

    _tasks: set[asyncio.Task]

    def create_task(self, coro: Coroutine[Any, Any, Any]) -> asyncio.Task:
        """Create an async task and add to group.

        Returns:
            The created task.
        """
        task = asyncio.create_task(coro)
        self._tasks.add(task)
        return task

    async def __aenter__(self) -> "_TaskGroup":
        """Setup self managed task group context."""
        self._tasks = set()
        return self

    async def __aexit__(self, *_: Any) -> None:
        """Execute tasks in group.

        The following execution rules are enforced:
        - The context stops executing all tasks if at least one task raises an Exception or the context is cancelled.
        - The context re-raises Exceptions to the caller.
        - The context re-raises CancelledErrors to the caller only if the context itself was cancelled.
        """
        try:
            pending_tasks = self._tasks
            while pending_tasks:
                done_tasks, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_EXCEPTION)

                if any(exception := done_task.exception() for done_task in done_tasks if not done_task.cancelled()):
                    break

            else:  # all tasks completed/cancelled successfully
                return

            for pending_task in pending_tasks:
                pending_task.cancel()

            await asyncio.gather(*pending_tasks, return_exceptions=True)
            raise cast(BaseException, exception)

        except asyncio.CancelledError:  # context itself was cancelled
            for task in self._tasks:
                task.cancel()

            await asyncio.gather(*self._tasks, return_exceptions=True)
            raise

        finally:
            self._tasks = set()

__aenter__() async

Setup self managed task group context.

Source code in strands/experimental/bidi/_async/_task_group.py
31
32
33
34
async def __aenter__(self) -> "_TaskGroup":
    """Setup self managed task group context."""
    self._tasks = set()
    return self

__aexit__(*_) async

Execute tasks in group.

The following execution rules are enforced: - The context stops executing all tasks if at least one task raises an Exception or the context is cancelled. - The context re-raises Exceptions to the caller. - The context re-raises CancelledErrors to the caller only if the context itself was cancelled.

Source code in strands/experimental/bidi/_async/_task_group.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
async def __aexit__(self, *_: Any) -> None:
    """Execute tasks in group.

    The following execution rules are enforced:
    - The context stops executing all tasks if at least one task raises an Exception or the context is cancelled.
    - The context re-raises Exceptions to the caller.
    - The context re-raises CancelledErrors to the caller only if the context itself was cancelled.
    """
    try:
        pending_tasks = self._tasks
        while pending_tasks:
            done_tasks, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_EXCEPTION)

            if any(exception := done_task.exception() for done_task in done_tasks if not done_task.cancelled()):
                break

        else:  # all tasks completed/cancelled successfully
            return

        for pending_task in pending_tasks:
            pending_task.cancel()

        await asyncio.gather(*pending_tasks, return_exceptions=True)
        raise cast(BaseException, exception)

    except asyncio.CancelledError:  # context itself was cancelled
        for task in self._tasks:
            task.cancel()

        await asyncio.gather(*self._tasks, return_exceptions=True)
        raise

    finally:
        self._tasks = set()

create_task(coro)

Create an async task and add to group.

Returns:

Type Description
Task

The created task.

Source code in strands/experimental/bidi/_async/_task_group.py
21
22
23
24
25
26
27
28
29
def create_task(self, coro: Coroutine[Any, Any, Any]) -> asyncio.Task:
    """Create an async task and add to group.

    Returns:
        The created task.
    """
    task = asyncio.create_task(coro)
    self._tasks.add(task)
    return task

_ToolCaller

Call tool as a function.

Source code in strands/tools/_caller.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class _ToolCaller:
    """Call tool as a function."""

    def __init__(self, agent: "Agent | BidiAgent") -> None:
        """Initialize instance.

        Args:
            agent: Agent reference that will accept tool results.
        """
        # WARNING: Do not add any other member variables or methods as this could result in a name conflict with
        #          agent tools and thus break their execution.
        self._agent = agent

    def __getattr__(self, name: str) -> Callable[..., Any]:
        """Call tool as a function.

        This method enables the method-style interface (e.g., `agent.tool.tool_name(param="value")`).
        It matches underscore-separated names to hyphenated tool names (e.g., 'some_thing' matches 'some-thing').

        Args:
            name: The name of the attribute (tool) being accessed.

        Returns:
            A function that when called will execute the named tool.

        Raises:
            AttributeError: If no tool with the given name exists or if multiple tools match the given name.
        """

        def caller(
            user_message_override: str | None = None,
            record_direct_tool_call: bool | None = None,
            **kwargs: Any,
        ) -> Any:
            """Call a tool directly by name.

            Args:
                user_message_override: Optional custom message to record instead of default
                record_direct_tool_call: Whether to record direct tool calls in message history. Overrides class
                    attribute if provided.
                **kwargs: Keyword arguments to pass to the tool.

            Returns:
                The result returned by the tool.

            Raises:
                AttributeError: If the tool doesn't exist.
            """
            if self._agent._interrupt_state.activated:
                raise RuntimeError("cannot directly call tool during interrupt")

            if record_direct_tool_call is not None:
                should_record_direct_tool_call = record_direct_tool_call
            else:
                should_record_direct_tool_call = self._agent.record_direct_tool_call

            should_lock = should_record_direct_tool_call

            from ..agent import Agent  # Locally imported to avoid circular reference

            acquired_lock = (
                should_lock
                and isinstance(self._agent, Agent)
                and self._agent._invocation_lock.acquire_lock(blocking=False)
            )
            if should_lock and not acquired_lock:
                raise ConcurrencyException(
                    "Direct tool call cannot be made while the agent is in the middle of an invocation. "
                    "Set record_direct_tool_call=False to allow direct tool calls during agent invocation."
                )

            try:
                normalized_name = self._find_normalized_tool_name(name)

                # Create unique tool ID and set up the tool request
                tool_id = f"tooluse_{name}_{random.randint(100000000, 999999999)}"
                tool_use: ToolUse = {
                    "toolUseId": tool_id,
                    "name": normalized_name,
                    "input": kwargs.copy(),
                }
                tool_results: list[ToolResult] = []
                invocation_state = kwargs

                async def acall() -> ToolResult:
                    async for event in ToolExecutor._stream(self._agent, tool_use, tool_results, invocation_state):
                        if isinstance(event, ToolInterruptEvent):
                            self._agent._interrupt_state.deactivate()
                            raise RuntimeError("cannot raise interrupt in direct tool call")

                    tool_result = tool_results[0]

                    if should_record_direct_tool_call:
                        # Create a record of this tool execution in the message history
                        await self._record_tool_execution(tool_use, tool_result, user_message_override)

                    return tool_result

                tool_result = run_async(acall)

                # TODO: https://github.com/strands-agents/sdk-python/issues/1311
                if isinstance(self._agent, Agent):
                    self._agent.conversation_manager.apply_management(self._agent)

                return tool_result

            finally:
                if acquired_lock and isinstance(self._agent, Agent):
                    self._agent._invocation_lock.release()

        return caller

    def _find_normalized_tool_name(self, name: str) -> str:
        """Lookup the tool represented by name, replacing characters with underscores as necessary."""
        tool_registry = self._agent.tool_registry.registry

        if tool_registry.get(name):
            return name

        # If the desired name contains underscores, it might be a placeholder for characters that can't be
        # represented as python identifiers but are valid as tool names, such as dashes. In that case, find
        # all tools that can be represented with the normalized name
        if "_" in name:
            filtered_tools = [
                tool_name for (tool_name, tool) in tool_registry.items() if tool_name.replace("-", "_") == name
            ]

            # The registry itself defends against similar names, so we can just take the first match
            if filtered_tools:
                return filtered_tools[0]

        raise AttributeError(f"Tool '{name}' not found")

    async def _record_tool_execution(
        self,
        tool: ToolUse,
        tool_result: ToolResult,
        user_message_override: str | None,
    ) -> None:
        """Record a tool execution in the message history.

        Creates a sequence of messages that represent the tool execution:

        1. A user message describing the tool call
        2. An assistant message with the tool use
        3. A user message with the tool result
        4. An assistant message acknowledging the tool call

        Args:
            tool: The tool call information.
            tool_result: The result returned by the tool.
            user_message_override: Optional custom message to include.
        """
        # Filter tool input parameters to only include those defined in tool spec
        filtered_input = self._filter_tool_parameters_for_recording(tool["name"], tool["input"])

        # Create user message describing the tool call
        input_parameters = json.dumps(filtered_input, default=lambda o: f"<<non-serializable: {type(o).__qualname__}>>")

        user_msg_content: list[ContentBlock] = [
            {"text": (f"agent.tool.{tool['name']} direct tool call.\nInput parameters: {input_parameters}\n")}
        ]

        # Add override message if provided
        if user_message_override:
            user_msg_content.insert(0, {"text": f"{user_message_override}\n"})

        # Create filtered tool use for message history
        filtered_tool: ToolUse = {
            "toolUseId": tool["toolUseId"],
            "name": tool["name"],
            "input": filtered_input,
        }

        # Create the message sequence
        user_msg: Message = {
            "role": "user",
            "content": user_msg_content,
        }
        tool_use_msg: Message = {
            "role": "assistant",
            "content": [{"toolUse": filtered_tool}],
        }
        tool_result_msg: Message = {
            "role": "user",
            "content": [{"toolResult": tool_result}],
        }
        assistant_msg: Message = {
            "role": "assistant",
            "content": [{"text": f"agent.tool.{tool['name']} was called."}],
        }

        # Add to message history
        await self._agent._append_messages(user_msg, tool_use_msg, tool_result_msg, assistant_msg)

    def _filter_tool_parameters_for_recording(self, tool_name: str, input_params: dict[str, Any]) -> dict[str, Any]:
        """Filter input parameters to only include those defined in the tool specification.

        Args:
            tool_name: Name of the tool to get specification for
            input_params: Original input parameters

        Returns:
            Filtered parameters containing only those defined in tool spec
        """
        all_tools_config = self._agent.tool_registry.get_all_tools_config()
        tool_spec = all_tools_config.get(tool_name)

        if not tool_spec or "inputSchema" not in tool_spec:
            return input_params.copy()

        properties = tool_spec["inputSchema"]["json"]["properties"]
        return {k: v for k, v in input_params.items() if k in properties}

__getattr__(name)

Call tool as a function.

This method enables the method-style interface (e.g., agent.tool.tool_name(param="value")). It matches underscore-separated names to hyphenated tool names (e.g., 'some_thing' matches 'some-thing').

Parameters:

Name Type Description Default
name str

The name of the attribute (tool) being accessed.

required

Returns:

Type Description
Callable[..., Any]

A function that when called will execute the named tool.

Raises:

Type Description
AttributeError

If no tool with the given name exists or if multiple tools match the given name.

Source code in strands/tools/_caller.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def __getattr__(self, name: str) -> Callable[..., Any]:
    """Call tool as a function.

    This method enables the method-style interface (e.g., `agent.tool.tool_name(param="value")`).
    It matches underscore-separated names to hyphenated tool names (e.g., 'some_thing' matches 'some-thing').

    Args:
        name: The name of the attribute (tool) being accessed.

    Returns:
        A function that when called will execute the named tool.

    Raises:
        AttributeError: If no tool with the given name exists or if multiple tools match the given name.
    """

    def caller(
        user_message_override: str | None = None,
        record_direct_tool_call: bool | None = None,
        **kwargs: Any,
    ) -> Any:
        """Call a tool directly by name.

        Args:
            user_message_override: Optional custom message to record instead of default
            record_direct_tool_call: Whether to record direct tool calls in message history. Overrides class
                attribute if provided.
            **kwargs: Keyword arguments to pass to the tool.

        Returns:
            The result returned by the tool.

        Raises:
            AttributeError: If the tool doesn't exist.
        """
        if self._agent._interrupt_state.activated:
            raise RuntimeError("cannot directly call tool during interrupt")

        if record_direct_tool_call is not None:
            should_record_direct_tool_call = record_direct_tool_call
        else:
            should_record_direct_tool_call = self._agent.record_direct_tool_call

        should_lock = should_record_direct_tool_call

        from ..agent import Agent  # Locally imported to avoid circular reference

        acquired_lock = (
            should_lock
            and isinstance(self._agent, Agent)
            and self._agent._invocation_lock.acquire_lock(blocking=False)
        )
        if should_lock and not acquired_lock:
            raise ConcurrencyException(
                "Direct tool call cannot be made while the agent is in the middle of an invocation. "
                "Set record_direct_tool_call=False to allow direct tool calls during agent invocation."
            )

        try:
            normalized_name = self._find_normalized_tool_name(name)

            # Create unique tool ID and set up the tool request
            tool_id = f"tooluse_{name}_{random.randint(100000000, 999999999)}"
            tool_use: ToolUse = {
                "toolUseId": tool_id,
                "name": normalized_name,
                "input": kwargs.copy(),
            }
            tool_results: list[ToolResult] = []
            invocation_state = kwargs

            async def acall() -> ToolResult:
                async for event in ToolExecutor._stream(self._agent, tool_use, tool_results, invocation_state):
                    if isinstance(event, ToolInterruptEvent):
                        self._agent._interrupt_state.deactivate()
                        raise RuntimeError("cannot raise interrupt in direct tool call")

                tool_result = tool_results[0]

                if should_record_direct_tool_call:
                    # Create a record of this tool execution in the message history
                    await self._record_tool_execution(tool_use, tool_result, user_message_override)

                return tool_result

            tool_result = run_async(acall)

            # TODO: https://github.com/strands-agents/sdk-python/issues/1311
            if isinstance(self._agent, Agent):
                self._agent.conversation_manager.apply_management(self._agent)

            return tool_result

        finally:
            if acquired_lock and isinstance(self._agent, Agent):
                self._agent._invocation_lock.release()

    return caller

__init__(agent)

Initialize instance.

Parameters:

Name Type Description Default
agent Agent | BidiAgent

Agent reference that will accept tool results.

required
Source code in strands/tools/_caller.py
30
31
32
33
34
35
36
37
38
def __init__(self, agent: "Agent | BidiAgent") -> None:
    """Initialize instance.

    Args:
        agent: Agent reference that will accept tool results.
    """
    # WARNING: Do not add any other member variables or methods as this could result in a name conflict with
    #          agent tools and thus break their execution.
    self._agent = agent

stop_all(*funcs) async

Call all stops in sequence and aggregate errors.

A failure in one stop call will not block subsequent stop calls.

Parameters:

Name Type Description Default
funcs Callable[..., Awaitable[None]]

Stop functions to call in sequence.

()

Raises:

Type Description
RuntimeError

If any stop function raises an exception.

Source code in strands/experimental/bidi/_async/__init__.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def stop_all(*funcs: Callable[..., Awaitable[None]]) -> None:
    """Call all stops in sequence and aggregate errors.

    A failure in one stop call will not block subsequent stop calls.

    Args:
        funcs: Stop functions to call in sequence.

    Raises:
        RuntimeError: If any stop function raises an exception.
    """
    exceptions = []
    for func in funcs:
        try:
            await func()
        except Exception as exception:
            exceptions.append({"func_name": func.__name__, "exception": repr(exception)})

    if exceptions:
        raise RuntimeError(f"exceptions={exceptions} | failed stop sequence")