Skip to content

strands.experimental.bidi.agent.loop

Agent loop.

The agent loop handles the events received from the model and executes tools when given a tool use request.

BidiInputEvent = BidiTextInputEvent | BidiAudioInputEvent | BidiImageInputEvent module-attribute

Union of different bidi input event types.

BidiOutputEvent = BidiConnectionStartEvent | BidiConnectionRestartEvent | BidiResponseStartEvent | BidiAudioStreamEvent | BidiTranscriptStreamEvent | BidiInterruptionEvent | BidiResponseCompleteEvent | BidiUsageEvent | BidiConnectionCloseEvent | BidiErrorEvent | ToolUseStreamEvent module-attribute

Union of different bidi output event types.

logger = logging.getLogger(__name__) module-attribute

BidiAfterConnectionRestartEvent dataclass

Bases: BidiHookEvent

Event emitted after agent attempts to restart model connection after timeout.

Attribtues

exception: Populated if exception was raised during connection restart. None value means the restart was successful.

Source code in strands/experimental/hooks/events.py
216
217
218
219
220
221
222
223
224
225
@dataclass
class BidiAfterConnectionRestartEvent(BidiHookEvent):
    """Event emitted after agent attempts to restart model connection after timeout.

    Attribtues:
        exception: Populated if exception was raised during connection restart.
            None value means the restart was successful.
    """

    exception: Exception | None = None

BidiAfterInvocationEvent dataclass

Bases: BidiHookEvent

Event triggered when BidiAgent ends a streaming session.

This event is fired after the BidiAgent has completed a streaming session, regardless of whether it completed successfully or encountered an error. Hook providers can use this event for cleanup, logging, or state persistence.

Note: This event uses reverse callback ordering, meaning callbacks registered later will be invoked first during cleanup.

This event is triggered at the end of agent.stop().

Source code in strands/experimental/hooks/events.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@dataclass
class BidiAfterInvocationEvent(BidiHookEvent):
    """Event triggered when BidiAgent ends a streaming session.

    This event is fired after the BidiAgent has completed a streaming session,
    regardless of whether it completed successfully or encountered an error.
    Hook providers can use this event for cleanup, logging, or state persistence.

    Note: This event uses reverse callback ordering, meaning callbacks registered
    later will be invoked first during cleanup.

    This event is triggered at the end of agent.stop().
    """

    @property
    def should_reverse_callbacks(self) -> bool:
        """True to invoke callbacks in reverse order."""
        return True

should_reverse_callbacks property

True to invoke callbacks in reverse order.

BidiAgent

Agent for bidirectional streaming conversations.

Enables real-time audio and text interaction with AI models through persistent connections. Supports concurrent tool execution and interruption handling.

