Skip to content

strands.experimental.bidi.agent.agent

Bidirectional Agent for real-time streaming conversations.

Provides real-time audio and text interaction through persistent streaming connections. Unlike traditional request-response patterns, this agent maintains long-running conversations where users can interrupt, provide additional input, and receive continuous responses including audio output.

Key capabilities:

  • Persistent conversation connections with concurrent processing
  • Real-time audio input/output streaming
  • Automatic interruption detection and tool execution
  • Event-driven communication with model providers

BidiAgent

Agent for bidirectional streaming conversations.

Enables real-time audio and text interaction with AI models through persistent connections. Supports concurrent tool execution and interruption handling.

Source code in strands/experimental/bidi/agent/agent.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
class BidiAgent:
    """Agent for bidirectional streaming conversations.

    Enables real-time audio and text interaction with AI models through persistent
    connections. Supports concurrent tool execution and interruption handling.
    """

    def __init__(
        self,
        model: BidiModel | str | None = None,
        tools: list[str | AgentTool | ToolProvider] | None = None,
        system_prompt: str | None = None,
        messages: Messages | None = None,
        record_direct_tool_call: bool = True,
        load_tools_from_directory: bool = False,
        agent_id: str | None = None,
        name: str | None = None,
        description: str | None = None,
        hooks: list[HookProvider] | None = None,
        state: AgentState | dict | None = None,
        session_manager: "SessionManager | None" = None,
        tool_executor: ToolExecutor | None = None,
        **kwargs: Any,
    ):
        """Initialize bidirectional agent.

        Args:
            model: BidiModel instance, string model_id, or None for default detection.
            tools: Optional list of tools with flexible format support.
            system_prompt: Optional system prompt for conversations.
            messages: Optional conversation history to initialize with.
            record_direct_tool_call: Whether to record direct tool calls in message history.
            load_tools_from_directory: Whether to load and automatically reload tools in the `./tools/` directory.
            agent_id: Optional ID for the agent, useful for connection management and multi-agent scenarios.
            name: Name of the Agent.
            description: Description of what the Agent does.
            hooks: Optional list of hook providers to register for lifecycle events.
            state: Stateful information for the agent. Can be either an AgentState object, or a json serializable dict.
            session_manager: Manager for handling agent sessions including conversation history and state.
                If provided, enables session-based persistence and state management.
            tool_executor: Definition of tool execution strategy (e.g., sequential, concurrent, etc.).
            **kwargs: Additional configuration for future extensibility.

        Raises:
            ValueError: If model configuration is invalid or state is invalid type.
            TypeError: If model type is unsupported.
        """
        self.model = (
            BidiNovaSonicModel()
            if not model
            else BidiNovaSonicModel(model_id=model)
            if isinstance(model, str)
            else model
        )
        self.system_prompt = system_prompt
        self.messages = messages or []

        # Agent identification
        self.agent_id = _identifier.validate(agent_id or _DEFAULT_AGENT_ID, _identifier.Identifier.AGENT)
        self.name = name or _DEFAULT_AGENT_NAME
        self.description = description

        # Tool execution configuration
        self.record_direct_tool_call = record_direct_tool_call
        self.load_tools_from_directory = load_tools_from_directory

        # Initialize tool registry
        self.tool_registry = ToolRegistry()

        if tools is not None:
            self.tool_registry.process_tools(tools)

        self.tool_registry.initialize_tools(self.load_tools_from_directory)

        # Initialize tool watcher if directory loading is enabled
        if self.load_tools_from_directory:
            self.tool_watcher = ToolWatcher(tool_registry=self.tool_registry)

        # Initialize agent state management
        if state is not None:
            if isinstance(state, dict):
                self.state = AgentState(state)
            elif isinstance(state, AgentState):
                self.state = state
            else:
                raise ValueError("state must be an AgentState object or a dict")
        else:
            self.state = AgentState()

        # Initialize other components
        self._tool_caller = _ToolCaller(self)

        # Initialize tool executor
        self.tool_executor = tool_executor or ConcurrentToolExecutor()

        # Initialize hooks registry
        self.hooks = HookRegistry()
        if hooks:
            for hook in hooks:
                self.hooks.add_hook(hook)

        # Initialize session management functionality
        self._session_manager = session_manager
        if self._session_manager:
            self.hooks.add_hook(self._session_manager)

        self._loop = _BidiAgentLoop(self)

        # Emit initialization event
        self.hooks.invoke_callbacks(BidiAgentInitializedEvent(agent=self))

        # TODO: Determine if full support is required
        self._interrupt_state = _InterruptState()

        self._started = False

    @property
    def tool(self) -> _ToolCaller:
        """Call tool as a function.

        Returns:
            ToolCaller for method-style tool execution.

        Example:
            ```
            agent = BidiAgent(model=model, tools=[calculator])
            agent.tool.calculator(expression="2+2")
            ```
        """
        return self._tool_caller

    @property
    def tool_names(self) -> list[str]:
        """Get a list of all registered tool names.

        Returns:
            Names of all tools available to this agent.
        """
        all_tools = self.tool_registry.get_all_tools_config()
        return list(all_tools.keys())

    async def start(self, invocation_state: dict[str, Any] | None = None) -> None:
        """Start a persistent bidirectional conversation connection.

        Initializes the streaming connection and starts background tasks for processing
        model events, tool execution, and connection management.

        Args:
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Raises:
            RuntimeError:
                If agent already started.

        Example:
            ```python
            await agent.start(invocation_state={
                "user_id": "user_123",
                "session_id": "session_456",
                "database": db_connection,
            })
            ```
        """
        if self._started:
            raise RuntimeError("agent already started | call stop before starting again")

        logger.debug("agent starting")
        await self._loop.start(invocation_state)
        self._started = True

    async def send(self, input_data: BidiAgentInput | dict[str, Any]) -> None:
        """Send input to the model (text, audio, image, or event dict).

        Unified method for sending text, audio, and image input to the model during
        an active conversation session. Accepts TypedEvent instances or plain dicts
        (e.g., from WebSocket clients) which are automatically reconstructed.

        Args:
            input_data: Can be:

                - str: Text message from user
                - BidiInputEvent: TypedEvent
                - dict: Event dictionary (will be reconstructed to TypedEvent)

        Raises:
            RuntimeError: If start has not been called.
            ValueError: If invalid input type.

        Example:
            await agent.send("Hello")
            await agent.send(BidiAudioInputEvent(audio="base64...", format="pcm", ...))
            await agent.send({"type": "bidirectional_text_input", "text": "Hello", "role": "user"})
        """
        if not self._started:
            raise RuntimeError("agent not started | call start before sending")

        input_event: BidiInputEvent

        if isinstance(input_data, str):
            input_event = BidiTextInputEvent(text=input_data)

        elif isinstance(input_data, BidiInputEvent):
            input_event = input_data

        elif isinstance(input_data, dict) and "type" in input_data:
            input_type = input_data["type"]
            input_data = {key: value for key, value in input_data.items() if key != "type"}
            if input_type == "bidi_text_input":
                input_event = BidiTextInputEvent(**input_data)
            elif input_type == "bidi_audio_input":
                input_event = BidiAudioInputEvent(**input_data)
            elif input_type == "bidi_image_input":
                input_event = BidiImageInputEvent(**input_data)
            else:
                raise ValueError(f"input_type=<{input_type}> | input type not supported")

        else:
            raise ValueError("invalid input | must be str, BidiInputEvent, or event dict")

        await self._loop.send(input_event)

    async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
        """Receive events from the model including audio, text, and tool calls.

        Yields:
            Model output events processed by background tasks including audio output,
            text responses, tool calls, and connection updates.

        Raises:
            RuntimeError: If start has not been called.
        """
        if not self._started:
            raise RuntimeError("agent not started | call start before receiving")

        async for event in self._loop.receive():
            yield event

    async def stop(self) -> None:
        """End the conversation connection and cleanup all resources.

        Terminates the streaming connection, cancels background tasks, and
        closes the connection to the model provider.
        """
        self._started = False
        await self._loop.stop()

    async def __aenter__(self, invocation_state: dict[str, Any] | None = None) -> "BidiAgent":
        """Async context manager entry point.

        Automatically starts the bidirectional connection when entering the context.

        Args:
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Returns:
            Self for use in the context.
        """
        logger.debug("context_manager=<enter> | starting agent")
        await self.start(invocation_state)
        return self

    async def __aexit__(self, *_: Any) -> None:
        """Async context manager exit point.

        Automatically ends the connection and cleans up resources including
        when exiting the context, regardless of whether an exception occurred.
        """
        logger.debug("context_manager=<exit> | stopping agent")
        await self.stop()

    async def run(
        self, inputs: list[BidiInput], outputs: list[BidiOutput], invocation_state: dict[str, Any] | None = None
    ) -> None:
        """Run the agent using provided IO channels for bidirectional communication.

        Args:
            inputs: Input callables to read data from a source
            outputs: Output callables to receive events from the agent
            invocation_state: Optional context to pass to tools during execution.
                This allows passing custom data (user_id, session_id, database connections, etc.)
                that tools can access via their invocation_state parameter.

        Example:
            ```python
            # Using model defaults:
            model = BidiNovaSonicModel()
            audio_io = BidiAudioIO()
            text_io = BidiTextIO()
            agent = BidiAgent(model=model, tools=[calculator])
            await agent.run(
                inputs=[audio_io.input()],
                outputs=[audio_io.output(), text_io.output()],
                invocation_state={"user_id": "user_123"}
            )

            # Using custom audio config:
            model = BidiNovaSonicModel(
                provider_config={"audio": {"input_rate": 48000, "output_rate": 24000}}
            )
            audio_io = BidiAudioIO()
            agent = BidiAgent(model=model, tools=[calculator])
            await agent.run(
                inputs=[audio_io.input()],
                outputs=[audio_io.output()],
            )
            ```
        """

        async def run_inputs() -> None:
            async def task(input_: BidiInput) -> None:
                while True:
                    event = await input_()
                    await self.send(event)

            await asyncio.gather(*[task(input_) for input_ in inputs])

        async def run_outputs(inputs_task: asyncio.Task) -> None:
            async for event in self.receive():
                await asyncio.gather(*[output(event) for output in outputs])

            inputs_task.cancel()

        try:
            await self.start(invocation_state)

            input_starts = [input_.start for input_ in inputs if isinstance(input_, BidiInput)]
            output_starts = [output.start for output in outputs if isinstance(output, BidiOutput)]
            for start in [*input_starts, *output_starts]:
                await start(self)

            async with asyncio.TaskGroup() as task_group:
                inputs_task = task_group.create_task(run_inputs())
                task_group.create_task(run_outputs(inputs_task))

        finally:
            input_stops = [input_.stop for input_ in inputs if isinstance(input_, BidiInput)]
            output_stops = [output.stop for output in outputs if isinstance(output, BidiOutput)]

            await stop_all(*input_stops, *output_stops, self.stop)

