Skip to content

strands.experimental.bidi.models

Bidirectional model interfaces and implementations.

strands.experimental.bidi.models.model

Bidirectional streaming model interface.

Defines the abstract interface for models that support real-time bidirectional communication with persistent connections. Unlike traditional request-response models, bidirectional models maintain an open connection for streaming audio, text, and tool interactions.

Features:

  • Persistent connection management with connect/close lifecycle
  • Real-time bidirectional communication (send and receive simultaneously)
  • Provider-agnostic event normalization
  • Support for audio, text, image, and tool result streaming

BidiModel

Bases: Protocol

Protocol for bidirectional streaming models.

This interface defines the contract for models that support persistent streaming connections with real-time audio and text communication. Implementations handle provider-specific protocols while exposing a standardized event-based API.

Attributes:

Name Type Description
config dict[str, Any]

Configuration dictionary with provider-specific settings.

Source code in strands/experimental/bidi/models/model.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class BidiModel(Protocol):
    """Protocol for bidirectional streaming models.

    This interface defines the contract for models that support persistent streaming
    connections with real-time audio and text communication. Implementations handle
    provider-specific protocols while exposing a standardized event-based API.

    Attributes:
        config: Configuration dictionary with provider-specific settings.
    """

    config: dict[str, Any]

    async def start(
        self,
        system_prompt: str | None = None,
        tools: list[ToolSpec] | None = None,
        messages: Messages | None = None,
        **kwargs: Any,
    ) -> None:
        """Establish a persistent streaming connection with the model.

        Opens a bidirectional connection that remains active for real-time communication.
        The connection supports concurrent sending and receiving of events until explicitly
        closed. Must be called before any send() or receive() operations.

        Args:
            system_prompt: System instructions to configure model behavior.
            tools: Tool specifications that the model can invoke during the conversation.
            messages: Initial conversation history to provide context.
            **kwargs: Provider-specific configuration options.
        """
        ...

    async def stop(self) -> None:
        """Close the streaming connection and release resources.

        Terminates the active bidirectional connection and cleans up any associated
        resources such as network connections, buffers, or background tasks. After
        calling close(), the model instance cannot be used until start() is called again.
        """
        ...

    def receive(self) -> AsyncIterable[BidiOutputEvent]:
        """Receive streaming events from the model.

        Continuously yields events from the model as they arrive over the connection.
        Events are normalized to a provider-agnostic format for uniform processing.
        This method should be called in a loop or async task to process model responses.

        The stream continues until the connection is closed or an error occurs.

        Yields:
            BidiOutputEvent: Standardized event objects containing audio output,
                transcripts, tool calls, or control signals.
        """
        ...

    async def send(
        self,
        content: BidiInputEvent | ToolResultEvent,
    ) -> None:
        """Send content to the model over the active connection.

        Transmits user input or tool results to the model during an active streaming
        session. Supports multiple content types including text, audio, images, and
        tool execution results. Can be called multiple times during a conversation.

        Args:
            content: The content to send. Must be one of:

                - BidiTextInputEvent: Text message from the user
                - BidiAudioInputEvent: Audio data for speech input
                - BidiImageInputEvent: Image data for visual understanding
                - ToolResultEvent: Result from a tool execution

        Example:
            ```
            await model.send(BidiTextInputEvent(text="Hello", role="user"))
            await model.send(BidiAudioInputEvent(audio=bytes, format="pcm", sample_rate=16000, channels=1))
            await model.send(BidiImageInputEvent(image=bytes, mime_type="image/jpeg", encoding="raw"))
            await model.send(ToolResultEvent(tool_result))
            ```
        """
        ...

receive()

Receive streaming events from the model.

Continuously yields events from the model as they arrive over the connection. Events are normalized to a provider-agnostic format for uniform processing. This method should be called in a loop or async task to process model responses.

The stream continues until the connection is closed or an error occurs.

Yields:

Name Type Description
BidiOutputEvent AsyncIterable[BidiOutputEvent]

Standardized event objects containing audio output, transcripts, tool calls, or control signals.

Source code in strands/experimental/bidi/models/model.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def receive(self) -> AsyncIterable[BidiOutputEvent]:
    """Receive streaming events from the model.

    Continuously yields events from the model as they arrive over the connection.
    Events are normalized to a provider-agnostic format for uniform processing.
    This method should be called in a loop or async task to process model responses.

    The stream continues until the connection is closed or an error occurs.

    Yields:
        BidiOutputEvent: Standardized event objects containing audio output,
            transcripts, tool calls, or control signals.
    """
    ...

send(content) async

Send content to the model over the active connection.

Transmits user input or tool results to the model during an active streaming session. Supports multiple content types including text, audio, images, and tool execution results. Can be called multiple times during a conversation.

Parameters:

Name Type Description Default
content BidiInputEvent | ToolResultEvent

The content to send. Must be one of:

  • BidiTextInputEvent: Text message from the user
  • BidiAudioInputEvent: Audio data for speech input
  • BidiImageInputEvent: Image data for visual understanding
  • ToolResultEvent: Result from a tool execution
required
Example
await model.send(BidiTextInputEvent(text="Hello", role="user"))
await model.send(BidiAudioInputEvent(audio=bytes, format="pcm", sample_rate=16000, channels=1))
await model.send(BidiImageInputEvent(image=bytes, mime_type="image/jpeg", encoding="raw"))
await model.send(ToolResultEvent(tool_result))
Source code in strands/experimental/bidi/models/model.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
async def send(
    self,
    content: BidiInputEvent | ToolResultEvent,
) -> None:
    """Send content to the model over the active connection.

    Transmits user input or tool results to the model during an active streaming
    session. Supports multiple content types including text, audio, images, and
    tool execution results. Can be called multiple times during a conversation.

    Args:
        content: The content to send. Must be one of:

            - BidiTextInputEvent: Text message from the user
            - BidiAudioInputEvent: Audio data for speech input
            - BidiImageInputEvent: Image data for visual understanding
            - ToolResultEvent: Result from a tool execution

    Example:
        ```
        await model.send(BidiTextInputEvent(text="Hello", role="user"))
        await model.send(BidiAudioInputEvent(audio=bytes, format="pcm", sample_rate=16000, channels=1))
        await model.send(BidiImageInputEvent(image=bytes, mime_type="image/jpeg", encoding="raw"))
        await model.send(ToolResultEvent(tool_result))
        ```
    """
    ...

start(system_prompt=None, tools=None, messages=None, **kwargs) async

Establish a persistent streaming connection with the model.

Opens a bidirectional connection that remains active for real-time communication. The connection supports concurrent sending and receiving of events until explicitly closed. Must be called before any send() or receive() operations.

Parameters:

Name Type Description Default
system_prompt str | None

System instructions to configure model behavior.

None
tools list[ToolSpec] | None

Tool specifications that the model can invoke during the conversation.

None
messages Messages | None

Initial conversation history to provide context.

None
**kwargs Any

Provider-specific configuration options.

{}
Source code in strands/experimental/bidi/models/model.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
async def start(
    self,
    system_prompt: str | None = None,
    tools: list[ToolSpec] | None = None,
    messages: Messages | None = None,
    **kwargs: Any,
) -> None:
    """Establish a persistent streaming connection with the model.

    Opens a bidirectional connection that remains active for real-time communication.
    The connection supports concurrent sending and receiving of events until explicitly
    closed. Must be called before any send() or receive() operations.

    Args:
        system_prompt: System instructions to configure model behavior.
        tools: Tool specifications that the model can invoke during the conversation.
        messages: Initial conversation history to provide context.
        **kwargs: Provider-specific configuration options.
    """
    ...

stop() async

Close the streaming connection and release resources.

Terminates the active bidirectional connection and cleans up any associated resources such as network connections, buffers, or background tasks. After calling close(), the model instance cannot be used until start() is called again.

Source code in strands/experimental/bidi/models/model.py
64
65
66
67
68
69
70
71
async def stop(self) -> None:
    """Close the streaming connection and release resources.

    Terminates the active bidirectional connection and cleans up any associated
    resources such as network connections, buffers, or background tasks. After
    calling close(), the model instance cannot be used until start() is called again.
    """
    ...

BidiModelTimeoutError

Bases: Exception

Model timeout error.

Bidirectional models are often configured with a connection time limit. Nova sonic for example keeps the connection open for 8 minutes max. Upon receiving a timeout, the agent loop is configured to restart the model connection so as to create a seamless, uninterrupted experience for the user.

Source code in strands/experimental/bidi/models/model.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class BidiModelTimeoutError(Exception):
    """Model timeout error.

    Bidirectional models are often configured with a connection time limit. Nova sonic for example keeps the connection
    open for 8 minutes max. Upon receiving a timeout, the agent loop is configured to restart the model connection so as
    to create a seamless, uninterrupted experience for the user.
    """

    def __init__(self, message: str, **restart_config: Any) -> None:
        """Initialize error.

        Args:
            message: Timeout message from model.
            **restart_config: Configure restart specific behaviors in the call to model start.
        """
        super().__init__(self, message)

        self.restart_config = restart_config

__init__(message, **restart_config)

Initialize error.

Parameters:

Name Type Description Default
message str

Timeout message from model.

required
**restart_config Any

Configure restart specific behaviors in the call to model start.

{}
Source code in strands/experimental/bidi/models/model.py
125
126
127
128
129
130
131
132
133
134
def __init__(self, message: str, **restart_config: Any) -> None:
    """Initialize error.

    Args:
        message: Timeout message from model.
        **restart_config: Configure restart specific behaviors in the call to model start.
    """
    super().__init__(self, message)

    self.restart_config = restart_config

strands.experimental.bidi.models.gemini_live

Gemini Live API bidirectional model provider using official Google GenAI SDK.

Implements the BidiModel interface for Google's Gemini Live API using the official Google GenAI SDK for simplified and robust WebSocket communication.

Key improvements over custom WebSocket implementation:

  • Uses official google-genai SDK with native Live API support
  • Simplified session management with client.aio.live.connect()
  • Built-in tool integration and event handling
  • Automatic WebSocket connection management and error handling
  • Native support for audio/text streaming and interruption

BidiGeminiLiveModel

Bases: BidiModel

Gemini Live API implementation using official Google GenAI SDK.

Combines model configuration and connection state in a single class. Provides a clean interface to Gemini Live API using the official SDK, eliminating custom WebSocket handling and providing robust error handling.