Source code in strands/experimental/bidi/agent/agent.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
class BidiAgent:
    """Agent for bidirectional streaming conversations.

    Enables real-time audio and text interaction with AI models through persistent
    connections. Supports concurrent tool execution and interruption handling.
    """

    def __init__(
        self,
        model: BidiModel | str | None = None,
        tools: list[str | AgentTool | ToolProvider] | None = None,
        system_prompt: str | None = None,
        messages: Messages | None = None,
        record_direct_tool_call: bool = True,
        load_tools_from_directory: bool = False,
        agent_id: str | None = None,
        name: str | None = None,
        description: str | None = None,
        hooks: list[HookProvider] | None = None,
        state: AgentState | dict | None = None,
        session_manager: "SessionManager | None" = None,
        tool_executor: ToolExecutor | None = None,
        **kwargs: Any,
    ):
        """Initialize bidirectional agent.

        Args:
            model: BidiModel instance, string model_id, or None for default detection.
            tools: Optional list of tools with flexible format support.
            system_prompt: Optional system prompt for conversations.
            messages: Optional conversation history to initialize with.
            record_direct_tool_call: Whether to record direct tool calls in message history.
            load_tools_from_directory: Whether to load and automatically reload tools in the `./tools/` directory.
            agent_id: Optional ID for the agent, useful for connection management and multi-agent scenarios.
            name: Name of the Agent.
            description: Description of what the Agent does.
            hooks: Optional list of hook providers to register for lifecycle events.
            state: Stateful information for the agent. Can be either an AgentState object, or a json serializable dict.
            session_manager: Manager for handling agent sessions including conversation history and state.
                If provided, enables session-based persistence and state management.
            tool_executor: Definition of tool execution strategy (e.g., sequential, concurrent, etc.).
            **kwargs: Additional configuration for future extensibility.

        Raises:
            ValueError: If model configuration is invalid or state is invalid type.
            TypeError: If model type is unsupported.
        """
        self.model = (
            BidiNovaSonicModel()
            if not model
            else BidiNovaSonicModel(model_id=model)
            if isinstance(model, str)
            else model
        )
        self.system_prompt = system_prompt
        self.messages = messages or []

        # Agent identification
        self.agent_id = _identifier.validate(agent_id or _DEFAULT_AGENT_ID, _identifier.Identifier.AGENT)
        self.name = name or _DEFAULT_AGENT_NAME
        self.description = description

        # Tool execution configuration
        self.record_direct_tool_call = record_direct_tool_call
        self.load_tools_from_directory = load_tools_from_directory

        # Initialize tool registry
        self.tool_registry = ToolRegistry()

        if tools is not None:
            self.tool_registry.process_tools(tools)

        self.tool_registry.initialize_tools(self.load_tools_from_directory)

        # Initialize tool watcher if directory loading is enabled
        if self.load_tools_from_directory:
            self.tool_watcher = ToolWatcher(tool_registry=self.tool_registry)

        # Initialize agent state management
        if state is not None:
            if isinstance(state, dict):
                self.state = AgentState(state)
            elif isinstance(state, AgentState):
                self.state = state
            else:
                raise ValueError("state must be an AgentState object or a dict")
        else:
            self.state = AgentState()

        # Initialize other components
        self._tool_caller = _ToolCaller(self)

        # Initialize tool executor
        self.tool_executor = tool_executor or ConcurrentToolExecutor()

        # Initialize hooks registry
        self.hooks = HookRegistry()
        if hooks:
            for hook in hooks:
                self.hooks.add_hook(hook)

        # Initialize session management functionality
        self._session_manager = session_manager
        if self._session_manager:
            self.hooks.add_hook(self._session_manager)

        self._loop = _BidiAgentLoop(self)

        # Emit initialization event
        self.hooks.invoke_callbacks(BidiAgentInitializedEvent(agent=self))

        # TODO: Determine if full support is required
        self._interrupt_state = _InterruptState()

        # Lock to ensure that paired messages are added to history in sequence without interference
        self._message_lock = asyncio.Lock()

        self._started = False

    @property
    def tool(self) -> _ToolCaller:
        """Call tool as a function.

        Returns:
            ToolCaller for method-style tool execution.

        Example:
            ```
            agent = BidiAgent(model=model, tools=[calculator])
            agent.tool.calculator(expression="2+2")
            ```
        """
        return self._tool_caller

    @property
    def tool_names(self) -> list[str]:
        """Get a list of all registered tool names.

        Returns:
            Names of all tools available to this agent.
        """
        all_tools = self.tool_registry.get_all_tools_config()
        return list(all_tools.keys())

    async def start(self, invocation_state: dict[str, Any] | None = None) -> None:
        """Start a persistent bidirectional conversation connection.

        Initializes the streaming connection and starts background tasks for processing
        model events, tool execution, and connection management.

        Args:
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Raises:
            RuntimeError:
                If agent already started.

        Example:
            ```python
            await agent.start(invocation_state={
                "user_id": "user_123",
                "session_id": "session_456",
                "database": db_connection,
            })
            ```
        """
        if self._started:
            raise RuntimeError("agent already started | call stop before starting again")

        logger.debug("agent starting")
        await self._loop.start(invocation_state)
        self._started = True

    async def send(self, input_data: BidiAgentInput | dict[str, Any]) -> None:
        """Send input to the model (text, audio, image, or event dict).

        Unified method for sending text, audio, and image input to the model during
        an active conversation session. Accepts TypedEvent instances or plain dicts
        (e.g., from WebSocket clients) which are automatically reconstructed.

        Args:
            input_data: Can be:

                - str: Text message from user
                - BidiInputEvent: TypedEvent
                - dict: Event dictionary (will be reconstructed to TypedEvent)

        Raises:
            RuntimeError: If start has not been called.
            ValueError: If invalid input type.

        Example:
            await agent.send("Hello")
            await agent.send(BidiAudioInputEvent(audio="base64...", format="pcm", ...))
            await agent.send({"type": "bidirectional_text_input", "text": "Hello", "role": "user"})
        """
        if not self._started:
            raise RuntimeError("agent not started | call start before sending")

        input_event: BidiInputEvent

        if isinstance(input_data, str):
            input_event = BidiTextInputEvent(text=input_data)

        elif isinstance(input_data, BidiInputEvent):
            input_event = input_data

        elif isinstance(input_data, dict) and "type" in input_data:
            input_type = input_data["type"]
            input_data = {key: value for key, value in input_data.items() if key != "type"}
            if input_type == "bidi_text_input":
                input_event = BidiTextInputEvent(**input_data)
            elif input_type == "bidi_audio_input":
                input_event = BidiAudioInputEvent(**input_data)
            elif input_type == "bidi_image_input":
                input_event = BidiImageInputEvent(**input_data)
            else:
                raise ValueError(f"input_type=<{input_type}> | input type not supported")

        else:
            raise ValueError("invalid input | must be str, BidiInputEvent, or event dict")

        await self._loop.send(input_event)

    async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
        """Receive events from the model including audio, text, and tool calls.

        Yields:
            Model output events processed by background tasks including audio output,
            text responses, tool calls, and connection updates.

        Raises:
            RuntimeError: If start has not been called.
        """
        if not self._started:
            raise RuntimeError("agent not started | call start before receiving")

        async for event in self._loop.receive():
            yield event

    async def stop(self) -> None:
        """End the conversation connection and cleanup all resources.

        Terminates the streaming connection, cancels background tasks, and
        closes the connection to the model provider.
        """
        self._started = False
        await self._loop.stop()

    async def __aenter__(self, invocation_state: dict[str, Any] | None = None) -> "BidiAgent":
        """Async context manager entry point.

        Automatically starts the bidirectional connection when entering the context.

        Args:
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Returns:
            Self for use in the context.
        """
        logger.debug("context_manager=<enter> | starting agent")
        await self.start(invocation_state)
        return self

    async def __aexit__(self, *_: Any) -> None:
        """Async context manager exit point.

        Automatically ends the connection and cleans up resources including
        when exiting the context, regardless of whether an exception occurred.
        """
        logger.debug("context_manager=<exit> | stopping agent")
        await self.stop()

    async def run(
        self, inputs: list[BidiInput], outputs: list[BidiOutput], invocation_state: dict[str, Any] | None = None
    ) -> None:
        """Run the agent using provided IO channels for bidirectional communication.

        Args:
            inputs: Input callables to read data from a source
            outputs: Output callables to receive events from the agent
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Example:
            ```python
            # Using model defaults:
            model = BidiNovaSonicModel()
            audio_io = BidiAudioIO()
            text_io = BidiTextIO()
            agent = BidiAgent(model=model, tools=[calculator])
            await agent.run(
                inputs=[audio_io.input()],
                outputs=[audio_io.output(), text_io.output()],
                invocation_state={"user_id": "user_123"}
            )

            # Using custom audio config:
            model = BidiNovaSonicModel(
                provider_config={"audio": {"input_rate": 48000, "output_rate": 24000}}
            )
            audio_io = BidiAudioIO()
            agent = BidiAgent(model=model, tools=[calculator])
            await agent.run(
                inputs=[audio_io.input()],
                outputs=[audio_io.output()],
            )
            ```
        """

        async def run_inputs() -> None:
            async def task(input_: BidiInput) -> None:
                while True:
                    event = await input_()
                    await self.send(event)

            await asyncio.gather(*[task(input_) for input_ in inputs])

        async def run_outputs(inputs_task: asyncio.Task) -> None:
            async for event in self.receive():
                await asyncio.gather(*[output(event) for output in outputs])

            inputs_task.cancel()

        try:
            await self.start(invocation_state)

            input_starts = [input_.start for input_ in inputs if isinstance(input_, BidiInput)]
            output_starts = [output.start for output in outputs if isinstance(output, BidiOutput)]
            for start in [*input_starts, *output_starts]:
                await start(self)

            async with _TaskGroup() as task_group:
                inputs_task = task_group.create_task(run_inputs())
                task_group.create_task(run_outputs(inputs_task))

        finally:
            input_stops = [input_.stop for input_ in inputs if isinstance(input_, BidiInput)]
            output_stops = [output.stop for output in outputs if isinstance(output, BidiOutput)]

            await stop_all(*input_stops, *output_stops, self.stop)

    async def _append_messages(self, *messages: Message) -> None:
        """Append messages to history in sequence without interference.

        The message lock ensures that paired messages are added to history in sequence without interference. For
        example, tool use and tool result messages must be added adjacent to each other.

        Args:
            *messages: List of messages to add into history.
        """
        async with self._message_lock:
            for message in messages:
                self.messages.append(message)
                await self.hooks.invoke_callbacks_async(BidiMessageAddedEvent(agent=self, message=message))

tool property

Call tool as a function.

Returns:

Type Description
_ToolCaller

ToolCaller for method-style tool execution.

Example
agent = BidiAgent(model=model, tools=[calculator])
agent.tool.calculator(expression="2+2")

tool_names property

Get a list of all registered tool names.

Returns:

Type Description
list[str]

Names of all tools available to this agent.

__aenter__(invocation_state=None) async

Async context manager entry point.

Automatically starts the bidirectional connection when entering the context.

Parameters:

Name Type Description Default
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None

Returns:

Type Description
BidiAgent

Self for use in the context.

Source code in strands/experimental/bidi/agent/agent.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
async def __aenter__(self, invocation_state: dict[str, Any] | None = None) -> "BidiAgent":
    """Async context manager entry point.

    Automatically starts the bidirectional connection when entering the context.

    Args:
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Returns:
        Self for use in the context.
    """
    logger.debug("context_manager=<enter> | starting agent")
    await self.start(invocation_state)
    return self

__aexit__(*_) async

Async context manager exit point.

Automatically ends the connection and cleans up resources including when exiting the context, regardless of whether an exception occurred.

Source code in strands/experimental/bidi/agent/agent.py
324
325
326
327
328
329
330
331
async def __aexit__(self, *_: Any) -> None:
    """Async context manager exit point.

    Automatically ends the connection and cleans up resources including
    when exiting the context, regardless of whether an exception occurred.
    """
    logger.debug("context_manager=<exit> | stopping agent")
    await self.stop()

__init__(model=None, tools=None, system_prompt=None, messages=None, record_direct_tool_call=True, load_tools_from_directory=False, agent_id=None, name=None, description=None, hooks=None, state=None, session_manager=None, tool_executor=None, **kwargs)

Initialize bidirectional agent.

Parameters:

Name Type Description Default
model BidiModel | str | None

BidiModel instance, string model_id, or None for default detection.

None
tools list[str | AgentTool | ToolProvider] | None

Optional list of tools with flexible format support.

None
system_prompt str | None

Optional system prompt for conversations.

None
messages Messages | None

Optional conversation history to initialize with.

None
record_direct_tool_call bool

Whether to record direct tool calls in message history.

True
load_tools_from_directory bool

