Skip to content

strands.experimental.bidi.models.openai_realtime

OpenAI Realtime API provider for Strands bidirectional streaming.

Provides real-time audio and text communication through OpenAI's Realtime API with WebSocket connections, voice activity detection, and function calling.

AudioSampleRate = Literal[16000, 24000, 48000] module-attribute

Audio sample rate in Hz.

BidiInputEvent = BidiTextInputEvent | BidiAudioInputEvent | BidiImageInputEvent module-attribute

Union of different bidi input event types.

BidiOutputEvent = BidiConnectionStartEvent | BidiConnectionRestartEvent | BidiResponseStartEvent | BidiAudioStreamEvent | BidiTranscriptStreamEvent | BidiInterruptionEvent | BidiResponseCompleteEvent | BidiUsageEvent | BidiConnectionCloseEvent | BidiErrorEvent | ToolUseStreamEvent module-attribute

Union of different bidi output event types.

DEFAULT_MODEL = 'gpt-realtime' module-attribute

DEFAULT_SAMPLE_RATE = 24000 module-attribute

DEFAULT_SESSION_CONFIG = {'type': 'realtime', 'instructions': 'You are a helpful assistant. Please speak in English and keep your responses clear and concise.', 'output_modalities': ['audio'], 'audio': {'input': {'format': {'type': 'audio/pcm', 'rate': DEFAULT_SAMPLE_RATE}, 'transcription': {'model': 'gpt-4o-transcribe'}, 'turn_detection': {'type': 'server_vad', 'threshold': 0.5, 'prefix_padding_ms': 300, 'silence_duration_ms': 500}}, 'output': {'format': {'type': 'audio/pcm', 'rate': DEFAULT_SAMPLE_RATE}, 'voice': 'alloy'}}} module-attribute

Messages = List[Message] module-attribute

A list of messages representing a conversation.

OPENAI_MAX_TIMEOUT_S = 3000 module-attribute

Max timeout before closing connection.

OpenAI documents a 60 minute limit on realtime sessions (docs). However, OpenAI does not emit any warnings when approaching the limit. As a workaround, we configure a max timeout client side to gracefully handle the connection closure. We set the max to 50 minutes to provide enough buffer before hitting the real limit.

OPENAI_REALTIME_URL = 'wss://api.openai.com/v1/realtime' module-attribute

Role = Literal['user', 'assistant'] module-attribute

Role of a message sender.

  • "user": Messages from the user to the assistant.
  • "assistant": Messages from the assistant to the user.

StopReason = Literal['complete', 'error', 'interrupted', 'tool_use'] module-attribute

Reason for the model ending its response generation.

  • "complete": Model completed its response.
  • "error": Model encountered an error.
  • "interrupted": Model was interrupted by the user.
  • "tool_use": Model is requesting a tool use.

logger = logging.getLogger(__name__) module-attribute

AudioConfig

Bases: TypedDict

Audio configuration for bidirectional streaming models.

Defines standard audio parameters that model providers use to specify their audio processing requirements. All fields are optional to support models that may not use audio or only need specific parameters.

Model providers build this configuration by merging user-provided values with their own defaults. The resulting configuration is then used by audio I/O implementations to configure hardware appropriately.

Attributes:

Name Type Description
input_rate AudioSampleRate

Input sample rate in Hz (e.g., 16000, 24000, 48000)

output_rate AudioSampleRate

Output sample rate in Hz (e.g., 16000, 24000, 48000)

channels AudioChannel

Number of audio channels (1=mono, 2=stereo)

format AudioFormat

Audio encoding format

voice str

Voice identifier for text-to-speech (e.g., "alloy", "matthew")

Source code in strands/experimental/bidi/types/model.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class AudioConfig(TypedDict, total=False):
    """Audio configuration for bidirectional streaming models.

    Defines standard audio parameters that model providers use to specify
    their audio processing requirements. All fields are optional to support
    models that may not use audio or only need specific parameters.

    Model providers build this configuration by merging user-provided values
    with their own defaults. The resulting configuration is then used by
    audio I/O implementations to configure hardware appropriately.

    Attributes:
        input_rate: Input sample rate in Hz (e.g., 16000, 24000, 48000)
        output_rate: Output sample rate in Hz (e.g., 16000, 24000, 48000)
        channels: Number of audio channels (1=mono, 2=stereo)
        format: Audio encoding format
        voice: Voice identifier for text-to-speech (e.g., "alloy", "matthew")
    """

    input_rate: AudioSampleRate
    output_rate: AudioSampleRate
    channels: AudioChannel
    format: AudioFormat
    voice: str

BidiAudioInputEvent

Bases: TypedEvent

Audio input event for sending audio to the model.

Used for sending audio data through the send() method.

Parameters:

Name Type Description Default
audio str

Base64-encoded audio string to send to model.

required
format AudioFormat | str

Audio format from SUPPORTED_AUDIO_FORMATS.

required
sample_rate AudioSampleRate

Sample rate from SUPPORTED_SAMPLE_RATES.

required
channels AudioChannel

Channel count from SUPPORTED_CHANNELS.

required
Source code in strands/experimental/bidi/types/events.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class BidiAudioInputEvent(TypedEvent):
    """Audio input event for sending audio to the model.

    Used for sending audio data through the send() method.

    Parameters:
        audio: Base64-encoded audio string to send to model.
        format: Audio format from SUPPORTED_AUDIO_FORMATS.
        sample_rate: Sample rate from SUPPORTED_SAMPLE_RATES.
        channels: Channel count from SUPPORTED_CHANNELS.
    """

    def __init__(
        self,
        audio: str,
        format: AudioFormat | str,
        sample_rate: AudioSampleRate,
        channels: AudioChannel,
    ):
        """Initialize audio input event."""
        super().__init__(
            {
                "type": "bidi_audio_input",
                "audio": audio,
                "format": format,
                "sample_rate": sample_rate,
                "channels": channels,
            }
        )

    @property
    def audio(self) -> str:
        """Base64-encoded audio string."""
        return cast(str, self["audio"])

    @property
    def format(self) -> AudioFormat:
        """Audio encoding format."""
        return cast(AudioFormat, self["format"])

    @property
    def sample_rate(self) -> AudioSampleRate:
        """Number of audio samples per second in Hz."""
        return cast(AudioSampleRate, self["sample_rate"])

    @property
    def channels(self) -> AudioChannel:
        """Number of audio channels (1=mono, 2=stereo)."""
        return cast(AudioChannel, self["channels"])

audio property

Base64-encoded audio string.

channels property

Number of audio channels (1=mono, 2=stereo).

format property

Audio encoding format.

sample_rate property

Number of audio samples per second in Hz.

__init__(audio, format, sample_rate, channels)

Initialize audio input event.

Source code in strands/experimental/bidi/types/events.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(
    self,
    audio: str,
    format: AudioFormat | str,
    sample_rate: AudioSampleRate,
    channels: AudioChannel,
):
    """Initialize audio input event."""
    super().__init__(
        {
            "type": "bidi_audio_input",
            "audio": audio,
            "format": format,
            "sample_rate": sample_rate,
            "channels": channels,
        }
    )

BidiAudioStreamEvent

Bases: TypedEvent

Streaming audio output from the model.

Parameters:

Name Type Description Default
audio str

Base64-encoded audio string.

required
format AudioFormat

Audio encoding format.

required
sample_rate AudioSampleRate

Number of audio samples per second in Hz.

required
channels AudioChannel

Number of audio channels (1=mono, 2=stereo).

required
Source code in strands/experimental/bidi/types/events.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class BidiAudioStreamEvent(TypedEvent):
    """Streaming audio output from the model.

    Parameters:
        audio: Base64-encoded audio string.
        format: Audio encoding format.
        sample_rate: Number of audio samples per second in Hz.
        channels: Number of audio channels (1=mono, 2=stereo).
    """

    def __init__(
        self,
        audio: str,
        format: AudioFormat,
        sample_rate: AudioSampleRate,
        channels: AudioChannel,
    ):
        """Initialize audio stream event."""
        super().__init__(
            {
                "type": "bidi_audio_stream",
                "audio": audio,
                "format": format,
                "sample_rate": sample_rate,
                "channels": channels,
            }
        )

    @property
    def audio(self) -> str:
        """Base64-encoded audio string."""
        return cast(str, self["audio"])

    @property
    def format(self) -> AudioFormat:
        """Audio encoding format."""
        return cast(AudioFormat, self["format"])

    @property
    def sample_rate(self) -> AudioSampleRate:
        """Number of audio samples per second in Hz."""
        return cast(AudioSampleRate, self["sample_rate"])

    @property
    def channels(self) -> AudioChannel:
        """Number of audio channels (1=mono, 2=stereo)."""
        return cast(AudioChannel, self["channels"])