Source code in strands/experimental/bidi/models/gemini_live.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
class BidiGeminiLiveModel(BidiModel):
    """Gemini Live API implementation using official Google GenAI SDK.

    Combines model configuration and connection state in a single class.
    Provides a clean interface to Gemini Live API using the official SDK,
    eliminating custom WebSocket handling and providing robust error handling.
    """

    def __init__(
        self,
        model_id: str = "gemini-2.5-flash-native-audio-preview-09-2025",
        provider_config: dict[str, Any] | None = None,
        client_config: dict[str, Any] | None = None,
        **kwargs: Any,
    ):
        """Initialize Gemini Live API bidirectional model.

        Args:
            model_id: Model identifier (default: gemini-2.5-flash-native-audio-preview-09-2025)
            provider_config: Model behavior (audio, inference)
            client_config: Authentication (api_key, http_options)
            **kwargs: Reserved for future parameters.

        """
        # Store model ID
        self.model_id = model_id

        # Resolve client config with defaults
        self._client_config = self._resolve_client_config(client_config or {})

        # Resolve provider config with defaults
        self.config = self._resolve_provider_config(provider_config or {})

        # Store API key for later use
        self.api_key = self._client_config.get("api_key")

        # Create Gemini client
        self._client = genai.Client(**self._client_config)

        # Connection state (initialized in start())
        self._live_session: Any = None
        self._live_session_context_manager: Any = None
        self._live_session_handle: str | None = None
        self._connection_id: str | None = None

    def _resolve_client_config(self, config: dict[str, Any]) -> dict[str, Any]:
        """Resolve client config (sets default http_options if not provided)."""
        resolved = config.copy()

        # Set default http_options if not provided
        if "http_options" not in resolved:
            resolved["http_options"] = {"api_version": "v1alpha"}

        return resolved

    def _resolve_provider_config(self, config: dict[str, Any]) -> dict[str, Any]:
        """Merge user config with defaults (user takes precedence)."""
        default_audio: AudioConfig = {
            "input_rate": GEMINI_INPUT_SAMPLE_RATE,
            "output_rate": GEMINI_OUTPUT_SAMPLE_RATE,
            "channels": GEMINI_CHANNELS,
            "format": "pcm",
        }
        default_inference = {
            "response_modalities": ["AUDIO"],
            "outputAudioTranscription": {},
            "inputAudioTranscription": {},
        }

        resolved = {
            "audio": {
                **default_audio,
                **config.get("audio", {}),
            },
            "inference": {
                **default_inference,
                **config.get("inference", {}),
            },
        }
        return resolved

    async def start(
        self,
        system_prompt: str | None = None,
        tools: list[ToolSpec] | None = None,
        messages: Messages | None = None,
        **kwargs: Any,
    ) -> None:
        """Establish bidirectional connection with Gemini Live API.

        Args:
            system_prompt: System instructions for the model.
            tools: List of tools available to the model.
            messages: Conversation history to initialize with.
            **kwargs: Additional configuration options.
        """
        if self._connection_id:
            raise RuntimeError("model already started | call stop before starting again")

        self._connection_id = str(uuid.uuid4())

        # Build live config
        live_config = self._build_live_config(system_prompt, tools, **kwargs)

        # Create the context manager and session
        self._live_session_context_manager = self._client.aio.live.connect(
            model=self.model_id, config=cast(LiveConnectConfigOrDict, live_config)
        )
        self._live_session = await self._live_session_context_manager.__aenter__()

        # Gemini itself restores message history when resuming from session
        if messages and "live_session_handle" not in kwargs:
            await self._send_message_history(messages)

    async def _send_message_history(self, messages: Messages) -> None:
        """Send conversation history to Gemini Live API.

        Sends each message as a separate turn with the correct role to maintain
        proper conversation context. Follows the same pattern as the non-bidirectional
        Gemini model implementation.
        """
        if not messages:
            return

        # Convert each message to Gemini format and send separately
        for message in messages:
            content_parts = []
            for content_block in message["content"]:
                if "text" in content_block:
                    content_parts.append(genai_types.Part(text=content_block["text"]))

            if content_parts:
                # Map role correctly - Gemini uses "user" and "model" roles
                # "assistant" role from Messages format maps to "model" in Gemini
                role = "model" if message["role"] == "assistant" else message["role"]
                content = genai_types.Content(role=role, parts=content_parts)
                await self._live_session.send_client_content(turns=content)

    async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
        """Receive Gemini Live API events and convert to provider-agnostic format."""
        if not self._connection_id:
            raise RuntimeError("model not started | call start before receiving")

        yield BidiConnectionStartEvent(connection_id=self._connection_id, model=self.model_id)

        # Wrap in while loop to restart after turn_complete (SDK limitation workaround)
        while True:
            async for message in self._live_session.receive():
                for event in self._convert_gemini_live_event(message):
                    yield event

    def _convert_gemini_live_event(self, message: LiveServerMessage) -> list[BidiOutputEvent]:
        """Convert Gemini Live API events to provider-agnostic format.

        Handles different types of content:

        - inputTranscription: User's speech transcribed to text
        - outputTranscription: Model's audio transcribed to text
        - modelTurn text: Text response from the model
        - usageMetadata: Token usage information

        Returns:
            List of event dicts (empty list if no events to emit).

        Raises:
            BidiModelTimeoutError: If gemini responds with go away message.
        """
        if message.go_away:
            raise BidiModelTimeoutError(
                message.go_away.model_dump_json(), live_session_handle=self._live_session_handle
            )

        if message.session_resumption_update:
            resumption_update = message.session_resumption_update
            if resumption_update.resumable and resumption_update.new_handle:
                self._live_session_handle = resumption_update.new_handle
                logger.debug("session_handle=<%s> | updating gemini session handle", self._live_session_handle)
            return []

        # Handle interruption first (from server_content)
        if message.server_content and message.server_content.interrupted:
            return [BidiInterruptionEvent(reason="user_speech")]

        # Handle input transcription (user's speech) - emit as transcript event
        if message.server_content and message.server_content.input_transcription:
            input_transcript = message.server_content.input_transcription
            # Check if the transcription object has text content
            if hasattr(input_transcript, "text") and input_transcript.text:
                transcription_text = input_transcript.text
                logger.debug("text_length=<%d> | gemini input transcription detected", len(transcription_text))
                return [
                    BidiTranscriptStreamEvent(
                        delta={"text": transcription_text},
                        text=transcription_text,
                        role="user",
                        # TODO: https://github.com/googleapis/python-genai/issues/1504
                        is_final=bool(input_transcript.finished),
                        current_transcript=transcription_text,
                    )
                ]

        # Handle output transcription (model's audio) - emit as transcript event
        if message.server_content and message.server_content.output_transcription:
            output_transcript = message.server_content.output_transcription
            # Check if the transcription object has text content
            if hasattr(output_transcript, "text") and output_transcript.text:
                transcription_text = output_transcript.text
                logger.debug("text_length=<%d> | gemini output transcription detected", len(transcription_text))
                return [
                    BidiTranscriptStreamEvent(
                        delta={"text": transcription_text},
                        text=transcription_text,
                        role="assistant",
                        # TODO: https://github.com/googleapis/python-genai/issues/1504
                        is_final=bool(output_transcript.finished),
                        current_transcript=transcription_text,
                    )
                ]

        # Handle audio output using SDK's built-in data property
        # Check this BEFORE text to avoid triggering warning on mixed content
        if message.data:
            # Convert bytes to base64 string for JSON serializability
            audio_b64 = base64.b64encode(message.data).decode("utf-8")
            return [
                BidiAudioStreamEvent(
                    audio=audio_b64,
                    format="pcm",
                    sample_rate=cast(AudioSampleRate, self.config["audio"]["output_rate"]),
                    channels=cast(AudioChannel, self.config["audio"]["channels"]),
                )
            ]

        # Handle text output from model_turn (avoids warning by checking parts directly)
        if message.server_content and message.server_content.model_turn:
            model_turn = message.server_content.model_turn
            if model_turn.parts:
                # Concatenate all text parts (Gemini may send multiple parts)
                text_parts = []
                for part in model_turn.parts:
                    # Check if part has text attribute and it's not empty
                    if hasattr(part, "text") and part.text:
                        text_parts.append(part.text)

                if text_parts:
                    full_text = " ".join(text_parts)
                    return [
                        BidiTranscriptStreamEvent(
                            delta={"text": full_text},
                            text=full_text,
                            role="assistant",
                            is_final=True,
                            current_transcript=full_text,
                        )
                    ]

        # Handle tool calls - return list to support multiple tool calls
        if message.tool_call and message.tool_call.function_calls:
            tool_events: list[BidiOutputEvent] = []
            for func_call in message.tool_call.function_calls:
                tool_use_event: ToolUse = {
                    "toolUseId": cast(str, func_call.id),
                    "name": cast(str, func_call.name),
                    "input": func_call.args or {},
                }
                # Create ToolUseStreamEvent for consistency with standard agent
                tool_events.append(
                    ToolUseStreamEvent(delta={"toolUse": tool_use_event}, current_tool_use=dict(tool_use_event))
                )
            return tool_events

        # Handle usage metadata
        if hasattr(message, "usage_metadata") and message.usage_metadata:
            usage = message.usage_metadata

            # Build modality details from token details
            modality_details = []

            # Process prompt tokens details
            if usage.prompt_tokens_details:
                for detail in usage.prompt_tokens_details:
                    if detail.modality and detail.token_count:
                        modality_details.append(
                            {
                                "modality": str(detail.modality).lower(),
                                "input_tokens": detail.token_count,
                                "output_tokens": 0,
                            }
                        )

            # Process response tokens details
            if usage.response_tokens_details:
                for detail in usage.response_tokens_details:
                    if detail.modality and detail.token_count:
                        # Find or create modality entry
                        modality_str = str(detail.modality).lower()
                        existing = next((m for m in modality_details if m["modality"] == modality_str), None)
                        if existing:
                            existing["output_tokens"] = detail.token_count
                        else:
                            modality_details.append(
                                {"modality": modality_str, "input_tokens": 0, "output_tokens": detail.token_count}
                            )

            return [
                BidiUsageEvent(
                    input_tokens=usage.prompt_token_count or 0,
                    output_tokens=usage.response_token_count or 0,
                    total_tokens=usage.total_token_count or 0,
                    modality_details=cast(list[ModalityUsage], modality_details) if modality_details else None,
                    cache_read_input_tokens=usage.cached_content_token_count
                    if usage.cached_content_token_count
                    else None,
                )
            ]

        # Silently ignore setup_complete and generation_complete messages
        return []

    async def send(
        self,
        content: BidiInputEvent | ToolResultEvent,
    ) -> None:
        """Unified send method for all content types. Sends the given inputs to Google Live API.

        Dispatches to appropriate internal handler based on content type.

        Args:
            content: Typed event (BidiTextInputEvent, BidiAudioInputEvent, BidiImageInputEvent, or ToolResultEvent).

        Raises:
            ValueError: If content type not supported (e.g., image content).
        """
        if not self._connection_id:
            raise RuntimeError("model not started | call start before sending/receiving")

        if isinstance(content, BidiTextInputEvent):
            await self._send_text_content(content.text)
        elif isinstance(content, BidiAudioInputEvent):
            await self._send_audio_content(content)
        elif isinstance(content, BidiImageInputEvent):
            await self._send_image_content(content)
        elif isinstance(content, ToolResultEvent):
            tool_result = content.get("tool_result")
            if tool_result:
                await self._send_tool_result(tool_result)
        else:
            raise ValueError(f"content_type={type(content)} | content not supported")

    async def _send_audio_content(self, audio_input: BidiAudioInputEvent) -> None:
        """Internal: Send audio content using Gemini Live API.

        Gemini Live expects continuous audio streaming via send_realtime_input.
        This automatically triggers VAD and can interrupt ongoing responses.
        """
        # Decode base64 audio to bytes for SDK
        audio_bytes = base64.b64decode(audio_input.audio)

        # Create audio blob for the SDK
        mime_type = f"audio/pcm;rate={self.config['audio']['input_rate']}"
        audio_blob = genai_types.Blob(data=audio_bytes, mime_type=mime_type)

        # Send real-time audio input - this automatically handles VAD and interruption
        await self._live_session.send_realtime_input(audio=audio_blob)

    async def _send_image_content(self, image_input: BidiImageInputEvent) -> None:
        """Internal: Send image content using Gemini Live API.

        Sends image frames following the same pattern as the GitHub example.
        Images are sent as base64-encoded data with MIME type.
        """
        # Image is already base64 encoded in the event
        msg = {"mime_type": image_input.mime_type, "data": image_input.image}

        # Send using the same method as the GitHub example
        await self._live_session.send(input=msg)

    async def _send_text_content(self, text: str) -> None:
        """Internal: Send text content using Gemini Live API."""
        # Create content with text
        content = genai_types.Content(role="user", parts=[genai_types.Part(text=text)])

        # Send as client content
        await self._live_session.send_client_content(turns=content)

    async def _send_tool_result(self, tool_result: ToolResult) -> None:
        """Internal: Send tool result using Gemini Live API."""
        tool_use_id = tool_result.get("toolUseId")
        content = tool_result.get("content", [])

        # Validate all content types are supported
        for block in content:
            if "text" not in block and "json" not in block:
                # Unsupported content type - raise error
                raise ValueError(
                    f"tool_use_id=<{tool_use_id}>, content_types=<{list(block.keys())}> | "
                    f"Content type not supported by Gemini Live API"
                )

        # Optimize for single content item - unwrap the array
        if len(content) == 1:
            result_data = cast(dict[str, Any], content[0])
        else:
            # Multiple items - send as array
            result_data = {"result": content}

        # Create function response
        func_response = genai_types.FunctionResponse(
            id=tool_use_id,
            name=tool_use_id,  # Gemini uses name as identifier
            response=result_data,
        )

        # Send tool response
        await self._live_session.send_tool_response(function_responses=[func_response])

    async def stop(self) -> None:
        """Close Gemini Live API connection."""

        async def stop_session() -> None:
            if not self._live_session_context_manager:
                return

            await self._live_session_context_manager.__aexit__(None, None, None)

        async def stop_connection() -> None:
            self._connection_id = None

        await stop_all(stop_session, stop_connection)

    def _build_live_config(
        self, system_prompt: str | None = None, tools: list[ToolSpec] | None = None, **kwargs: Any
    ) -> dict[str, Any]:
        """Build LiveConnectConfig for the official SDK.

        Simply passes through all config parameters from provider_config, allowing users
        to configure any Gemini Live API parameter directly.
        """
        config_dict: dict[str, Any] = self.config["inference"].copy()

        config_dict["session_resumption"] = {"handle": kwargs.get("live_session_handle")}

        # Add system instruction if provided
        if system_prompt:
            config_dict["system_instruction"] = system_prompt

        # Add tools if provided
        if tools:
            config_dict["tools"] = self._format_tools_for_live_api(tools)

        if "voice" in self.config["audio"]:
            config_dict.setdefault("speech_config", {}).setdefault("voice_config", {}).setdefault(
                "prebuilt_voice_config", {}
            )["voice_name"] = self.config["audio"]["voice"]

        return config_dict

    def _format_tools_for_live_api(self, tool_specs: list[ToolSpec]) -> list[genai_types.Tool]:
        """Format tool specs for Gemini Live API."""
        if not tool_specs:
            return []

        return [
            genai_types.Tool(
                function_declarations=[
                    genai_types.FunctionDeclaration(
                        description=tool_spec["description"],
                        name=tool_spec["name"],
                        parameters_json_schema=tool_spec["inputSchema"]["json"],
                    )
                    for tool_spec in tool_specs
                ],
            ),
        ]