tool property

Call tool as a function.

Returns:

Type Description
_ToolCaller

ToolCaller for method-style tool execution.

Example
agent = BidiAgent(model=model, tools=[calculator])
agent.tool.calculator(expression="2+2")

tool_names property

Get a list of all registered tool names.

Returns:

Type Description
list[str]

Names of all tools available to this agent.

__aenter__(invocation_state=None) async

Async context manager entry point.

Automatically starts the bidirectional connection when entering the context.

Parameters:

Name Type Description Default
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None

Returns:

Type Description
BidiAgent

Self for use in the context.

Source code in strands/experimental/bidi/agent/agent.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
async def __aenter__(self, invocation_state: dict[str, Any] | None = None) -> "BidiAgent":
    """Async context manager entry point.

    Automatically starts the bidirectional connection when entering the context.

    Args:
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Returns:
        Self for use in the context.
    """
    logger.debug("context_manager=<enter> | starting agent")
    await self.start(invocation_state)
    return self

__aexit__(*_) async

Async context manager exit point.

Automatically ends the connection and cleans up resources including when exiting the context, regardless of whether an exception occurred.

Source code in strands/experimental/bidi/agent/agent.py
321
322
323
324
325
326
327
328
async def __aexit__(self, *_: Any) -> None:
    """Async context manager exit point.

    Automatically ends the connection and cleans up resources including
    when exiting the context, regardless of whether an exception occurred.
    """
    logger.debug("context_manager=<exit> | stopping agent")
    await self.stop()