audio property

Base64-encoded audio string.

channels property

Number of audio channels (1=mono, 2=stereo).

format property

Audio encoding format.

sample_rate property

Number of audio samples per second in Hz.

__init__(audio, format, sample_rate, channels)

Initialize audio stream event.

Source code in strands/experimental/bidi/types/events.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def __init__(
    self,
    audio: str,
    format: AudioFormat,
    sample_rate: AudioSampleRate,
    channels: AudioChannel,
):
    """Initialize audio stream event."""
    super().__init__(
        {
            "type": "bidi_audio_stream",
            "audio": audio,
            "format": format,
            "sample_rate": sample_rate,
            "channels": channels,
        }
    )

BidiConnectionStartEvent

Bases: TypedEvent

Streaming connection established and ready for interaction.

Parameters:

Name Type Description Default
connection_id str

Unique identifier for this streaming connection.

required
model str

Model identifier (e.g., "gpt-realtime", "gemini-2.0-flash-live").

required
Source code in strands/experimental/bidi/types/events.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class BidiConnectionStartEvent(TypedEvent):
    """Streaming connection established and ready for interaction.

    Parameters:
        connection_id: Unique identifier for this streaming connection.
        model: Model identifier (e.g., "gpt-realtime", "gemini-2.0-flash-live").
    """

    def __init__(self, connection_id: str, model: str):
        """Initialize connection start event."""
        super().__init__(
            {
                "type": "bidi_connection_start",
                "connection_id": connection_id,
                "model": model,
            }
        )

    @property
    def connection_id(self) -> str:
        """Unique identifier for this streaming connection."""
        return cast(str, self["connection_id"])

    @property
    def model(self) -> str:
        """Model identifier (e.g., 'gpt-realtime', 'gemini-2.0-flash-live')."""
        return cast(str, self["model"])

connection_id property

Unique identifier for this streaming connection.

model property

Model identifier (e.g., 'gpt-realtime', 'gemini-2.0-flash-live').

__init__(connection_id, model)

Initialize connection start event.

Source code in strands/experimental/bidi/types/events.py
194
195
196
197
198
199
200
201
202
def __init__(self, connection_id: str, model: str):
    """Initialize connection start event."""
    super().__init__(
        {
            "type": "bidi_connection_start",
            "connection_id": connection_id,
            "model": model,
        }
    )

BidiInterruptionEvent

Bases: TypedEvent

Model generation was interrupted.

Parameters:

Name Type Description Default
reason Literal['user_speech', 'error']

Why the interruption occurred.

required
Source code in strands/experimental/bidi/types/events.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
class BidiInterruptionEvent(TypedEvent):
    """Model generation was interrupted.

    Parameters:
        reason: Why the interruption occurred.
    """

    def __init__(self, reason: Literal["user_speech", "error"]):
        """Initialize interruption event."""
        super().__init__(
            {
                "type": "bidi_interruption",
                "reason": reason,
            }
        )

    @property
    def reason(self) -> str:
        """Why the interruption occurred."""
        return cast(str, self["reason"])

reason property

Why the interruption occurred.

__init__(reason)

Initialize interruption event.

Source code in strands/experimental/bidi/types/events.py
370
371
372
373
374
375
376
377
def __init__(self, reason: Literal["user_speech", "error"]):
    """Initialize interruption event."""
    super().__init__(
        {
            "type": "bidi_interruption",
            "reason": reason,
        }
    )

BidiModel

Bases: Protocol

Protocol for bidirectional streaming models.

This interface defines the contract for models that support persistent streaming connections with real-time audio and text communication. Implementations handle provider-specific protocols while exposing a standardized event-based API.

Attributes:

Name Type Description
config dict[str, Any]

Configuration dictionary with provider-specific settings.

Source code in strands/experimental/bidi/models/model.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class BidiModel(Protocol):
    """Protocol for bidirectional streaming models.

    This interface defines the contract for models that support persistent streaming
    connections with real-time audio and text communication. Implementations handle
    provider-specific protocols while exposing a standardized event-based API.

    Attributes:
        config: Configuration dictionary with provider-specific settings.
    """

    config: dict[str, Any]

    async def start(
        self,
        system_prompt: str | None = None,
        tools: list[ToolSpec] | None = None,
        messages: Messages | None = None,
        **kwargs: Any,
    ) -> None:
        """Establish a persistent streaming connection with the model.

        Opens a bidirectional connection that remains active for real-time communication.
        The connection supports concurrent sending and receiving of events until explicitly
        closed. Must be called before any send() or receive() operations.

        Args:
            system_prompt: System instructions to configure model behavior.
            tools: Tool specifications that the model can invoke during the conversation.
            messages: Initial conversation history to provide context.
            **kwargs: Provider-specific configuration options.
        """
        ...

    async def stop(self) -> None:
        """Close the streaming connection and release resources.

        Terminates the active bidirectional connection and cleans up any associated
        resources such as network connections, buffers, or background tasks. After
        calling close(), the model instance cannot be used until start() is called again.
        """
        ...

    def receive(self) -> AsyncIterable[BidiOutputEvent]:
        """Receive streaming events from the model.

        Continuously yields events from the model as they arrive over the connection.
        Events are normalized to a provider-agnostic format for uniform processing.
        This method should be called in a loop or async task to process model responses.

        The stream continues until the connection is closed or an error occurs.

        Yields:
            BidiOutputEvent: Standardized event objects containing audio output,
                transcripts, tool calls, or control signals.
        """
        ...

    async def send(
        self,
        content: BidiInputEvent | ToolResultEvent,
    ) -> None:
        """Send content to the model over the active connection.

        Transmits user input or tool results to the model during an active streaming
        session. Supports multiple content types including text, audio, images, and
        tool execution results. Can be called multiple times during a conversation.

        Args:
            content: The content to send. Must be one of:

                - BidiTextInputEvent: Text message from the user
                - BidiAudioInputEvent: Audio data for speech input
                - BidiImageInputEvent: Image data for visual understanding
                - ToolResultEvent: Result from a tool execution

        Example:
            ```
            await model.send(BidiTextInputEvent(text="Hello", role="user"))
            await model.send(BidiAudioInputEvent(audio=bytes, format="pcm", sample_rate=16000, channels=1))
            await model.send(BidiImageInputEvent(image=bytes, mime_type="image/jpeg", encoding="raw"))
            await model.send(ToolResultEvent(tool_result))
            ```
        """
        ...

receive()

Receive streaming events from the model.

Continuously yields events from the model as they arrive over the connection. Events are normalized to a provider-agnostic format for uniform processing. This method should be called in a loop or async task to process model responses.

The stream continues until the connection is closed or an error occurs.

Yields:

Name Type Description
BidiOutputEvent AsyncIterable[BidiOutputEvent]

Standardized event objects containing audio output, transcripts, tool calls, or control signals.

Source code in strands/experimental/bidi/models/model.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def receive(self) -> AsyncIterable[BidiOutputEvent]:
    """Receive streaming events from the model.

    Continuously yields events from the model as they arrive over the connection.
    Events are normalized to a provider-agnostic format for uniform processing.
    This method should be called in a loop or async task to process model responses.

    The stream continues until the connection is closed or an error occurs.

    Yields:
        BidiOutputEvent: Standardized event objects containing audio output,
            transcripts, tool calls, or control signals.
    """
    ...

send(content) async

Send content to the model over the active connection.

Transmits user input or tool results to the model during an active streaming session. Supports multiple content types including text, audio, images, and tool execution results. Can be called multiple times during a conversation.

Parameters:

Name Type Description Default
content BidiInputEvent | ToolResultEvent

The content to send. Must be one of:

  • BidiTextInputEvent: Text message from the user
  • BidiAudioInputEvent: Audio data for speech input
  • BidiImageInputEvent: Image data for visual understanding
  • ToolResultEvent: Result from a tool execution
required
Example
await model.send(BidiTextInputEvent(text="Hello", role="user"))
await model.send(BidiAudioInputEvent(audio=bytes, format="pcm", sample_rate=16000, channels=1))
await model.send(BidiImageInputEvent(image=bytes, mime_type="image/jpeg", encoding="raw"))
await model.send(ToolResultEvent(tool_result))
Source code in strands/experimental/bidi/models/model.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
async def send(
    self,
    content: BidiInputEvent | ToolResultEvent,
) -> None:
    """Send content to the model over the active connection.

    Transmits user input or tool results to the model during an active streaming
    session. Supports multiple content types including text, audio, images, and
    tool execution results. Can be called multiple times during a conversation.

    Args:
        content: The content to send. Must be one of:

            - BidiTextInputEvent: Text message from the user
            - BidiAudioInputEvent: Audio data for speech input
            - BidiImageInputEvent: Image data for visual understanding
            - ToolResultEvent: Result from a tool execution

    Example:
        ```
        await model.send(BidiTextInputEvent(text="Hello", role="user"))
        await model.send(BidiAudioInputEvent(audio=bytes, format="pcm", sample_rate=16000, channels=1))
        await model.send(BidiImageInputEvent(image=bytes, mime_type="image/jpeg", encoding="raw"))
        await model.send(ToolResultEvent(tool_result))
        ```
    """
    ...

