Skip to content

strands.experimental.bidi.io.audio

Send and receive audio data from devices.

Reads user audio from input device and sends agent audio to output device using PyAudio. If a user interrupts the agent, the output buffer is cleared to stop playback.

Audio configuration is provided by the model via agent.model.config["audio"].

BidiOutputEvent = BidiConnectionStartEvent | BidiConnectionRestartEvent | BidiResponseStartEvent | BidiAudioStreamEvent | BidiTranscriptStreamEvent | BidiInterruptionEvent | BidiResponseCompleteEvent | BidiUsageEvent | BidiConnectionCloseEvent | BidiErrorEvent | ToolUseStreamEvent module-attribute

Union of different bidi output event types.

logger = logging.getLogger(__name__) module-attribute

BidiAgent

Agent for bidirectional streaming conversations.

Enables real-time audio and text interaction with AI models through persistent connections. Supports concurrent tool execution and interruption handling.

Source code in strands/experimental/bidi/agent/agent.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
class BidiAgent:
    """Agent for bidirectional streaming conversations.

    Enables real-time audio and text interaction with AI models through persistent
    connections. Supports concurrent tool execution and interruption handling.
    """

    def __init__(
        self,
        model: BidiModel | str | None = None,
        tools: list[str | AgentTool | ToolProvider] | None = None,
        system_prompt: str | None = None,
        messages: Messages | None = None,
        record_direct_tool_call: bool = True,
        load_tools_from_directory: bool = False,
        agent_id: str | None = None,
        name: str | None = None,
        description: str | None = None,
        hooks: list[HookProvider] | None = None,
        state: AgentState | dict | None = None,
        session_manager: "SessionManager | None" = None,
        tool_executor: ToolExecutor | None = None,
        **kwargs: Any,
    ):
        """Initialize bidirectional agent.

        Args:
            model: BidiModel instance, string model_id, or None for default detection.
            tools: Optional list of tools with flexible format support.
            system_prompt: Optional system prompt for conversations.
            messages: Optional conversation history to initialize with.
            record_direct_tool_call: Whether to record direct tool calls in message history.
            load_tools_from_directory: Whether to load and automatically reload tools in the `./tools/` directory.
            agent_id: Optional ID for the agent, useful for connection management and multi-agent scenarios.
            name: Name of the Agent.
            description: Description of what the Agent does.
            hooks: Optional list of hook providers to register for lifecycle events.
            state: Stateful information for the agent. Can be either an AgentState object, or a json serializable dict.
            session_manager: Manager for handling agent sessions including conversation history and state.
                If provided, enables session-based persistence and state management.
            tool_executor: Definition of tool execution strategy (e.g., sequential, concurrent, etc.).
            **kwargs: Additional configuration for future extensibility.

        Raises:
            ValueError: If model configuration is invalid or state is invalid type.
            TypeError: If model type is unsupported.
        """
        self.model = (
            BidiNovaSonicModel()
            if not model
            else BidiNovaSonicModel(model_id=model)
            if isinstance(model, str)
            else model
        )
        self.system_prompt = system_prompt
        self.messages = messages or []

        # Agent identification
        self.agent_id = _identifier.validate(agent_id or _DEFAULT_AGENT_ID, _identifier.Identifier.AGENT)
        self.name = name or _DEFAULT_AGENT_NAME
        self.description = description

        # Tool execution configuration
        self.record_direct_tool_call = record_direct_tool_call
        self.load_tools_from_directory = load_tools_from_directory

        # Initialize tool registry
        self.tool_registry = ToolRegistry()

        if tools is not None:
            self.tool_registry.process_tools(tools)

        self.tool_registry.initialize_tools(self.load_tools_from_directory)

        # Initialize tool watcher if directory loading is enabled
        if self.load_tools_from_directory:
            self.tool_watcher = ToolWatcher(tool_registry=self.tool_registry)

        # Initialize agent state management
        if state is not None:
            if isinstance(state, dict):
                self.state = AgentState(state)
            elif isinstance(state, AgentState):
                self.state = state
            else:
                raise ValueError("state must be an AgentState object or a dict")
        else:
            self.state = AgentState()

        # Initialize other components
        self._tool_caller = _ToolCaller(self)

        # Initialize tool executor
        self.tool_executor = tool_executor or ConcurrentToolExecutor()

        # Initialize hooks registry
        self.hooks = HookRegistry()
        if hooks:
            for hook in hooks:
                self.hooks.add_hook(hook)

        # Initialize session management functionality
        self._session_manager = session_manager
        if self._session_manager:
            self.hooks.add_hook(self._session_manager)

        self._loop = _BidiAgentLoop(self)

        # Emit initialization event
        self.hooks.invoke_callbacks(BidiAgentInitializedEvent(agent=self))

        # TODO: Determine if full support is required
        self._interrupt_state = _InterruptState()

        # Lock to ensure that paired messages are added to history in sequence without interference
        self._message_lock = asyncio.Lock()

        self._started = False

    @property
    def tool(self) -> _ToolCaller:
        """Call tool as a function.

        Returns:
            ToolCaller for method-style tool execution.

        Example:
            ```
            agent = BidiAgent(model=model, tools=[calculator])
            agent.tool.calculator(expression="2+2")
            ```
        """
        return self._tool_caller

    @property
    def tool_names(self) -> list[str]:
        """Get a list of all registered tool names.

        Returns:
            Names of all tools available to this agent.
        """
        all_tools = self.tool_registry.get_all_tools_config()
        return list(all_tools.keys())

    async def start(self, invocation_state: dict[str, Any] | None = None) -> None:
        """Start a persistent bidirectional conversation connection.

        Initializes the streaming connection and starts background tasks for processing
        model events, tool execution, and connection management.

        Args:
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Raises:
            RuntimeError:
                If agent already started.

        Example:
            ```python
            await agent.start(invocation_state={
                "user_id": "user_123",
                "session_id": "session_456",
                "database": db_connection,
            })
            ```
        """
        if self._started:
            raise RuntimeError("agent already started | call stop before starting again")

        logger.debug("agent starting")
        await self._loop.start(invocation_state)
        self._started = True

    async def send(self, input_data: BidiAgentInput | dict[str, Any]) -> None:
        """Send input to the model (text, audio, image, or event dict).

        Unified method for sending text, audio, and image input to the model during
        an active conversation session. Accepts TypedEvent instances or plain dicts
        (e.g., from WebSocket clients) which are automatically reconstructed.

        Args:
            input_data: Can be:

                - str: Text message from user
                - BidiInputEvent: TypedEvent
                - dict: Event dictionary (will be reconstructed to TypedEvent)

        Raises:
            RuntimeError: If start has not been called.
            ValueError: If invalid input type.

        Example:
            await agent.send("Hello")
            await agent.send(BidiAudioInputEvent(audio="base64...", format="pcm", ...))
            await agent.send({"type": "bidirectional_text_input", "text": "Hello", "role": "user"})
        """
        if not self._started:
            raise RuntimeError("agent not started | call start before sending")

        input_event: BidiInputEvent

        if isinstance(input_data, str):
            input_event = BidiTextInputEvent(text=input_data)

        elif isinstance(input_data, BidiInputEvent):
            input_event = input_data

        elif isinstance(input_data, dict) and "type" in input_data:
            input_type = input_data["type"]
            input_data = {key: value for key, value in input_data.items() if key != "type"}
            if input_type == "bidi_text_input":
                input_event = BidiTextInputEvent(**input_data)
            elif input_type == "bidi_audio_input":
                input_event = BidiAudioInputEvent(**input_data)
            elif input_type == "bidi_image_input":
                input_event = BidiImageInputEvent(**input_data)
            else:
                raise ValueError(f"input_type=<{input_type}> | input type not supported")

        else:
            raise ValueError("invalid input | must be str, BidiInputEvent, or event dict")

        await self._loop.send(input_event)

    async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
        """Receive events from the model including audio, text, and tool calls.

        Yields:
            Model output events processed by background tasks including audio output,
            text responses, tool calls, and connection updates.

        Raises:
            RuntimeError: If start has not been called.
        """
        if not self._started:
            raise RuntimeError("agent not started | call start before receiving")

        async for event in self._loop.receive():
            yield event

    async def stop(self) -> None:
        """End the conversation connection and cleanup all resources.

        Terminates the streaming connection, cancels background tasks, and
        closes the connection to the model provider.
        """
        self._started = False
        await self._loop.stop()

    async def __aenter__(self, invocation_state: dict[str, Any] | None = None) -> "BidiAgent":
        """Async context manager entry point.

        Automatically starts the bidirectional connection when entering the context.

        Args:
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Returns:
            Self for use in the context.
        """
        logger.debug("context_manager=<enter> | starting agent")
        await self.start(invocation_state)
        return self

    async def __aexit__(self, *_: Any) -> None:
        """Async context manager exit point.

        Automatically ends the connection and cleans up resources including
        when exiting the context, regardless of whether an exception occurred.
        """
        logger.debug("context_manager=<exit> | stopping agent")
        await self.stop()

    async def run(
        self, inputs: list[BidiInput], outputs: list[BidiOutput], invocation_state: dict[str, Any] | None = None
    ) -> None:
        """Run the agent using provided IO channels for bidirectional communication.

        Args:
            inputs: Input callables to read data from a source
            outputs: Output callables to receive events from the agent
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Example:
            ```python
            # Using model defaults:
            model = BidiNovaSonicModel()
            audio_io = BidiAudioIO()
            text_io = BidiTextIO()
            agent = BidiAgent(model=model, tools=[calculator])
            await agent.run(
                inputs=[audio_io.input()],
                outputs=[audio_io.output(), text_io.output()],
                invocation_state={"user_id": "user_123"}
            )

            # Using custom audio config:
            model = BidiNovaSonicModel(
                provider_config={"audio": {"input_rate": 48000, "output_rate": 24000}}
            )
            audio_io = BidiAudioIO()
            agent = BidiAgent(model=model, tools=[calculator])
            await agent.run(
                inputs=[audio_io.input()],
                outputs=[audio_io.output()],
            )
            ```
        """

        async def run_inputs() -> None:
            async def task(input_: BidiInput) -> None:
                while True:
                    event = await input_()
                    await self.send(event)

            await asyncio.gather(*[task(input_) for input_ in inputs])

        async def run_outputs(inputs_task: asyncio.Task) -> None:
            async for event in self.receive():
                await asyncio.gather(*[output(event) for output in outputs])

            inputs_task.cancel()

        try:
            await self.start(invocation_state)

            input_starts = [input_.start for input_ in inputs if isinstance(input_, BidiInput)]
            output_starts = [output.start for output in outputs if isinstance(output, BidiOutput)]
            for start in [*input_starts, *output_starts]:
                await start(self)

            async with _TaskGroup() as task_group:
                inputs_task = task_group.create_task(run_inputs())
                task_group.create_task(run_outputs(inputs_task))

        finally:
            input_stops = [input_.stop for input_ in inputs if isinstance(input_, BidiInput)]
            output_stops = [output.stop for output in outputs if isinstance(output, BidiOutput)]

            await stop_all(*input_stops, *output_stops, self.stop)

    async def _append_messages(self, *messages: Message) -> None:
        """Append messages to history in sequence without interference.

        The message lock ensures that paired messages are added to history in sequence without interference. For
        example, tool use and tool result messages must be added adjacent to each other.

        Args:
            *messages: List of messages to add into history.
        """
        async with self._message_lock:
            for message in messages:
                self.messages.append(message)
                await self.hooks.invoke_callbacks_async(BidiMessageAddedEvent(agent=self, message=message))