__init__(model_id='gemini-2.5-flash-native-audio-preview-09-2025', provider_config=None, client_config=None, **kwargs)

Initialize Gemini Live API bidirectional model.

Parameters:

Name Type Description Default
model_id str

Model identifier (default: gemini-2.5-flash-native-audio-preview-09-2025)

'gemini-2.5-flash-native-audio-preview-09-2025'
provider_config dict[str, Any] | None

Model behavior (audio, inference)

None
client_config dict[str, Any] | None

Authentication (api_key, http_options)

None
**kwargs Any

Reserved for future parameters.

{}
Source code in strands/experimental/bidi/models/gemini_live.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def __init__(
    self,
    model_id: str = "gemini-2.5-flash-native-audio-preview-09-2025",
    provider_config: dict[str, Any] | None = None,
    client_config: dict[str, Any] | None = None,
    **kwargs: Any,
):
    """Initialize Gemini Live API bidirectional model.

    Args:
        model_id: Model identifier (default: gemini-2.5-flash-native-audio-preview-09-2025)
        provider_config: Model behavior (audio, inference)
        client_config: Authentication (api_key, http_options)
        **kwargs: Reserved for future parameters.

    """
    # Store model ID
    self.model_id = model_id

    # Resolve client config with defaults
    self._client_config = self._resolve_client_config(client_config or {})

    # Resolve provider config with defaults
    self.config = self._resolve_provider_config(provider_config or {})

    # Store API key for later use
    self.api_key = self._client_config.get("api_key")

    # Create Gemini client
    self._client = genai.Client(**self._client_config)

    # Connection state (initialized in start())
    self._live_session: Any = None
    self._live_session_context_manager: Any = None
    self._live_session_handle: str | None = None
    self._connection_id: str | None = None

receive() async

Receive Gemini Live API events and convert to provider-agnostic format.

Source code in strands/experimental/bidi/models/gemini_live.py
192
193
194
195
196
197
198
199
200
201
202
203
async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
    """Receive Gemini Live API events and convert to provider-agnostic format."""
    if not self._connection_id:
        raise RuntimeError("model not started | call start before receiving")

    yield BidiConnectionStartEvent(connection_id=self._connection_id, model=self.model_id)

    # Wrap in while loop to restart after turn_complete (SDK limitation workaround)
    while True:
        async for message in self._live_session.receive():
            for event in self._convert_gemini_live_event(message):
                yield event

send(content) async

Unified send method for all content types. Sends the given inputs to Google Live API.

Dispatches to appropriate internal handler based on content type.

Parameters:

Name Type Description Default
content BidiInputEvent | ToolResultEvent

Typed event (BidiTextInputEvent, BidiAudioInputEvent, BidiImageInputEvent, or ToolResultEvent).

required

Raises:

Type Description
ValueError

If content type not supported (e.g., image content).

Source code in strands/experimental/bidi/models/gemini_live.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
async def send(
    self,
    content: BidiInputEvent | ToolResultEvent,
) -> None:
    """Unified send method for all content types. Sends the given inputs to Google Live API.

    Dispatches to appropriate internal handler based on content type.

    Args:
        content: Typed event (BidiTextInputEvent, BidiAudioInputEvent, BidiImageInputEvent, or ToolResultEvent).

    Raises:
        ValueError: If content type not supported (e.g., image content).
    """
    if not self._connection_id:
        raise RuntimeError("model not started | call start before sending/receiving")

    if isinstance(content, BidiTextInputEvent):
        await self._send_text_content(content.text)
    elif isinstance(content, BidiAudioInputEvent):
        await self._send_audio_content(content)
    elif isinstance(content, BidiImageInputEvent):
        await self._send_image_content(content)
    elif isinstance(content, ToolResultEvent):
        tool_result = content.get("tool_result")
        if tool_result:
            await self._send_tool_result(tool_result)
    else:
        raise ValueError(f"content_type={type(content)} | content not supported")

start(system_prompt=None, tools=None, messages=None, **kwargs) async

Establish bidirectional connection with Gemini Live API.

Parameters:

Name Type Description Default
system_prompt str | None

System instructions for the model.

None
tools list[ToolSpec] | None

List of tools available to the model.

None
messages Messages | None

Conversation history to initialize with.

None
**kwargs Any

Additional configuration options.

{}
Source code in strands/experimental/bidi/models/gemini_live.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
async def start(
    self,
    system_prompt: str | None = None,
    tools: list[ToolSpec] | None = None,
    messages: Messages | None = None,
    **kwargs: Any,
) -> None:
    """Establish bidirectional connection with Gemini Live API.

    Args:
        system_prompt: System instructions for the model.
        tools: List of tools available to the model.
        messages: Conversation history to initialize with.
        **kwargs: Additional configuration options.
    """
    if self._connection_id:
        raise RuntimeError("model already started | call stop before starting again")

    self._connection_id = str(uuid.uuid4())

    # Build live config
    live_config = self._build_live_config(system_prompt, tools, **kwargs)

    # Create the context manager and session
    self._live_session_context_manager = self._client.aio.live.connect(
        model=self.model_id, config=cast(LiveConnectConfigOrDict, live_config)
    )
    self._live_session = await self._live_session_context_manager.__aenter__()

    # Gemini itself restores message history when resuming from session
    if messages and "live_session_handle" not in kwargs:
        await self._send_message_history(messages)

stop() async

Close Gemini Live API connection.

Source code in strands/experimental/bidi/models/gemini_live.py
470
471
472
473
474
475
476
477
478
479
480
481
482
async def stop(self) -> None:
    """Close Gemini Live API connection."""

    async def stop_session() -> None:
        if not self._live_session_context_manager:
            return

        await self._live_session_context_manager.__aexit__(None, None, None)

    async def stop_connection() -> None:
        self._connection_id = None

    await stop_all(stop_session, stop_connection)

strands.experimental.bidi.models.nova_sonic

Nova Sonic bidirectional model provider for real-time streaming conversations.

Implements the BidiModel interface for Amazon's Nova Sonic, handling the complex event sequencing and audio processing required by Nova Sonic's InvokeModelWithBidirectionalStream protocol.

Nova Sonic specifics:

  • Hierarchical event sequences: connectionStart → promptStart → content streaming
  • Base64-encoded audio format with hex encoding
  • Tool execution with content containers and identifier tracking
  • 8-minute connection limits with proper cleanup sequences
  • Interruption detection through stopReason events

BidiNovaSonicModel

Bases: BidiModel

Nova Sonic implementation for bidirectional streaming.

Combines model configuration and connection state in a single class. Manages Nova Sonic's complex event sequencing, audio format conversion, and tool execution patterns while providing the standard BidiModel interface.

Attributes:

Name Type Description
_stream DuplexEventStream

open bedrock stream to nova sonic.