Whether to load and automatically reload tools in the ./tools/ directory.

False
agent_id str | None

Optional ID for the agent, useful for connection management and multi-agent scenarios.

None
name str | None

Name of the Agent.

None
description str | None

Description of what the Agent does.

None
hooks list[HookProvider] | None

Optional list of hook providers to register for lifecycle events.

None
state AgentState | dict | None

Stateful information for the agent. Can be either an AgentState object, or a json serializable dict.

None
session_manager SessionManager | None

Manager for handling agent sessions including conversation history and state. If provided, enables session-based persistence and state management.

None
tool_executor ToolExecutor | None

Definition of tool execution strategy (e.g., sequential, concurrent, etc.).

None
**kwargs Any

Additional configuration for future extensibility.

{}

Raises:

Type Description
ValueError

If model configuration is invalid or state is invalid type.

TypeError

If model type is unsupported.

Source code in strands/experimental/bidi/agent/agent.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def __init__(
    self,
    model: BidiModel | str | None = None,
    tools: list[str | AgentTool | ToolProvider] | None = None,
    system_prompt: str | None = None,
    messages: Messages | None = None,
    record_direct_tool_call: bool = True,
    load_tools_from_directory: bool = False,
    agent_id: str | None = None,
    name: str | None = None,
    description: str | None = None,
    hooks: list[HookProvider] | None = None,
    state: AgentState | dict | None = None,
    session_manager: "SessionManager | None" = None,
    tool_executor: ToolExecutor | None = None,
    **kwargs: Any,
):
    """Initialize bidirectional agent.

    Args:
        model: BidiModel instance, string model_id, or None for default detection.
        tools: Optional list of tools with flexible format support.
        system_prompt: Optional system prompt for conversations.
        messages: Optional conversation history to initialize with.
        record_direct_tool_call: Whether to record direct tool calls in message history.
        load_tools_from_directory: Whether to load and automatically reload tools in the `./tools/` directory.
        agent_id: Optional ID for the agent, useful for connection management and multi-agent scenarios.
        name: Name of the Agent.
        description: Description of what the Agent does.
        hooks: Optional list of hook providers to register for lifecycle events.
        state: Stateful information for the agent. Can be either an AgentState object, or a json serializable dict.
        session_manager: Manager for handling agent sessions including conversation history and state.
            If provided, enables session-based persistence and state management.
        tool_executor: Definition of tool execution strategy (e.g., sequential, concurrent, etc.).
        **kwargs: Additional configuration for future extensibility.

    Raises:
        ValueError: If model configuration is invalid or state is invalid type.
        TypeError: If model type is unsupported.
    """
    self.model = (
        BidiNovaSonicModel()
        if not model
        else BidiNovaSonicModel(model_id=model)
        if isinstance(model, str)
        else model
    )
    self.system_prompt = system_prompt
    self.messages = messages or []

    # Agent identification
    self.agent_id = _identifier.validate(agent_id or _DEFAULT_AGENT_ID, _identifier.Identifier.AGENT)
    self.name = name or _DEFAULT_AGENT_NAME
    self.description = description

    # Tool execution configuration
    self.record_direct_tool_call = record_direct_tool_call
    self.load_tools_from_directory = load_tools_from_directory

    # Initialize tool registry
    self.tool_registry = ToolRegistry()

    if tools is not None:
        self.tool_registry.process_tools(tools)

    self.tool_registry.initialize_tools(self.load_tools_from_directory)

    # Initialize tool watcher if directory loading is enabled
    if self.load_tools_from_directory:
        self.tool_watcher = ToolWatcher(tool_registry=self.tool_registry)

    # Initialize agent state management
    if state is not None:
        if isinstance(state, dict):
            self.state = AgentState(state)
        elif isinstance(state, AgentState):
            self.state = state
        else:
            raise ValueError("state must be an AgentState object or a dict")
    else:
        self.state = AgentState()

    # Initialize other components
    self._tool_caller = _ToolCaller(self)

    # Initialize tool executor
    self.tool_executor = tool_executor or ConcurrentToolExecutor()

    # Initialize hooks registry
    self.hooks = HookRegistry()
    if hooks:
        for hook in hooks:
            self.hooks.add_hook(hook)

    # Initialize session management functionality
    self._session_manager = session_manager
    if self._session_manager:
        self.hooks.add_hook(self._session_manager)

    self._loop = _BidiAgentLoop(self)

    # Emit initialization event
    self.hooks.invoke_callbacks(BidiAgentInitializedEvent(agent=self))

    # TODO: Determine if full support is required
    self._interrupt_state = _InterruptState()

    # Lock to ensure that paired messages are added to history in sequence without interference
    self._message_lock = asyncio.Lock()

    self._started = False

receive() async

Receive events from the model including audio, text, and tool calls.

Yields:

Type Description
AsyncGenerator[BidiOutputEvent, None]

Model output events processed by background tasks including audio output,

AsyncGenerator[BidiOutputEvent, None]

text responses, tool calls, and connection updates.

Raises:

Type Description
RuntimeError

If start has not been called.

Source code in strands/experimental/bidi/agent/agent.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
    """Receive events from the model including audio, text, and tool calls.

    Yields:
        Model output events processed by background tasks including audio output,
        text responses, tool calls, and connection updates.

    Raises:
        RuntimeError: If start has not been called.
    """
    if not self._started:
        raise RuntimeError("agent not started | call start before receiving")

    async for event in self._loop.receive():
        yield event

run(inputs, outputs, invocation_state=None) async

Run the agent using provided IO channels for bidirectional communication.

Parameters:

Name Type Description Default
inputs list[BidiInput]

Input callables to read data from a source

required
outputs list[BidiOutput]

Output callables to receive events from the agent

required
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None
Example
# Using model defaults:
model = BidiNovaSonicModel()
audio_io = BidiAudioIO()
text_io = BidiTextIO()
agent = BidiAgent(model=model, tools=[calculator])
await agent.run(
    inputs=[audio_io.input()],
    outputs=[audio_io.output(), text_io.output()],
    invocation_state={"user_id": "user_123"}
)

# Using custom audio config:
model = BidiNovaSonicModel(
    provider_config={"audio": {"input_rate": 48000, "output_rate": 24000}}
)
audio_io = BidiAudioIO()
agent = BidiAgent(model=model, tools=[calculator])
await agent.run(
    inputs=[audio_io.input()],
    outputs=[audio_io.output()],
)
Source code in strands/experimental/bidi/agent/agent.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
async def run(
    self, inputs: list[BidiInput], outputs: list[BidiOutput], invocation_state: dict[str, Any] | None = None
) -> None:
    """Run the agent using provided IO channels for bidirectional communication.

    Args:
        inputs: Input callables to read data from a source
        outputs: Output callables to receive events from the agent
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Example:
        ```python
        # Using model defaults:
        model = BidiNovaSonicModel()
        audio_io = BidiAudioIO()
        text_io = BidiTextIO()
        agent = BidiAgent(model=model, tools=[calculator])
        await agent.run(
            inputs=[audio_io.input()],
            outputs=[audio_io.output(), text_io.output()],
            invocation_state={"user_id": "user_123"}
        )

        # Using custom audio config:
        model = BidiNovaSonicModel(
            provider_config={"audio": {"input_rate": 48000, "output_rate": 24000}}
        )
        audio_io = BidiAudioIO()
        agent = BidiAgent(model=model, tools=[calculator])
        await agent.run(
            inputs=[audio_io.input()],
            outputs=[audio_io.output()],
        )
        ```
    """

    async def run_inputs() -> None:
        async def task(input_: BidiInput) -> None:
            while True:
                event = await input_()
                await self.send(event)

        await asyncio.gather(*[task(input_) for input_ in inputs])

    async def run_outputs(inputs_task: asyncio.Task) -> None:
        async for event in self.receive():
            await asyncio.gather(*[output(event) for output in outputs])

        inputs_task.cancel()

    try:
        await self.start(invocation_state)

        input_starts = [input_.start for input_ in inputs if isinstance(input_, BidiInput)]
        output_starts = [output.start for output in outputs if isinstance(output, BidiOutput)]
        for start in [*input_starts, *output_starts]:
            await start(self)

        async with _TaskGroup() as task_group:
            inputs_task = task_group.create_task(run_inputs())
            task_group.create_task(run_outputs(inputs_task))

    finally:
        input_stops = [input_.stop for input_ in inputs if isinstance(input_, BidiInput)]
        output_stops = [output.stop for output in outputs if isinstance(output, BidiOutput)]

        await stop_all(*input_stops, *output_stops, self.stop)