tool property

Call tool as a function.

Returns:

Type Description
_ToolCaller

ToolCaller for method-style tool execution.

Example
agent = BidiAgent(model=model, tools=[calculator])
agent.tool.calculator(expression="2+2")

tool_names property

Get a list of all registered tool names.

Returns:

Type Description
list[str]

Names of all tools available to this agent.

__aenter__(invocation_state=None) async

Async context manager entry point.

Automatically starts the bidirectional connection when entering the context.

Parameters:

Name Type Description Default
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None

Returns:

Type Description
BidiAgent

Self for use in the context.

Source code in strands/experimental/bidi/agent/agent.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
async def __aenter__(self, invocation_state: dict[str, Any] | None = None) -> "BidiAgent":
    """Async context manager entry point.

    Automatically starts the bidirectional connection when entering the context.

    Args:
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Returns:
        Self for use in the context.
    """
    logger.debug("context_manager=<enter> | starting agent")
    await self.start(invocation_state)
    return self

__aexit__(*_) async

Async context manager exit point.

Automatically ends the connection and cleans up resources including when exiting the context, regardless of whether an exception occurred.

Source code in strands/experimental/bidi/agent/agent.py
324
325
326
327
328
329
330
331
async def __aexit__(self, *_: Any) -> None:
    """Async context manager exit point.

    Automatically ends the connection and cleans up resources including
    when exiting the context, regardless of whether an exception occurred.
    """
    logger.debug("context_manager=<exit> | stopping agent")
    await self.stop()

__init__(model=None, tools=None, system_prompt=None, messages=None, record_direct_tool_call=True, load_tools_from_directory=False, agent_id=None, name=None, description=None, hooks=None, state=None, session_manager=None, tool_executor=None, **kwargs)

Initialize bidirectional agent.

Parameters:

Name Type Description Default
model BidiModel | str | None

BidiModel instance, string model_id, or None for default detection.

None
tools list[str | AgentTool | ToolProvider] | None

Optional list of tools with flexible format support.

None
system_prompt str | None

Optional system prompt for conversations.

None
messages Messages | None