Source code in strands/experimental/bidi/models/nova_sonic.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
class BidiNovaSonicModel(BidiModel):
    """Nova Sonic implementation for bidirectional streaming.

    Combines model configuration and connection state in a single class.
    Manages Nova Sonic's complex event sequencing, audio format conversion, and
    tool execution patterns while providing the standard BidiModel interface.

    Attributes:
        _stream: open bedrock stream to nova sonic.
    """

    _stream: DuplexEventStream

    def __init__(
        self,
        model_id: str = "amazon.nova-sonic-v1:0",
        provider_config: dict[str, Any] | None = None,
        client_config: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> None:
        """Initialize Nova Sonic bidirectional model.

        Args:
            model_id: Model identifier (default: amazon.nova-sonic-v1:0)
            provider_config: Model behavior (audio, inference settings)
            client_config: AWS authentication (boto_session OR region, not both)
            **kwargs: Reserved for future parameters.
        """
        # Store model ID
        self.model_id = model_id

        # Resolve client config with defaults
        self._client_config = self._resolve_client_config(client_config or {})

        # Resolve provider config with defaults
        self.config = self._resolve_provider_config(provider_config or {})

        # Store session and region for later use
        self._session = self._client_config["boto_session"]
        self.region = self._client_config["region"]

        # Track API-provided identifiers
        self._connection_id: str | None = None
        self._audio_content_name: str | None = None
        self._current_completion_id: str | None = None

        # Indicates if model is done generating transcript
        self._generation_stage: str | None = None

        # Ensure certain events are sent in sequence when required
        self._send_lock = asyncio.Lock()

        logger.debug("model_id=<%s> | nova sonic model initialized", model_id)

    def _resolve_client_config(self, config: dict[str, Any]) -> dict[str, Any]:
        """Resolve AWS client config (creates boto session if needed)."""
        if "boto_session" in config and "region" in config:
            raise ValueError("Cannot specify both 'boto_session' and 'region' in client_config")

        resolved = config.copy()

        # Create boto session if not provided
        if "boto_session" not in resolved:
            resolved["boto_session"] = boto3.Session()

        # Resolve region from session or use default
        if "region" not in resolved:
            resolved["region"] = resolved["boto_session"].region_name or "us-east-1"

        return resolved

    def _resolve_provider_config(self, config: dict[str, Any]) -> dict[str, Any]:
        """Merge user config with defaults (user takes precedence)."""
        default_audio: AudioConfig = {
            "input_rate": cast(AudioSampleRate, NOVA_AUDIO_INPUT_CONFIG["sampleRateHertz"]),
            "output_rate": cast(AudioSampleRate, NOVA_AUDIO_OUTPUT_CONFIG["sampleRateHertz"]),
            "channels": cast(AudioChannel, NOVA_AUDIO_INPUT_CONFIG["channelCount"]),
            "format": "pcm",
            "voice": cast(str, NOVA_AUDIO_OUTPUT_CONFIG["voiceId"]),
        }

        resolved = {
            "audio": {
                **default_audio,
                **config.get("audio", {}),
            },
            "inference": config.get("inference", {}),
        }
        return resolved

    async def start(
        self,
        system_prompt: str | None = None,
        tools: list[ToolSpec] | None = None,
        messages: Messages | None = None,
        **kwargs: Any,
    ) -> None:
        """Establish bidirectional connection to Nova Sonic.

        Args:
            system_prompt: System instructions for the model.
            tools: List of tools available to the model.
            messages: Conversation history to initialize with.
            **kwargs: Additional configuration options.

        Raises:
            RuntimeError: If user calls start again without first stopping.
        """
        if self._connection_id:
            raise RuntimeError("model already started | call stop before starting again")

        logger.debug("nova connection starting")

        self._connection_id = str(uuid.uuid4())

        # Get credentials from boto3 session (full credential chain)
        credentials = self._session.get_credentials()

        if not credentials:
            raise ValueError(
                "no AWS credentials found. configure credentials via environment variables, "
                "credential files, IAM roles, or SSO."
            )

        # Use static resolver with credentials configured as properties
        resolver = StaticCredentialsResolver()

        config = Config(
            endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
            region=self.region,
            aws_credentials_identity_resolver=resolver,
            auth_scheme_resolver=HTTPAuthSchemeResolver(),
            auth_schemes={ShapeID("aws.auth#sigv4"): SigV4AuthScheme(service="bedrock")},
            # Configure static credentials as properties
            aws_access_key_id=credentials.access_key,
            aws_secret_access_key=credentials.secret_key,
            aws_session_token=credentials.token,
        )

        self.client = BedrockRuntimeClient(config=config)
        logger.debug("region=<%s> | nova sonic client initialized", self.region)

        client = BedrockRuntimeClient(config=config)
        self._stream = await client.invoke_model_with_bidirectional_stream(
            InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id)
        )
        logger.debug("region=<%s> | nova sonic client initialized", self.region)

        init_events = self._build_initialization_events(system_prompt, tools, messages)
        logger.debug("event_count=<%d> | sending nova sonic initialization events", len(init_events))
        await self._send_nova_events(init_events)

        logger.info("connection_id=<%s> | nova sonic connection established", self._connection_id)

    def _build_initialization_events(
        self, system_prompt: str | None, tools: list[ToolSpec] | None, messages: Messages | None
    ) -> list[str]:
        """Build the sequence of initialization events."""
        tools = tools or []
        events = [
            self._get_connection_start_event(),
            self._get_prompt_start_event(tools),
            *self._get_system_prompt_events(system_prompt),
        ]

        # Add conversation history if provided
        if messages:
            events.extend(self._get_message_history_events(messages))
            logger.debug("message_count=<%d> | conversation history added to initialization", len(messages))

        return events

    def _log_event_type(self, nova_event: dict[str, Any]) -> None:
        """Log specific Nova Sonic event types for debugging."""
        if "usageEvent" in nova_event:
            logger.debug("usage=<%s> | nova usage event received", nova_event["usageEvent"])
        elif "textOutput" in nova_event:
            logger.debug("nova text output received")
        elif "toolUse" in nova_event:
            tool_use = nova_event["toolUse"]
            logger.debug(
                "tool_name=<%s>, tool_use_id=<%s> | nova tool use received",
                tool_use["toolName"],
                tool_use["toolUseId"],
            )
        elif "audioOutput" in nova_event:
            audio_content = nova_event["audioOutput"]["content"]
            audio_bytes = base64.b64decode(audio_content)
            logger.debug("audio_bytes=<%d> | nova audio output received", len(audio_bytes))

    async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
        """Receive Nova Sonic events and convert to provider-agnostic format.

        Raises:
            RuntimeError: If start has not been called.
        """
        if not self._connection_id:
            raise RuntimeError("model not started | call start before receiving")

        logger.debug("nova event stream starting")
        yield BidiConnectionStartEvent(connection_id=self._connection_id, model=self.model_id)

        _, output = await self._stream.await_output()
        while True:
            try:
                event_data = await output.receive()

            except ValidationException as error:
                if "InternalErrorCode=531" in error.message:
                    # nova also times out if user is silent for 175 seconds
                    raise BidiModelTimeoutError(error.message) from error
                raise

            except ModelTimeoutException as error:
                raise BidiModelTimeoutError(error.message) from error

            if not event_data:
                continue

            nova_event = json.loads(event_data.value.bytes_.decode("utf-8"))["event"]
            self._log_event_type(nova_event)

            model_event = self._convert_nova_event(nova_event)
            if model_event:
                yield model_event

    async def send(self, content: BidiInputEvent | ToolResultEvent) -> None:
        """Unified send method for all content types. Sends the given content to Nova Sonic.

        Dispatches to appropriate internal handler based on content type.

        Args:
            content: Input event.

        Raises:
            ValueError: If content type not supported (e.g., image content).
        """
        if not self._connection_id:
            raise RuntimeError("model not started | call start before sending")

        if isinstance(content, BidiTextInputEvent):
            await self._send_text_content(content.text)
        elif isinstance(content, BidiAudioInputEvent):
            await self._send_audio_content(content)
        elif isinstance(content, ToolResultEvent):
            tool_result = content.get("tool_result")
            if tool_result:
                await self._send_tool_result(tool_result)
        else:
            raise ValueError(f"content_type={type(content)} | content not supported")

    async def _start_audio_connection(self) -> None:
        """Internal: Start audio input connection (call once before sending audio chunks)."""
        logger.debug("nova audio connection starting")
        self._audio_content_name = str(uuid.uuid4())

        # Build audio input configuration from config
        audio_input_config = {
            "mediaType": "audio/lpcm",
            "sampleRateHertz": self.config["audio"]["input_rate"],
            "sampleSizeBits": 16,
            "channelCount": self.config["audio"]["channels"],
            "audioType": "SPEECH",
            "encoding": "base64",
        }

        audio_content_start = json.dumps(
            {
                "event": {
                    "contentStart": {
                        "promptName": self._connection_id,
                        "contentName": self._audio_content_name,
                        "type": "AUDIO",
                        "interactive": True,
                        "role": "USER",
                        "audioInputConfiguration": audio_input_config,
                    }
                }
            }
        )

        await self._send_nova_events([audio_content_start])

    async def _send_audio_content(self, audio_input: BidiAudioInputEvent) -> None:
        """Internal: Send audio using Nova Sonic protocol-specific format."""
        # Start audio connection if not already active
        if not self._audio_content_name:
            await self._start_audio_connection()

        # Audio is already base64 encoded in the event
        # Send audio input event
        audio_event = json.dumps(
            {
                "event": {
                    "audioInput": {
                        "promptName": self._connection_id,
                        "contentName": self._audio_content_name,
                        "content": audio_input.audio,
                    }
                }
            }
        )

        await self._send_nova_events([audio_event])

    async def _end_audio_input(self) -> None:
        """Internal: End current audio input connection to trigger Nova Sonic processing."""
        if not self._audio_content_name:
            return

        logger.debug("nova audio connection ending")

        audio_content_end = json.dumps(
            {"event": {"contentEnd": {"promptName": self._connection_id, "contentName": self._audio_content_name}}}
        )

        await self._send_nova_events([audio_content_end])
        self._audio_content_name = None

    async def _send_text_content(self, text: str) -> None:
        """Internal: Send text content using Nova Sonic format."""
        content_name = str(uuid.uuid4())
        events = [
            self._get_text_content_start_event(content_name),
            self._get_text_input_event(content_name, text),
            self._get_content_end_event(content_name),
        ]
        await self._send_nova_events(events)

    async def _send_tool_result(self, tool_result: ToolResult) -> None:
        """Internal: Send tool result using Nova Sonic toolResult format."""
        tool_use_id = tool_result["toolUseId"]

        logger.debug("tool_use_id=<%s> | sending nova tool result", tool_use_id)

        # Validate content types and preserve structure
        content = tool_result.get("content", [])

        # Validate all content types are supported
        for block in content:
            if "text" not in block and "json" not in block:
                # Unsupported content type - raise error
                raise ValueError(
                    f"tool_use_id=<{tool_use_id}>, content_types=<{list(block.keys())}> | "
                    f"Content type not supported by Nova Sonic"
                )

        # Optimize for single content item - unwrap the array
        if len(content) == 1:
            result_data = cast(dict[str, Any], content[0])
        else:
            # Multiple items - send as array
            result_data = {"content": content}

        content_name = str(uuid.uuid4())
        events = [
            self._get_tool_content_start_event(content_name, tool_use_id),
            self._get_tool_result_event(content_name, result_data),
            self._get_content_end_event(content_name),
        ]
        await self._send_nova_events(events)

    async def stop(self) -> None:
        """Close Nova Sonic connection with proper cleanup sequence."""
        logger.debug("nova connection cleanup starting")

        async def stop_events() -> None:
            if not self._connection_id:
                return

            await self._end_audio_input()
            cleanup_events = [self._get_prompt_end_event(), self._get_connection_end_event()]
            await self._send_nova_events(cleanup_events)

        async def stop_stream() -> None:
            if not hasattr(self, "_stream"):
                return

            await self._stream.close()

        async def stop_connection() -> None:
            self._connection_id = None

        await stop_all(stop_events, stop_stream, stop_connection)

        logger.debug("nova connection closed")

    def _convert_nova_event(self, nova_event: dict[str, Any]) -> BidiOutputEvent | None:
        """Convert Nova Sonic events to TypedEvent format."""
        # Handle completion start - track completionId
        if "completionStart" in nova_event:
            completion_data = nova_event["completionStart"]
            self._current_completion_id = completion_data.get("completionId")
            logger.debug("completion_id=<%s> | nova completion started", self._current_completion_id)
            return None

        # Handle completion end
        if "completionEnd" in nova_event:
            completion_data = nova_event["completionEnd"]
            completion_id = completion_data.get("completionId", self._current_completion_id)
            stop_reason = completion_data.get("stopReason", "END_TURN")

            event = BidiResponseCompleteEvent(
                response_id=completion_id or str(uuid.uuid4()),  # Fallback to UUID if missing
                stop_reason="interrupted" if stop_reason == "INTERRUPTED" else "complete",
            )

            # Clear completion tracking
            self._current_completion_id = None
            return event

        # Handle audio output
        if "audioOutput" in nova_event:
            # Audio is already base64 string from Nova Sonic
            audio_content = nova_event["audioOutput"]["content"]
            return BidiAudioStreamEvent(
                audio=audio_content,
                format="pcm",
                sample_rate=cast(AudioSampleRate, self.config["audio"]["output_rate"]),
                channels=cast(AudioChannel, self.config["audio"]["channels"]),
            )

        # Handle text output (transcripts)
        elif "textOutput" in nova_event:
            text_output = nova_event["textOutput"]
            text_content = text_output["content"]
            # Check for Nova Sonic interruption pattern
            if '{ "interrupted" : true }' in text_content:
                logger.debug("nova interruption detected in text output")
                return BidiInterruptionEvent(reason="user_speech")

            return BidiTranscriptStreamEvent(
                delta={"text": text_content},
                text=text_content,
                role=text_output["role"].lower(),
                is_final=self._generation_stage == "FINAL",
                current_transcript=text_content,
            )

        # Handle tool use
        if "toolUse" in nova_event:
            tool_use = nova_event["toolUse"]
            tool_use_event: ToolUse = {
                "toolUseId": tool_use["toolUseId"],
                "name": tool_use["toolName"],
                "input": json.loads(tool_use["content"]),
            }
            # Return ToolUseStreamEvent - cast to dict for type compatibility
            return ToolUseStreamEvent(delta={"toolUse": tool_use_event}, current_tool_use=dict(tool_use_event))

        # Handle interruption
        if nova_event.get("stopReason") == "INTERRUPTED":
            logger.debug("nova interruption detected via stop reason")
            return BidiInterruptionEvent(reason="user_speech")

        # Handle usage events - convert to multimodal usage format
        if "usageEvent" in nova_event:
            usage_data = nova_event["usageEvent"]
            total_input = usage_data.get("totalInputTokens", 0)
            total_output = usage_data.get("totalOutputTokens", 0)

            return BidiUsageEvent(
                input_tokens=total_input,
                output_tokens=total_output,
                total_tokens=usage_data.get("totalTokens", total_input + total_output),
            )

        # Handle content start events (emit response start)
        if "contentStart" in nova_event:
            content_data = nova_event["contentStart"]
            if content_data["type"] == "TEXT":
                self._generation_stage = json.loads(content_data["additionalModelFields"])["generationStage"]

            # Emit response start event using API-provided completionId
            # completionId should already be tracked from completionStart event
            return BidiResponseStartEvent(
                response_id=self._current_completion_id or str(uuid.uuid4())  # Fallback to UUID if missing
            )

        if "contentEnd" in nova_event:
            self._generation_stage = None

        # Ignore all other events
        return None

    def _get_connection_start_event(self) -> str:
        """Generate Nova Sonic connection start event."""
        inference_config = {_NOVA_INFERENCE_CONFIG_KEYS[key]: value for key, value in self.config["inference"].items()}
        return json.dumps({"event": {"sessionStart": {"inferenceConfiguration": inference_config}}})

    def _get_prompt_start_event(self, tools: list[ToolSpec]) -> str:
        """Generate Nova Sonic prompt start event with tool configuration."""
        # Build audio output configuration from config
        audio_output_config = {
            "mediaType": "audio/lpcm",
            "sampleRateHertz": self.config["audio"]["output_rate"],
            "sampleSizeBits": 16,
            "channelCount": self.config["audio"]["channels"],
            "voiceId": self.config["audio"].get("voice", "matthew"),
            "encoding": "base64",
            "audioType": "SPEECH",
        }

        prompt_start_event: dict[str, Any] = {
            "event": {
                "promptStart": {
                    "promptName": self._connection_id,
                    "textOutputConfiguration": NOVA_TEXT_CONFIG,
                    "audioOutputConfiguration": audio_output_config,
                }
            }
        }

        if tools:
            tool_config = self._build_tool_configuration(tools)
            prompt_start_event["event"]["promptStart"]["toolUseOutputConfiguration"] = NOVA_TOOL_CONFIG
            prompt_start_event["event"]["promptStart"]["toolConfiguration"] = {"tools": tool_config}

        return json.dumps(prompt_start_event)

    def _build_tool_configuration(self, tools: list[ToolSpec]) -> list[dict[str, Any]]:
        """Build tool configuration from tool specs."""
        tool_config: list[dict[str, Any]] = []
        for tool in tools:
            input_schema = (
                {"json": json.dumps(tool["inputSchema"]["json"])}
                if "json" in tool["inputSchema"]
                else {"json": json.dumps(tool["inputSchema"])}
            )

            tool_config.append(
                {"toolSpec": {"name": tool["name"], "description": tool["description"], "inputSchema": input_schema}}
            )
        return tool_config

    def _get_system_prompt_events(self, system_prompt: str | None) -> list[str]:
        """Generate system prompt events."""
        content_name = str(uuid.uuid4())
        return [
            self._get_text_content_start_event(content_name, "SYSTEM"),
            self._get_text_input_event(content_name, system_prompt or ""),
            self._get_content_end_event(content_name),
        ]

    def _get_message_history_events(self, messages: Messages) -> list[str]:
        """Generate conversation history events from agent messages.

        Converts agent message history to Nova Sonic format following the
        contentStart/textInput/contentEnd pattern for each message.

        Args:
            messages: List of conversation messages with role and content.

        Returns:
            List of JSON event strings for Nova Sonic.
        """
        events = []

        for message in messages:
            role = message["role"].upper()  # Convert to ASSISTANT or USER
            content_blocks = message.get("content", [])

            # Extract text content from content blocks
            text_parts = []
            for block in content_blocks:
                if "text" in block:
                    text_parts.append(block["text"])

            # Combine all text parts
            if text_parts:
                combined_text = "\n".join(text_parts)
                content_name = str(uuid.uuid4())

                # Add contentStart, textInput, and contentEnd events
                events.extend(
                    [
                        self._get_text_content_start_event(content_name, role),
                        self._get_text_input_event(content_name, combined_text),
                        self._get_content_end_event(content_name),
                    ]
                )

        return events

    def _get_text_content_start_event(self, content_name: str, role: str = "USER") -> str:
        """Generate text content start event."""
        return json.dumps(
            {
                "event": {
                    "contentStart": {
                        "promptName": self._connection_id,
                        "contentName": content_name,
                        "type": "TEXT",
                        "role": role,
                        "interactive": True,
                        "textInputConfiguration": NOVA_TEXT_CONFIG,
                    }
                }
            }
        )

    def _get_tool_content_start_event(self, content_name: str, tool_use_id: str) -> str:
        """Generate tool content start event."""
        return json.dumps(
            {
                "event": {
                    "contentStart": {
                        "promptName": self._connection_id,
                        "contentName": content_name,
                        "interactive": False,
                        "type": "TOOL",
                        "role": "TOOL",
                        "toolResultInputConfiguration": {
                            "toolUseId": tool_use_id,
                            "type": "TEXT",
                            "textInputConfiguration": NOVA_TEXT_CONFIG,
                        },
                    }
                }
            }
        )

    def _get_text_input_event(self, content_name: str, text: str) -> str:
        """Generate text input event."""
        return json.dumps(
            {"event": {"textInput": {"promptName": self._connection_id, "contentName": content_name, "content": text}}}
        )

    def _get_tool_result_event(self, content_name: str, result: dict[str, Any]) -> str:
        """Generate tool result event."""
        return json.dumps(
            {
                "event": {
                    "toolResult": {
                        "promptName": self._connection_id,
                        "contentName": content_name,
                        "content": json.dumps(result),
                    }
                }
            }
        )

    def _get_content_end_event(self, content_name: str) -> str:
        """Generate content end event."""
        return json.dumps({"event": {"contentEnd": {"promptName": self._connection_id, "contentName": content_name}}})

    def _get_prompt_end_event(self) -> str:
        """Generate prompt end event."""
        return json.dumps({"event": {"promptEnd": {"promptName": self._connection_id}}})

    def _get_connection_end_event(self) -> str:
        """Generate connection end event."""
        return json.dumps({"event": {"connectionEnd": {}}})

    async def _send_nova_events(self, events: list[str]) -> None:
        """Send event JSON string to Nova Sonic stream.

        A lock is used to send events in sequence when required (e.g., tool result start, content, and end).

        Args:
            events: Jsonified events.
        """
        async with self._send_lock:
            for event in events:
                bytes_data = event.encode("utf-8")
                chunk = InvokeModelWithBidirectionalStreamInputChunk(
                    value=BidirectionalInputPayloadPart(bytes_=bytes_data)
                )
                await self._stream.input_stream.send(chunk)
                logger.debug("nova sonic event sent successfully")