send(input_data) async

Send input to the model (text, audio, image, or event dict).

Unified method for sending text, audio, and image input to the model during an active conversation session. Accepts TypedEvent instances or plain dicts (e.g., from WebSocket clients) which are automatically reconstructed.

Parameters:

Name Type Description Default
input_data BidiAgentInput | dict[str, Any]

Can be:

  • str: Text message from user
  • BidiInputEvent: TypedEvent
  • dict: Event dictionary (will be reconstructed to TypedEvent)
required

Raises:

Type Description
RuntimeError

If start has not been called.

ValueError

If invalid input type.

Example

await agent.send("Hello") await agent.send(BidiAudioInputEvent(audio="base64...", format="pcm", ...)) await agent.send({"type": "bidirectional_text_input", "text": "Hello", "role": "user"})

Source code in strands/experimental/bidi/agent/agent.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
async def send(self, input_data: BidiAgentInput | dict[str, Any]) -> None:
    """Send input to the model (text, audio, image, or event dict).

    Unified method for sending text, audio, and image input to the model during
    an active conversation session. Accepts TypedEvent instances or plain dicts
    (e.g., from WebSocket clients) which are automatically reconstructed.

    Args:
        input_data: Can be:

            - str: Text message from user
            - BidiInputEvent: TypedEvent
            - dict: Event dictionary (will be reconstructed to TypedEvent)

    Raises:
        RuntimeError: If start has not been called.
        ValueError: If invalid input type.

    Example:
        await agent.send("Hello")
        await agent.send(BidiAudioInputEvent(audio="base64...", format="pcm", ...))
        await agent.send({"type": "bidirectional_text_input", "text": "Hello", "role": "user"})
    """
    if not self._started:
        raise RuntimeError("agent not started | call start before sending")

    input_event: BidiInputEvent

    if isinstance(input_data, str):
        input_event = BidiTextInputEvent(text=input_data)

    elif isinstance(input_data, BidiInputEvent):
        input_event = input_data

    elif isinstance(input_data, dict) and "type" in input_data:
        input_type = input_data["type"]
        input_data = {key: value for key, value in input_data.items() if key != "type"}
        if input_type == "bidi_text_input":
            input_event = BidiTextInputEvent(**input_data)
        elif input_type == "bidi_audio_input":
            input_event = BidiAudioInputEvent(**input_data)
        elif input_type == "bidi_image_input":
            input_event = BidiImageInputEvent(**input_data)
        else:
            raise ValueError(f"input_type=<{input_type}> | input type not supported")

    else:
        raise ValueError("invalid input | must be str, BidiInputEvent, or event dict")

    await self._loop.send(input_event)

start(invocation_state=None) async

Start a persistent bidirectional conversation connection.

Initializes the streaming connection and starts background tasks for processing model events, tool execution, and connection management.

Parameters:

Name Type Description Default
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None

Raises:

Type Description
RuntimeError

If agent already started.

Example
await agent.start(invocation_state={
    "user_id": "user_123",
    "session_id": "session_456",
    "database": db_connection,
})
Source code in strands/experimental/bidi/agent/agent.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
async def start(self, invocation_state: dict[str, Any] | None = None) -> None:
    """Start a persistent bidirectional conversation connection.

    Initializes the streaming connection and starts background tasks for processing
    model events, tool execution, and connection management.

    Args:
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Raises:
        RuntimeError:
            If agent already started.

    Example:
        ```python
        await agent.start(invocation_state={
            "user_id": "user_123",
            "session_id": "session_456",
            "database": db_connection,
        })
        ```
    """
    if self._started:
        raise RuntimeError("agent already started | call stop before starting again")

    logger.debug("agent starting")
    await self._loop.start(invocation_state)
    self._started = True

stop() async

End the conversation connection and cleanup all resources.

Terminates the streaming connection, cancels background tasks, and closes the connection to the model provider.

Source code in strands/experimental/bidi/agent/agent.py
298
299
300
301
302
303
304
305
async def stop(self) -> None:
    """End the conversation connection and cleanup all resources.

    Terminates the streaming connection, cancels background tasks, and
    closes the connection to the model provider.
    """
    self._started = False
    await self._loop.stop()

BidiBeforeConnectionRestartEvent dataclass

Bases: BidiHookEvent

Event emitted before agent attempts to restart model connection after timeout.

Attributes:

Name Type Description
timeout_error BidiModelTimeoutError

Timeout error reported by the model.

Source code in strands/experimental/hooks/events.py
205
206
207
208
209
210
211
212
213
@dataclass
class BidiBeforeConnectionRestartEvent(BidiHookEvent):
    """Event emitted before agent attempts to restart model connection after timeout.

    Attributes:
        timeout_error: Timeout error reported by the model.
    """

    timeout_error: "BidiModelTimeoutError"

BidiBeforeInvocationEvent dataclass

Bases: BidiHookEvent

Event triggered when BidiAgent starts a streaming session.

This event is fired before the BidiAgent begins a streaming session, before any model connection or audio processing occurs. Hook providers can use this event to perform session-level setup, logging, or validation.

This event is triggered at the beginning of agent.start().

Source code in strands/experimental/hooks/events.py
66
67
68
69
70
71
72
73
74
75
76
77
@dataclass
class BidiBeforeInvocationEvent(BidiHookEvent):
    """Event triggered when BidiAgent starts a streaming session.

    This event is fired before the BidiAgent begins a streaming session,
    before any model connection or audio processing occurs. Hook providers can
    use this event to perform session-level setup, logging, or validation.

    This event is triggered at the beginning of agent.start().
    """

    pass

BidiConnectionCloseEvent

Bases: TypedEvent

Streaming connection closed.

Parameters:

Name Type Description Default
connection_id str

Unique identifier for this streaming connection (matches BidiConnectionStartEvent).

required
reason Literal['client_disconnect', 'timeout', 'error', 'complete', 'user_request']

Why the connection was closed.

required
Source code in strands/experimental/bidi/types/events.py
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
class BidiConnectionCloseEvent(TypedEvent):
    """Streaming connection closed.

    Parameters:
        connection_id: Unique identifier for this streaming connection (matches BidiConnectionStartEvent).
        reason: Why the connection was closed.
    """

    def __init__(
        self,
        connection_id: str,
        reason: Literal["client_disconnect", "timeout", "error", "complete", "user_request"],
    ):
        """Initialize connection close event."""
        super().__init__(
            {
                "type": "bidi_connection_close",
                "connection_id": connection_id,
                "reason": reason,
            }
        )

    @property
    def connection_id(self) -> str:
        """Unique identifier for this streaming connection."""
        return cast(str, self["connection_id"])

    @property
    def reason(self) -> str:
        """Why the interruption occurred."""
        return cast(str, self["reason"])

connection_id property

Unique identifier for this streaming connection.

reason property

Why the interruption occurred.

__init__(connection_id, reason)

Initialize connection close event.

Source code in strands/experimental/bidi/types/events.py
510
511
512
513
514
515
516
517
518
519
520
521
522
def __init__(
    self,
    connection_id: str,
    reason: Literal["client_disconnect", "timeout", "error", "complete", "user_request"],
):
    """Initialize connection close event."""
    super().__init__(
        {
            "type": "bidi_connection_close",
            "connection_id": connection_id,
            "reason": reason,
        }
    )