start(system_prompt=None, tools=None, messages=None, **kwargs) async

Establish a persistent streaming connection with the model.

Opens a bidirectional connection that remains active for real-time communication. The connection supports concurrent sending and receiving of events until explicitly closed. Must be called before any send() or receive() operations.

Parameters:

Name Type Description Default
system_prompt str | None

System instructions to configure model behavior.

None
tools list[ToolSpec] | None

Tool specifications that the model can invoke during the conversation.

None
messages Messages | None

Initial conversation history to provide context.

None
**kwargs Any

Provider-specific configuration options.

{}
Source code in strands/experimental/bidi/models/model.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
async def start(
    self,
    system_prompt: str | None = None,
    tools: list[ToolSpec] | None = None,
    messages: Messages | None = None,
    **kwargs: Any,
) -> None:
    """Establish a persistent streaming connection with the model.

    Opens a bidirectional connection that remains active for real-time communication.
    The connection supports concurrent sending and receiving of events until explicitly
    closed. Must be called before any send() or receive() operations.

    Args:
        system_prompt: System instructions to configure model behavior.
        tools: Tool specifications that the model can invoke during the conversation.
        messages: Initial conversation history to provide context.
        **kwargs: Provider-specific configuration options.
    """
    ...

stop() async

Close the streaming connection and release resources.

Terminates the active bidirectional connection and cleans up any associated resources such as network connections, buffers, or background tasks. After calling close(), the model instance cannot be used until start() is called again.

Source code in strands/experimental/bidi/models/model.py
64
65
66
67
68
69
70
71
async def stop(self) -> None:
    """Close the streaming connection and release resources.

    Terminates the active bidirectional connection and cleans up any associated
    resources such as network connections, buffers, or background tasks. After
    calling close(), the model instance cannot be used until start() is called again.
    """
    ...

BidiModelTimeoutError

Bases: Exception

Model timeout error.

Bidirectional models are often configured with a connection time limit. Nova sonic for example keeps the connection open for 8 minutes max. Upon receiving a timeout, the agent loop is configured to restart the model connection so as to create a seamless, uninterrupted experience for the user.

Source code in strands/experimental/bidi/models/model.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class BidiModelTimeoutError(Exception):
    """Model timeout error.

    Bidirectional models are often configured with a connection time limit. Nova sonic for example keeps the connection
    open for 8 minutes max. Upon receiving a timeout, the agent loop is configured to restart the model connection so as
    to create a seamless, uninterrupted experience for the user.
    """

    def __init__(self, message: str, **restart_config: Any) -> None:
        """Initialize error.

        Args:
            message: Timeout message from model.
            **restart_config: Configure restart specific behaviors in the call to model start.
        """
        super().__init__(self, message)

        self.restart_config = restart_config

__init__(message, **restart_config)

Initialize error.

Parameters:

Name Type Description Default
message str

Timeout message from model.

required
**restart_config Any

Configure restart specific behaviors in the call to model start.

{}
Source code in strands/experimental/bidi/models/model.py
125
126
127
128
129
130
131
132
133
134
def __init__(self, message: str, **restart_config: Any) -> None:
    """Initialize error.

    Args:
        message: Timeout message from model.
        **restart_config: Configure restart specific behaviors in the call to model start.
    """
    super().__init__(self, message)

    self.restart_config = restart_config

BidiOpenAIRealtimeModel

Bases: BidiModel

OpenAI Realtime API implementation for bidirectional streaming.

Combines model configuration and connection state in a single class. Manages WebSocket connection to OpenAI's Realtime API with automatic VAD, function calling, and event conversion to Strands format.