__init__(model_id='amazon.nova-sonic-v1:0', provider_config=None, client_config=None, **kwargs)

Initialize Nova Sonic bidirectional model.

Parameters:

Name Type Description Default
model_id str

Model identifier (default: amazon.nova-sonic-v1:0)

'amazon.nova-sonic-v1:0'
provider_config dict[str, Any] | None

Model behavior (audio, inference settings)

None
client_config dict[str, Any] | None

AWS authentication (boto_session OR region, not both)

None
**kwargs Any

Reserved for future parameters.

{}
Source code in strands/experimental/bidi/models/nova_sonic.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def __init__(
    self,
    model_id: str = "amazon.nova-sonic-v1:0",
    provider_config: dict[str, Any] | None = None,
    client_config: dict[str, Any] | None = None,
    **kwargs: Any,
) -> None:
    """Initialize Nova Sonic bidirectional model.

    Args:
        model_id: Model identifier (default: amazon.nova-sonic-v1:0)
        provider_config: Model behavior (audio, inference settings)
        client_config: AWS authentication (boto_session OR region, not both)
        **kwargs: Reserved for future parameters.
    """
    # Store model ID
    self.model_id = model_id

    # Resolve client config with defaults
    self._client_config = self._resolve_client_config(client_config or {})

    # Resolve provider config with defaults
    self.config = self._resolve_provider_config(provider_config or {})

    # Store session and region for later use
    self._session = self._client_config["boto_session"]
    self.region = self._client_config["region"]

    # Track API-provided identifiers
    self._connection_id: str | None = None
    self._audio_content_name: str | None = None
    self._current_completion_id: str | None = None

    # Indicates if model is done generating transcript
    self._generation_stage: str | None = None

    # Ensure certain events are sent in sequence when required
    self._send_lock = asyncio.Lock()

    logger.debug("model_id=<%s> | nova sonic model initialized", model_id)

receive() async

Receive Nova Sonic events and convert to provider-agnostic format.

Raises:

Type Description
RuntimeError

If start has not been called.

Source code in strands/experimental/bidi/models/nova_sonic.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
    """Receive Nova Sonic events and convert to provider-agnostic format.

    Raises:
        RuntimeError: If start has not been called.
    """
    if not self._connection_id:
        raise RuntimeError("model not started | call start before receiving")

    logger.debug("nova event stream starting")
    yield BidiConnectionStartEvent(connection_id=self._connection_id, model=self.model_id)

    _, output = await self._stream.await_output()
    while True:
        try:
            event_data = await output.receive()

        except ValidationException as error:
            if "InternalErrorCode=531" in error.message:
                # nova also times out if user is silent for 175 seconds
                raise BidiModelTimeoutError(error.message) from error
            raise

        except ModelTimeoutException as error:
            raise BidiModelTimeoutError(error.message) from error

        if not event_data:
            continue

        nova_event = json.loads(event_data.value.bytes_.decode("utf-8"))["event"]
        self._log_event_type(nova_event)

        model_event = self._convert_nova_event(nova_event)
        if model_event:
            yield model_event

send(content) async

Unified send method for all content types. Sends the given content to Nova Sonic.

Dispatches to appropriate internal handler based on content type.

Parameters:

Name Type Description Default
content BidiInputEvent | ToolResultEvent

Input event.

required

Raises:

Type Description
ValueError

If content type not supported (e.g., image content).

Source code in strands/experimental/bidi/models/nova_sonic.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
async def send(self, content: BidiInputEvent | ToolResultEvent) -> None:
    """Unified send method for all content types. Sends the given content to Nova Sonic.

    Dispatches to appropriate internal handler based on content type.

    Args:
        content: Input event.

    Raises:
        ValueError: If content type not supported (e.g., image content).
    """
    if not self._connection_id:
        raise RuntimeError("model not started | call start before sending")

    if isinstance(content, BidiTextInputEvent):
        await self._send_text_content(content.text)
    elif isinstance(content, BidiAudioInputEvent):
        await self._send_audio_content(content)
    elif isinstance(content, ToolResultEvent):
        tool_result = content.get("tool_result")
        if tool_result:
            await self._send_tool_result(tool_result)
    else:
        raise ValueError(f"content_type={type(content)} | content not supported")