BidiConnectionRestartEvent

Bases: TypedEvent

Agent is restarting the model connection after timeout.

Source code in strands/experimental/bidi/types/events.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class BidiConnectionRestartEvent(TypedEvent):
    """Agent is restarting the model connection after timeout."""

    def __init__(self, timeout_error: "BidiModelTimeoutError"):
        """Initialize.

        Args:
            timeout_error: Timeout error reported by the model.
        """
        super().__init__(
            {
                "type": "bidi_connection_restart",
                "timeout_error": timeout_error,
            }
        )

    @property
    def timeout_error(self) -> "BidiModelTimeoutError":
        """Model timeout error."""
        return cast("BidiModelTimeoutError", self["timeout_error"])

timeout_error property

Model timeout error.

__init__(timeout_error)

Initialize.

Parameters:

Name Type Description Default
timeout_error BidiModelTimeoutError

Timeout error reported by the model.

required
Source code in strands/experimental/bidi/types/events.py
218
219
220
221
222
223
224
225
226
227
228
229
def __init__(self, timeout_error: "BidiModelTimeoutError"):
    """Initialize.

    Args:
        timeout_error: Timeout error reported by the model.
    """
    super().__init__(
        {
            "type": "bidi_connection_restart",
            "timeout_error": timeout_error,
        }
    )

BidiInterruptionEvent

Bases: TypedEvent

Model generation was interrupted.

Parameters:

Name Type Description Default
reason Literal['user_speech', 'error']

Why the interruption occurred.

required
Source code in strands/experimental/bidi/types/events.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
class BidiInterruptionEvent(TypedEvent):
    """Model generation was interrupted.

    Parameters:
        reason: Why the interruption occurred.
    """

    def __init__(self, reason: Literal["user_speech", "error"]):
        """Initialize interruption event."""
        super().__init__(
            {
                "type": "bidi_interruption",
                "reason": reason,
            }
        )

    @property
    def reason(self) -> str:
        """Why the interruption occurred."""
        return cast(str, self["reason"])

reason property

Why the interruption occurred.

__init__(reason)

Initialize interruption event.

Source code in strands/experimental/bidi/types/events.py
370
371
372
373
374
375
376
377
def __init__(self, reason: Literal["user_speech", "error"]):
    """Initialize interruption event."""
    super().__init__(
        {
            "type": "bidi_interruption",
            "reason": reason,
        }
    )

BidiInterruptionHookEvent dataclass

Bases: BidiHookEvent

Event triggered when model generation is interrupted.