Optional conversation history to initialize with.

None
record_direct_tool_call bool

Whether to record direct tool calls in message history.

True
load_tools_from_directory bool

Whether to load and automatically reload tools in the ./tools/ directory.

False
agent_id str | None

Optional ID for the agent, useful for connection management and multi-agent scenarios.

None
name str | None

Name of the Agent.

None
description str | None

Description of what the Agent does.

None
hooks list[HookProvider] | None

Optional list of hook providers to register for lifecycle events.

None
state AgentState | dict | None

Stateful information for the agent. Can be either an AgentState object, or a json serializable dict.

None
session_manager SessionManager | None

Manager for handling agent sessions including conversation history and state. If provided, enables session-based persistence and state management.

None
tool_executor ToolExecutor | None

Definition of tool execution strategy (e.g., sequential, concurrent, etc.).

None
**kwargs Any

Additional configuration for future extensibility.

{}

Raises:

Type Description
ValueError

If model configuration is invalid or state is invalid type.

TypeError

If model type is unsupported.

Source code in strands/experimental/bidi/agent/agent.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def __init__(
    self,
    model: BidiModel | str | None = None,
    tools: list[str | AgentTool | ToolProvider] | None = None,
    system_prompt: str | None = None,
    messages: Messages | None = None,
    record_direct_tool_call: bool = True,
    load_tools_from_directory: bool = False,
    agent_id: str | None = None,
    name: str | None = None,
    description: str | None = None,
    hooks: list[HookProvider] | None = None,
    state: AgentState | dict | None = None,
    session_manager: "SessionManager | None" = None,
    tool_executor: ToolExecutor | None = None,
    **kwargs: Any,
):
    """Initialize bidirectional agent.

    Args:
        model: BidiModel instance, string model_id, or None for default detection.
        tools: Optional list of tools with flexible format support.
        system_prompt: Optional system prompt for conversations.
        messages: Optional conversation history to initialize with.
        record_direct_tool_call: Whether to record direct tool calls in message history.
        load_tools_from_directory: Whether to load and automatically reload tools in the `./tools/` directory.
        agent_id: Optional ID for the agent, useful for connection management and multi-agent scenarios.
        name: Name of the Agent.
        description: Description of what the Agent does.
        hooks: Optional list of hook providers to register for lifecycle events.
        state: Stateful information for the agent. Can be either an AgentState object, or a json serializable dict.
        session_manager: Manager for handling agent sessions including conversation history and state.
            If provided, enables session-based persistence and state management.
        tool_executor: Definition of tool execution strategy (e.g., sequential, concurrent, etc.).
        **kwargs: Additional configuration for future extensibility.

    Raises:
        ValueError: If model configuration is invalid or state is invalid type.
        TypeError: If model type is unsupported.
    """
    self.model = (
        BidiNovaSonicModel()
        if not model
        else BidiNovaSonicModel(model_id=model)
        if isinstance(model, str)
        else model
    )
    self.system_prompt = system_prompt
    self.messages = messages or []

    # Agent identification
    self.agent_id = _identifier.validate(agent_id or _DEFAULT_AGENT_ID, _identifier.Identifier.AGENT)
    self.name = name or _DEFAULT_AGENT_NAME
    self.description = description

    # Tool execution configuration
    self.record_direct_tool_call = record_direct_tool_call
    self.load_tools_from_directory = load_tools_from_directory

    # Initialize tool registry
    self.tool_registry = ToolRegistry()

    if tools is not None:
        self.tool_registry.process_tools(tools)

    self.tool_registry.initialize_tools(self.load_tools_from_directory)

    # Initialize tool watcher if directory loading is enabled
    if self.load_tools_from_directory:
        self.tool_watcher = ToolWatcher(tool_registry=self.tool_registry)

    # Initialize agent state management
    if state is not None:
        if isinstance(state, dict):
            self.state = AgentState(state)
        elif isinstance(state, AgentState):
            self.state = state
        else:
            raise ValueError("state must be an AgentState object or a dict")
    else:
        self.state = AgentState()

    # Initialize other components
    self._tool_caller = _ToolCaller(self)

    # Initialize tool executor
    self.tool_executor = tool_executor or ConcurrentToolExecutor()

    # Initialize hooks registry
    self.hooks = HookRegistry()
    if hooks:
        for hook in hooks:
            self.hooks.add_hook(hook)

    # Initialize session management functionality
    self._session_manager = session_manager
    if self._session_manager:
        self.hooks.add_hook(self._session_manager)

    self._loop = _BidiAgentLoop(self)

    # Emit initialization event
    self.hooks.invoke_callbacks(BidiAgentInitializedEvent(agent=self))

    # TODO: Determine if full support is required
    self._interrupt_state = _InterruptState()

    # Lock to ensure that paired messages are added to history in sequence without interference
    self._message_lock = asyncio.Lock()

    self._started = False

receive() async

Receive events from the model including audio, text, and tool calls.

Yields:

Type Description
AsyncGenerator[BidiOutputEvent, None]

Model output events processed by background tasks including audio output,

AsyncGenerator[BidiOutputEvent, None]

text responses, tool calls, and connection updates.

Raises:

Type Description
RuntimeError

If start has not been called.

Source code in strands/experimental/bidi/agent/agent.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
    """Receive events from the model including audio, text, and tool calls.

    Yields:
        Model output events processed by background tasks including audio output,
        text responses, tool calls, and connection updates.

    Raises:
        RuntimeError: If start has not been called.
    """
    if not self._started:
        raise RuntimeError("agent not started | call start before receiving")

    async for event in self._loop.receive():
        yield event

run(inputs, outputs, invocation_state=None) async

Run the agent using provided IO channels for bidirectional communication.

Parameters:

Name Type Description Default
inputs list[BidiInput]

Input callables to read data from a source

required
outputs list[BidiOutput]

Output callables to receive events from the agent

required
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None
Example
# Using model defaults:
model = BidiNovaSonicModel()
audio_io = BidiAudioIO()
text_io = BidiTextIO()
agent = BidiAgent(model=model, tools=[calculator])
await agent.run(
    inputs=[audio_io.input()],
    outputs=[audio_io.output(), text_io.output()],
    invocation_state={"user_id": "user_123"}
)