start(system_prompt=None, tools=None, messages=None, **kwargs) async

Establish bidirectional connection to Nova Sonic.

Parameters:

Name Type Description Default
system_prompt str | None

System instructions for the model.

None
tools list[ToolSpec] | None

List of tools available to the model.

None
messages Messages | None

Conversation history to initialize with.

None
**kwargs Any

Additional configuration options.

{}

Raises:

Type Description
RuntimeError

If user calls start again without first stopping.

Source code in strands/experimental/bidi/models/nova_sonic.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
async def start(
    self,
    system_prompt: str | None = None,
    tools: list[ToolSpec] | None = None,
    messages: Messages | None = None,
    **kwargs: Any,
) -> None:
    """Establish bidirectional connection to Nova Sonic.

    Args:
        system_prompt: System instructions for the model.
        tools: List of tools available to the model.
        messages: Conversation history to initialize with.
        **kwargs: Additional configuration options.

    Raises:
        RuntimeError: If user calls start again without first stopping.
    """
    if self._connection_id:
        raise RuntimeError("model already started | call stop before starting again")

    logger.debug("nova connection starting")

    self._connection_id = str(uuid.uuid4())

    # Get credentials from boto3 session (full credential chain)
    credentials = self._session.get_credentials()

    if not credentials:
        raise ValueError(
            "no AWS credentials found. configure credentials via environment variables, "
            "credential files, IAM roles, or SSO."
        )

    # Use static resolver with credentials configured as properties
    resolver = StaticCredentialsResolver()

    config = Config(
        endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
        region=self.region,
        aws_credentials_identity_resolver=resolver,
        auth_scheme_resolver=HTTPAuthSchemeResolver(),
        auth_schemes={ShapeID("aws.auth#sigv4"): SigV4AuthScheme(service="bedrock")},
        # Configure static credentials as properties
        aws_access_key_id=credentials.access_key,
        aws_secret_access_key=credentials.secret_key,
        aws_session_token=credentials.token,
    )

    self.client = BedrockRuntimeClient(config=config)
    logger.debug("region=<%s> | nova sonic client initialized", self.region)

    client = BedrockRuntimeClient(config=config)
    self._stream = await client.invoke_model_with_bidirectional_stream(
        InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id)
    )
    logger.debug("region=<%s> | nova sonic client initialized", self.region)

    init_events = self._build_initialization_events(system_prompt, tools, messages)
    logger.debug("event_count=<%d> | sending nova sonic initialization events", len(init_events))
    await self._send_nova_events(init_events)

    logger.info("connection_id=<%s> | nova sonic connection established", self._connection_id)

stop() async

Close Nova Sonic connection with proper cleanup sequence.

Source code in strands/experimental/bidi/models/nova_sonic.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
async def stop(self) -> None:
    """Close Nova Sonic connection with proper cleanup sequence."""
    logger.debug("nova connection cleanup starting")

    async def stop_events() -> None:
        if not self._connection_id:
            return

        await self._end_audio_input()
        cleanup_events = [self._get_prompt_end_event(), self._get_connection_end_event()]
        await self._send_nova_events(cleanup_events)

    async def stop_stream() -> None:
        if not hasattr(self, "_stream"):
            return

        await self._stream.close()

    async def stop_connection() -> None:
        self._connection_id = None

    await stop_all(stop_events, stop_stream, stop_connection)

    logger.debug("nova connection closed")

strands.experimental.bidi.models.openai_realtime

OpenAI Realtime API provider for Strands bidirectional streaming.

Provides real-time audio and text communication through OpenAI's Realtime API with WebSocket connections, voice activity detection, and function calling.

OPENAI_MAX_TIMEOUT_S = 3000 module-attribute

Max timeout before closing connection.

OpenAI documents a 60 minute limit on realtime sessions (docs). However, OpenAI does not emit any warnings when approaching the limit. As a workaround, we configure a max timeout client side to gracefully handle the connection closure. We set the max to 50 minutes to provide enough buffer before hitting the real limit.

BidiOpenAIRealtimeModel

Bases: BidiModel

OpenAI Realtime API implementation for bidirectional streaming.

Combines model configuration and connection state in a single class. Manages WebSocket connection to OpenAI's Realtime API with automatic VAD, function calling, and event conversion to Strands format.