__init__(model=None, tools=None, system_prompt=None, messages=None, record_direct_tool_call=True, load_tools_from_directory=False, agent_id=None, name=None, description=None, hooks=None, state=None, session_manager=None, tool_executor=None, **kwargs)

Initialize bidirectional agent.

Parameters:

Name Type Description Default
model BidiModel | str | None

BidiModel instance, string model_id, or None for default detection.

None
tools list[str | AgentTool | ToolProvider] | None

Optional list of tools with flexible format support.

None
system_prompt str | None

Optional system prompt for conversations.

None
messages Messages | None

Optional conversation history to initialize with.

None
record_direct_tool_call bool

Whether to record direct tool calls in message history.

True
load_tools_from_directory bool

Whether to load and automatically reload tools in the ./tools/ directory.

False
agent_id str | None

Optional ID for the agent, useful for connection management and multi-agent scenarios.

None
name str | None

Name of the Agent.

None
description str | None

Description of what the Agent does.

None
hooks list[HookProvider] | None

Optional list of hook providers to register for lifecycle events.

None
state AgentState | dict | None

Stateful information for the agent. Can be either an AgentState object, or a json serializable dict.

None
session_manager SessionManager | None

Manager for handling agent sessions including conversation history and state. If provided, enables session-based persistence and state management.