# Using custom audio config:
model = BidiNovaSonicModel(
    provider_config={"audio": {"input_rate": 48000, "output_rate": 24000}}
)
audio_io = BidiAudioIO()
agent = BidiAgent(model=model, tools=[calculator])
await agent.run(
    inputs=[audio_io.input()],
    outputs=[audio_io.output()],
)
Source code in strands/experimental/bidi/agent/agent.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
async def run(
    self, inputs: list[BidiInput], outputs: list[BidiOutput], invocation_state: dict[str, Any] | None = None
) -> None:
    """Run the agent using provided IO channels for bidirectional communication.

    Args:
        inputs: Input callables to read data from a source
        outputs: Output callables to receive events from the agent
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Example:
        ```python
        # Using model defaults:
        model = BidiNovaSonicModel()
        audio_io = BidiAudioIO()
        text_io = BidiTextIO()
        agent = BidiAgent(model=model, tools=[calculator])
        await agent.run(
            inputs=[audio_io.input()],
            outputs=[audio_io.output(), text_io.output()],
            invocation_state={"user_id": "user_123"}
        )

        # Using custom audio config:
        model = BidiNovaSonicModel(
            provider_config={"audio": {"input_rate": 48000, "output_rate": 24000}}
        )
        audio_io = BidiAudioIO()
        agent = BidiAgent(model=model, tools=[calculator])
        await agent.run(
            inputs=[audio_io.input()],
            outputs=[audio_io.output()],
        )
        ```
    """

    async def run_inputs() -> None:
        async def task(input_: BidiInput) -> None:
            while True:
                event = await input_()
                await self.send(event)

        await asyncio.gather(*[task(input_) for input_ in inputs])

    async def run_outputs(inputs_task: asyncio.Task) -> None:
        async for event in self.receive():
            await asyncio.gather(*[output(event) for output in outputs])

        inputs_task.cancel()

    try:
        await self.start(invocation_state)

        input_starts = [input_.start for input_ in inputs if isinstance(input_, BidiInput)]
        output_starts = [output.start for output in outputs if isinstance(output, BidiOutput)]
        for start in [*input_starts, *output_starts]:
            await start(self)

        async with _TaskGroup() as task_group:
            inputs_task = task_group.create_task(run_inputs())
            task_group.create_task(run_outputs(inputs_task))

    finally:
        input_stops = [input_.stop for input_ in inputs if isinstance(input_, BidiInput)]
        output_stops = [output.stop for output in outputs if isinstance(output, BidiOutput)]

        await stop_all(*input_stops, *output_stops, self.stop)

send(input_data) async

Send input to the model (text, audio, image, or event dict).

Unified method for sending text, audio, and image input to the model during an active conversation session. Accepts TypedEvent instances or plain dicts (e.g., from WebSocket clients) which are automatically reconstructed.

Parameters:

Name Type Description Default
input_data BidiAgentInput | dict[str, Any]

Can be:

  • str: Text message from user
  • BidiInputEvent: TypedEvent
  • dict: Event dictionary (will be reconstructed to TypedEvent)
required

Raises:

Type Description
RuntimeError

If start has not been called.

ValueError

If invalid input type.

Example

await agent.send("Hello") await agent.send(BidiAudioInputEvent(audio="base64...", format="pcm", ...)) await agent.send({"type": "bidirectional_text_input", "text": "Hello", "role": "user"})

Source code in strands/experimental/bidi/agent/agent.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
async def send(self, input_data: BidiAgentInput | dict[str, Any]) -> None:
    """Send input to the model (text, audio, image, or event dict).

    Unified method for sending text, audio, and image input to the model during
    an active conversation session. Accepts TypedEvent instances or plain dicts
    (e.g., from WebSocket clients) which are automatically reconstructed.

    Args:
        input_data: Can be:

            - str: Text message from user
            - BidiInputEvent: TypedEvent
            - dict: Event dictionary (will be reconstructed to TypedEvent)

    Raises:
        RuntimeError: If start has not been called.
        ValueError: If invalid input type.

    Example:
        await agent.send("Hello")
        await agent.send(BidiAudioInputEvent(audio="base64...", format="pcm", ...))
        await agent.send({"type": "bidirectional_text_input", "text": "Hello", "role": "user"})
    """
    if not self._started:
        raise RuntimeError("agent not started | call start before sending")

    input_event: BidiInputEvent

    if isinstance(input_data, str):
        input_event = BidiTextInputEvent(text=input_data)

    elif isinstance(input_data, BidiInputEvent):
        input_event = input_data

    elif isinstance(input_data, dict) and "type" in input_data:
        input_type = input_data["type"]
        input_data = {key: value for key, value in input_data.items() if key != "type"}
        if input_type == "bidi_text_input":
            input_event = BidiTextInputEvent(**input_data)
        elif input_type == "bidi_audio_input":
            input_event = BidiAudioInputEvent(**input_data)
        elif input_type == "bidi_image_input":
            input_event = BidiImageInputEvent(**input_data)
        else:
            raise ValueError(f"input_type=<{input_type}> | input type not supported")

    else:
        raise ValueError("invalid input | must be str, BidiInputEvent, or event dict")

    await self._loop.send(input_event)

start(invocation_state=None) async

Start a persistent bidirectional conversation connection.

Initializes the streaming connection and starts background tasks for processing model events, tool execution, and connection management.

Parameters:

Name Type Description Default
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None

Raises:

Type Description
RuntimeError

If agent already started.

Example
await agent.start(invocation_state={
    "user_id": "user_123",
    "session_id": "session_456",
    "database": db_connection,
})
Source code in strands/experimental/bidi/agent/agent.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
async def start(self, invocation_state: dict[str, Any] | None = None) -> None:
    """Start a persistent bidirectional conversation connection.

    Initializes the streaming connection and starts background tasks for processing
    model events, tool execution, and connection management.

    Args:
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Raises:
        RuntimeError:
            If agent already started.

    Example:
        ```python
        await agent.start(invocation_state={
            "user_id": "user_123",
            "session_id": "session_456",
            "database": db_connection,
        })
        ```
    """
    if self._started:
        raise RuntimeError("agent already started | call stop before starting again")

    logger.debug("agent starting")
    await self._loop.start(invocation_state)
    self._started = True

stop() async

End the conversation connection and cleanup all resources.

Terminates the streaming connection, cancels background tasks, and closes the connection to the model provider.

Source code in strands/experimental/bidi/agent/agent.py
298
299
300
301
302
303
304
305
async def stop(self) -> None:
    """End the conversation connection and cleanup all resources.

    Terminates the streaming connection, cancels background tasks, and
    closes the connection to the model provider.
    """
    self._started = False
    await self._loop.stop()

BidiAudioIO

Send and receive audio data from devices.

Source code in strands/experimental/bidi/io/audio.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
class BidiAudioIO:
    """Send and receive audio data from devices."""

    def __init__(self, **config: Any) -> None:
        """Initialize audio devices.

        Args:
            **config: Optional device configuration:

                - input_buffer_size (int): Maximum input buffer size (default: None)
                - input_device_index (int): Specific input device (default: None = system default)
                - input_frames_per_buffer (int): Input buffer size (default: 512)
                - output_buffer_size (int): Maximum output buffer size (default: None)
                - output_device_index (int): Specific output device (default: None = system default)
                - output_frames_per_buffer (int): Output buffer size (default: 512)
        """
        self._config = config

    def input(self) -> _BidiAudioInput:
        """Return audio processing BidiInput."""
        return _BidiAudioInput(self._config)

    def output(self) -> _BidiAudioOutput:
        """Return audio processing BidiOutput."""
        return _BidiAudioOutput(self._config)