Source code in strands/experimental/bidi/models/openai_realtime.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
class BidiOpenAIRealtimeModel(BidiModel):
    """OpenAI Realtime API implementation for bidirectional streaming.

    Combines model configuration and connection state in a single class.
    Manages WebSocket connection to OpenAI's Realtime API with automatic VAD,
    function calling, and event conversion to Strands format.
    """

    _websocket: ClientConnection
    _start_time: int

    def __init__(
        self,
        model_id: str = DEFAULT_MODEL,
        provider_config: dict[str, Any] | None = None,
        client_config: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> None:
        """Initialize OpenAI Realtime bidirectional model.

        Args:
            model_id: Model identifier (default: gpt-realtime)
            provider_config: Model behavior (audio, instructions, turn_detection, etc.)
            client_config: Authentication (api_key, organization, project)
                Falls back to OPENAI_API_KEY, OPENAI_ORGANIZATION, OPENAI_PROJECT env vars
            **kwargs: Reserved for future parameters.

        """
        # Store model ID
        self.model_id = model_id

        # Resolve client config with defaults and env vars
        self._client_config = self._resolve_client_config(client_config or {})

        # Resolve provider config with defaults
        self.config = self._resolve_provider_config(provider_config or {})

        # Store client config values for later use
        self.api_key = self._client_config["api_key"]
        self.organization = self._client_config.get("organization")
        self.project = self._client_config.get("project")
        self.timeout_s = self._client_config["timeout_s"]

        if self.timeout_s > OPENAI_MAX_TIMEOUT_S:
            raise ValueError(
                f"timeout_s=<{self.timeout_s}>, max_timeout_s=<{OPENAI_MAX_TIMEOUT_S}> | timeout exceeds max limit"
            )

        # Connection state (initialized in start())
        self._connection_id: str | None = None

        self._function_call_buffer: dict[str, Any] = {}

        logger.debug("model=<%s> | openai realtime model initialized", model_id)

    def _resolve_client_config(self, config: dict[str, Any]) -> dict[str, Any]:
        """Resolve client config with env var fallback (config takes precedence)."""
        resolved = config.copy()

        if "api_key" not in resolved:
            resolved["api_key"] = os.getenv("OPENAI_API_KEY")

        if not resolved.get("api_key"):
            raise ValueError(
                "OpenAI API key is required. Provide via client_config={'api_key': '...'} "
                "or set OPENAI_API_KEY environment variable."
            )
        if "organization" not in resolved:
            env_org = os.getenv("OPENAI_ORGANIZATION")
            if env_org:
                resolved["organization"] = env_org

        if "project" not in resolved:
            env_project = os.getenv("OPENAI_PROJECT")
            if env_project:
                resolved["project"] = env_project

        if "timeout_s" not in resolved:
            resolved["timeout_s"] = OPENAI_MAX_TIMEOUT_S

        return resolved

    def _resolve_provider_config(self, config: dict[str, Any]) -> dict[str, Any]:
        """Merge user config with defaults (user takes precedence)."""
        default_audio: AudioConfig = {
            "input_rate": cast(AudioSampleRate, DEFAULT_SAMPLE_RATE),
            "output_rate": cast(AudioSampleRate, DEFAULT_SAMPLE_RATE),
            "channels": 1,
            "format": "pcm",
            "voice": "alloy",
        }

        resolved = {
            "audio": {
                **default_audio,
                **config.get("audio", {}),
            },
            "inference": config.get("inference", {}),
        }
        return resolved

    async def start(
        self,
        system_prompt: str | None = None,
        tools: list[ToolSpec] | None = None,
        messages: Messages | None = None,
        **kwargs: Any,
    ) -> None:
        """Establish bidirectional connection to OpenAI Realtime API.

        Args:
            system_prompt: System instructions for the model.
            tools: List of tools available to the model.
            messages: Conversation history to initialize with.
            **kwargs: Additional configuration options.
        """
        if self._connection_id:
            raise RuntimeError("model already started | call stop before starting again")

        logger.debug("openai realtime connection starting")

        # Initialize connection state
        self._connection_id = str(uuid.uuid4())
        self._start_time = int(time.time())

        self._function_call_buffer = {}

        # Establish WebSocket connection
        url = f"{OPENAI_REALTIME_URL}?model={self.model_id}"

        headers = [("Authorization", f"Bearer {self.api_key}")]
        if self.organization:
            headers.append(("OpenAI-Organization", self.organization))
        if self.project:
            headers.append(("OpenAI-Project", self.project))

        self._websocket = await websockets.connect(url, additional_headers=headers)
        logger.debug("connection_id=<%s> | websocket connected successfully", self._connection_id)

        # Configure session
        session_config = self._build_session_config(system_prompt, tools)
        await self._send_event({"type": "session.update", "session": session_config})

        # Add conversation history if provided
        if messages:
            await self._add_conversation_history(messages)

    def _create_text_event(self, text: str, role: str, is_final: bool = True) -> BidiTranscriptStreamEvent:
        """Create standardized transcript event.

        Args:
            text: The transcript text
            role: The role (will be normalized to lowercase)
            is_final: Whether this is the final transcript
        """
        # Normalize role to lowercase and ensure it's either "user" or "assistant"
        normalized_role = role.lower() if isinstance(role, str) else "assistant"
        if normalized_role not in ["user", "assistant"]:
            normalized_role = "assistant"

        return BidiTranscriptStreamEvent(
            delta={"text": text},
            text=text,
            role=cast(Role, normalized_role),
            is_final=is_final,
            current_transcript=text if is_final else None,
        )

    def _create_voice_activity_event(self, activity_type: str) -> BidiInterruptionEvent | None:
        """Create standardized interruption event for voice activity."""
        # Only speech_started triggers interruption
        if activity_type == "speech_started":
            return BidiInterruptionEvent(reason="user_speech")
        # Other voice activity events are logged but don't create events
        return None

    def _build_session_config(self, system_prompt: str | None, tools: list[ToolSpec] | None) -> dict[str, Any]:
        """Build session configuration for OpenAI Realtime API."""
        config: dict[str, Any] = DEFAULT_SESSION_CONFIG.copy()

        if system_prompt:
            config["instructions"] = system_prompt

        if tools:
            config["tools"] = self._convert_tools_to_openai_format(tools)

        # Apply user-provided session configuration
        supported_params = {
            "max_output_tokens",
            "output_modalities",
            "tool_choice",
        }
        for key, value in self.config["inference"].items():
            if key in supported_params:
                config[key] = value
            else:
                logger.warning("parameter=<%s> | ignoring unsupported session parameter", key)

        audio_config = self.config["audio"]

        if "voice" in audio_config:
            config.setdefault("audio", {}).setdefault("output", {})["voice"] = audio_config["voice"]

        if "input_rate" in audio_config:
            config.setdefault("audio", {}).setdefault("input", {}).setdefault("format", {})["rate"] = audio_config[
                "input_rate"
            ]

        if "output_rate" in audio_config:
            config.setdefault("audio", {}).setdefault("output", {}).setdefault("format", {})["rate"] = audio_config[
                "output_rate"
            ]

        return config

    def _convert_tools_to_openai_format(self, tools: list[ToolSpec]) -> list[dict]:
        """Convert Strands tool specifications to OpenAI Realtime API format."""
        openai_tools = []

        for tool in tools:
            input_schema = tool["inputSchema"]
            if "json" in input_schema:
                schema = (
                    json.loads(input_schema["json"]) if isinstance(input_schema["json"], str) else input_schema["json"]
                )
            else:
                schema = input_schema

            # OpenAI Realtime API expects flat structure, not nested under "function"
            openai_tool = {
                "type": "function",
                "name": tool["name"],
                "description": tool["description"],
                "parameters": schema,
            }
            openai_tools.append(openai_tool)

        return openai_tools

    async def _add_conversation_history(self, messages: Messages) -> None:
        """Add conversation history to the session.

        Converts agent message history to OpenAI Realtime API format using
        conversation.item.create events for each message.

        Note: OpenAI Realtime API has a 32-character limit on call_id, so we truncate
        UUIDs consistently to ensure tool calls and their results match.

        Args:
            messages: List of conversation messages with role and content.
        """
        # Track tool call IDs to ensure consistency between calls and results
        call_id_map: dict[str, str] = {}

        # First pass: collect all tool call IDs
        for message in messages:
            for block in message.get("content", []):
                if "toolUse" in block:
                    tool_use = block["toolUse"]
                    original_id = tool_use["toolUseId"]
                    call_id = original_id[:32]
                    call_id_map[original_id] = call_id

        # Second pass: send messages
        for message in messages:
            role = message["role"]
            content_blocks = message.get("content", [])

            # Build content array for OpenAI format
            openai_content = []

            for block in content_blocks:
                if "text" in block:
                    # Text content - use appropriate type based on role
                    # User messages use "input_text", assistant messages use "output_text"
                    if role == "user":
                        openai_content.append({"type": "input_text", "text": block["text"]})
                    else:  # assistant
                        openai_content.append({"type": "output_text", "text": block["text"]})
                elif "toolUse" in block:
                    # Tool use - create as function_call item
                    tool_use = block["toolUse"]
                    original_id = tool_use["toolUseId"]
                    # Use pre-mapped call_id
                    call_id = call_id_map[original_id]

                    tool_item = {
                        "type": "conversation.item.create",
                        "item": {
                            "type": "function_call",
                            "call_id": call_id,
                            "name": tool_use["name"],
                            "arguments": json.dumps(tool_use["input"]),
                        },
                    }
                    await self._send_event(tool_item)
                    continue  # Tool use is sent separately, not in message content
                elif "toolResult" in block:
                    # Tool result - create as function_call_output item
                    tool_result = block["toolResult"]
                    original_id = tool_result["toolUseId"]

                    # Validate content types and serialize, preserving structure
                    result_output = ""
                    if "content" in tool_result:
                        # First validate all content types are supported
                        for result_block in tool_result["content"]:
                            if "text" not in result_block and "json" not in result_block:
                                # Unsupported content type - raise error
                                raise ValueError(
                                    f"tool_use_id=<{original_id}>, content_types=<{list(result_block.keys())}> | "
                                    f"Content type not supported by OpenAI Realtime API"
                                )

                        # Preserve structure by JSON-dumping the entire content array
                        result_output = json.dumps(tool_result["content"])

                    # Use mapped call_id if available, otherwise skip orphaned result
                    if original_id not in call_id_map:
                        continue  # Skip this tool result since we don't have the call

                    call_id = call_id_map[original_id]

                    result_item = {
                        "type": "conversation.item.create",
                        "item": {
                            "type": "function_call_output",
                            "call_id": call_id,
                            "output": result_output,
                        },
                    }
                    await self._send_event(result_item)
                    continue  # Tool result is sent separately, not in message content

            # Only create message item if there's text content
            if openai_content:
                conversation_item = {
                    "type": "conversation.item.create",
                    "item": {"type": "message", "role": role, "content": openai_content},
                }
                await self._send_event(conversation_item)

        logger.debug("message_count=<%d> | conversation history added to openai session", len(messages))

    async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
        """Receive OpenAI events and convert to Strands TypedEvent format."""
        if not self._connection_id:
            raise RuntimeError("model not started | call start before sending/receiving")

        yield BidiConnectionStartEvent(connection_id=self._connection_id, model=self.model_id)

        while True:
            duration = time.time() - self._start_time
            if duration >= self.timeout_s:
                raise BidiModelTimeoutError(f"timeout_s=<{self.timeout_s}>")

            try:
                message = await asyncio.wait_for(self._websocket.recv(), timeout=10)
            except asyncio.TimeoutError:
                continue

            openai_event = json.loads(message)

            for event in self._convert_openai_event(openai_event) or []:
                yield event

    def _convert_openai_event(self, openai_event: dict[str, Any]) -> list[BidiOutputEvent] | None:
        """Convert OpenAI events to Strands TypedEvent format."""
        event_type = openai_event.get("type")

        # Turn start - response begins
        if event_type == "response.created":
            response = openai_event.get("response", {})
            response_id = response.get("id", str(uuid.uuid4()))
            return [BidiResponseStartEvent(response_id=response_id)]

        # Audio output
        elif event_type == "response.output_audio.delta":
            # Audio is already base64 string from OpenAI
            # Use the resolved output sample rate from our merged configuration
            sample_rate = self.config["audio"]["output_rate"]

            # Channels from config is guaranteed to be 1 or 2
            channels = cast(Literal[1, 2], self.config["audio"]["channels"])
            return [
                BidiAudioStreamEvent(
                    audio=openai_event["delta"],
                    format="pcm",
                    sample_rate=sample_rate,
                    channels=channels,
                )
            ]

        # Assistant text output events - combine multiple similar events
        elif event_type in ["response.output_text.delta", "response.output_audio_transcript.delta"]:
            role = openai_event.get("role", "assistant")
            return [
                self._create_text_event(
                    openai_event["delta"], role.lower() if isinstance(role, str) else "assistant", is_final=False
                )
            ]

        elif event_type in ["response.output_audio_transcript.done"]:
            role = openai_event.get("role", "assistant").lower()
            return [self._create_text_event(openai_event["transcript"], role)]

        elif event_type in ["response.output_text.done"]:
            role = openai_event.get("role", "assistant").lower()
            return [self._create_text_event(openai_event["text"], role)]

        # User transcription events - combine multiple similar events
        elif event_type in [
            "conversation.item.input_audio_transcription.delta",
            "conversation.item.input_audio_transcription.completed",
        ]:
            text_key = "delta" if "delta" in event_type else "transcript"
            text = openai_event.get(text_key, "")
            role = openai_event.get("role", "user")
            is_final = "completed" in event_type
            return (
                [self._create_text_event(text, role.lower() if isinstance(role, str) else "user", is_final=is_final)]
                if text.strip()
                else None
            )

        elif event_type == "conversation.item.input_audio_transcription.segment":
            segment_data = openai_event.get("segment", {})
            text = segment_data.get("text", "")
            role = segment_data.get("role", "user")
            return (
                [self._create_text_event(text, role.lower() if isinstance(role, str) else "user")]
                if text.strip()
                else None
            )

        elif event_type == "conversation.item.input_audio_transcription.failed":
            error_info = openai_event.get("error", {})
            logger.warning("error=<%s> | openai transcription failed", error_info.get("message", "unknown error"))
            return None

        # Function call processing
        elif event_type == "response.function_call_arguments.delta":
            call_id = openai_event.get("call_id")
            delta = openai_event.get("delta", "")
            if call_id:
                if call_id not in self._function_call_buffer:
                    self._function_call_buffer[call_id] = {"call_id": call_id, "name": "", "arguments": delta}
                else:
                    self._function_call_buffer[call_id]["arguments"] += delta
            return None

        elif event_type == "response.function_call_arguments.done":
            call_id = openai_event.get("call_id")
            if call_id and call_id in self._function_call_buffer:
                function_call = self._function_call_buffer[call_id]
                try:
                    tool_use: ToolUse = {
                        "toolUseId": call_id,
                        "name": function_call["name"],
                        "input": json.loads(function_call["arguments"]) if function_call["arguments"] else {},
                    }
                    del self._function_call_buffer[call_id]
                    # Return ToolUseStreamEvent for consistency with standard agent
                    return [ToolUseStreamEvent(delta={"toolUse": tool_use}, current_tool_use=dict(tool_use))]
                except (json.JSONDecodeError, KeyError) as e:
                    logger.warning("call_id=<%s>, error=<%s> | error parsing function arguments", call_id, e)
                    del self._function_call_buffer[call_id]
            return None

        # Voice activity detection - speech_started triggers interruption
        elif event_type == "input_audio_buffer.speech_started":
            # This is the primary interruption signal - handle it first
            return [BidiInterruptionEvent(reason="user_speech")]

        # Response cancelled - handle interruption
        elif event_type == "response.cancelled":
            response = openai_event.get("response", {})
            response_id = response.get("id", "unknown")
            logger.debug("response_id=<%s> | openai response cancelled", response_id)
            return [BidiResponseCompleteEvent(response_id=response_id, stop_reason="interrupted")]

        # Turn complete and usage - response finished
        elif event_type == "response.done":
            response = openai_event.get("response", {})
            response_id = response.get("id", "unknown")
            status = response.get("status", "completed")
            usage = response.get("usage")

            # Map OpenAI status to our stop_reason
            stop_reason_map = {
                "completed": "complete",
                "cancelled": "interrupted",
                "failed": "error",
                "incomplete": "interrupted",
            }

            # Build list of events to return
            events: list[Any] = []

            # Always add response complete event
            events.append(
                BidiResponseCompleteEvent(
                    response_id=response_id,
                    stop_reason=cast(StopReason, stop_reason_map.get(status, "complete")),
                ),
            )

            # Add usage event if available
            if usage:
                input_details = usage.get("input_token_details", {})
                output_details = usage.get("output_token_details", {})

                # Build modality details
                modality_details = []

                # Text modality
                text_input = input_details.get("text_tokens", 0)
                text_output = output_details.get("text_tokens", 0)
                if text_input > 0 or text_output > 0:
                    modality_details.append(
                        {"modality": "text", "input_tokens": text_input, "output_tokens": text_output}
                    )

                # Audio modality
                audio_input = input_details.get("audio_tokens", 0)
                audio_output = output_details.get("audio_tokens", 0)
                if audio_input > 0 or audio_output > 0:
                    modality_details.append(
                        {"modality": "audio", "input_tokens": audio_input, "output_tokens": audio_output}
                    )

                # Image modality
                image_input = input_details.get("image_tokens", 0)
                if image_input > 0:
                    modality_details.append({"modality": "image", "input_tokens": image_input, "output_tokens": 0})

                # Cached tokens
                cached_tokens = input_details.get("cached_tokens", 0)

                # Add usage event
                events.append(
                    BidiUsageEvent(
                        input_tokens=usage.get("input_tokens", 0),
                        output_tokens=usage.get("output_tokens", 0),
                        total_tokens=usage.get("total_tokens", 0),
                        modality_details=cast(list[ModalityUsage], modality_details) if modality_details else None,
                        cache_read_input_tokens=cached_tokens if cached_tokens > 0 else None,
                    )
                )

            # Return list of events
            return events

        # Lifecycle events (log only) - combine multiple similar events
        elif event_type in ["conversation.item.retrieve", "conversation.item.added"]:
            item = openai_event.get("item", {})
            action = "retrieved" if "retrieve" in event_type else "added"
            logger.debug("action=<%s>, item_id=<%s> | openai conversation item event", action, item.get("id"))
            return None

        elif event_type == "conversation.item.done":
            logger.debug("item_id=<%s> | openai conversation item done", openai_event.get("item", {}).get("id"))
            return None

        # Response output events - combine similar events
        elif event_type in [
            "response.output_item.added",
            "response.output_item.done",
            "response.content_part.added",
            "response.content_part.done",
        ]:
            item_data = openai_event.get("item") or openai_event.get("part")
            logger.debug(
                "event_type=<%s>, item_id=<%s> | openai output event",
                event_type,
                item_data.get("id") if item_data else "unknown",
            )

            # Track function call names from response.output_item.added
            if event_type == "response.output_item.added":
                item = openai_event.get("item", {})
                if item.get("type") == "function_call":
                    call_id = item.get("call_id")
                    function_name = item.get("name")
                    if call_id and function_name:
                        if call_id not in self._function_call_buffer:
                            self._function_call_buffer[call_id] = {
                                "call_id": call_id,
                                "name": function_name,
                                "arguments": "",
                            }
                        else:
                            self._function_call_buffer[call_id]["name"] = function_name
            return None

        # Session/buffer events - combine simple log-only events
        elif event_type in [
            "input_audio_buffer.committed",
            "input_audio_buffer.cleared",
            "session.created",
            "session.updated",
        ]:
            logger.debug("event_type=<%s> | openai event received", event_type)
            return None

        elif event_type == "error":
            error_data = openai_event.get("error", {})
            error_code = error_data.get("code", "")

            # Suppress expected errors that don't affect session state
            if error_code == "response_cancel_not_active":
                # This happens when trying to cancel a response that's not active
                # It's safe to ignore as the session remains functional
                logger.debug("openai response cancel attempted when no response active")
                return None

            # Log other errors
            logger.error("error=<%s> | openai realtime error", error_data)
            return None

        else:
            logger.debug("event_type=<%s> | unhandled openai event type", event_type)
            return None

    async def send(
        self,
        content: BidiInputEvent | ToolResultEvent,
    ) -> None:
        """Unified send method for all content types. Sends the given content to OpenAI.

        Dispatches to appropriate internal handler based on content type.

        Args:
            content: Typed event (BidiTextInputEvent, BidiAudioInputEvent, BidiImageInputEvent, or ToolResultEvent).

        Raises:
            ValueError: If content type not supported (e.g., image content).
        """
        if not self._connection_id:
            raise RuntimeError("model not started | call start before sending")

        # Note: TypedEvent inherits from dict, so isinstance checks for TypedEvent must come first
        if isinstance(content, BidiTextInputEvent):
            await self._send_text_content(content.text)
        elif isinstance(content, BidiAudioInputEvent):
            await self._send_audio_content(content)
        elif isinstance(content, ToolResultEvent):
            tool_result = content.get("tool_result")
            if tool_result:
                await self._send_tool_result(tool_result)
        else:
            raise ValueError(f"content_type={type(content)} | content not supported")

    async def _send_audio_content(self, audio_input: BidiAudioInputEvent) -> None:
        """Internal: Send audio content to OpenAI for processing."""
        # Audio is already base64 encoded in the event
        await self._send_event({"type": "input_audio_buffer.append", "audio": audio_input.audio})

    async def _send_text_content(self, text: str) -> None:
        """Internal: Send text content to OpenAI for processing."""
        item_data = {"type": "message", "role": "user", "content": [{"type": "input_text", "text": text}]}
        await self._send_event({"type": "conversation.item.create", "item": item_data})
        await self._send_event({"type": "response.create"})

    async def _send_interrupt(self) -> None:
        """Internal: Send interruption signal to OpenAI."""
        await self._send_event({"type": "response.cancel"})

    async def _send_tool_result(self, tool_result: ToolResult) -> None:
        """Internal: Send tool result back to OpenAI."""
        tool_use_id = tool_result.get("toolUseId")

        logger.debug("tool_use_id=<%s> | sending openai tool result", tool_use_id)

        # Validate content types and serialize, preserving structure
        result_output = ""
        if "content" in tool_result:
            # First validate all content types are supported
            for block in tool_result["content"]:
                if "text" not in block and "json" not in block:
                    # Unsupported content type - raise error
                    raise ValueError(
                        f"tool_use_id=<{tool_use_id}>, content_types=<{list(block.keys())}> | "
                        f"Content type not supported by OpenAI Realtime API"
                    )

            # Preserve structure by JSON-dumping the entire content array
            result_output = json.dumps(tool_result["content"])

        item_data = {"type": "function_call_output", "call_id": tool_use_id, "output": result_output}
        await self._send_event({"type": "conversation.item.create", "item": item_data})
        await self._send_event({"type": "response.create"})

    async def stop(self) -> None:
        """Close session and cleanup resources."""
        logger.debug("openai realtime connection cleanup starting")

        async def stop_websocket() -> None:
            if not hasattr(self, "_websocket"):
                return

            await self._websocket.close()

        async def stop_connection() -> None:
            self._connection_id = None

        await stop_all(stop_websocket, stop_connection)

        logger.debug("openai realtime connection closed")

    async def _send_event(self, event: dict[str, Any]) -> None:
        """Send event to OpenAI via WebSocket."""
        message = json.dumps(event)
        await self._websocket.send(message)
        logger.debug("event_type=<%s> | openai event sent", event.get("type"))

__init__(model_id=DEFAULT_MODEL, provider_config=None, client_config=None, **kwargs)

Initialize OpenAI Realtime bidirectional model.

Parameters:

Name Type Description Default
model_id str

Model identifier (default: gpt-realtime)

DEFAULT_MODEL
provider_config dict[str, Any] | None

Model behavior (audio, instructions, turn_detection, etc.)

None
client_config dict[str, Any] | None

Authentication (api_key, organization, project) Falls back to OPENAI_API_KEY, OPENAI_ORGANIZATION, OPENAI_PROJECT env vars

None
**kwargs Any

Reserved for future parameters.

{}
Source code in strands/experimental/bidi/models/openai_realtime.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __init__(
    self,
    model_id: str = DEFAULT_MODEL,
    provider_config: dict[str, Any] | None = None,
    client_config: dict[str, Any] | None = None,
    **kwargs: Any,
) -> None:
    """Initialize OpenAI Realtime bidirectional model.

    Args:
        model_id: Model identifier (default: gpt-realtime)
        provider_config: Model behavior (audio, instructions, turn_detection, etc.)
        client_config: Authentication (api_key, organization, project)
            Falls back to OPENAI_API_KEY, OPENAI_ORGANIZATION, OPENAI_PROJECT env vars
        **kwargs: Reserved for future parameters.

    """
    # Store model ID
    self.model_id = model_id

    # Resolve client config with defaults and env vars
    self._client_config = self._resolve_client_config(client_config or {})

    # Resolve provider config with defaults
    self.config = self._resolve_provider_config(provider_config or {})

    # Store client config values for later use
    self.api_key = self._client_config["api_key"]
    self.organization = self._client_config.get("organization")
    self.project = self._client_config.get("project")
    self.timeout_s = self._client_config["timeout_s"]

    if self.timeout_s > OPENAI_MAX_TIMEOUT_S:
        raise ValueError(
            f"timeout_s=<{self.timeout_s}>, max_timeout_s=<{OPENAI_MAX_TIMEOUT_S}> | timeout exceeds max limit"
        )

    # Connection state (initialized in start())
    self._connection_id: str | None = None

    self._function_call_buffer: dict[str, Any] = {}

    logger.debug("model=<%s> | openai realtime model initialized", model_id)

receive() async

Receive OpenAI events and convert to Strands TypedEvent format.

Source code in strands/experimental/bidi/models/openai_realtime.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
    """Receive OpenAI events and convert to Strands TypedEvent format."""
    if not self._connection_id:
        raise RuntimeError("model not started | call start before sending/receiving")

    yield BidiConnectionStartEvent(connection_id=self._connection_id, model=self.model_id)

    while True:
        duration = time.time() - self._start_time
        if duration >= self.timeout_s:
            raise BidiModelTimeoutError(f"timeout_s=<{self.timeout_s}>")

        try:
            message = await asyncio.wait_for(self._websocket.recv(), timeout=10)
        except asyncio.TimeoutError:
            continue

        openai_event = json.loads(message)

        for event in self._convert_openai_event(openai_event) or []:
            yield event

send(content) async

Unified send method for all content types. Sends the given content to OpenAI.

Dispatches to appropriate internal handler based on content type.

Parameters:

Name Type Description Default
content BidiInputEvent | ToolResultEvent

Typed event (BidiTextInputEvent, BidiAudioInputEvent, BidiImageInputEvent, or ToolResultEvent).

required

Raises:

Type Description
ValueError

If content type not supported (e.g., image content).

Source code in strands/experimental/bidi/models/openai_realtime.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
async def send(
    self,
    content: BidiInputEvent | ToolResultEvent,
) -> None:
    """Unified send method for all content types. Sends the given content to OpenAI.

    Dispatches to appropriate internal handler based on content type.

    Args:
        content: Typed event (BidiTextInputEvent, BidiAudioInputEvent, BidiImageInputEvent, or ToolResultEvent).

    Raises:
        ValueError: If content type not supported (e.g., image content).
    """
    if not self._connection_id:
        raise RuntimeError("model not started | call start before sending")

    # Note: TypedEvent inherits from dict, so isinstance checks for TypedEvent must come first
    if isinstance(content, BidiTextInputEvent):
        await self._send_text_content(content.text)
    elif isinstance(content, BidiAudioInputEvent):
        await self._send_audio_content(content)
    elif isinstance(content, ToolResultEvent):
        tool_result = content.get("tool_result")
        if tool_result:
            await self._send_tool_result(tool_result)
    else:
        raise ValueError(f"content_type={type(content)} | content not supported")

start(system_prompt=None, tools=None, messages=None, **kwargs) async

Establish bidirectional connection to OpenAI Realtime API.

Parameters:

Name Type Description Default
system_prompt str | None

System instructions for the model.

None
tools list[ToolSpec] | None

List of tools available to the model.

None
messages Messages | None

Conversation history to initialize with.

None
**kwargs Any

Additional configuration options.

{}
Source code in strands/experimental/bidi/models/openai_realtime.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
async def start(
    self,
    system_prompt: str | None = None,
    tools: list[ToolSpec] | None = None,
    messages: Messages | None = None,
    **kwargs: Any,
) -> None:
    """Establish bidirectional connection to OpenAI Realtime API.

    Args:
        system_prompt: System instructions for the model.
        tools: List of tools available to the model.
        messages: Conversation history to initialize with.
        **kwargs: Additional configuration options.
    """
    if self._connection_id:
        raise RuntimeError("model already started | call stop before starting again")

    logger.debug("openai realtime connection starting")

    # Initialize connection state
    self._connection_id = str(uuid.uuid4())
    self._start_time = int(time.time())

    self._function_call_buffer = {}

    # Establish WebSocket connection
    url = f"{OPENAI_REALTIME_URL}?model={self.model_id}"

    headers = [("Authorization", f"Bearer {self.api_key}")]
    if self.organization:
        headers.append(("OpenAI-Organization", self.organization))
    if self.project:
        headers.append(("OpenAI-Project", self.project))

    self._websocket = await websockets.connect(url, additional_headers=headers)
    logger.debug("connection_id=<%s> | websocket connected successfully", self._connection_id)

    # Configure session
    session_config = self._build_session_config(system_prompt, tools)
    await self._send_event({"type": "session.update", "session": session_config})

    # Add conversation history if provided
    if messages:
        await self._add_conversation_history(messages)

stop() async

Close session and cleanup resources.

Source code in strands/experimental/bidi/models/openai_realtime.py
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
async def stop(self) -> None:
    """Close session and cleanup resources."""
    logger.debug("openai realtime connection cleanup starting")

    async def stop_websocket() -> None:
        if not hasattr(self, "_websocket"):
            return

        await self._websocket.close()

    async def stop_connection() -> None:
        self._connection_id = None

    await stop_all(stop_websocket, stop_connection)

    logger.debug("openai realtime connection closed")