None
tool_executor ToolExecutor | None

Definition of tool execution strategy (e.g., sequential, concurrent, etc.).

None
**kwargs Any

Additional configuration for future extensibility.

{}

Raises:

Type Description
ValueError

If model configuration is invalid or state is invalid type.

TypeError

If model type is unsupported.

Source code in strands/experimental/bidi/agent/agent.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def __init__(
    self,
    model: BidiModel | str | None = None,
    tools: list[str | AgentTool | ToolProvider] | None = None,
    system_prompt: str | None = None,
    messages: Messages | None = None,
    record_direct_tool_call: bool = True,
    load_tools_from_directory: bool = False,
    agent_id: str | None = None,
    name: str | None = None,
    description: str | None = None,
    hooks: list[HookProvider] | None = None,
    state: AgentState | dict | None = None,
    session_manager: "SessionManager | None" = None,
    tool_executor: ToolExecutor | None = None,
    **kwargs: Any,
):
    """Initialize bidirectional agent.

    Args:
        model: BidiModel instance, string model_id, or None for default detection.
        tools: Optional list of tools with flexible format support.
        system_prompt: Optional system prompt for conversations.
        messages: Optional conversation history to initialize with.
        record_direct_tool_call: Whether to record direct tool calls in message history.
        load_tools_from_directory: Whether to load and automatically reload tools in the `./tools/` directory.
        agent_id: Optional ID for the agent, useful for connection management and multi-agent scenarios.
        name: Name of the Agent.
        description: Description of what the Agent does.
        hooks: Optional list of hook providers to register for lifecycle events.
        state: Stateful information for the agent. Can be either an AgentState object, or a json serializable dict.
        session_manager: Manager for handling agent sessions including conversation history and state.
            If provided, enables session-based persistence and state management.
        tool_executor: Definition of tool execution strategy (e.g., sequential, concurrent, etc.).
        **kwargs: Additional configuration for future extensibility.

    Raises:
        ValueError: If model configuration is invalid or state is invalid type.
        TypeError: If model type is unsupported.
    """
    self.model = (
        BidiNovaSonicModel()
        if not model
        else BidiNovaSonicModel(model_id=model)
        if isinstance(model, str)
        else model
    )
    self.system_prompt = system_prompt
    self.messages = messages or []

    # Agent identification
    self.agent_id = _identifier.validate(agent_id or _DEFAULT_AGENT_ID, _identifier.Identifier.AGENT)
    self.name = name or _DEFAULT_AGENT_NAME
    self.description = description

    # Tool execution configuration
    self.record_direct_tool_call = record_direct_tool_call
    self.load_tools_from_directory = load_tools_from_directory

    # Initialize tool registry
    self.tool_registry = ToolRegistry()

    if tools is not None:
        self.tool_registry.process_tools(tools)

    self.tool_registry.initialize_tools(self.load_tools_from_directory)

    # Initialize tool watcher if directory loading is enabled
    if self.load_tools_from_directory:
        self.tool_watcher = ToolWatcher(tool_registry=self.tool_registry)

    # Initialize agent state management
    if state is not None:
        if isinstance(state, dict):
            self.state = AgentState(state)
        elif isinstance(state, AgentState):
            self.state = state
        else:
            raise ValueError("state must be an AgentState object or a dict")
    else:
        self.state = AgentState()

    # Initialize other components
    self._tool_caller = _ToolCaller(self)

    # Initialize tool executor
    self.tool_executor = tool_executor or ConcurrentToolExecutor()

    # Initialize hooks registry
    self.hooks = HookRegistry()
    if hooks:
        for hook in hooks:
            self.hooks.add_hook(hook)

    # Initialize session management functionality
    self._session_manager = session_manager
    if self._session_manager:
        self.hooks.add_hook(self._session_manager)

    self._loop = _BidiAgentLoop(self)

    # Emit initialization event
    self.hooks.invoke_callbacks(BidiAgentInitializedEvent(agent=self))

    # TODO: Determine if full support is required
    self._interrupt_state = _InterruptState()

    self._started = False