This event is fired when the user interrupts the assistant (e.g., by speaking during the assistant's response) or when an error causes interruption. This is specific to bidirectional streaming and doesn't exist in standard agents.

Hook providers can use this event to log interruptions, implement custom interruption handling, or trigger cleanup logic.

Attributes:

Name Type Description
reason Literal['user_speech', 'error']

The reason for the interruption ("user_speech" or "error").

interrupted_response_id str | None

Optional ID of the response that was interrupted.

Source code in strands/experimental/hooks/events.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@dataclass
class BidiInterruptionEvent(BidiHookEvent):
    """Event triggered when model generation is interrupted.

    This event is fired when the user interrupts the assistant (e.g., by speaking
    during the assistant's response) or when an error causes interruption. This is
    specific to bidirectional streaming and doesn't exist in standard agents.

    Hook providers can use this event to log interruptions, implement custom
    interruption handling, or trigger cleanup logic.

    Attributes:
        reason: The reason for the interruption ("user_speech" or "error").
        interrupted_response_id: Optional ID of the response that was interrupted.
    """

    reason: Literal["user_speech", "error"]
    interrupted_response_id: str | None = None

BidiModelTimeoutError

Bases: Exception

Model timeout error.

Bidirectional models are often configured with a connection time limit. Nova sonic for example keeps the connection open for 8 minutes max. Upon receiving a timeout, the agent loop is configured to restart the model connection so as to create a seamless, uninterrupted experience for the user.

Source code in strands/experimental/bidi/models/model.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class BidiModelTimeoutError(Exception):
    """Model timeout error.

    Bidirectional models are often configured with a connection time limit. Nova sonic for example keeps the connection
    open for 8 minutes max. Upon receiving a timeout, the agent loop is configured to restart the model connection so as
    to create a seamless, uninterrupted experience for the user.
    """

    def __init__(self, message: str, **restart_config: Any) -> None:
        """Initialize error.

        Args:
            message: Timeout message from model.
            **restart_config: Configure restart specific behaviors in the call to model start.
        """
        super().__init__(self, message)

        self.restart_config = restart_config

__init__(message, **restart_config)

Initialize error.

Parameters:

Name Type Description Default
message str

Timeout message from model.

required
**restart_config Any

Configure restart specific behaviors in the call to model start.

{}
Source code in strands/experimental/bidi/models/model.py
125
126
127
128
129
130
131
132
133
134
def __init__(self, message: str, **restart_config: Any) -> None:
    """Initialize error.

    Args:
        message: Timeout message from model.
        **restart_config: Configure restart specific behaviors in the call to model start.
    """
    super().__init__(self, message)

    self.restart_config = restart_config

BidiTextInputEvent

Bases: TypedEvent

Text input event for sending text to the model.

Used for sending text content through the send() method.

Parameters:

Name Type Description Default
text str

The text content to send to the model.

required
role Role

The role of the message sender (default: "user").

'user'
Source code in strands/experimental/bidi/types/events.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class BidiTextInputEvent(TypedEvent):
    """Text input event for sending text to the model.

    Used for sending text content through the send() method.

    Parameters:
        text: The text content to send to the model.
        role: The role of the message sender (default: "user").
    """

    def __init__(self, text: str, role: Role = "user"):
        """Initialize text input event."""
        super().__init__(
            {
                "type": "bidi_text_input",
                "text": text,
                "role": role,
            }
        )

    @property
    def text(self) -> str:
        """The text content to send to the model."""
        return cast(str, self["text"])

    @property
    def role(self) -> Role:
        """The role of the message sender."""
        return cast(Role, self["role"])

role property

The role of the message sender.

text property

The text content to send to the model.

__init__(text, role='user')

Initialize text input event.

Source code in strands/experimental/bidi/types/events.py
74
75
76
77
78
79
80
81
82
def __init__(self, text: str, role: Role = "user"):
    """Initialize text input event."""
    super().__init__(
        {
            "type": "bidi_text_input",
            "text": text,
            "role": role,
        }
    )

BidiTranscriptStreamEvent

Bases: ModelStreamEvent

Audio transcription streaming (user or assistant speech).

Supports incremental transcript updates for providers that send partial transcripts before the final version.

Parameters:

Name Type Description Default
delta ContentBlockDelta

The incremental transcript change (ContentBlockDelta).

required
text str

The delta text (same as delta content for convenience).

required
role Role

Who is speaking ("user" or "assistant").

required
is_final bool

Whether this is the final/complete transcript.

required
current_transcript str | None

The accumulated transcript text so far (None for first delta).

None
Source code in strands/experimental/bidi/types/events.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
class BidiTranscriptStreamEvent(ModelStreamEvent):
    """Audio transcription streaming (user or assistant speech).

    Supports incremental transcript updates for providers that send partial
    transcripts before the final version.

    Parameters:
        delta: The incremental transcript change (ContentBlockDelta).
        text: The delta text (same as delta content for convenience).
        role: Who is speaking ("user" or "assistant").
        is_final: Whether this is the final/complete transcript.
        current_transcript: The accumulated transcript text so far (None for first delta).
    """

    def __init__(
        self,
        delta: ContentBlockDelta,
        text: str,
        role: Role,
        is_final: bool,
        current_transcript: str | None = None,
    ):
        """Initialize transcript stream event."""
        super().__init__(
            {
                "type": "bidi_transcript_stream",
                "delta": delta,
                "text": text,
                "role": role,
                "is_final": is_final,
                "current_transcript": current_transcript,
            }
        )

    @property
    def delta(self) -> ContentBlockDelta:
        """The incremental transcript change."""
        return cast(ContentBlockDelta, self["delta"])

    @property
    def text(self) -> str:
        """The text content to send to the model."""
        return cast(str, self["text"])

    @property
    def role(self) -> Role:
        """The role of the message sender."""
        return cast(Role, self["role"])

    @property
    def is_final(self) -> bool:
        """Whether this is the final/complete transcript."""
        return cast(bool, self["is_final"])

    @property
    def current_transcript(self) -> str | None:
        """The accumulated transcript text so far."""
        return cast(str | None, self.get("current_transcript"))

current_transcript property

The accumulated transcript text so far.

delta property

The incremental transcript change.

is_final property

Whether this is the final/complete transcript.

role property

The role of the message sender.

text property

The text content to send to the model.

__init__(delta, text, role, is_final, current_transcript=None)

Initialize transcript stream event.

Source code in strands/experimental/bidi/types/events.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def __init__(
    self,
    delta: ContentBlockDelta,
    text: str,
    role: Role,
    is_final: bool,
    current_transcript: str | None = None,
):
    """Initialize transcript stream event."""
    super().__init__(
        {
            "type": "bidi_transcript_stream",
            "delta": delta,
            "text": text,
            "role": role,
            "is_final": is_final,
            "current_transcript": current_transcript,
        }
    )

Message

Bases: TypedDict

A message in a conversation with the agent.

Attributes:

Name Type Description
content List[ContentBlock]

The message content.

role Role

The role of the message sender.

Source code in strands/types/content.py
178
179
180
181
182
183
184
185
186
187
class Message(TypedDict):
    """A message in a conversation with the agent.

    Attributes:
        content: The message content.
        role: The role of the message sender.
    """

    content: List[ContentBlock]
    role: Role

ToolInterruptEvent

Bases: TypedEvent

Event emitted when a tool is interrupted.

Source code in strands/types/_events.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
class ToolInterruptEvent(TypedEvent):
    """Event emitted when a tool is interrupted."""

    def __init__(self, tool_use: ToolUse, interrupts: list[Interrupt]) -> None:
        """Set interrupt in the event payload."""
        super().__init__({"tool_interrupt_event": {"tool_use": tool_use, "interrupts": interrupts}})

    @property
    def tool_use_id(self) -> str:
        """The id of the tool interrupted."""
        return cast(ToolUse, cast(dict, self.get("tool_interrupt_event")).get("tool_use"))["toolUseId"]

    @property
    def interrupts(self) -> list[Interrupt]:
        """The interrupt instances."""
        return cast(list[Interrupt], self["tool_interrupt_event"]["interrupts"])

interrupts property

The interrupt instances.

tool_use_id property

The id of the tool interrupted.

__init__(tool_use, interrupts)

Set interrupt in the event payload.

Source code in strands/types/_events.py
346
347
348
def __init__(self, tool_use: ToolUse, interrupts: list[Interrupt]) -> None:
    """Set interrupt in the event payload."""
    super().__init__({"tool_interrupt_event": {"tool_use": tool_use, "interrupts": interrupts}})

ToolResult

Bases: TypedDict

Result of a tool execution.

Attributes:

Name Type Description
content list[ToolResultContent]

List of result content returned by the tool.

status ToolResultStatus

The status of the tool execution ("success" or "error").

toolUseId str

The unique identifier of the tool use request that produced this result.

Source code in strands/types/tools.py
87
88
89
90
91
92
93
94
95
96
97
98
class ToolResult(TypedDict):
    """Result of a tool execution.

    Attributes:
        content: List of result content returned by the tool.
        status: The status of the tool execution ("success" or "error").
        toolUseId: The unique identifier of the tool use request that produced this result.
    """

    content: list[ToolResultContent]
    status: ToolResultStatus
    toolUseId: str

ToolResultEvent

Bases: TypedEvent

Event emitted when a tool execution completes.

Source code in strands/types/_events.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
class ToolResultEvent(TypedEvent):
    """Event emitted when a tool execution completes."""

    def __init__(self, tool_result: ToolResult) -> None:
        """Initialize with the completed tool result.

        Args:
            tool_result: Final result from the tool execution
        """
        super().__init__({"type": "tool_result", "tool_result": tool_result})

    @property
    def tool_use_id(self) -> str:
        """The toolUseId associated with this result."""
        return cast(ToolResult, self.get("tool_result"))["toolUseId"]

    @property
    def tool_result(self) -> ToolResult:
        """Final result from the completed tool execution."""
        return cast(ToolResult, self.get("tool_result"))

    @property
    @override
    def is_callback_event(self) -> bool:
        return False

tool_result property

Final result from the completed tool execution.

tool_use_id property

The toolUseId associated with this result.

__init__(tool_result)

Initialize with the completed tool result.

Parameters:

Name Type Description Default
tool_result ToolResult

Final result from the tool execution

required
Source code in strands/types/_events.py
278
279
280
281
282
283
284
def __init__(self, tool_result: ToolResult) -> None:
    """Initialize with the completed tool result.

    Args:
        tool_result: Final result from the tool execution
    """
    super().__init__({"type": "tool_result", "tool_result": tool_result})

ToolResultMessageEvent

Bases: TypedEvent

Event emitted when tool results are formatted as a message.

This event is fired when tool execution results are converted into a message format to be added to the conversation history. It provides access to the formatted message containing tool results.

Source code in strands/types/_events.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
class ToolResultMessageEvent(TypedEvent):
    """Event emitted when tool results are formatted as a message.

    This event is fired when tool execution results are converted into a
    message format to be added to the conversation history. It provides
    access to the formatted message containing tool results.
    """

    def __init__(self, message: Any) -> None:
        """Initialize with the model-generated message.

        Args:
            message: Message containing tool results for conversation history
        """
        super().__init__({"message": message})

__init__(message)

Initialize with the model-generated message.

Parameters:

Name Type Description Default
message Any

Message containing tool results for conversation history

required
Source code in strands/types/_events.py
385
386
387
388
389
390
391
def __init__(self, message: Any) -> None:
    """Initialize with the model-generated message.

    Args:
        message: Message containing tool results for conversation history
    """
    super().__init__({"message": message})

ToolUse

Bases: TypedDict

A request from the model to use a specific tool with the provided input.

Attributes:

Name Type Description
input Any

The input parameters for the tool. Can be any JSON-serializable type.

name str

The name of the tool to invoke.

toolUseId str

A unique identifier for this specific tool use request.

Source code in strands/types/tools.py
52
53
54
55
56
57
58
59
60
61
62
63
64
class ToolUse(TypedDict):
    """A request from the model to use a specific tool with the provided input.

    Attributes:
        input: The input parameters for the tool.
            Can be any JSON-serializable type.
        name: The name of the tool to invoke.
        toolUseId: A unique identifier for this specific tool use request.
    """

    input: Any
    name: str
    toolUseId: str

ToolUseStreamEvent

Bases: ModelStreamEvent

Event emitted during tool use input streaming.

Source code in strands/types/_events.py
143
144
145
146
147
148
class ToolUseStreamEvent(ModelStreamEvent):
    """Event emitted during tool use input streaming."""

    def __init__(self, delta: ContentBlockDelta, current_tool_use: dict[str, Any]) -> None:
        """Initialize with delta and current tool use state."""
        super().__init__({"type": "tool_use_stream", "delta": delta, "current_tool_use": current_tool_use})

__init__(delta, current_tool_use)

Initialize with delta and current tool use state.

Source code in strands/types/_events.py
146
147
148
def __init__(self, delta: ContentBlockDelta, current_tool_use: dict[str, Any]) -> None:
    """Initialize with delta and current tool use state."""
    super().__init__({"type": "tool_use_stream", "delta": delta, "current_tool_use": current_tool_use})

_BidiAgentLoop

Agent loop.

Attributes:

Name Type Description
_agent

BidiAgent instance to loop.

_started

Flag if agent loop has started.

_task_pool

Track active async tasks created in loop.

_event_queue Queue

Queue model and tool call events for receiver.

_invocation_state dict[str, Any]

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

_send_gate

Gate the sending of events to the model. Blocks when agent is reseting the model connection after timeout.

Source code in strands/experimental/bidi/agent/loop.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
class _BidiAgentLoop:
    """Agent loop.

    Attributes:
        _agent: BidiAgent instance to loop.
        _started: Flag if agent loop has started.
        _task_pool: Track active async tasks created in loop.
        _event_queue: Queue model and tool call events for receiver.
        _invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.
        _send_gate: Gate the sending of events to the model.
            Blocks when agent is reseting the model connection after timeout.
    """

    def __init__(self, agent: "BidiAgent") -> None:
        """Initialize members of the agent loop.

        Note, before receiving events from the loop, the user must call `start`.

        Args:
            agent: Bidirectional agent to loop over.
        """
        self._agent = agent
        self._started = False
        self._task_pool = _TaskPool()
        self._event_queue: asyncio.Queue
        self._invocation_state: dict[str, Any]

        self._send_gate = asyncio.Event()

    async def start(self, invocation_state: dict[str, Any] | None = None) -> None:
        """Start the agent loop.

        The agent model is started as part of this call.

        Args:
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Raises:
            RuntimeError: If loop already started.
        """
        if self._started:
            raise RuntimeError("loop already started | call stop before starting again")

        logger.debug("agent loop starting")
        await self._agent.hooks.invoke_callbacks_async(BidiBeforeInvocationEvent(agent=self._agent))

        await self._agent.model.start(
            system_prompt=self._agent.system_prompt,
            tools=self._agent.tool_registry.get_all_tool_specs(),
            messages=self._agent.messages,
        )

        self._event_queue = asyncio.Queue(maxsize=1)

        self._task_pool = _TaskPool()
        self._task_pool.create(self._run_model())

        self._invocation_state = invocation_state or {}
        self._send_gate.set()
        self._started = True

    async def stop(self) -> None:
        """Stop the agent loop."""
        logger.debug("agent loop stopping")

        self._started = False
        self._send_gate.clear()
        self._invocation_state = {}

        async def stop_tasks() -> None:
            await self._task_pool.cancel()

        async def stop_model() -> None:
            await self._agent.model.stop()

        try:
            await stop_all(stop_tasks, stop_model)
        finally:
            await self._agent.hooks.invoke_callbacks_async(BidiAfterInvocationEvent(agent=self._agent))

    async def send(self, event: BidiInputEvent | ToolResultEvent) -> None:
        """Send model event.

        Additionally, add text input to messages array.

        Args:
            event: User input event or tool result.

        Raises:
            RuntimeError: If start has not been called.
        """
        if not self._started:
            raise RuntimeError("loop not started | call start before sending")

        if not self._send_gate.is_set():
            logger.debug("waiting for model send signal")
            await self._send_gate.wait()

        if isinstance(event, BidiTextInputEvent):
            message: Message = {"role": "user", "content": [{"text": event.text}]}
            await self._agent._append_messages(message)

        await self._agent.model.send(event)

    async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
        """Receive model and tool call events.

        Returns:
            Model and tool call events.

        Raises:
            RuntimeError: If start has not been called.
        """
        if not self._started:
            raise RuntimeError("loop not started | call start before receiving")

        while True:
            event = await self._event_queue.get()
            if isinstance(event, BidiModelTimeoutError):
                logger.debug("model timeout error received")
                yield BidiConnectionRestartEvent(event)
                await self._restart_connection(event)
                continue

            if isinstance(event, Exception):
                raise event

            # Check for graceful shutdown event
            if isinstance(event, BidiConnectionCloseEvent) and event.reason == "user_request":
                yield event
                break

            yield event

    async def _restart_connection(self, timeout_error: BidiModelTimeoutError) -> None:
        """Restart the model connection after timeout.

        Args:
            timeout_error: Timeout error reported by the model.
        """
        logger.debug("reseting model connection")

        self._send_gate.clear()

        await self._agent.hooks.invoke_callbacks_async(BidiBeforeConnectionRestartEvent(self._agent, timeout_error))

        restart_exception = None
        try:
            await self._agent.model.stop()
            await self._agent.model.start(
                self._agent.system_prompt,
                self._agent.tool_registry.get_all_tool_specs(),
                self._agent.messages,
                **timeout_error.restart_config,
            )
            self._task_pool.create(self._run_model())
        except Exception as exception:
            restart_exception = exception
        finally:
            await self._agent.hooks.invoke_callbacks_async(
                BidiAfterConnectionRestartEvent(self._agent, restart_exception)
            )

        self._send_gate.set()

    async def _run_model(self) -> None:
        """Task for running the model.

        Events are streamed through the event queue.
        """
        logger.debug("model task starting")

        try:
            async for event in self._agent.model.receive():
                await self._event_queue.put(event)

                if isinstance(event, BidiTranscriptStreamEvent):
                    if event["is_final"]:
                        message: Message = {"role": event["role"], "content": [{"text": event["text"]}]}
                        await self._agent._append_messages(message)

                elif isinstance(event, ToolUseStreamEvent):
                    tool_use = event["current_tool_use"]
                    self._task_pool.create(self._run_tool(tool_use))

                elif isinstance(event, BidiInterruptionEvent):
                    await self._agent.hooks.invoke_callbacks_async(
                        BidiInterruptionHookEvent(
                            agent=self._agent,
                            reason=event["reason"],
                            interrupted_response_id=event.get("interrupted_response_id"),
                        )
                    )

        except Exception as error:
            await self._event_queue.put(error)

    async def _run_tool(self, tool_use: ToolUse) -> None:
        """Task for running tool requested by the model using the tool executor.

        Args:
            tool_use: Tool use request from model.
        """
        logger.debug("tool_name=<%s> | tool execution starting", tool_use["name"])

        tool_results: list[ToolResult] = []

        invocation_state: dict[str, Any] = {
            **self._invocation_state,
            "agent": self._agent,
            "model": self._agent.model,
            "messages": self._agent.messages,
            "system_prompt": self._agent.system_prompt,
        }

        try:
            tool_events = self._agent.tool_executor._stream(
                self._agent,
                tool_use,
                tool_results,
                invocation_state,
                structured_output_context=None,
            )

            async for tool_event in tool_events:
                if isinstance(tool_event, ToolInterruptEvent):
                    self._agent._interrupt_state.deactivate()
                    interrupt_names = [interrupt.name for interrupt in tool_event.interrupts]
                    raise RuntimeError(f"interrupts={interrupt_names} | tool interrupts are not supported in bidi")

                await self._event_queue.put(tool_event)

            # Normal flow for all tools (including stop_conversation)
            tool_result_event = cast(ToolResultEvent, tool_event)

            tool_use_message: Message = {"role": "assistant", "content": [{"toolUse": tool_use}]}
            tool_result_message: Message = {"role": "user", "content": [{"toolResult": tool_result_event.tool_result}]}
            await self._agent._append_messages(tool_use_message, tool_result_message)

            await self._event_queue.put(ToolResultMessageEvent(tool_result_message))

            # Check for stop_conversation before sending to model
            if tool_use["name"] == "stop_conversation":
                logger.info("tool_name=<%s> | conversation stop requested, skipping model send", tool_use["name"])
                connection_id = getattr(self._agent.model, "_connection_id", "unknown")
                await self._event_queue.put(
                    BidiConnectionCloseEvent(connection_id=connection_id, reason="user_request")
                )
                return  # Skip the model send

            # Send result to model (all tools except stop_conversation)
            await self.send(tool_result_event)

        except Exception as error:
            await self._event_queue.put(error)

__init__(agent)

Initialize members of the agent loop.

Note, before receiving events from the loop, the user must call start.

Parameters:

Name Type Description Default
agent BidiAgent

Bidirectional agent to loop over.

required
Source code in strands/experimental/bidi/agent/loop.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(self, agent: "BidiAgent") -> None:
    """Initialize members of the agent loop.

    Note, before receiving events from the loop, the user must call `start`.

    Args:
        agent: Bidirectional agent to loop over.
    """
    self._agent = agent
    self._started = False
    self._task_pool = _TaskPool()
    self._event_queue: asyncio.Queue
    self._invocation_state: dict[str, Any]

    self._send_gate = asyncio.Event()

receive() async

Receive model and tool call events.

Returns:

Type Description
AsyncGenerator[BidiOutputEvent, None]

Model and tool call events.

Raises:

Type Description
RuntimeError

If start has not been called.

Source code in strands/experimental/bidi/agent/loop.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
    """Receive model and tool call events.

    Returns:
        Model and tool call events.

    Raises:
        RuntimeError: If start has not been called.
    """
    if not self._started:
        raise RuntimeError("loop not started | call start before receiving")

    while True:
        event = await self._event_queue.get()
        if isinstance(event, BidiModelTimeoutError):
            logger.debug("model timeout error received")
            yield BidiConnectionRestartEvent(event)
            await self._restart_connection(event)
            continue

        if isinstance(event, Exception):
            raise event

        # Check for graceful shutdown event
        if isinstance(event, BidiConnectionCloseEvent) and event.reason == "user_request":
            yield event
            break

        yield event

send(event) async

Send model event.

Additionally, add text input to messages array.

Parameters:

Name Type Description Default
event BidiInputEvent | ToolResultEvent

User input event or tool result.

required

Raises:

Type Description
RuntimeError

If start has not been called.

Source code in strands/experimental/bidi/agent/loop.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
async def send(self, event: BidiInputEvent | ToolResultEvent) -> None:
    """Send model event.

    Additionally, add text input to messages array.

    Args:
        event: User input event or tool result.

    Raises:
        RuntimeError: If start has not been called.
    """
    if not self._started:
        raise RuntimeError("loop not started | call start before sending")

    if not self._send_gate.is_set():
        logger.debug("waiting for model send signal")
        await self._send_gate.wait()

    if isinstance(event, BidiTextInputEvent):
        message: Message = {"role": "user", "content": [{"text": event.text}]}
        await self._agent._append_messages(message)

    await self._agent.model.send(event)

start(invocation_state=None) async

Start the agent loop.

The agent model is started as part of this call.

Parameters:

Name Type Description Default
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None

Raises:

Type Description
RuntimeError

If loop already started.

Source code in strands/experimental/bidi/agent/loop.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
async def start(self, invocation_state: dict[str, Any] | None = None) -> None:
    """Start the agent loop.

    The agent model is started as part of this call.

    Args:
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Raises:
        RuntimeError: If loop already started.
    """
    if self._started:
        raise RuntimeError("loop already started | call stop before starting again")

    logger.debug("agent loop starting")
    await self._agent.hooks.invoke_callbacks_async(BidiBeforeInvocationEvent(agent=self._agent))

    await self._agent.model.start(
        system_prompt=self._agent.system_prompt,
        tools=self._agent.tool_registry.get_all_tool_specs(),
        messages=self._agent.messages,
    )

    self._event_queue = asyncio.Queue(maxsize=1)

    self._task_pool = _TaskPool()
    self._task_pool.create(self._run_model())

    self._invocation_state = invocation_state or {}
    self._send_gate.set()
    self._started = True

stop() async

Stop the agent loop.

Source code in strands/experimental/bidi/agent/loop.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
async def stop(self) -> None:
    """Stop the agent loop."""
    logger.debug("agent loop stopping")

    self._started = False
    self._send_gate.clear()
    self._invocation_state = {}

    async def stop_tasks() -> None:
        await self._task_pool.cancel()

    async def stop_model() -> None:
        await self._agent.model.stop()

    try:
        await stop_all(stop_tasks, stop_model)
    finally:
        await self._agent.hooks.invoke_callbacks_async(BidiAfterInvocationEvent(agent=self._agent))

_TaskPool

Manage pool of active async tasks.

Source code in strands/experimental/bidi/_async/_task_pool.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class _TaskPool:
    """Manage pool of active async tasks."""

    def __init__(self) -> None:
        """Setup task container."""
        self._tasks: set[asyncio.Task] = set()

    def __len__(self) -> int:
        """Number of active tasks."""
        return len(self._tasks)

    def create(self, coro: Coroutine[Any, Any, Any]) -> asyncio.Task:
        """Create async task.

        Adds a clean up callback to run after task completes.

        Returns:
            The created task.
        """
        task = asyncio.create_task(coro)
        task.add_done_callback(lambda task: self._tasks.remove(task))

        self._tasks.add(task)
        return task

    async def cancel(self) -> None:
        """Cancel all active tasks in pool."""
        for task in self._tasks:
            task.cancel()

        try:
            await asyncio.gather(*self._tasks)
        except asyncio.CancelledError:
            pass

__init__()

Setup task container.

Source code in strands/experimental/bidi/_async/_task_pool.py
13
14
15
def __init__(self) -> None:
    """Setup task container."""
    self._tasks: set[asyncio.Task] = set()

__len__()

Number of active tasks.

Source code in strands/experimental/bidi/_async/_task_pool.py
17
18
19
def __len__(self) -> int:
    """Number of active tasks."""
    return len(self._tasks)

cancel() async

Cancel all active tasks in pool.

Source code in strands/experimental/bidi/_async/_task_pool.py
35
36
37
38
39
40
41
42
43
async def cancel(self) -> None:
    """Cancel all active tasks in pool."""
    for task in self._tasks:
        task.cancel()

    try:
        await asyncio.gather(*self._tasks)
    except asyncio.CancelledError:
        pass

create(coro)

Create async task.

Adds a clean up callback to run after task completes.

Returns:

Type Description
Task

The created task.

Source code in strands/experimental/bidi/_async/_task_pool.py
21
22
23
24
25
26
27
28
29
30
31
32
33
def create(self, coro: Coroutine[Any, Any, Any]) -> asyncio.Task:
    """Create async task.

    Adds a clean up callback to run after task completes.

    Returns:
        The created task.
    """
    task = asyncio.create_task(coro)
    task.add_done_callback(lambda task: self._tasks.remove(task))

    self._tasks.add(task)
    return task

stop_all(*funcs) async

Call all stops in sequence and aggregate errors.

A failure in one stop call will not block subsequent stop calls.

Parameters:

Name Type Description Default
funcs Callable[..., Awaitable[None]]

Stop functions to call in sequence.

()

Raises:

Type Description
RuntimeError

If any stop function raises an exception.

Source code in strands/experimental/bidi/_async/__init__.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def stop_all(*funcs: Callable[..., Awaitable[None]]) -> None:
    """Call all stops in sequence and aggregate errors.

    A failure in one stop call will not block subsequent stop calls.

    Args:
        funcs: Stop functions to call in sequence.

    Raises:
        RuntimeError: If any stop function raises an exception.
    """
    exceptions = []
    for func in funcs:
        try:
            await func()
        except Exception as exception:
            exceptions.append({"func_name": func.__name__, "exception": repr(exception)})

    if exceptions:
        raise RuntimeError(f"exceptions={exceptions} | failed stop sequence")