__init__(**config)

Initialize audio devices.

Parameters:

Name Type Description Default
**config Any

Optional device configuration:

  • input_buffer_size (int): Maximum input buffer size (default: None)
  • input_device_index (int): Specific input device (default: None = system default)
  • input_frames_per_buffer (int): Input buffer size (default: 512)
  • output_buffer_size (int): Maximum output buffer size (default: None)
  • output_device_index (int): Specific output device (default: None = system default)
  • output_frames_per_buffer (int): Output buffer size (default: 512)
{}
Source code in strands/experimental/bidi/io/audio.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def __init__(self, **config: Any) -> None:
    """Initialize audio devices.

    Args:
        **config: Optional device configuration:

            - input_buffer_size (int): Maximum input buffer size (default: None)
            - input_device_index (int): Specific input device (default: None = system default)
            - input_frames_per_buffer (int): Input buffer size (default: 512)
            - output_buffer_size (int): Maximum output buffer size (default: None)
            - output_device_index (int): Specific output device (default: None = system default)
            - output_frames_per_buffer (int): Output buffer size (default: 512)
    """
    self._config = config

input()

Return audio processing BidiInput.

Source code in strands/experimental/bidi/io/audio.py
288
289
290
def input(self) -> _BidiAudioInput:
    """Return audio processing BidiInput."""
    return _BidiAudioInput(self._config)

output()

Return audio processing BidiOutput.

Source code in strands/experimental/bidi/io/audio.py
292
293
294
def output(self) -> _BidiAudioOutput:
    """Return audio processing BidiOutput."""
    return _BidiAudioOutput(self._config)

BidiAudioInputEvent

Bases: TypedEvent

Audio input event for sending audio to the model.

Used for sending audio data through the send() method.

Parameters:

Name Type Description Default
audio str

Base64-encoded audio string to send to model.

required
format AudioFormat | str

Audio format from SUPPORTED_AUDIO_FORMATS.

required
sample_rate AudioSampleRate

Sample rate from SUPPORTED_SAMPLE_RATES.

required
channels AudioChannel

Channel count from SUPPORTED_CHANNELS.

required
Source code in strands/experimental/bidi/types/events.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class BidiAudioInputEvent(TypedEvent):
    """Audio input event for sending audio to the model.

    Used for sending audio data through the send() method.

    Parameters:
        audio: Base64-encoded audio string to send to model.
        format: Audio format from SUPPORTED_AUDIO_FORMATS.
        sample_rate: Sample rate from SUPPORTED_SAMPLE_RATES.
        channels: Channel count from SUPPORTED_CHANNELS.
    """

    def __init__(
        self,
        audio: str,
        format: AudioFormat | str,
        sample_rate: AudioSampleRate,
        channels: AudioChannel,
    ):
        """Initialize audio input event."""
        super().__init__(
            {
                "type": "bidi_audio_input",
                "audio": audio,
                "format": format,
                "sample_rate": sample_rate,
                "channels": channels,
            }
        )

    @property
    def audio(self) -> str:
        """Base64-encoded audio string."""
        return cast(str, self["audio"])

    @property
    def format(self) -> AudioFormat:
        """Audio encoding format."""
        return cast(AudioFormat, self["format"])

    @property
    def sample_rate(self) -> AudioSampleRate:
        """Number of audio samples per second in Hz."""
        return cast(AudioSampleRate, self["sample_rate"])

    @property
    def channels(self) -> AudioChannel:
        """Number of audio channels (1=mono, 2=stereo)."""
        return cast(AudioChannel, self["channels"])

audio property

Base64-encoded audio string.

channels property

Number of audio channels (1=mono, 2=stereo).

format property

Audio encoding format.

sample_rate property

Number of audio samples per second in Hz.

__init__(audio, format, sample_rate, channels)

Initialize audio input event.

Source code in strands/experimental/bidi/types/events.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(
    self,
    audio: str,
    format: AudioFormat | str,
    sample_rate: AudioSampleRate,
    channels: AudioChannel,
):
    """Initialize audio input event."""
    super().__init__(
        {
            "type": "bidi_audio_input",
            "audio": audio,
            "format": format,
            "sample_rate": sample_rate,
            "channels": channels,
        }
    )

BidiAudioStreamEvent

Bases: TypedEvent

Streaming audio output from the model.

Parameters:

Name Type Description Default
audio str

Base64-encoded audio string.

required
format AudioFormat

Audio encoding format.

required
sample_rate AudioSampleRate

Number of audio samples per second in Hz.

required
channels AudioChannel

Number of audio channels (1=mono, 2=stereo).

required
Source code in strands/experimental/bidi/types/events.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class BidiAudioStreamEvent(TypedEvent):
    """Streaming audio output from the model.

    Parameters:
        audio: Base64-encoded audio string.
        format: Audio encoding format.
        sample_rate: Number of audio samples per second in Hz.
        channels: Number of audio channels (1=mono, 2=stereo).
    """

    def __init__(
        self,
        audio: str,
        format: AudioFormat,
        sample_rate: AudioSampleRate,
        channels: AudioChannel,
    ):
        """Initialize audio stream event."""
        super().__init__(
            {
                "type": "bidi_audio_stream",
                "audio": audio,
                "format": format,
                "sample_rate": sample_rate,
                "channels": channels,
            }
        )

    @property
    def audio(self) -> str:
        """Base64-encoded audio string."""
        return cast(str, self["audio"])

    @property
    def format(self) -> AudioFormat:
        """Audio encoding format."""
        return cast(AudioFormat, self["format"])

    @property
    def sample_rate(self) -> AudioSampleRate:
        """Number of audio samples per second in Hz."""
        return cast(AudioSampleRate, self["sample_rate"])

    @property
    def channels(self) -> AudioChannel:
        """Number of audio channels (1=mono, 2=stereo)."""
        return cast(AudioChannel, self["channels"])

audio property

Base64-encoded audio string.

channels property

Number of audio channels (1=mono, 2=stereo).

format property

Audio encoding format.

sample_rate property

Number of audio samples per second in Hz.

__init__(audio, format, sample_rate, channels)

Initialize audio stream event.

Source code in strands/experimental/bidi/types/events.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def __init__(
    self,
    audio: str,
    format: AudioFormat,
    sample_rate: AudioSampleRate,
    channels: AudioChannel,
):
    """Initialize audio stream event."""
    super().__init__(
        {
            "type": "bidi_audio_stream",
            "audio": audio,
            "format": format,
            "sample_rate": sample_rate,
            "channels": channels,
        }
    )

BidiInput

Bases: Protocol

Protocol for bidirectional input callables.

Input callables read data from a source (microphone, camera, websocket, etc.) and return events to be sent to the agent.