receive() async

Receive events from the model including audio, text, and tool calls.

Yields:

Type Description
AsyncGenerator[BidiOutputEvent, None]

Model output events processed by background tasks including audio output,

AsyncGenerator[BidiOutputEvent, None]

text responses, tool calls, and connection updates.

Raises:

Type Description
RuntimeError

If start has not been called.

Source code in strands/experimental/bidi/agent/agent.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
    """Receive events from the model including audio, text, and tool calls.

    Yields:
        Model output events processed by background tasks including audio output,
        text responses, tool calls, and connection updates.

    Raises:
        RuntimeError: If start has not been called.
    """
    if not self._started:
        raise RuntimeError("agent not started | call start before receiving")

    async for event in self._loop.receive():
        yield event

run(inputs, outputs, invocation_state=None) async

Run the agent using provided IO channels for bidirectional communication.

Parameters:

Name Type Description Default
inputs list[BidiInput]

Input callables to read data from a source

required
outputs list[BidiOutput]

Output callables to receive events from the agent

required
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None
Example
# Using model defaults:
model = BidiNovaSonicModel()
audio_io = BidiAudioIO()
text_io = BidiTextIO()
agent = BidiAgent(model=model, tools=[calculator])
await agent.run(
    inputs=[audio_io.input()],
    outputs=[audio_io.output(), text_io.output()],
    invocation_state={"user_id": "user_123"}
)

# Using custom audio config:
model = BidiNovaSonicModel(
    provider_config={"audio": {"input_rate": 48000, "output_rate": 24000}}
)
audio_io = BidiAudioIO()
agent = BidiAgent(model=model, tools=[calculator])
await agent.run(
    inputs=[audio_io.input()],
    outputs=[audio_io.output()],
)
Source code in strands/experimental/bidi/agent/agent.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
async def run(
    self, inputs: list[BidiInput], outputs: list[BidiOutput], invocation_state: dict[str, Any] | None = None
) -> None:
    """Run the agent using provided IO channels for bidirectional communication.

    Args:
        inputs: Input callables to read data from a source
        outputs: Output callables to receive events from the agent
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Example:
        ```python
        # Using model defaults:
        model = BidiNovaSonicModel()
        audio_io = BidiAudioIO()
        text_io = BidiTextIO()
        agent = BidiAgent(model=model, tools=[calculator])
        await agent.run(
            inputs=[audio_io.input()],
            outputs=[audio_io.output(), text_io.output()],
            invocation_state={"user_id": "user_123"}
        )

        # Using custom audio config:
        model = BidiNovaSonicModel(
            provider_config={"audio": {"input_rate": 48000, "output_rate": 24000}}
        )
        audio_io = BidiAudioIO()
        agent = BidiAgent(model=model, tools=[calculator])
        await agent.run(
            inputs=[audio_io.input()],
            outputs=[audio_io.output()],
        )
        ```
    """

    async def run_inputs() -> None:
        async def task(input_: BidiInput) -> None:
            while True:
                event = await input_()
                await self.send(event)

        await asyncio.gather(*[task(input_) for input_ in inputs])

    async def run_outputs(inputs_task: asyncio.Task) -> None:
        async for event in self.receive():
            await asyncio.gather(*[output(event) for output in outputs])

        inputs_task.cancel()

    try:
        await self.start(invocation_state)

        input_starts = [input_.start for input_ in inputs if isinstance(input_, BidiInput)]
        output_starts = [output.start for output in outputs if isinstance(output, BidiOutput)]
        for start in [*input_starts, *output_starts]:
            await start(self)

        async with asyncio.TaskGroup() as task_group:
            inputs_task = task_group.create_task(run_inputs())
            task_group.create_task(run_outputs(inputs_task))

    finally:
        input_stops = [input_.stop for input_ in inputs if isinstance(input_, BidiInput)]
        output_stops = [output.stop for output in outputs if isinstance(output, BidiOutput)]

        await stop_all(*input_stops, *output_stops, self.stop)