Source code in strands/experimental/bidi/models/openai_realtime.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
class BidiOpenAIRealtimeModel(BidiModel):
    """OpenAI Realtime API implementation for bidirectional streaming.

    Combines model configuration and connection state in a single class.
    Manages WebSocket connection to OpenAI's Realtime API with automatic VAD,
    function calling, and event conversion to Strands format.
    """

    _websocket: ClientConnection
    _start_time: int

    def __init__(
        self,
        model_id: str = DEFAULT_MODEL,
        provider_config: dict[str, Any] | None = None,
        client_config: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> None:
        """Initialize OpenAI Realtime bidirectional model.

        Args:
            model_id: Model identifier (default: gpt-realtime)
            provider_config: Model behavior (audio, instructions, turn_detection, etc.)
            client_config: Authentication (api_key, organization, project)
                Falls back to OPENAI_API_KEY, OPENAI_ORGANIZATION, OPENAI_PROJECT env vars
            **kwargs: Reserved for future parameters.

        """
        # Store model ID
        self.model_id = model_id

        # Resolve client config with defaults and env vars
        self._client_config = self._resolve_client_config(client_config or {})

        # Resolve provider config with defaults
        self.config = self._resolve_provider_config(provider_config or {})

        # Store client config values for later use
        self.api_key = self._client_config["api_key"]
        self.organization = self._client_config.get("organization")
        self.project = self._client_config.get("project")
        self.timeout_s = self._client_config["timeout_s"]

        if self.timeout_s > OPENAI_MAX_TIMEOUT_S:
            raise ValueError(
                f"timeout_s=<{self.timeout_s}>, max_timeout_s=<{OPENAI_MAX_TIMEOUT_S}> | timeout exceeds max limit"
            )

        # Connection state (initialized in start())
        self._connection_id: str | None = None

        self._function_call_buffer: dict[str, Any] = {}

        logger.debug("model=<%s> | openai realtime model initialized", model_id)

    def _resolve_client_config(self, config: dict[str, Any]) -> dict[str, Any]:
        """Resolve client config with env var fallback (config takes precedence)."""
        resolved = config.copy()

        if "api_key" not in resolved:
            resolved["api_key"] = os.getenv("OPENAI_API_KEY")

        if not resolved.get("api_key"):
            raise ValueError(
                "OpenAI API key is required. Provide via client_config={'api_key': '...'} "
                "or set OPENAI_API_KEY environment variable."
            )
        if "organization" not in resolved:
            env_org = os.getenv("OPENAI_ORGANIZATION")
            if env_org:
                resolved["organization"] = env_org

        if "project" not in resolved:
            env_project = os.getenv("OPENAI_PROJECT")
            if env_project:
                resolved["project"] = env_project

        if "timeout_s" not in resolved:
            resolved["timeout_s"] = OPENAI_MAX_TIMEOUT_S

        return resolved

    def _resolve_provider_config(self, config: dict[str, Any]) -> dict[str, Any]:
        """Merge user config with defaults (user takes precedence)."""
        default_audio: AudioConfig = {
            "input_rate": cast(AudioSampleRate, DEFAULT_SAMPLE_RATE),
            "output_rate": cast(AudioSampleRate, DEFAULT_SAMPLE_RATE),
            "channels": 1,
            "format": "pcm",
            "voice": "alloy",
        }

        resolved = {
            "audio": {
                **default_audio,
                **config.get("audio", {}),
            },
            "inference": config.get("inference", {}),
        }
        return resolved

    async def start(
        self,
        system_prompt: str | None = None,
        tools: list[ToolSpec] | None = None,
        messages: Messages | None = None,
        **kwargs: Any,
    ) -> None:
        """Establish bidirectional connection to OpenAI Realtime API.

        Args:
            system_prompt: System instructions for the model.
            tools: List of tools available to the model.
            messages: Conversation history to initialize with.
            **kwargs: Additional configuration options.
        """
        if self._connection_id:
            raise RuntimeError("model already started | call stop before starting again")

        logger.debug("openai realtime connection starting")

        # Initialize connection state
        self._connection_id = str(uuid.uuid4())
        self._start_time = int(time.time())

        self._function_call_buffer = {}

        # Establish WebSocket connection
        url = f"{OPENAI_REALTIME_URL}?model={self.model_id}"

        headers = [("Authorization", f"Bearer {self.api_key}")]
        if self.organization:
            headers.append(("OpenAI-Organization", self.organization))
        if self.project:
            headers.append(("OpenAI-Project", self.project))

        self._websocket = await websockets.connect(url, additional_headers=headers)
        logger.debug("connection_id=<%s> | websocket connected successfully", self._connection_id)

        # Configure session
        session_config = self._build_session_config(system_prompt, tools)
        await self._send_event({"type": "session.update", "session": session_config})

        # Add conversation history if provided
        if messages:
            await self._add_conversation_history(messages)

    def _create_text_event(self, text: str, role: str, is_final: bool = True) -> BidiTranscriptStreamEvent:
        """Create standardized transcript event.

        Args:
            text: The transcript text
            role: The role (will be normalized to lowercase)
            is_final: Whether this is the final transcript
        """
        # Normalize role to lowercase and ensure it's either "user" or "assistant"
        normalized_role = role.lower() if isinstance(role, str) else "assistant"
        if normalized_role not in ["user", "assistant"]:
            normalized_role = "assistant"

        return BidiTranscriptStreamEvent(
            delta={"text": text},
            text=text,
            role=cast(Role, normalized_role),
            is_final=is_final,
            current_transcript=text if is_final else None,
        )

    def _create_voice_activity_event(self, activity_type: str) -> BidiInterruptionEvent | None:
        """Create standardized interruption event for voice activity."""
        # Only speech_started triggers interruption
        if activity_type == "speech_started":
            return BidiInterruptionEvent(reason="user_speech")
        # Other voice activity events are logged but don't create events
        return None

    def _build_session_config(self, system_prompt: str | None, tools: list[ToolSpec] | None) -> dict[str, Any]:
        """Build session configuration for OpenAI Realtime API."""
        config: dict[str, Any] = DEFAULT_SESSION_CONFIG.copy()

        if system_prompt:
            config["instructions"] = system_prompt

        if tools:
            config["tools"] = self._convert_tools_to_openai_format(tools)

        # Apply user-provided session configuration
        supported_params = {
            "max_output_tokens",
            "output_modalities",
            "tool_choice",
        }
        for key, value in self.config["inference"].items():
            if key in supported_params:
                config[key] = value
            else:
                logger.warning("parameter=<%s> | ignoring unsupported session parameter", key)

        audio_config = self.config["audio"]

        if "voice" in audio_config:
            config.setdefault("audio", {}).setdefault("output", {})["voice"] = audio_config["voice"]

        if "input_rate" in audio_config:
            config.setdefault("audio", {}).setdefault("input", {}).setdefault("format", {})["rate"] = audio_config[
                "input_rate"
            ]

        if "output_rate" in audio_config:
            config.setdefault("audio", {}).setdefault("output", {}).setdefault("format", {})["rate"] = audio_config[
                "output_rate"
            ]

        return config

    def _convert_tools_to_openai_format(self, tools: list[ToolSpec]) -> list[dict]:
        """Convert Strands tool specifications to OpenAI Realtime API format."""
        openai_tools = []

        for tool in tools:
            input_schema = tool["inputSchema"]
            if "json" in input_schema:
                schema = (
                    json.loads(input_schema["json"]) if isinstance(input_schema["json"], str) else input_schema["json"]
                )
            else:
                schema = input_schema

            # OpenAI Realtime API expects flat structure, not nested under "function"
            openai_tool = {
                "type": "function",
                "name": tool["name"],
                "description": tool["description"],
                "parameters": schema,
            }
            openai_tools.append(openai_tool)

        return openai_tools

    async def _add_conversation_history(self, messages: Messages) -> None:
        """Add conversation history to the session.

        Converts agent message history to OpenAI Realtime API format using
        conversation.item.create events for each message.

        Note: OpenAI Realtime API has a 32-character limit on call_id, so we truncate
        UUIDs consistently to ensure tool calls and their results match.

        Args:
            messages: List of conversation messages with role and content.
        """
        # Track tool call IDs to ensure consistency between calls and results
        call_id_map: dict[str, str] = {}

        # First pass: collect all tool call IDs
        for message in messages:
            for block in message.get("content", []):
                if "toolUse" in block:
                    tool_use = block["toolUse"]
                    original_id = tool_use["toolUseId"]
                    call_id = original_id[:32]
                    call_id_map[original_id] = call_id

        # Second pass: send messages
        for message in messages:
            role = message["role"]
            content_blocks = message.get("content", [])

            # Build content array for OpenAI format
            openai_content = []

            for block in content_blocks:
                if "text" in block:
                    # Text content - use appropriate type based on role
                    # User messages use "input_text", assistant messages use "output_text"
                    if role == "user":
                        openai_content.append({"type": "input_text", "text": block["text"]})
                    else:  # assistant
                        openai_content.append({"type": "output_text", "text": block["text"]})
                elif "toolUse" in block:
                    # Tool use - create as function_call item
                    tool_use = block["toolUse"]
                    original_id = tool_use["toolUseId"]
                    # Use pre-mapped call_id
                    call_id = call_id_map[original_id]

                    tool_item = {
                        "type": "conversation.item.create",
                        "item": {
                            "type": "function_call",
                            "call_id": call_id,
                            "name": tool_use["name"],
                            "arguments": json.dumps(tool_use["input"]),
                        },
                    }
                    await self._send_event(tool_item)
                    continue  # Tool use is sent separately, not in message content
                elif "toolResult" in block:
                    # Tool result - create as function_call_output item
                    tool_result = block["toolResult"]
                    original_id = tool_result["toolUseId"]

                    # Validate content types and serialize, preserving structure
                    result_output = ""
                    if "content" in tool_result:
                        # First validate all content types are supported
                        for result_block in tool_result["content"]:
                            if "text" not in result_block and "json" not in result_block:
                                # Unsupported content type - raise error
                                raise ValueError(
                                    f"tool_use_id=<{original_id}>, content_types=<{list(result_block.keys())}> | "
                                    f"Content type not supported by OpenAI Realtime API"
                                )

                        # Preserve structure by JSON-dumping the entire content array
                        result_output = json.dumps(tool_result["content"])

                    # Use mapped call_id if available, otherwise skip orphaned result
                    if original_id not in call_id_map:
                        continue  # Skip this tool result since we don't have the call

                    call_id = call_id_map[original_id]

                    result_item = {
                        "type": "conversation.item.create",
                        "item": {
                            "type": "function_call_output",
                            "call_id": call_id,
                            "output": result_output,
                        },
                    }
                    await self._send_event(result_item)
                    continue  # Tool result is sent separately, not in message content

            # Only create message item if there's text content
            if openai_content:
                conversation_item = {
                    "type": "conversation.item.create",
                    "item": {"type": "message", "role": role, "content": openai_content},
                }
                await self._send_event(conversation_item)

        logger.debug("message_count=<%d> | conversation history added to openai session", len(messages))

    async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
        """Receive OpenAI events and convert to Strands TypedEvent format."""
        if not self._connection_id:
            raise RuntimeError("model not started | call start before sending/receiving")

        yield BidiConnectionStartEvent(connection_id=self._connection_id, model=self.model_id)

        while True:
            duration = time.time() - self._start_time
            if duration >= self.timeout_s:
                raise BidiModelTimeoutError(f"timeout_s=<{self.timeout_s}>")

            try:
                message = await asyncio.wait_for(self._websocket.recv(), timeout=10)
            except asyncio.TimeoutError:
                continue

            openai_event = json.loads(message)

            for event in self._convert_openai_event(openai_event) or []:
                yield event

    def _convert_openai_event(self, openai_event: dict[str, Any]) -> list[BidiOutputEvent] | None:
        """Convert OpenAI events to Strands TypedEvent format."""
        event_type = openai_event.get("type")

        # Turn start - response begins
        if event_type == "response.created":
            response = openai_event.get("response", {})
            response_id = response.get("id", str(uuid.uuid4()))
            return [BidiResponseStartEvent(response_id=response_id)]

        # Audio output
        elif event_type == "response.output_audio.delta":
            # Audio is already base64 string from OpenAI
            # Use the resolved output sample rate from our merged configuration
            sample_rate = self.config["audio"]["output_rate"]

            # Channels from config is guaranteed to be 1 or 2
            channels = cast(Literal[1, 2], self.config["audio"]["channels"])
            return [
                BidiAudioStreamEvent(
                    audio=openai_event["delta"],
                    format="pcm",
                    sample_rate=sample_rate,
                    channels=channels,
                )
            ]

        # Assistant text output events - combine multiple similar events
        elif event_type in ["response.output_text.delta", "response.output_audio_transcript.delta"]:
            role = openai_event.get("role", "assistant")
            return [
                self._create_text_event(
                    openai_event["delta"], role.lower() if isinstance(role, str) else "assistant", is_final=False
                )
            ]

        elif event_type in ["response.output_audio_transcript.done"]:
            role = openai_event.get("role", "assistant").lower()
            return [self._create_text_event(openai_event["transcript"], role)]

        elif event_type in ["response.output_text.done"]:
            role = openai_event.get("role", "assistant").lower()
            return [self._create_text_event(openai_event["text"], role)]

        # User transcription events - combine multiple similar events
        elif event_type in [
            "conversation.item.input_audio_transcription.delta",
            "conversation.item.input_audio_transcription.completed",
        ]:
            text_key = "delta" if "delta" in event_type else "transcript"
            text = openai_event.get(text_key, "")
            role = openai_event.get("role", "user")
            is_final = "completed" in event_type
            return (
                [self._create_text_event(text, role.lower() if isinstance(role, str) else "user", is_final=is_final)]
                if text.strip()
                else None
            )

        elif event_type == "conversation.item.input_audio_transcription.segment":
            segment_data = openai_event.get("segment", {})
            text = segment_data.get("text", "")
            role = segment_data.get("role", "user")
            return (
                [self._create_text_event(text, role.lower() if isinstance(role, str) else "user")]
                if text.strip()
                else None
            )

        elif event_type == "conversation.item.input_audio_transcription.failed":
            error_info = openai_event.get("error", {})
            logger.warning("error=<%s> | openai transcription failed", error_info.get("message", "unknown error"))
            return None

        # Function call processing
        elif event_type == "response.function_call_arguments.delta":
            call_id = openai_event.get("call_id")
            delta = openai_event.get("delta", "")
            if call_id:
                if call_id not in self._function_call_buffer:
                    self._function_call_buffer[call_id] = {"call_id": call_id, "name": "", "arguments": delta}
                else:
                    self._function_call_buffer[call_id]["arguments"] += delta
            return None

        elif event_type == "response.function_call_arguments.done":
            call_id = openai_event.get("call_id")
            if call_id and call_id in self._function_call_buffer:
                function_call = self._function_call_buffer[call_id]
                try:
                    tool_use: ToolUse = {
                        "toolUseId": call_id,
                        "name": function_call["name"],
                        "input": json.loads(function_call["arguments"]) if function_call["arguments"] else {},
                    }
                    del self._function_call_buffer[call_id]
                    # Return ToolUseStreamEvent for consistency with standard agent
                    return [ToolUseStreamEvent(delta={"toolUse": tool_use}, current_tool_use=dict(tool_use))]
                except (json.JSONDecodeError, KeyError) as e:
                    logger.warning("call_id=<%s>, error=<%s> | error parsing function arguments", call_id, e)
                    del self._function_call_buffer[call_id]
            return None

        # Voice activity detection - speech_started triggers interruption
        elif event_type == "input_audio_buffer.speech_started":
            # This is the primary interruption signal - handle it first
            return [BidiInterruptionEvent(reason="user_speech")]

        # Response cancelled - handle interruption
        elif event_type == "response.cancelled":
            response = openai_event.get("response", {})
            response_id = response.get("id", "unknown")
            logger.debug("response_id=<%s> | openai response cancelled", response_id)
            return [BidiResponseCompleteEvent(response_id=response_id, stop_reason="interrupted")]

        # Turn complete and usage - response finished
        elif event_type == "response.done":
            response = openai_event.get("response", {})
            response_id = response.get("id", "unknown")
            status = response.get("status", "completed")
            usage = response.get("usage")

            # Map OpenAI status to our stop_reason
            stop_reason_map = {
                "completed": "complete",
                "cancelled": "interrupted",
                "failed": "error",
                "incomplete": "interrupted",
            }

            # Build list of events to return
            events: list[Any] = []

            # Always add response complete event
            events.append(
                BidiResponseCompleteEvent(
                    response_id=response_id,
                    stop_reason=cast(StopReason, stop_reason_map.get(status, "complete")),
                ),
            )

            # Add usage event if available
            if usage:
                input_details = usage.get("input_token_details", {})
                output_details = usage.get("output_token_details", {})

                # Build modality details
                modality_details = []

                # Text modality
                text_input = input_details.get("text_tokens", 0)
                text_output = output_details.get("text_tokens", 0)
                if text_input > 0 or text_output > 0:
                    modality_details.append(
                        {"modality": "text", "input_tokens": text_input, "output_tokens": text_output}
                    )

                # Audio modality
                audio_input = input_details.get("audio_tokens", 0)
                audio_output = output_details.get("audio_tokens", 0)
                if audio_input > 0 or audio_output > 0:
                    modality_details.append(
                        {"modality": "audio", "input_tokens": audio_input, "output_tokens": audio_output}
                    )

                # Image modality
                image_input = input_details.get("image_tokens", 0)
                if image_input > 0:
                    modality_details.append({"modality": "image", "input_tokens": image_input, "output_tokens": 0})

                # Cached tokens
                cached_tokens = input_details.get("cached_tokens", 0)

                # Add usage event
                events.append(
                    BidiUsageEvent(
                        input_tokens=usage.get("input_tokens", 0),
                        output_tokens=usage.get("output_tokens", 0),
                        total_tokens=usage.get("total_tokens", 0),
                        modality_details=cast(list[ModalityUsage], modality_details) if modality_details else None,
                        cache_read_input_tokens=cached_tokens if cached_tokens > 0 else None,
                    )
                )

            # Return list of events
            return events

        # Lifecycle events (log only) - combine multiple similar events
        elif event_type in ["conversation.item.retrieve", "conversation.item.added"]:
            item = openai_event.get("item", {})
            action = "retrieved" if "retrieve" in event_type else "added"
            logger.debug("action=<%s>, item_id=<%s> | openai conversation item event", action, item.get("id"))
            return None

        elif event_type == "conversation.item.done":
            logger.debug("item_id=<%s> | openai conversation item done", openai_event.get("item", {}).get("id"))
            return None

        # Response output events - combine similar events
        elif event_type in [
            "response.output_item.added",
            "response.output_item.done",
            "response.content_part.added",
            "response.content_part.done",
        ]:
            item_data = openai_event.get("item") or openai_event.get("part")
            logger.debug(
                "event_type=<%s>, item_id=<%s> | openai output event",
                event_type,
                item_data.get("id") if item_data else "unknown",
            )

            # Track function call names from response.output_item.added
            if event_type == "response.output_item.added":
                item = openai_event.get("item", {})
                if item.get("type") == "function_call":
                    call_id = item.get("call_id")
                    function_name = item.get("name")
                    if call_id and function_name:
                        if call_id not in self._function_call_buffer:
                            self._function_call_buffer[call_id] = {
                                "call_id": call_id,
                                "name": function_name,
                                "arguments": "",
                            }
                        else:
                            self._function_call_buffer[call_id]["name"] = function_name
            return None

        # Session/buffer events - combine simple log-only events
        elif event_type in [
            "input_audio_buffer.committed",
            "input_audio_buffer.cleared",
            "session.created",
            "session.updated",
        ]:
            logger.debug("event_type=<%s> | openai event received", event_type)
            return None

        elif event_type == "error":
            error_data = openai_event.get("error", {})
            error_code = error_data.get("code", "")

            # Suppress expected errors that don't affect session state
            if error_code == "response_cancel_not_active":
                # This happens when trying to cancel a response that's not active
                # It's safe to ignore as the session remains functional
                logger.debug("openai response cancel attempted when no response active")
                return None

            # Log other errors
            logger.error("error=<%s> | openai realtime error", error_data)
            return None

        else:
            logger.debug("event_type=<%s> | unhandled openai event type", event_type)
            return None

    async def send(
        self,
        content: BidiInputEvent | ToolResultEvent,
    ) -> None:
        """Unified send method for all content types. Sends the given content to OpenAI.

        Dispatches to appropriate internal handler based on content type.

        Args:
            content: Typed event (BidiTextInputEvent, BidiAudioInputEvent, BidiImageInputEvent, or ToolResultEvent).

        Raises:
            ValueError: If content type not supported (e.g., image content).
        """
        if not self._connection_id:
            raise RuntimeError("model not started | call start before sending")

        # Note: TypedEvent inherits from dict, so isinstance checks for TypedEvent must come first
        if isinstance(content, BidiTextInputEvent):
            await self._send_text_content(content.text)
        elif isinstance(content, BidiAudioInputEvent):
            await self._send_audio_content(content)
        elif isinstance(content, ToolResultEvent):
            tool_result = content.get("tool_result")
            if tool_result:
                await self._send_tool_result(tool_result)
        else:
            raise ValueError(f"content_type={type(content)} | content not supported")

    async def _send_audio_content(self, audio_input: BidiAudioInputEvent) -> None:
        """Internal: Send audio content to OpenAI for processing."""
        # Audio is already base64 encoded in the event
        await self._send_event({"type": "input_audio_buffer.append", "audio": audio_input.audio})

    async def _send_text_content(self, text: str) -> None:
        """Internal: Send text content to OpenAI for processing."""
        item_data = {"type": "message", "role": "user", "content": [{"type": "input_text", "text": text}]}
        await self._send_event({"type": "conversation.item.create", "item": item_data})
        await self._send_event({"type": "response.create"})

    async def _send_interrupt(self) -> None:
        """Internal: Send interruption signal to OpenAI."""
        await self._send_event({"type": "response.cancel"})

    async def _send_tool_result(self, tool_result: ToolResult) -> None:
        """Internal: Send tool result back to OpenAI."""
        tool_use_id = tool_result.get("toolUseId")

        logger.debug("tool_use_id=<%s> | sending openai tool result", tool_use_id)

        # Validate content types and serialize, preserving structure
        result_output = ""
        if "content" in tool_result:
            # First validate all content types are supported
            for block in tool_result["content"]:
                if "text" not in block and "json" not in block:
                    # Unsupported content type - raise error
                    raise ValueError(
                        f"tool_use_id=<{tool_use_id}>, content_types=<{list(block.keys())}> | "
                        f"Content type not supported by OpenAI Realtime API"
                    )

            # Preserve structure by JSON-dumping the entire content array
            result_output = json.dumps(tool_result["content"])

        item_data = {"type": "function_call_output", "call_id": tool_use_id, "output": result_output}
        await self._send_event({"type": "conversation.item.create", "item": item_data})
        await self._send_event({"type": "response.create"})

    async def stop(self) -> None:
        """Close session and cleanup resources."""
        logger.debug("openai realtime connection cleanup starting")

        async def stop_websocket() -> None:
            if not hasattr(self, "_websocket"):
                return

            await self._websocket.close()

        async def stop_connection() -> None:
            self._connection_id = None

        await stop_all(stop_websocket, stop_connection)

        logger.debug("openai realtime connection closed")

    async def _send_event(self, event: dict[str, Any]) -> None:
        """Send event to OpenAI via WebSocket."""
        message = json.dumps(event)
        await self._websocket.send(message)
        logger.debug("event_type=<%s> | openai event sent", event.get("type"))

__init__(model_id=DEFAULT_MODEL, provider_config=None, client_config=None, **kwargs)

Initialize OpenAI Realtime bidirectional model.

Parameters:

Name Type Description Default
model_id str

Model identifier (default: gpt-realtime)

DEFAULT_MODEL
provider_config dict[str, Any] | None

Model behavior (audio, instructions, turn_detection, etc.)

None
client_config dict[str, Any] | None

Authentication (api_key, organization, project) Falls back to OPENAI_API_KEY, OPENAI_ORGANIZATION, OPENAI_PROJECT env vars

None
**kwargs Any

Reserved for future parameters.

{}
Source code in strands/experimental/bidi/models/openai_realtime.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __init__(
    self,
    model_id: str = DEFAULT_MODEL,
    provider_config: dict[str, Any] | None = None,
    client_config: dict[str, Any] | None = None,
    **kwargs: Any,
) -> None:
    """Initialize OpenAI Realtime bidirectional model.

    Args:
        model_id: Model identifier (default: gpt-realtime)
        provider_config: Model behavior (audio, instructions, turn_detection, etc.)
        client_config: Authentication (api_key, organization, project)
            Falls back to OPENAI_API_KEY, OPENAI_ORGANIZATION, OPENAI_PROJECT env vars
        **kwargs: Reserved for future parameters.

    """
    # Store model ID
    self.model_id = model_id

    # Resolve client config with defaults and env vars
    self._client_config = self._resolve_client_config(client_config or {})

    # Resolve provider config with defaults
    self.config = self._resolve_provider_config(provider_config or {})

    # Store client config values for later use
    self.api_key = self._client_config["api_key"]
    self.organization = self._client_config.get("organization")
    self.project = self._client_config.get("project")
    self.timeout_s = self._client_config["timeout_s"]

    if self.timeout_s > OPENAI_MAX_TIMEOUT_S:
        raise ValueError(
            f"timeout_s=<{self.timeout_s}>, max_timeout_s=<{OPENAI_MAX_TIMEOUT_S}> | timeout exceeds max limit"
        )

    # Connection state (initialized in start())
    self._connection_id: str | None = None

    self._function_call_buffer: dict[str, Any] = {}

    logger.debug("model=<%s> | openai realtime model initialized", model_id)

receive() async

Receive OpenAI events and convert to Strands TypedEvent format.

Source code in strands/experimental/bidi/models/openai_realtime.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
    """Receive OpenAI events and convert to Strands TypedEvent format."""
    if not self._connection_id:
        raise RuntimeError("model not started | call start before sending/receiving")

    yield BidiConnectionStartEvent(connection_id=self._connection_id, model=self.model_id)

    while True:
        duration = time.time() - self._start_time
        if duration >= self.timeout_s:
            raise BidiModelTimeoutError(f"timeout_s=<{self.timeout_s}>")

        try:
            message = await asyncio.wait_for(self._websocket.recv(), timeout=10)
        except asyncio.TimeoutError:
            continue

        openai_event = json.loads(message)

        for event in self._convert_openai_event(openai_event) or []:
            yield event

send(content) async

Unified send method for all content types. Sends the given content to OpenAI.

Dispatches to appropriate internal handler based on content type.

Parameters:

Name Type Description Default
content BidiInputEvent | ToolResultEvent

Typed event (BidiTextInputEvent, BidiAudioInputEvent, BidiImageInputEvent, or ToolResultEvent).

required

Raises:

Type Description
ValueError

If content type not supported (e.g., image content).

Source code in strands/experimental/bidi/models/openai_realtime.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
async def send(
    self,
    content: BidiInputEvent | ToolResultEvent,
) -> None:
    """Unified send method for all content types. Sends the given content to OpenAI.

    Dispatches to appropriate internal handler based on content type.

    Args:
        content: Typed event (BidiTextInputEvent, BidiAudioInputEvent, BidiImageInputEvent, or ToolResultEvent).

    Raises:
        ValueError: If content type not supported (e.g., image content).
    """
    if not self._connection_id:
        raise RuntimeError("model not started | call start before sending")

    # Note: TypedEvent inherits from dict, so isinstance checks for TypedEvent must come first
    if isinstance(content, BidiTextInputEvent):
        await self._send_text_content(content.text)
    elif isinstance(content, BidiAudioInputEvent):
        await self._send_audio_content(content)
    elif isinstance(content, ToolResultEvent):
        tool_result = content.get("tool_result")
        if tool_result:
            await self._send_tool_result(tool_result)
    else:
        raise ValueError(f"content_type={type(content)} | content not supported")

start(system_prompt=None, tools=None, messages=None, **kwargs) async

Establish bidirectional connection to OpenAI Realtime API.

Parameters:

Name Type Description Default
system_prompt str | None

System instructions for the model.

None
tools list[ToolSpec] | None

List of tools available to the model.

None
messages Messages | None

Conversation history to initialize with.

None
**kwargs Any

Additional configuration options.

{}
Source code in strands/experimental/bidi/models/openai_realtime.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
async def start(
    self,
    system_prompt: str | None = None,
    tools: list[ToolSpec] | None = None,
    messages: Messages | None = None,
    **kwargs: Any,
) -> None:
    """Establish bidirectional connection to OpenAI Realtime API.

    Args:
        system_prompt: System instructions for the model.
        tools: List of tools available to the model.
        messages: Conversation history to initialize with.
        **kwargs: Additional configuration options.
    """
    if self._connection_id:
        raise RuntimeError("model already started | call stop before starting again")

    logger.debug("openai realtime connection starting")

    # Initialize connection state
    self._connection_id = str(uuid.uuid4())
    self._start_time = int(time.time())

    self._function_call_buffer = {}

    # Establish WebSocket connection
    url = f"{OPENAI_REALTIME_URL}?model={self.model_id}"

    headers = [("Authorization", f"Bearer {self.api_key}")]
    if self.organization:
        headers.append(("OpenAI-Organization", self.organization))
    if self.project:
        headers.append(("OpenAI-Project", self.project))

    self._websocket = await websockets.connect(url, additional_headers=headers)
    logger.debug("connection_id=<%s> | websocket connected successfully", self._connection_id)

    # Configure session
    session_config = self._build_session_config(system_prompt, tools)
    await self._send_event({"type": "session.update", "session": session_config})

    # Add conversation history if provided
    if messages:
        await self._add_conversation_history(messages)

stop() async

Close session and cleanup resources.

Source code in strands/experimental/bidi/models/openai_realtime.py
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
async def stop(self) -> None:
    """Close session and cleanup resources."""
    logger.debug("openai realtime connection cleanup starting")

    async def stop_websocket() -> None:
        if not hasattr(self, "_websocket"):
            return

        await self._websocket.close()

    async def stop_connection() -> None:
        self._connection_id = None

    await stop_all(stop_websocket, stop_connection)

    logger.debug("openai realtime connection closed")

BidiResponseCompleteEvent

Bases: TypedEvent

Model finished generating response.

Parameters:

Name Type Description Default
response_id str

ID of the response that completed (matches response.start).

required
stop_reason StopReason

Why the response ended.

required
Source code in strands/experimental/bidi/types/events.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
class BidiResponseCompleteEvent(TypedEvent):
    """Model finished generating response.

    Parameters:
        response_id: ID of the response that completed (matches response.start).
        stop_reason: Why the response ended.
    """

    def __init__(
        self,
        response_id: str,
        stop_reason: StopReason,
    ):
        """Initialize response complete event."""
        super().__init__(
            {
                "type": "bidi_response_complete",
                "response_id": response_id,
                "stop_reason": stop_reason,
            }
        )

    @property
    def response_id(self) -> str:
        """Unique identifier for this response."""
        return cast(str, self["response_id"])

    @property
    def stop_reason(self) -> StopReason:
        """Why the response ended."""
        return cast(StopReason, self["stop_reason"])

response_id property

Unique identifier for this response.

stop_reason property

Why the response ended.

__init__(response_id, stop_reason)

Initialize response complete event.

Source code in strands/experimental/bidi/types/events.py
393
394
395
396
397
398
399
400
401
402
403
404
405
def __init__(
    self,
    response_id: str,
    stop_reason: StopReason,
):
    """Initialize response complete event."""
    super().__init__(
        {
            "type": "bidi_response_complete",
            "response_id": response_id,
            "stop_reason": stop_reason,
        }
    )

BidiResponseStartEvent

Bases: TypedEvent

Model starts generating a response.

Parameters:

Name Type Description Default
response_id str

Unique identifier for this response (used in response.complete).

required
Source code in strands/experimental/bidi/types/events.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
class BidiResponseStartEvent(TypedEvent):
    """Model starts generating a response.

    Parameters:
        response_id: Unique identifier for this response (used in response.complete).
    """

    def __init__(self, response_id: str):
        """Initialize response start event."""
        super().__init__({"type": "bidi_response_start", "response_id": response_id})

    @property
    def response_id(self) -> str:
        """Unique identifier for this response."""
        return cast(str, self["response_id"])

response_id property

Unique identifier for this response.

__init__(response_id)

Initialize response start event.

Source code in strands/experimental/bidi/types/events.py
244
245
246
def __init__(self, response_id: str):
    """Initialize response start event."""
    super().__init__({"type": "bidi_response_start", "response_id": response_id})

BidiTextInputEvent

Bases: TypedEvent

Text input event for sending text to the model.

Used for sending text content through the send() method.

Parameters:

Name Type Description Default
text str

The text content to send to the model.

required
role Role

The role of the message sender (default: "user").

'user'
Source code in strands/experimental/bidi/types/events.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class BidiTextInputEvent(TypedEvent):
    """Text input event for sending text to the model.

    Used for sending text content through the send() method.

    Parameters:
        text: The text content to send to the model.
        role: The role of the message sender (default: "user").
    """

    def __init__(self, text: str, role: Role = "user"):
        """Initialize text input event."""
        super().__init__(
            {
                "type": "bidi_text_input",
                "text": text,
                "role": role,
            }
        )

    @property
    def text(self) -> str:
        """The text content to send to the model."""
        return cast(str, self["text"])

    @property
    def role(self) -> Role:
        """The role of the message sender."""
        return cast(Role, self["role"])

role property

The role of the message sender.

text property

The text content to send to the model.

__init__(text, role='user')

Initialize text input event.

Source code in strands/experimental/bidi/types/events.py
74
75
76
77
78
79
80
81
82
def __init__(self, text: str, role: Role = "user"):
    """Initialize text input event."""
    super().__init__(
        {
            "type": "bidi_text_input",
            "text": text,
            "role": role,
        }
    )

BidiTranscriptStreamEvent

Bases: ModelStreamEvent

Audio transcription streaming (user or assistant speech).

Supports incremental transcript updates for providers that send partial transcripts before the final version.

Parameters:

Name Type Description Default
delta ContentBlockDelta

The incremental transcript change (ContentBlockDelta).

required
text str

The delta text (same as delta content for convenience).

required
role Role

Who is speaking ("user" or "assistant").

required
is_final bool

Whether this is the final/complete transcript.

required
current_transcript str | None

The accumulated transcript text so far (None for first delta).

None
Source code in strands/experimental/bidi/types/events.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
class BidiTranscriptStreamEvent(ModelStreamEvent):
    """Audio transcription streaming (user or assistant speech).

    Supports incremental transcript updates for providers that send partial
    transcripts before the final version.

    Parameters:
        delta: The incremental transcript change (ContentBlockDelta).
        text: The delta text (same as delta content for convenience).
        role: Who is speaking ("user" or "assistant").
        is_final: Whether this is the final/complete transcript.
        current_transcript: The accumulated transcript text so far (None for first delta).
    """

    def __init__(
        self,
        delta: ContentBlockDelta,
        text: str,
        role: Role,
        is_final: bool,
        current_transcript: str | None = None,
    ):
        """Initialize transcript stream event."""
        super().__init__(
            {
                "type": "bidi_transcript_stream",
                "delta": delta,
                "text": text,
                "role": role,
                "is_final": is_final,
                "current_transcript": current_transcript,
            }
        )

    @property
    def delta(self) -> ContentBlockDelta:
        """The incremental transcript change."""
        return cast(ContentBlockDelta, self["delta"])

    @property
    def text(self) -> str:
        """The text content to send to the model."""
        return cast(str, self["text"])

    @property
    def role(self) -> Role:
        """The role of the message sender."""
        return cast(Role, self["role"])

    @property
    def is_final(self) -> bool:
        """Whether this is the final/complete transcript."""
        return cast(bool, self["is_final"])

    @property
    def current_transcript(self) -> str | None:
        """The accumulated transcript text so far."""
        return cast(str | None, self.get("current_transcript"))

current_transcript property

The accumulated transcript text so far.

delta property

The incremental transcript change.

is_final property

Whether this is the final/complete transcript.

role property

The role of the message sender.

text property

The text content to send to the model.

__init__(delta, text, role, is_final, current_transcript=None)

Initialize transcript stream event.

Source code in strands/experimental/bidi/types/events.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def __init__(
    self,
    delta: ContentBlockDelta,
    text: str,
    role: Role,
    is_final: bool,
    current_transcript: str | None = None,
):
    """Initialize transcript stream event."""
    super().__init__(
        {
            "type": "bidi_transcript_stream",
            "delta": delta,
            "text": text,
            "role": role,
            "is_final": is_final,
            "current_transcript": current_transcript,
        }
    )

BidiUsageEvent

Bases: TypedEvent

Token usage event with modality breakdown for bidirectional streaming.

Tracks token consumption across different modalities (audio, text, images) during bidirectional streaming sessions.

Parameters:

Name Type Description Default
input_tokens int

Total tokens used for all input modalities.

required
output_tokens int

Total tokens used for all output modalities.

required
total_tokens int

Sum of input and output tokens.

required
modality_details list[ModalityUsage] | None

Optional list of token usage per modality.

None
cache_read_input_tokens int | None

Optional tokens read from cache.

None
cache_write_input_tokens int | None

Optional tokens written to cache.

None
Source code in strands/experimental/bidi/types/events.py
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
class BidiUsageEvent(TypedEvent):
    """Token usage event with modality breakdown for bidirectional streaming.

    Tracks token consumption across different modalities (audio, text, images)
    during bidirectional streaming sessions.

    Parameters:
        input_tokens: Total tokens used for all input modalities.
        output_tokens: Total tokens used for all output modalities.
        total_tokens: Sum of input and output tokens.
        modality_details: Optional list of token usage per modality.
        cache_read_input_tokens: Optional tokens read from cache.
        cache_write_input_tokens: Optional tokens written to cache.
    """

    def __init__(
        self,
        input_tokens: int,
        output_tokens: int,
        total_tokens: int,
        modality_details: list[ModalityUsage] | None = None,
        cache_read_input_tokens: int | None = None,
        cache_write_input_tokens: int | None = None,
    ):
        """Initialize usage event."""
        data: dict[str, Any] = {
            "type": "bidi_usage",
            "inputTokens": input_tokens,
            "outputTokens": output_tokens,
            "totalTokens": total_tokens,
        }
        if modality_details is not None:
            data["modality_details"] = modality_details
        if cache_read_input_tokens is not None:
            data["cacheReadInputTokens"] = cache_read_input_tokens
        if cache_write_input_tokens is not None:
            data["cacheWriteInputTokens"] = cache_write_input_tokens
        super().__init__(data)

    @property
    def input_tokens(self) -> int:
        """Total tokens used for all input modalities."""
        return cast(int, self["inputTokens"])

    @property
    def output_tokens(self) -> int:
        """Total tokens used for all output modalities."""
        return cast(int, self["outputTokens"])

    @property
    def total_tokens(self) -> int:
        """Sum of input and output tokens."""
        return cast(int, self["totalTokens"])

    @property
    def modality_details(self) -> list[ModalityUsage]:
        """Optional list of token usage per modality."""
        return cast(list[ModalityUsage], self.get("modality_details", []))

    @property
    def cache_read_input_tokens(self) -> int | None:
        """Optional tokens read from cache."""
        return cast(int | None, self.get("cacheReadInputTokens"))

    @property
    def cache_write_input_tokens(self) -> int | None:
        """Optional tokens written to cache."""
        return cast(int | None, self.get("cacheWriteInputTokens"))

cache_read_input_tokens property

Optional tokens read from cache.

cache_write_input_tokens property

Optional tokens written to cache.

input_tokens property

Total tokens used for all input modalities.

modality_details property

Optional list of token usage per modality.

output_tokens property

Total tokens used for all output modalities.

total_tokens property

Sum of input and output tokens.

__init__(input_tokens, output_tokens, total_tokens, modality_details=None, cache_read_input_tokens=None, cache_write_input_tokens=None)

Initialize usage event.

Source code in strands/experimental/bidi/types/events.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def __init__(
    self,
    input_tokens: int,
    output_tokens: int,
    total_tokens: int,
    modality_details: list[ModalityUsage] | None = None,
    cache_read_input_tokens: int | None = None,
    cache_write_input_tokens: int | None = None,
):
    """Initialize usage event."""
    data: dict[str, Any] = {
        "type": "bidi_usage",
        "inputTokens": input_tokens,
        "outputTokens": output_tokens,
        "totalTokens": total_tokens,
    }
    if modality_details is not None:
        data["modality_details"] = modality_details
    if cache_read_input_tokens is not None:
        data["cacheReadInputTokens"] = cache_read_input_tokens
    if cache_write_input_tokens is not None:
        data["cacheWriteInputTokens"] = cache_write_input_tokens
    super().__init__(data)

ModalityUsage

Bases: dict

Token usage for a specific modality.

Attributes:

Name Type Description
modality Literal['text', 'audio', 'image', 'cached']

Type of content.

input_tokens int

Tokens used for this modality's input.

output_tokens int

Tokens used for this modality's output.

Source code in strands/experimental/bidi/types/events.py
418
419
420
421
422
423
424
425
426
427
428
429
class ModalityUsage(dict):
    """Token usage for a specific modality.

    Attributes:
        modality: Type of content.
        input_tokens: Tokens used for this modality's input.
        output_tokens: Tokens used for this modality's output.
    """

    modality: Literal["text", "audio", "image", "cached"]
    input_tokens: int
    output_tokens: int

ToolResult

Bases: TypedDict

Result of a tool execution.

Attributes:

Name Type Description
content list[ToolResultContent]

List of result content returned by the tool.

status ToolResultStatus

The status of the tool execution ("success" or "error").

toolUseId str

The unique identifier of the tool use request that produced this result.

Source code in strands/types/tools.py
87
88
89
90
91
92
93
94
95
96
97
98
class ToolResult(TypedDict):
    """Result of a tool execution.

    Attributes:
        content: List of result content returned by the tool.
        status: The status of the tool execution ("success" or "error").
        toolUseId: The unique identifier of the tool use request that produced this result.
    """

    content: list[ToolResultContent]
    status: ToolResultStatus
    toolUseId: str

ToolResultEvent

Bases: TypedEvent

Event emitted when a tool execution completes.

Source code in strands/types/_events.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
class ToolResultEvent(TypedEvent):
    """Event emitted when a tool execution completes."""

    def __init__(self, tool_result: ToolResult) -> None:
        """Initialize with the completed tool result.

        Args:
            tool_result: Final result from the tool execution
        """
        super().__init__({"type": "tool_result", "tool_result": tool_result})

    @property
    def tool_use_id(self) -> str:
        """The toolUseId associated with this result."""
        return cast(ToolResult, self.get("tool_result"))["toolUseId"]

    @property
    def tool_result(self) -> ToolResult:
        """Final result from the completed tool execution."""
        return cast(ToolResult, self.get("tool_result"))

    @property
    @override
    def is_callback_event(self) -> bool:
        return False

tool_result property

Final result from the completed tool execution.

tool_use_id property

The toolUseId associated with this result.

__init__(tool_result)

Initialize with the completed tool result.

Parameters:

Name Type Description Default
tool_result ToolResult

Final result from the tool execution

required
Source code in strands/types/_events.py
278
279
280
281
282
283
284
def __init__(self, tool_result: ToolResult) -> None:
    """Initialize with the completed tool result.

    Args:
        tool_result: Final result from the tool execution
    """
    super().__init__({"type": "tool_result", "tool_result": tool_result})

ToolSpec

Bases: TypedDict

Specification for a tool that can be used by an agent.

Attributes:

Name Type Description
description str

A human-readable description of what the tool does.

inputSchema JSONSchema

JSON Schema defining the expected input parameters.

name str

The unique name of the tool.

outputSchema NotRequired[JSONSchema]

Optional JSON Schema defining the expected output format. Note: Not all model providers support this field. Providers that don't support it should filter it out before sending to their API.

Source code in strands/types/tools.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ToolSpec(TypedDict):
    """Specification for a tool that can be used by an agent.

    Attributes:
        description: A human-readable description of what the tool does.
        inputSchema: JSON Schema defining the expected input parameters.
        name: The unique name of the tool.
        outputSchema: Optional JSON Schema defining the expected output format.
            Note: Not all model providers support this field. Providers that don't
            support it should filter it out before sending to their API.
    """

    description: str
    inputSchema: JSONSchema
    name: str
    outputSchema: NotRequired[JSONSchema]

ToolUse

Bases: TypedDict

A request from the model to use a specific tool with the provided input.

Attributes:

Name Type Description
input Any

The input parameters for the tool. Can be any JSON-serializable type.

name str

The name of the tool to invoke.

toolUseId str

A unique identifier for this specific tool use request.

Source code in strands/types/tools.py
52
53
54
55
56
57
58
59
60
61
62
63
64
class ToolUse(TypedDict):
    """A request from the model to use a specific tool with the provided input.

    Attributes:
        input: The input parameters for the tool.
            Can be any JSON-serializable type.
        name: The name of the tool to invoke.
        toolUseId: A unique identifier for this specific tool use request.
    """

    input: Any
    name: str
    toolUseId: str

ToolUseStreamEvent

Bases: ModelStreamEvent

Event emitted during tool use input streaming.

Source code in strands/types/_events.py
143
144
145
146
147
148
class ToolUseStreamEvent(ModelStreamEvent):
    """Event emitted during tool use input streaming."""

    def __init__(self, delta: ContentBlockDelta, current_tool_use: dict[str, Any]) -> None:
        """Initialize with delta and current tool use state."""
        super().__init__({"type": "tool_use_stream", "delta": delta, "current_tool_use": current_tool_use})

__init__(delta, current_tool_use)

Initialize with delta and current tool use state.

Source code in strands/types/_events.py
146
147
148
def __init__(self, delta: ContentBlockDelta, current_tool_use: dict[str, Any]) -> None:
    """Initialize with delta and current tool use state."""
    super().__init__({"type": "tool_use_stream", "delta": delta, "current_tool_use": current_tool_use})

stop_all(*funcs) async

Call all stops in sequence and aggregate errors.

A failure in one stop call will not block subsequent stop calls.

Parameters:

Name Type Description Default
funcs Callable[..., Awaitable[None]]

Stop functions to call in sequence.

()

Raises:

Type Description
RuntimeError

If any stop function raises an exception.

Source code in strands/experimental/bidi/_async/__init__.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def stop_all(*funcs: Callable[..., Awaitable[None]]) -> None:
    """Call all stops in sequence and aggregate errors.

    A failure in one stop call will not block subsequent stop calls.

    Args:
        funcs: Stop functions to call in sequence.

    Raises:
        RuntimeError: If any stop function raises an exception.
    """
    exceptions = []
    for func in funcs:
        try:
            await func()
        except Exception as exception:
            exceptions.append({"func_name": func.__name__, "exception": repr(exception)})

    if exceptions:
        raise RuntimeError(f"exceptions={exceptions} | failed stop sequence")