Source code in strands/experimental/bidi/types/io.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@runtime_checkable
class BidiInput(Protocol):
    """Protocol for bidirectional input callables.

    Input callables read data from a source (microphone, camera, websocket, etc.)
    and return events to be sent to the agent.
    """

    async def start(self, agent: "BidiAgent") -> None:
        """Start input."""
        return

    async def stop(self) -> None:
        """Stop input."""
        return

    def __call__(self) -> Awaitable[BidiInputEvent]:
        """Read input data from the source.

        Returns:
            Awaitable that resolves to an input event (audio, text, image, etc.)
        """
        ...

__call__()

Read input data from the source.

Returns:

Type Description
Awaitable[BidiInputEvent]

Awaitable that resolves to an input event (audio, text, image, etc.)

Source code in strands/experimental/bidi/types/io.py
32
33
34
35
36
37
38
def __call__(self) -> Awaitable[BidiInputEvent]:
    """Read input data from the source.

    Returns:
        Awaitable that resolves to an input event (audio, text, image, etc.)
    """
    ...

start(agent) async

Start input.

Source code in strands/experimental/bidi/types/io.py
24
25
26
async def start(self, agent: "BidiAgent") -> None:
    """Start input."""
    return

stop() async

Stop input.

Source code in strands/experimental/bidi/types/io.py
28
29
30
async def stop(self) -> None:
    """Stop input."""
    return

BidiInterruptionEvent

Bases: TypedEvent

Model generation was interrupted.

Parameters:

Name Type Description Default
reason Literal['user_speech', 'error']

Why the interruption occurred.

required
Source code in strands/experimental/bidi/types/events.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
class BidiInterruptionEvent(TypedEvent):
    """Model generation was interrupted.

    Parameters:
        reason: Why the interruption occurred.
    """

    def __init__(self, reason: Literal["user_speech", "error"]):
        """Initialize interruption event."""
        super().__init__(
            {
                "type": "bidi_interruption",
                "reason": reason,
            }
        )

    @property
    def reason(self) -> str:
        """Why the interruption occurred."""
        return cast(str, self["reason"])

reason property

Why the interruption occurred.

__init__(reason)

Initialize interruption event.

Source code in strands/experimental/bidi/types/events.py
370
371
372
373
374
375
376
377
def __init__(self, reason: Literal["user_speech", "error"]):
    """Initialize interruption event."""
    super().__init__(
        {
            "type": "bidi_interruption",
            "reason": reason,
        }
    )

BidiOutput

Bases: Protocol

Protocol for bidirectional output callables.

Output callables receive events from the agent and handle them appropriately (play audio, display text, send over websocket, etc.).

Source code in strands/experimental/bidi/types/io.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@runtime_checkable
class BidiOutput(Protocol):
    """Protocol for bidirectional output callables.

    Output callables receive events from the agent and handle them appropriately
    (play audio, display text, send over websocket, etc.).
    """

    async def start(self, agent: "BidiAgent") -> None:
        """Start output."""
        return

    async def stop(self) -> None:
        """Stop output."""
        return

    def __call__(self, event: BidiOutputEvent) -> Awaitable[None]:
        """Process output events from the agent.

        Args:
            event: Output event from the agent (audio, text, tool calls, etc.)
        """
        ...

__call__(event)

Process output events from the agent.

Parameters:

Name Type Description Default
event BidiOutputEvent

Output event from the agent (audio, text, tool calls, etc.)

required
Source code in strands/experimental/bidi/types/io.py
57
58
59
60
61
62
63
def __call__(self, event: BidiOutputEvent) -> Awaitable[None]:
    """Process output events from the agent.

    Args:
        event: Output event from the agent (audio, text, tool calls, etc.)
    """
    ...

start(agent) async

Start output.

Source code in strands/experimental/bidi/types/io.py
49
50
51
async def start(self, agent: "BidiAgent") -> None:
    """Start output."""
    return

stop() async

Stop output.

Source code in strands/experimental/bidi/types/io.py
53
54
55
async def stop(self) -> None:
    """Stop output."""
    return

_BidiAudioBuffer

Buffer chunks of audio data between agent and PyAudio.

Source code in strands/experimental/bidi/io/audio.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class _BidiAudioBuffer:
    """Buffer chunks of audio data between agent and PyAudio."""

    _buffer: queue.Queue
    _data: bytearray

    def __init__(self, size: int | None = None):
        """Initialize buffer settings.

        Args:
            size: Size of the buffer (default: unbounded).
        """
        self._size = size or 0

    def start(self) -> None:
        """Setup buffer."""
        self._buffer = queue.Queue(self._size)
        self._data = bytearray()

    def stop(self) -> None:
        """Tear down buffer."""
        if hasattr(self, "_data"):
            self._data.clear()
        if hasattr(self, "_buffer"):
            # Unblocking waited get calls by putting an empty chunk
            # Note, Queue.shutdown exists but is a 3.13+ only feature
            # We simulate shutdown with the below logic
            self._buffer.put_nowait(b"")
            self._buffer = queue.Queue(self._size)

    def put(self, chunk: bytes) -> None:
        """Put data chunk into buffer.

        If full, removes the oldest chunk.
        """
        if self._buffer.full():
            logger.debug("buffer is full | removing oldest chunk")
            try:
                self._buffer.get_nowait()
            except queue.Empty:
                logger.debug("buffer already empty")
                pass

        self._buffer.put_nowait(chunk)

    def get(self, byte_count: int | None = None) -> bytes:
        """Get the number of bytes specified from the buffer.

        Args:
            byte_count: Number of bytes to get from buffer.

                - If the number of bytes specified is not available, the return is padded with silence.
                - If the number of bytes is not specified, get the first chunk put in the buffer.

        Returns:
            Specified number of bytes.
        """
        if not byte_count:
            self._data.extend(self._buffer.get())
            byte_count = len(self._data)

        while len(self._data) < byte_count:
            try:
                self._data.extend(self._buffer.get_nowait())
            except queue.Empty:
                break

        padding_bytes = b"\x00" * max(byte_count - len(self._data), 0)
        self._data.extend(padding_bytes)

        data = self._data[:byte_count]
        del self._data[:byte_count]

        return bytes(data)

    def clear(self) -> None:
        """Clear the buffer."""
        while True:
            try:
                self._buffer.get_nowait()
            except queue.Empty:
                break

__init__(size=None)

Initialize buffer settings.

Parameters:

Name Type Description Default
size int | None

Size of the buffer (default: unbounded).

None
Source code in strands/experimental/bidi/io/audio.py
32
33
34
35
36
37
38
def __init__(self, size: int | None = None):
    """Initialize buffer settings.

    Args:
        size: Size of the buffer (default: unbounded).
    """
    self._size = size or 0

clear()

Clear the buffer.

Source code in strands/experimental/bidi/io/audio.py
101
102
103
104
105
106
107
def clear(self) -> None:
    """Clear the buffer."""
    while True:
        try:
            self._buffer.get_nowait()
        except queue.Empty:
            break