send(input_data) async

Send input to the model (text, audio, image, or event dict).

Unified method for sending text, audio, and image input to the model during an active conversation session. Accepts TypedEvent instances or plain dicts (e.g., from WebSocket clients) which are automatically reconstructed.

Parameters:

Name Type Description Default
input_data BidiAgentInput | dict[str, Any]

Can be:

  • str: Text message from user
  • BidiInputEvent: TypedEvent
  • dict: Event dictionary (will be reconstructed to TypedEvent)
required

Raises:

Type Description
RuntimeError

If start has not been called.

ValueError

If invalid input type.

Example

await agent.send("Hello") await agent.send(BidiAudioInputEvent(audio="base64...", format="pcm", ...)) await agent.send({"type": "bidirectional_text_input", "text": "Hello", "role": "user"})

Source code in strands/experimental/bidi/agent/agent.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
async def send(self, input_data: BidiAgentInput | dict[str, Any]) -> None:
    """Send input to the model (text, audio, image, or event dict).

    Unified method for sending text, audio, and image input to the model during
    an active conversation session. Accepts TypedEvent instances or plain dicts
    (e.g., from WebSocket clients) which are automatically reconstructed.

    Args:
        input_data: Can be:

            - str: Text message from user
            - BidiInputEvent: TypedEvent
            - dict: Event dictionary (will be reconstructed to TypedEvent)

    Raises:
        RuntimeError: If start has not been called.
        ValueError: If invalid input type.

    Example:
        await agent.send("Hello")
        await agent.send(BidiAudioInputEvent(audio="base64...", format="pcm", ...))
        await agent.send({"type": "bidirectional_text_input", "text": "Hello", "role": "user"})
    """
    if not self._started:
        raise RuntimeError("agent not started | call start before sending")

    input_event: BidiInputEvent

    if isinstance(input_data, str):
        input_event = BidiTextInputEvent(text=input_data)

    elif isinstance(input_data, BidiInputEvent):
        input_event = input_data

    elif isinstance(input_data, dict) and "type" in input_data:
        input_type = input_data["type"]
        input_data = {key: value for key, value in input_data.items() if key != "type"}
        if input_type == "bidi_text_input":
            input_event = BidiTextInputEvent(**input_data)
        elif input_type == "bidi_audio_input":
            input_event = BidiAudioInputEvent(**input_data)
        elif input_type == "bidi_image_input":
            input_event = BidiImageInputEvent(**input_data)
        else:
            raise ValueError(f"input_type=<{input_type}> | input type not supported")

    else:
        raise ValueError("invalid input | must be str, BidiInputEvent, or event dict")

    await self._loop.send(input_event)

start(invocation_state=None) async

Start a persistent bidirectional conversation connection.

Initializes the streaming connection and starts background tasks for processing model events, tool execution, and connection management.

Parameters:

Name Type Description Default
invocation_state dict[str, Any] | None

Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter.

None

Raises:

Type Description
RuntimeError

If agent already started.

Example
await agent.start(invocation_state={
    "user_id": "user_123",
    "session_id": "session_456",
    "database": db_connection,
})
Source code in strands/experimental/bidi/agent/agent.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
async def start(self, invocation_state: dict[str, Any] | None = None) -> None:
    """Start a persistent bidirectional conversation connection.

    Initializes the streaming connection and starts background tasks for processing
    model events, tool execution, and connection management.

    Args:
        invocation_state: Optional context to pass to tools during execution.
            This allows passing custom data (user_id, session_id, database connections, etc.)
            that tools can access via their invocation_state parameter.

    Raises:
        RuntimeError:
            If agent already started.

    Example:
        ```python
        await agent.start(invocation_state={
            "user_id": "user_123",
            "session_id": "session_456",
            "database": db_connection,
        })
        ```
    """
    if self._started:
        raise RuntimeError("agent already started | call stop before starting again")

    logger.debug("agent starting")
    await self._loop.start(invocation_state)
    self._started = True

stop() async

End the conversation connection and cleanup all resources.

Terminates the streaming connection, cancels background tasks, and closes the connection to the model provider.

Source code in strands/experimental/bidi/agent/agent.py
295
296
297
298
299
300
301
302
async def stop(self) -> None:
    """End the conversation connection and cleanup all resources.

    Terminates the streaming connection, cancels background tasks, and
    closes the connection to the model provider.
    """
    self._started = False
    await self._loop.stop()