get(byte_count=None)

Get the number of bytes specified from the buffer.

Parameters:

Name Type Description Default
byte_count int | None

Number of bytes to get from buffer.

  • If the number of bytes specified is not available, the return is padded with silence.
  • If the number of bytes is not specified, get the first chunk put in the buffer.
None

Returns:

Type Description
bytes

Specified number of bytes.

Source code in strands/experimental/bidi/io/audio.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def get(self, byte_count: int | None = None) -> bytes:
    """Get the number of bytes specified from the buffer.

    Args:
        byte_count: Number of bytes to get from buffer.

            - If the number of bytes specified is not available, the return is padded with silence.
            - If the number of bytes is not specified, get the first chunk put in the buffer.

    Returns:
        Specified number of bytes.
    """
    if not byte_count:
        self._data.extend(self._buffer.get())
        byte_count = len(self._data)

    while len(self._data) < byte_count:
        try:
            self._data.extend(self._buffer.get_nowait())
        except queue.Empty:
            break

    padding_bytes = b"\x00" * max(byte_count - len(self._data), 0)
    self._data.extend(padding_bytes)

    data = self._data[:byte_count]
    del self._data[:byte_count]

    return bytes(data)

put(chunk)

Put data chunk into buffer.

If full, removes the oldest chunk.

Source code in strands/experimental/bidi/io/audio.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def put(self, chunk: bytes) -> None:
    """Put data chunk into buffer.

    If full, removes the oldest chunk.
    """
    if self._buffer.full():
        logger.debug("buffer is full | removing oldest chunk")
        try:
            self._buffer.get_nowait()
        except queue.Empty:
            logger.debug("buffer already empty")
            pass

    self._buffer.put_nowait(chunk)

start()

Setup buffer.

Source code in strands/experimental/bidi/io/audio.py
40
41
42
43
def start(self) -> None:
    """Setup buffer."""
    self._buffer = queue.Queue(self._size)
    self._data = bytearray()

stop()

Tear down buffer.

Source code in strands/experimental/bidi/io/audio.py
45
46
47
48
49
50
51
52
53
54
def stop(self) -> None:
    """Tear down buffer."""
    if hasattr(self, "_data"):
        self._data.clear()
    if hasattr(self, "_buffer"):
        # Unblocking waited get calls by putting an empty chunk
        # Note, Queue.shutdown exists but is a 3.13+ only feature
        # We simulate shutdown with the below logic
        self._buffer.put_nowait(b"")
        self._buffer = queue.Queue(self._size)

_BidiAudioInput

Bases: BidiInput

Handle audio input from user.

Attributes:

Name Type Description
_audio PyAudio

PyAudio instance for audio system access.

_stream Stream

Audio input stream.

_buffer

Buffer for sharing audio data between agent and PyAudio.

Source code in strands/experimental/bidi/io/audio.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class _BidiAudioInput(BidiInput):
    """Handle audio input from user.

    Attributes:
        _audio: PyAudio instance for audio system access.
        _stream: Audio input stream.
        _buffer: Buffer for sharing audio data between agent and PyAudio.
    """

    _audio: pyaudio.PyAudio
    _stream: pyaudio.Stream

    _BUFFER_SIZE = None
    _DEVICE_INDEX = None
    _FRAMES_PER_BUFFER = 512

    def __init__(self, config: dict[str, Any]) -> None:
        """Extract configs."""
        self._buffer_size = config.get("input_buffer_size", _BidiAudioInput._BUFFER_SIZE)
        self._device_index = config.get("input_device_index", _BidiAudioInput._DEVICE_INDEX)
        self._frames_per_buffer = config.get("input_frames_per_buffer", _BidiAudioInput._FRAMES_PER_BUFFER)

        self._buffer = _BidiAudioBuffer(self._buffer_size)

    async def start(self, agent: "BidiAgent") -> None:
        """Start input stream.

        Args:
            agent: The BidiAgent instance, providing access to model configuration.
        """
        logger.debug("starting audio input stream")

        self._channels = agent.model.config["audio"]["channels"]
        self._format = agent.model.config["audio"]["format"]
        self._rate = agent.model.config["audio"]["input_rate"]

        self._buffer.start()
        self._audio = pyaudio.PyAudio()
        self._stream = self._audio.open(
            channels=self._channels,
            format=pyaudio.paInt16,
            frames_per_buffer=self._frames_per_buffer,
            input=True,
            input_device_index=self._device_index,
            rate=self._rate,
            stream_callback=self._callback,
        )

        logger.debug("audio input stream started")

    async def stop(self) -> None:
        """Stop input stream."""
        logger.debug("stopping audio input stream")

        if hasattr(self, "_stream"):
            self._stream.close()
        if hasattr(self, "_audio"):
            self._audio.terminate()
        if hasattr(self, "_buffer"):
            self._buffer.stop()

        logger.debug("audio input stream stopped")

    async def __call__(self) -> BidiAudioInputEvent:
        """Read audio from input stream."""
        data = await asyncio.to_thread(self._buffer.get)

        return BidiAudioInputEvent(
            audio=base64.b64encode(data).decode("utf-8"),
            channels=self._channels,
            format=self._format,
            sample_rate=self._rate,
        )

    def _callback(self, in_data: bytes, *_: Any) -> tuple[None, Any]:
        """Callback to receive audio data from PyAudio."""
        self._buffer.put(in_data)
        return (None, pyaudio.paContinue)

__call__() async

Read audio from input stream.

Source code in strands/experimental/bidi/io/audio.py
173
174
175
176
177
178
179
180
181
182
async def __call__(self) -> BidiAudioInputEvent:
    """Read audio from input stream."""
    data = await asyncio.to_thread(self._buffer.get)

    return BidiAudioInputEvent(
        audio=base64.b64encode(data).decode("utf-8"),
        channels=self._channels,
        format=self._format,
        sample_rate=self._rate,
    )

__init__(config)

Extract configs.

Source code in strands/experimental/bidi/io/audio.py
126
127
128
129
130
131
132
def __init__(self, config: dict[str, Any]) -> None:
    """Extract configs."""
    self._buffer_size = config.get("input_buffer_size", _BidiAudioInput._BUFFER_SIZE)
    self._device_index = config.get("input_device_index", _BidiAudioInput._DEVICE_INDEX)
    self._frames_per_buffer = config.get("input_frames_per_buffer", _BidiAudioInput._FRAMES_PER_BUFFER)

    self._buffer = _BidiAudioBuffer(self._buffer_size)

start(agent) async

Start input stream.

Parameters:

Name Type Description Default
agent BidiAgent

The BidiAgent instance, providing access to model configuration.

required
Source code in strands/experimental/bidi/io/audio.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
async def start(self, agent: "BidiAgent") -> None:
    """Start input stream.

    Args:
        agent: The BidiAgent instance, providing access to model configuration.
    """
    logger.debug("starting audio input stream")

    self._channels = agent.model.config["audio"]["channels"]
    self._format = agent.model.config["audio"]["format"]
    self._rate = agent.model.config["audio"]["input_rate"]

    self._buffer.start()
    self._audio = pyaudio.PyAudio()
    self._stream = self._audio.open(
        channels=self._channels,
        format=pyaudio.paInt16,
        frames_per_buffer=self._frames_per_buffer,
        input=True,
        input_device_index=self._device_index,
        rate=self._rate,
        stream_callback=self._callback,
    )

    logger.debug("audio input stream started")

stop() async

Stop input stream.

Source code in strands/experimental/bidi/io/audio.py
160
161
162
163
164
165
166
167
168
169
170
171
async def stop(self) -> None:
    """Stop input stream."""
    logger.debug("stopping audio input stream")

    if hasattr(self, "_stream"):
        self._stream.close()
    if hasattr(self, "_audio"):
        self._audio.terminate()
    if hasattr(self, "_buffer"):
        self._buffer.stop()

    logger.debug("audio input stream stopped")

_BidiAudioOutput

Bases: BidiOutput

Handle audio output from bidi agent.

Attributes:

Name Type Description
_audio PyAudio

PyAudio instance for audio system access.

_stream Stream

Audio output stream.

_buffer

Buffer for sharing audio data between agent and PyAudio.

Source code in strands/experimental/bidi/io/audio.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
class _BidiAudioOutput(BidiOutput):
    """Handle audio output from bidi agent.

    Attributes:
        _audio: PyAudio instance for audio system access.
        _stream: Audio output stream.
        _buffer: Buffer for sharing audio data between agent and PyAudio.
    """

    _audio: pyaudio.PyAudio
    _stream: pyaudio.Stream

    _BUFFER_SIZE = None
    _DEVICE_INDEX = None
    _FRAMES_PER_BUFFER = 512

    def __init__(self, config: dict[str, Any]) -> None:
        """Extract configs."""
        self._buffer_size = config.get("output_buffer_size", _BidiAudioOutput._BUFFER_SIZE)
        self._device_index = config.get("output_device_index", _BidiAudioOutput._DEVICE_INDEX)
        self._frames_per_buffer = config.get("output_frames_per_buffer", _BidiAudioOutput._FRAMES_PER_BUFFER)

        self._buffer = _BidiAudioBuffer(self._buffer_size)

    async def start(self, agent: "BidiAgent") -> None:
        """Start output stream.

        Args:
            agent: The BidiAgent instance, providing access to model configuration.
        """
        logger.debug("starting audio output stream")

        self._channels = agent.model.config["audio"]["channels"]
        self._rate = agent.model.config["audio"]["output_rate"]

        self._buffer.start()
        self._audio = pyaudio.PyAudio()
        self._stream = self._audio.open(
            channels=self._channels,
            format=pyaudio.paInt16,
            frames_per_buffer=self._frames_per_buffer,
            output=True,
            output_device_index=self._device_index,
            rate=self._rate,
            stream_callback=self._callback,
        )

        logger.debug("audio output stream started")

    async def stop(self) -> None:
        """Stop output stream."""
        logger.debug("stopping audio output stream")

        if hasattr(self, "_stream"):
            self._stream.close()
        if hasattr(self, "_audio"):
            self._audio.terminate()
        if hasattr(self, "_buffer"):
            self._buffer.stop()

        logger.debug("audio output stream stopped")

    async def __call__(self, event: BidiOutputEvent) -> None:
        """Send audio to output stream."""
        if isinstance(event, BidiAudioStreamEvent):
            data = base64.b64decode(event["audio"])
            self._buffer.put(data)
            logger.debug("audio_bytes=<%d> | audio chunk buffered for playback", len(data))

        elif isinstance(event, BidiInterruptionEvent):
            logger.debug("reason=<%s> | clearing audio buffer due to interruption", event["reason"])
            self._buffer.clear()

    def _callback(self, _in_data: None, frame_count: int, *_: Any) -> tuple[bytes, Any]:
        """Callback to send audio data to PyAudio."""
        byte_count = frame_count * pyaudio.get_sample_size(pyaudio.paInt16)
        data = self._buffer.get(byte_count)
        return (data, pyaudio.paContinue)

__call__(event) async

Send audio to output stream.

Source code in strands/experimental/bidi/io/audio.py
252
253
254
255
256
257
258
259
260
261
async def __call__(self, event: BidiOutputEvent) -> None:
    """Send audio to output stream."""
    if isinstance(event, BidiAudioStreamEvent):
        data = base64.b64decode(event["audio"])
        self._buffer.put(data)
        logger.debug("audio_bytes=<%d> | audio chunk buffered for playback", len(data))

    elif isinstance(event, BidiInterruptionEvent):
        logger.debug("reason=<%s> | clearing audio buffer due to interruption", event["reason"])
        self._buffer.clear()

__init__(config)

Extract configs.

Source code in strands/experimental/bidi/io/audio.py
206
207
208
209
210
211
212
def __init__(self, config: dict[str, Any]) -> None:
    """Extract configs."""
    self._buffer_size = config.get("output_buffer_size", _BidiAudioOutput._BUFFER_SIZE)
    self._device_index = config.get("output_device_index", _BidiAudioOutput._DEVICE_INDEX)
    self._frames_per_buffer = config.get("output_frames_per_buffer", _BidiAudioOutput._FRAMES_PER_BUFFER)

    self._buffer = _BidiAudioBuffer(self._buffer_size)

start(agent) async

Start output stream.

Parameters:

Name Type Description Default
agent BidiAgent

The BidiAgent instance, providing access to model configuration.

required
Source code in strands/experimental/bidi/io/audio.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
async def start(self, agent: "BidiAgent") -> None:
    """Start output stream.

    Args:
        agent: The BidiAgent instance, providing access to model configuration.
    """
    logger.debug("starting audio output stream")

    self._channels = agent.model.config["audio"]["channels"]
    self._rate = agent.model.config["audio"]["output_rate"]

    self._buffer.start()
    self._audio = pyaudio.PyAudio()
    self._stream = self._audio.open(
        channels=self._channels,
        format=pyaudio.paInt16,
        frames_per_buffer=self._frames_per_buffer,
        output=True,
        output_device_index=self._device_index,
        rate=self._rate,
        stream_callback=self._callback,
    )

    logger.debug("audio output stream started")

stop() async

Stop output stream.

Source code in strands/experimental/bidi/io/audio.py
239
240
241
242
243
244
245
246
247
248
249
250
async def stop(self) -> None:
    """Stop output stream."""
    logger.debug("stopping audio output stream")

    if hasattr(self, "_stream"):
        self._stream.close()
    if hasattr(self, "_audio"):
        self._audio.terminate()
    if hasattr(self, "_buffer"):
        self._buffer.stop()

    logger.debug("audio output stream stopped")