Skip to content

strands.experimental.bidi.models.nova_sonic

Nova Sonic bidirectional model provider for real-time streaming conversations.

Implements the BidiModel interface for Amazon's Nova Sonic, handling the complex event sequencing and audio processing required by Nova Sonic's InvokeModelWithBidirectionalStream protocol.

Nova Sonic specifics:

  • Hierarchical event sequences: connectionStart → promptStart → content streaming
  • Base64-encoded audio format with hex encoding
  • Tool execution with content containers and identifier tracking
  • 8-minute connection limits with proper cleanup sequences
  • Interruption detection through stopReason events

AudioChannel = Literal[1, 2] module-attribute

Number of audio channels.

  • Mono: 1
  • Stereo: 2

AudioSampleRate = Literal[16000, 24000, 48000] module-attribute

Audio sample rate in Hz.

BidiInputEvent = BidiTextInputEvent | BidiAudioInputEvent | BidiImageInputEvent module-attribute

Union of different bidi input event types.

BidiOutputEvent = BidiConnectionStartEvent | BidiConnectionRestartEvent | BidiResponseStartEvent | BidiAudioStreamEvent | BidiTranscriptStreamEvent | BidiInterruptionEvent | BidiResponseCompleteEvent | BidiUsageEvent | BidiConnectionCloseEvent | BidiErrorEvent | ToolUseStreamEvent module-attribute

Union of different bidi output event types.

Messages = List[Message] module-attribute

A list of messages representing a conversation.

NOVA_AUDIO_INPUT_CONFIG = {'mediaType': 'audio/lpcm', 'sampleRateHertz': 16000, 'sampleSizeBits': 16, 'channelCount': 1, 'audioType': 'SPEECH', 'encoding': 'base64'} module-attribute

NOVA_AUDIO_OUTPUT_CONFIG = {'mediaType': 'audio/lpcm', 'sampleRateHertz': 16000, 'sampleSizeBits': 16, 'channelCount': 1, 'voiceId': 'matthew', 'encoding': 'base64', 'audioType': 'SPEECH'} module-attribute

NOVA_TEXT_CONFIG = {'mediaType': 'text/plain'} module-attribute

NOVA_TOOL_CONFIG = {'mediaType': 'application/json'} module-attribute

_NOVA_INFERENCE_CONFIG_KEYS = {'max_tokens': 'maxTokens', 'temperature': 'temperature', 'top_p': 'topP'} module-attribute

logger = logging.getLogger(__name__) module-attribute

AudioConfig

Bases: TypedDict

Audio configuration for bidirectional streaming models.

Defines standard audio parameters that model providers use to specify their audio processing requirements. All fields are optional to support models that may not use audio or only need specific parameters.

Model providers build this configuration by merging user-provided values with their own defaults. The resulting configuration is then used by audio I/O implementations to configure hardware appropriately.

Attributes:

Name Type Description
input_rate AudioSampleRate

Input sample rate in Hz (e.g., 16000, 24000, 48000)

output_rate AudioSampleRate

Output sample rate in Hz (e.g., 16000, 24000, 48000)

channels AudioChannel

Number of audio channels (1=mono, 2=stereo)

format AudioFormat

Audio encoding format

voice str

Voice identifier for text-to-speech (e.g., "alloy", "matthew")

Source code in strands/experimental/bidi/types/model.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class AudioConfig(TypedDict, total=False):
    """Audio configuration for bidirectional streaming models.

    Defines standard audio parameters that model providers use to specify
    their audio processing requirements. All fields are optional to support
    models that may not use audio or only need specific parameters.

    Model providers build this configuration by merging user-provided values
    with their own defaults. The resulting configuration is then used by
    audio I/O implementations to configure hardware appropriately.

    Attributes:
        input_rate: Input sample rate in Hz (e.g., 16000, 24000, 48000)
        output_rate: Output sample rate in Hz (e.g., 16000, 24000, 48000)
        channels: Number of audio channels (1=mono, 2=stereo)
        format: Audio encoding format
        voice: Voice identifier for text-to-speech (e.g., "alloy", "matthew")
    """

    input_rate: AudioSampleRate
    output_rate: AudioSampleRate
    channels: AudioChannel
    format: AudioFormat
    voice: str

BidiAudioInputEvent

Bases: TypedEvent

Audio input event for sending audio to the model.

Used for sending audio data through the send() method.

Parameters:

Name Type Description Default
audio str

Base64-encoded audio string to send to model.

required
format AudioFormat | str

Audio format from SUPPORTED_AUDIO_FORMATS.

required
sample_rate AudioSampleRate

Sample rate from SUPPORTED_SAMPLE_RATES.

required
channels AudioChannel

Channel count from SUPPORTED_CHANNELS.

required
Source code in strands/experimental/bidi/types/events.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class BidiAudioInputEvent(TypedEvent):
    """Audio input event for sending audio to the model.

    Used for sending audio data through the send() method.

    Parameters:
        audio: Base64-encoded audio string to send to model.
        format: Audio format from SUPPORTED_AUDIO_FORMATS.
        sample_rate: Sample rate from SUPPORTED_SAMPLE_RATES.
        channels: Channel count from SUPPORTED_CHANNELS.
    """

    def __init__(
        self,
        audio: str,
        format: AudioFormat | str,
        sample_rate: AudioSampleRate,
        channels: AudioChannel,
    ):
        """Initialize audio input event."""
        super().__init__(
            {
                "type": "bidi_audio_input",
                "audio": audio,
                "format": format,
                "sample_rate": sample_rate,
                "channels": channels,
            }
        )

    @property
    def audio(self) -> str:
        """Base64-encoded audio string."""
        return cast(str, self["audio"])

    @property
    def format(self) -> AudioFormat:
        """Audio encoding format."""
        return cast(AudioFormat, self["format"])

    @property
    def sample_rate(self) -> AudioSampleRate:
        """Number of audio samples per second in Hz."""
        return cast(AudioSampleRate, self["sample_rate"])

    @property
    def channels(self) -> AudioChannel:
        """Number of audio channels (1=mono, 2=stereo)."""
        return cast(AudioChannel, self["channels"])

audio property

Base64-encoded audio string.

channels property

Number of audio channels (1=mono, 2=stereo).

format property

Audio encoding format.

sample_rate property

Number of audio samples per second in Hz.

__init__(audio, format, sample_rate, channels)

Initialize audio input event.

Source code in strands/experimental/bidi/types/events.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(
    self,
    audio: str,
    format: AudioFormat | str,
    sample_rate: AudioSampleRate,
    channels: AudioChannel,
):
    """Initialize audio input event."""
    super().__init__(
        {
            "type": "bidi_audio_input",
            "audio": audio,
            "format": format,
            "sample_rate": sample_rate,
            "channels": channels,
        }
    )

BidiAudioStreamEvent

Bases: TypedEvent

Streaming audio output from the model.

Parameters:

Name Type Description Default
audio str

Base64-encoded audio string.

required
format AudioFormat

Audio encoding format.

required
sample_rate AudioSampleRate

Number of audio samples per second in Hz.

required
channels AudioChannel

Number of audio channels (1=mono, 2=stereo).

required
Source code in strands/experimental/bidi/types/events.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class BidiAudioStreamEvent(TypedEvent):
    """Streaming audio output from the model.

    Parameters:
        audio: Base64-encoded audio string.
        format: Audio encoding format.
        sample_rate: Number of audio samples per second in Hz.
        channels: Number of audio channels (1=mono, 2=stereo).
    """

    def __init__(
        self,
        audio: str,
        format: AudioFormat,
        sample_rate: AudioSampleRate,
        channels: AudioChannel,
    ):
        """Initialize audio stream event."""
        super().__init__(
            {
                "type": "bidi_audio_stream",
                "audio": audio,
                "format": format,
                "sample_rate": sample_rate,
                "channels": channels,
            }
        )

    @property
    def audio(self) -> str:
        """Base64-encoded audio string."""
        return cast(str, self["audio"])

    @property
    def format(self) -> AudioFormat:
        """Audio encoding format."""
        return cast(AudioFormat, self["format"])

    @property
    def sample_rate(self) -> AudioSampleRate:
        """Number of audio samples per second in Hz."""
        return cast(AudioSampleRate, self["sample_rate"])

    @property
    def channels(self) -> AudioChannel:
        """Number of audio channels (1=mono, 2=stereo)."""
        return cast(AudioChannel, self["channels"])

audio property

Base64-encoded audio string.

channels property

Number of audio channels (1=mono, 2=stereo).

format property

Audio encoding format.

sample_rate property

Number of audio samples per second in Hz.

__init__(audio, format, sample_rate, channels)

Initialize audio stream event.

Source code in strands/experimental/bidi/types/events.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def __init__(
    self,
    audio: str,
    format: AudioFormat,
    sample_rate: AudioSampleRate,
    channels: AudioChannel,
):
    """Initialize audio stream event."""
    super().__init__(
        {
            "type": "bidi_audio_stream",
            "audio": audio,
            "format": format,
            "sample_rate": sample_rate,
            "channels": channels,
        }
    )

BidiConnectionStartEvent

Bases: TypedEvent

Streaming connection established and ready for interaction.

Parameters:

Name Type Description Default
connection_id str

Unique identifier for this streaming connection.

required
model str

Model identifier (e.g., "gpt-realtime", "gemini-2.0-flash-live").

required
Source code in strands/experimental/bidi/types/events.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class BidiConnectionStartEvent(TypedEvent):
    """Streaming connection established and ready for interaction.

    Parameters:
        connection_id: Unique identifier for this streaming connection.
        model: Model identifier (e.g., "gpt-realtime", "gemini-2.0-flash-live").
    """

    def __init__(self, connection_id: str, model: str):
        """Initialize connection start event."""
        super().__init__(
            {
                "type": "bidi_connection_start",
                "connection_id": connection_id,
                "model": model,
            }
        )

    @property
    def connection_id(self) -> str:
        """Unique identifier for this streaming connection."""
        return cast(str, self["connection_id"])

    @property
    def model(self) -> str:
        """Model identifier (e.g., 'gpt-realtime', 'gemini-2.0-flash-live')."""
        return cast(str, self["model"])

connection_id property

Unique identifier for this streaming connection.

model property

Model identifier (e.g., 'gpt-realtime', 'gemini-2.0-flash-live').

__init__(connection_id, model)

Initialize connection start event.

Source code in strands/experimental/bidi/types/events.py
194
195
196
197
198
199
200
201
202
def __init__(self, connection_id: str, model: str):
    """Initialize connection start event."""
    super().__init__(
        {
            "type": "bidi_connection_start",
            "connection_id": connection_id,
            "model": model,
        }
    )

BidiInterruptionEvent

Bases: TypedEvent

Model generation was interrupted.

Parameters:

Name Type Description Default
reason Literal['user_speech', 'error']

Why the interruption occurred.

required
Source code in strands/experimental/bidi/types/events.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
class BidiInterruptionEvent(TypedEvent):
    """Model generation was interrupted.

    Parameters:
        reason: Why the interruption occurred.
    """

    def __init__(self, reason: Literal["user_speech", "error"]):
        """Initialize interruption event."""
        super().__init__(
            {
                "type": "bidi_interruption",
                "reason": reason,
            }
        )

    @property
    def reason(self) -> str:
        """Why the interruption occurred."""
        return cast(str, self["reason"])

reason property

Why the interruption occurred.

__init__(reason)

Initialize interruption event.

Source code in strands/experimental/bidi/types/events.py
370
371
372
373
374
375
376
377
def __init__(self, reason: Literal["user_speech", "error"]):
    """Initialize interruption event."""
    super().__init__(
        {
            "type": "bidi_interruption",
            "reason": reason,
        }
    )

BidiModel

Bases: Protocol

Protocol for bidirectional streaming models.

This interface defines the contract for models that support persistent streaming connections with real-time audio and text communication. Implementations handle provider-specific protocols while exposing a standardized event-based API.

Attributes:

Name Type Description
config dict[str, Any]

Configuration dictionary with provider-specific settings.

Source code in strands/experimental/bidi/models/model.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class BidiModel(Protocol):
    """Protocol for bidirectional streaming models.

    This interface defines the contract for models that support persistent streaming
    connections with real-time audio and text communication. Implementations handle
    provider-specific protocols while exposing a standardized event-based API.

    Attributes:
        config: Configuration dictionary with provider-specific settings.
    """

    config: dict[str, Any]

    async def start(
        self,
        system_prompt: str | None = None,
        tools: list[ToolSpec] | None = None,
        messages: Messages | None = None,
        **kwargs: Any,
    ) -> None:
        """Establish a persistent streaming connection with the model.

        Opens a bidirectional connection that remains active for real-time communication.
        The connection supports concurrent sending and receiving of events until explicitly
        closed. Must be called before any send() or receive() operations.

        Args:
            system_prompt: System instructions to configure model behavior.
            tools: Tool specifications that the model can invoke during the conversation.
            messages: Initial conversation history to provide context.
            **kwargs: Provider-specific configuration options.
        """
        ...

    async def stop(self) -> None:
        """Close the streaming connection and release resources.

        Terminates the active bidirectional connection and cleans up any associated
        resources such as network connections, buffers, or background tasks. After
        calling close(), the model instance cannot be used until start() is called again.
        """
        ...

    def receive(self) -> AsyncIterable[BidiOutputEvent]:
        """Receive streaming events from the model.

        Continuously yields events from the model as they arrive over the connection.
        Events are normalized to a provider-agnostic format for uniform processing.
        This method should be called in a loop or async task to process model responses.

        The stream continues until the connection is closed or an error occurs.

        Yields:
            BidiOutputEvent: Standardized event objects containing audio output,
                transcripts, tool calls, or control signals.
        """
        ...

    async def send(
        self,
        content: BidiInputEvent | ToolResultEvent,
    ) -> None:
        """Send content to the model over the active connection.

        Transmits user input or tool results to the model during an active streaming
        session. Supports multiple content types including text, audio, images, and
        tool execution results. Can be called multiple times during a conversation.

        Args:
            content: The content to send. Must be one of:

                - BidiTextInputEvent: Text message from the user
                - BidiAudioInputEvent: Audio data for speech input
                - BidiImageInputEvent: Image data for visual understanding
                - ToolResultEvent: Result from a tool execution

        Example:
            ```
            await model.send(BidiTextInputEvent(text="Hello", role="user"))
            await model.send(BidiAudioInputEvent(audio=bytes, format="pcm", sample_rate=16000, channels=1))
            await model.send(BidiImageInputEvent(image=bytes, mime_type="image/jpeg", encoding="raw"))
            await model.send(ToolResultEvent(tool_result))
            ```
        """
        ...

receive()

Receive streaming events from the model.

Continuously yields events from the model as they arrive over the connection. Events are normalized to a provider-agnostic format for uniform processing. This method should be called in a loop or async task to process model responses.

The stream continues until the connection is closed or an error occurs.

Yields:

Name Type Description
BidiOutputEvent AsyncIterable[BidiOutputEvent]

Standardized event objects containing audio output, transcripts, tool calls, or control signals.

Source code in strands/experimental/bidi/models/model.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def receive(self) -> AsyncIterable[BidiOutputEvent]:
    """Receive streaming events from the model.

    Continuously yields events from the model as they arrive over the connection.
    Events are normalized to a provider-agnostic format for uniform processing.
    This method should be called in a loop or async task to process model responses.

    The stream continues until the connection is closed or an error occurs.

    Yields:
        BidiOutputEvent: Standardized event objects containing audio output,
            transcripts, tool calls, or control signals.
    """
    ...

send(content) async

Send content to the model over the active connection.

Transmits user input or tool results to the model during an active streaming session. Supports multiple content types including text, audio, images, and tool execution results. Can be called multiple times during a conversation.

Parameters:

Name Type Description Default
content BidiInputEvent | ToolResultEvent

The content to send. Must be one of:

  • BidiTextInputEvent: Text message from the user
  • BidiAudioInputEvent: Audio data for speech input
  • BidiImageInputEvent: Image data for visual understanding
  • ToolResultEvent: Result from a tool execution
required
Example
await model.send(BidiTextInputEvent(text="Hello", role="user"))
await model.send(BidiAudioInputEvent(audio=bytes, format="pcm", sample_rate=16000, channels=1))
await model.send(BidiImageInputEvent(image=bytes, mime_type="image/jpeg", encoding="raw"))
await model.send(ToolResultEvent(tool_result))
Source code in strands/experimental/bidi/models/model.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
async def send(
    self,
    content: BidiInputEvent | ToolResultEvent,
) -> None:
    """Send content to the model over the active connection.

    Transmits user input or tool results to the model during an active streaming
    session. Supports multiple content types including text, audio, images, and
    tool execution results. Can be called multiple times during a conversation.

    Args:
        content: The content to send. Must be one of:

            - BidiTextInputEvent: Text message from the user
            - BidiAudioInputEvent: Audio data for speech input
            - BidiImageInputEvent: Image data for visual understanding
            - ToolResultEvent: Result from a tool execution

    Example:
        ```
        await model.send(BidiTextInputEvent(text="Hello", role="user"))
        await model.send(BidiAudioInputEvent(audio=bytes, format="pcm", sample_rate=16000, channels=1))
        await model.send(BidiImageInputEvent(image=bytes, mime_type="image/jpeg", encoding="raw"))
        await model.send(ToolResultEvent(tool_result))
        ```
    """
    ...

start(system_prompt=None, tools=None, messages=None, **kwargs) async

Establish a persistent streaming connection with the model.

Opens a bidirectional connection that remains active for real-time communication. The connection supports concurrent sending and receiving of events until explicitly closed. Must be called before any send() or receive() operations.

Parameters:

Name Type Description Default
system_prompt str | None

System instructions to configure model behavior.

None
tools list[ToolSpec] | None

Tool specifications that the model can invoke during the conversation.

None
messages Messages | None

Initial conversation history to provide context.

None
**kwargs Any

Provider-specific configuration options.

{}
Source code in strands/experimental/bidi/models/model.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
async def start(
    self,
    system_prompt: str | None = None,
    tools: list[ToolSpec] | None = None,
    messages: Messages | None = None,
    **kwargs: Any,
) -> None:
    """Establish a persistent streaming connection with the model.

    Opens a bidirectional connection that remains active for real-time communication.
    The connection supports concurrent sending and receiving of events until explicitly
    closed. Must be called before any send() or receive() operations.

    Args:
        system_prompt: System instructions to configure model behavior.
        tools: Tool specifications that the model can invoke during the conversation.
        messages: Initial conversation history to provide context.
        **kwargs: Provider-specific configuration options.
    """
    ...

stop() async

Close the streaming connection and release resources.

Terminates the active bidirectional connection and cleans up any associated resources such as network connections, buffers, or background tasks. After calling close(), the model instance cannot be used until start() is called again.

Source code in strands/experimental/bidi/models/model.py
64
65
66
67
68
69
70
71
async def stop(self) -> None:
    """Close the streaming connection and release resources.

    Terminates the active bidirectional connection and cleans up any associated
    resources such as network connections, buffers, or background tasks. After
    calling close(), the model instance cannot be used until start() is called again.
    """
    ...

BidiModelTimeoutError

Bases: Exception

Model timeout error.

Bidirectional models are often configured with a connection time limit. Nova sonic for example keeps the connection open for 8 minutes max. Upon receiving a timeout, the agent loop is configured to restart the model connection so as to create a seamless, uninterrupted experience for the user.

Source code in strands/experimental/bidi/models/model.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class BidiModelTimeoutError(Exception):
    """Model timeout error.

    Bidirectional models are often configured with a connection time limit. Nova sonic for example keeps the connection
    open for 8 minutes max. Upon receiving a timeout, the agent loop is configured to restart the model connection so as
    to create a seamless, uninterrupted experience for the user.
    """

    def __init__(self, message: str, **restart_config: Any) -> None:
        """Initialize error.

        Args:
            message: Timeout message from model.
            **restart_config: Configure restart specific behaviors in the call to model start.
        """
        super().__init__(self, message)

        self.restart_config = restart_config

__init__(message, **restart_config)

Initialize error.

Parameters:

Name Type Description Default
message str

Timeout message from model.

required
**restart_config Any

Configure restart specific behaviors in the call to model start.

{}
Source code in strands/experimental/bidi/models/model.py
125
126
127
128
129
130
131
132
133
134
def __init__(self, message: str, **restart_config: Any) -> None:
    """Initialize error.

    Args:
        message: Timeout message from model.
        **restart_config: Configure restart specific behaviors in the call to model start.
    """
    super().__init__(self, message)

    self.restart_config = restart_config

BidiNovaSonicModel

Bases: BidiModel

Nova Sonic implementation for bidirectional streaming.

Combines model configuration and connection state in a single class. Manages Nova Sonic's complex event sequencing, audio format conversion, and tool execution patterns while providing the standard BidiModel interface.

Attributes:

Name Type Description
_stream DuplexEventStream

open bedrock stream to nova sonic.

Source code in strands/experimental/bidi/models/nova_sonic.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
class BidiNovaSonicModel(BidiModel):
    """Nova Sonic implementation for bidirectional streaming.

    Combines model configuration and connection state in a single class.
    Manages Nova Sonic's complex event sequencing, audio format conversion, and
    tool execution patterns while providing the standard BidiModel interface.

    Attributes:
        _stream: open bedrock stream to nova sonic.
    """

    _stream: DuplexEventStream

    def __init__(
        self,
        model_id: str = "amazon.nova-sonic-v1:0",
        provider_config: dict[str, Any] | None = None,
        client_config: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> None:
        """Initialize Nova Sonic bidirectional model.

        Args:
            model_id: Model identifier (default: amazon.nova-sonic-v1:0)
            provider_config: Model behavior (audio, inference settings)
            client_config: AWS authentication (boto_session OR region, not both)
            **kwargs: Reserved for future parameters.
        """
        # Store model ID
        self.model_id = model_id

        # Resolve client config with defaults
        self._client_config = self._resolve_client_config(client_config or {})

        # Resolve provider config with defaults
        self.config = self._resolve_provider_config(provider_config or {})

        # Store session and region for later use
        self._session = self._client_config["boto_session"]
        self.region = self._client_config["region"]

        # Track API-provided identifiers
        self._connection_id: str | None = None
        self._audio_content_name: str | None = None
        self._current_completion_id: str | None = None

        # Indicates if model is done generating transcript
        self._generation_stage: str | None = None

        # Ensure certain events are sent in sequence when required
        self._send_lock = asyncio.Lock()

        logger.debug("model_id=<%s> | nova sonic model initialized", model_id)

    def _resolve_client_config(self, config: dict[str, Any]) -> dict[str, Any]:
        """Resolve AWS client config (creates boto session if needed)."""
        if "boto_session" in config and "region" in config:
            raise ValueError("Cannot specify both 'boto_session' and 'region' in client_config")

        resolved = config.copy()

        # Create boto session if not provided
        if "boto_session" not in resolved:
            resolved["boto_session"] = boto3.Session()

        # Resolve region from session or use default
        if "region" not in resolved:
            resolved["region"] = resolved["boto_session"].region_name or "us-east-1"

        return resolved

    def _resolve_provider_config(self, config: dict[str, Any]) -> dict[str, Any]:
        """Merge user config with defaults (user takes precedence)."""
        default_audio: AudioConfig = {
            "input_rate": cast(AudioSampleRate, NOVA_AUDIO_INPUT_CONFIG["sampleRateHertz"]),
            "output_rate": cast(AudioSampleRate, NOVA_AUDIO_OUTPUT_CONFIG["sampleRateHertz"]),
            "channels": cast(AudioChannel, NOVA_AUDIO_INPUT_CONFIG["channelCount"]),
            "format": "pcm",
            "voice": cast(str, NOVA_AUDIO_OUTPUT_CONFIG["voiceId"]),
        }

        resolved = {
            "audio": {
                **default_audio,
                **config.get("audio", {}),
            },
            "inference": config.get("inference", {}),
        }
        return resolved

    async def start(
        self,
        system_prompt: str | None = None,
        tools: list[ToolSpec] | None = None,
        messages: Messages | None = None,
        **kwargs: Any,
    ) -> None:
        """Establish bidirectional connection to Nova Sonic.

        Args:
            system_prompt: System instructions for the model.
            tools: List of tools available to the model.
            messages: Conversation history to initialize with.
            **kwargs: Additional configuration options.

        Raises:
            RuntimeError: If user calls start again without first stopping.
        """
        if self._connection_id:
            raise RuntimeError("model already started | call stop before starting again")

        logger.debug("nova connection starting")

        self._connection_id = str(uuid.uuid4())

        # Get credentials from boto3 session (full credential chain)
        credentials = self._session.get_credentials()

        if not credentials:
            raise ValueError(
                "no AWS credentials found. configure credentials via environment variables, "
                "credential files, IAM roles, or SSO."
            )

        # Use static resolver with credentials configured as properties
        resolver = StaticCredentialsResolver()

        config = Config(
            endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
            region=self.region,
            aws_credentials_identity_resolver=resolver,
            auth_scheme_resolver=HTTPAuthSchemeResolver(),
            auth_schemes={ShapeID("aws.auth#sigv4"): SigV4AuthScheme(service="bedrock")},
            # Configure static credentials as properties
            aws_access_key_id=credentials.access_key,
            aws_secret_access_key=credentials.secret_key,
            aws_session_token=credentials.token,
        )

        self.client = BedrockRuntimeClient(config=config)
        logger.debug("region=<%s> | nova sonic client initialized", self.region)

        client = BedrockRuntimeClient(config=config)
        self._stream = await client.invoke_model_with_bidirectional_stream(
            InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id)
        )
        logger.debug("region=<%s> | nova sonic client initialized", self.region)

        init_events = self._build_initialization_events(system_prompt, tools, messages)
        logger.debug("event_count=<%d> | sending nova sonic initialization events", len(init_events))
        await self._send_nova_events(init_events)

        logger.info("connection_id=<%s> | nova sonic connection established", self._connection_id)

    def _build_initialization_events(
        self, system_prompt: str | None, tools: list[ToolSpec] | None, messages: Messages | None
    ) -> list[str]:
        """Build the sequence of initialization events."""
        tools = tools or []
        events = [
            self._get_connection_start_event(),
            self._get_prompt_start_event(tools),
            *self._get_system_prompt_events(system_prompt),
        ]

        # Add conversation history if provided
        if messages:
            events.extend(self._get_message_history_events(messages))
            logger.debug("message_count=<%d> | conversation history added to initialization", len(messages))

        return events

    def _log_event_type(self, nova_event: dict[str, Any]) -> None:
        """Log specific Nova Sonic event types for debugging."""
        if "usageEvent" in nova_event:
            logger.debug("usage=<%s> | nova usage event received", nova_event["usageEvent"])
        elif "textOutput" in nova_event:
            logger.debug("nova text output received")
        elif "toolUse" in nova_event:
            tool_use = nova_event["toolUse"]
            logger.debug(
                "tool_name=<%s>, tool_use_id=<%s> | nova tool use received",
                tool_use["toolName"],
                tool_use["toolUseId"],
            )
        elif "audioOutput" in nova_event:
            audio_content = nova_event["audioOutput"]["content"]
            audio_bytes = base64.b64decode(audio_content)
            logger.debug("audio_bytes=<%d> | nova audio output received", len(audio_bytes))

    async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
        """Receive Nova Sonic events and convert to provider-agnostic format.

        Raises:
            RuntimeError: If start has not been called.
        """
        if not self._connection_id:
            raise RuntimeError("model not started | call start before receiving")

        logger.debug("nova event stream starting")
        yield BidiConnectionStartEvent(connection_id=self._connection_id, model=self.model_id)

        _, output = await self._stream.await_output()
        while True:
            try:
                event_data = await output.receive()

            except ValidationException as error:
                if "InternalErrorCode=531" in error.message:
                    # nova also times out if user is silent for 175 seconds
                    raise BidiModelTimeoutError(error.message) from error
                raise

            except ModelTimeoutException as error:
                raise BidiModelTimeoutError(error.message) from error

            if not event_data:
                continue

            nova_event = json.loads(event_data.value.bytes_.decode("utf-8"))["event"]
            self._log_event_type(nova_event)

            model_event = self._convert_nova_event(nova_event)
            if model_event:
                yield model_event

    async def send(self, content: BidiInputEvent | ToolResultEvent) -> None:
        """Unified send method for all content types. Sends the given content to Nova Sonic.

        Dispatches to appropriate internal handler based on content type.

        Args:
            content: Input event.

        Raises:
            ValueError: If content type not supported (e.g., image content).
        """
        if not self._connection_id:
            raise RuntimeError("model not started | call start before sending")

        if isinstance(content, BidiTextInputEvent):
            await self._send_text_content(content.text)
        elif isinstance(content, BidiAudioInputEvent):
            await self._send_audio_content(content)
        elif isinstance(content, ToolResultEvent):
            tool_result = content.get("tool_result")
            if tool_result:
                await self._send_tool_result(tool_result)
        else:
            raise ValueError(f"content_type={type(content)} | content not supported")

    async def _start_audio_connection(self) -> None:
        """Internal: Start audio input connection (call once before sending audio chunks)."""
        logger.debug("nova audio connection starting")
        self._audio_content_name = str(uuid.uuid4())

        # Build audio input configuration from config
        audio_input_config = {
            "mediaType": "audio/lpcm",
            "sampleRateHertz": self.config["audio"]["input_rate"],
            "sampleSizeBits": 16,
            "channelCount": self.config["audio"]["channels"],
            "audioType": "SPEECH",
            "encoding": "base64",
        }

        audio_content_start = json.dumps(
            {
                "event": {
                    "contentStart": {
                        "promptName": self._connection_id,
                        "contentName": self._audio_content_name,
                        "type": "AUDIO",
                        "interactive": True,
                        "role": "USER",
                        "audioInputConfiguration": audio_input_config,
                    }
                }
            }
        )

        await self._send_nova_events([audio_content_start])

    async def _send_audio_content(self, audio_input: BidiAudioInputEvent) -> None:
        """Internal: Send audio using Nova Sonic protocol-specific format."""
        # Start audio connection if not already active
        if not self._audio_content_name:
            await self._start_audio_connection()

        # Audio is already base64 encoded in the event
        # Send audio input event
        audio_event = json.dumps(
            {
                "event": {
                    "audioInput": {
                        "promptName": self._connection_id,
                        "contentName": self._audio_content_name,
                        "content": audio_input.audio,
                    }
                }
            }
        )

        await self._send_nova_events([audio_event])

    async def _end_audio_input(self) -> None:
        """Internal: End current audio input connection to trigger Nova Sonic processing."""
        if not self._audio_content_name:
            return

        logger.debug("nova audio connection ending")

        audio_content_end = json.dumps(
            {"event": {"contentEnd": {"promptName": self._connection_id, "contentName": self._audio_content_name}}}
        )

        await self._send_nova_events([audio_content_end])
        self._audio_content_name = None

    async def _send_text_content(self, text: str) -> None:
        """Internal: Send text content using Nova Sonic format."""
        content_name = str(uuid.uuid4())
        events = [
            self._get_text_content_start_event(content_name),
            self._get_text_input_event(content_name, text),
            self._get_content_end_event(content_name),
        ]
        await self._send_nova_events(events)

    async def _send_tool_result(self, tool_result: ToolResult) -> None:
        """Internal: Send tool result using Nova Sonic toolResult format."""
        tool_use_id = tool_result["toolUseId"]

        logger.debug("tool_use_id=<%s> | sending nova tool result", tool_use_id)

        # Validate content types and preserve structure
        content = tool_result.get("content", [])

        # Validate all content types are supported
        for block in content:
            if "text" not in block and "json" not in block:
                # Unsupported content type - raise error
                raise ValueError(
                    f"tool_use_id=<{tool_use_id}>, content_types=<{list(block.keys())}> | "
                    f"Content type not supported by Nova Sonic"
                )

        # Optimize for single content item - unwrap the array
        if len(content) == 1:
            result_data = cast(dict[str, Any], content[0])
        else:
            # Multiple items - send as array
            result_data = {"content": content}

        content_name = str(uuid.uuid4())
        events = [
            self._get_tool_content_start_event(content_name, tool_use_id),
            self._get_tool_result_event(content_name, result_data),
            self._get_content_end_event(content_name),
        ]
        await self._send_nova_events(events)

    async def stop(self) -> None:
        """Close Nova Sonic connection with proper cleanup sequence."""
        logger.debug("nova connection cleanup starting")

        async def stop_events() -> None:
            if not self._connection_id:
                return

            await self._end_audio_input()
            cleanup_events = [self._get_prompt_end_event(), self._get_connection_end_event()]
            await self._send_nova_events(cleanup_events)

        async def stop_stream() -> None:
            if not hasattr(self, "_stream"):
                return

            await self._stream.close()

        async def stop_connection() -> None:
            self._connection_id = None

        await stop_all(stop_events, stop_stream, stop_connection)

        logger.debug("nova connection closed")

    def _convert_nova_event(self, nova_event: dict[str, Any]) -> BidiOutputEvent | None:
        """Convert Nova Sonic events to TypedEvent format."""
        # Handle completion start - track completionId
        if "completionStart" in nova_event:
            completion_data = nova_event["completionStart"]
            self._current_completion_id = completion_data.get("completionId")
            logger.debug("completion_id=<%s> | nova completion started", self._current_completion_id)
            return None

        # Handle completion end
        if "completionEnd" in nova_event:
            completion_data = nova_event["completionEnd"]
            completion_id = completion_data.get("completionId", self._current_completion_id)
            stop_reason = completion_data.get("stopReason", "END_TURN")

            event = BidiResponseCompleteEvent(
                response_id=completion_id or str(uuid.uuid4()),  # Fallback to UUID if missing
                stop_reason="interrupted" if stop_reason == "INTERRUPTED" else "complete",
            )

            # Clear completion tracking
            self._current_completion_id = None
            return event

        # Handle audio output
        if "audioOutput" in nova_event:
            # Audio is already base64 string from Nova Sonic
            audio_content = nova_event["audioOutput"]["content"]
            return BidiAudioStreamEvent(
                audio=audio_content,
                format="pcm",
                sample_rate=cast(AudioSampleRate, self.config["audio"]["output_rate"]),
                channels=cast(AudioChannel, self.config["audio"]["channels"]),
            )

        # Handle text output (transcripts)
        elif "textOutput" in nova_event:
            text_output = nova_event["textOutput"]
            text_content = text_output["content"]
            # Check for Nova Sonic interruption pattern
            if '{ "interrupted" : true }' in text_content:
                logger.debug("nova interruption detected in text output")
                return BidiInterruptionEvent(reason="user_speech")

            return BidiTranscriptStreamEvent(
                delta={"text": text_content},
                text=text_content,
                role=text_output["role"].lower(),
                is_final=self._generation_stage == "FINAL",
                current_transcript=text_content,
            )

        # Handle tool use
        if "toolUse" in nova_event:
            tool_use = nova_event["toolUse"]
            tool_use_event: ToolUse = {
                "toolUseId": tool_use["toolUseId"],
                "name": tool_use["toolName"],
                "input": json.loads(tool_use["content"]),
            }
            # Return ToolUseStreamEvent - cast to dict for type compatibility
            return ToolUseStreamEvent(delta={"toolUse": tool_use_event}, current_tool_use=dict(tool_use_event))

        # Handle interruption
        if nova_event.get("stopReason") == "INTERRUPTED":
            logger.debug("nova interruption detected via stop reason")
            return BidiInterruptionEvent(reason="user_speech")

        # Handle usage events - convert to multimodal usage format
        if "usageEvent" in nova_event:
            usage_data = nova_event["usageEvent"]
            total_input = usage_data.get("totalInputTokens", 0)
            total_output = usage_data.get("totalOutputTokens", 0)

            return BidiUsageEvent(
                input_tokens=total_input,
                output_tokens=total_output,
                total_tokens=usage_data.get("totalTokens", total_input + total_output),
            )

        # Handle content start events (emit response start)
        if "contentStart" in nova_event:
            content_data = nova_event["contentStart"]
            if content_data["type"] == "TEXT":
                self._generation_stage = json.loads(content_data["additionalModelFields"])["generationStage"]

            # Emit response start event using API-provided completionId
            # completionId should already be tracked from completionStart event
            return BidiResponseStartEvent(
                response_id=self._current_completion_id or str(uuid.uuid4())  # Fallback to UUID if missing
            )

        if "contentEnd" in nova_event:
            self._generation_stage = None

        # Ignore all other events
        return None

    def _get_connection_start_event(self) -> str:
        """Generate Nova Sonic connection start event."""
        inference_config = {_NOVA_INFERENCE_CONFIG_KEYS[key]: value for key, value in self.config["inference"].items()}
        return json.dumps({"event": {"sessionStart": {"inferenceConfiguration": inference_config}}})

    def _get_prompt_start_event(self, tools: list[ToolSpec]) -> str:
        """Generate Nova Sonic prompt start event with tool configuration."""
        # Build audio output configuration from config
        audio_output_config = {
            "mediaType": "audio/lpcm",
            "sampleRateHertz": self.config["audio"]["output_rate"],
            "sampleSizeBits": 16,
            "channelCount": self.config["audio"]["channels"],
            "voiceId": self.config["audio"].get("voice", "matthew"),
            "encoding": "base64",
            "audioType": "SPEECH",
        }

        prompt_start_event: dict[str, Any] = {
            "event": {
                "promptStart": {
                    "promptName": self._connection_id,
                    "textOutputConfiguration": NOVA_TEXT_CONFIG,
                    "audioOutputConfiguration": audio_output_config,
                }
            }
        }

        if tools:
            tool_config = self._build_tool_configuration(tools)
            prompt_start_event["event"]["promptStart"]["toolUseOutputConfiguration"] = NOVA_TOOL_CONFIG
            prompt_start_event["event"]["promptStart"]["toolConfiguration"] = {"tools": tool_config}

        return json.dumps(prompt_start_event)

    def _build_tool_configuration(self, tools: list[ToolSpec]) -> list[dict[str, Any]]:
        """Build tool configuration from tool specs."""
        tool_config: list[dict[str, Any]] = []
        for tool in tools:
            input_schema = (
                {"json": json.dumps(tool["inputSchema"]["json"])}
                if "json" in tool["inputSchema"]
                else {"json": json.dumps(tool["inputSchema"])}
            )

            tool_config.append(
                {"toolSpec": {"name": tool["name"], "description": tool["description"], "inputSchema": input_schema}}
            )
        return tool_config

    def _get_system_prompt_events(self, system_prompt: str | None) -> list[str]:
        """Generate system prompt events."""
        content_name = str(uuid.uuid4())
        return [
            self._get_text_content_start_event(content_name, "SYSTEM"),
            self._get_text_input_event(content_name, system_prompt or ""),
            self._get_content_end_event(content_name),
        ]

    def _get_message_history_events(self, messages: Messages) -> list[str]:
        """Generate conversation history events from agent messages.

        Converts agent message history to Nova Sonic format following the
        contentStart/textInput/contentEnd pattern for each message.

        Args:
            messages: List of conversation messages with role and content.

        Returns:
            List of JSON event strings for Nova Sonic.
        """
        events = []

        for message in messages:
            role = message["role"].upper()  # Convert to ASSISTANT or USER
            content_blocks = message.get("content", [])

            # Extract text content from content blocks
            text_parts = []
            for block in content_blocks:
                if "text" in block:
                    text_parts.append(block["text"])

            # Combine all text parts
            if text_parts:
                combined_text = "\n".join(text_parts)
                content_name = str(uuid.uuid4())

                # Add contentStart, textInput, and contentEnd events
                events.extend(
                    [
                        self._get_text_content_start_event(content_name, role),
                        self._get_text_input_event(content_name, combined_text),
                        self._get_content_end_event(content_name),
                    ]
                )

        return events

    def _get_text_content_start_event(self, content_name: str, role: str = "USER") -> str:
        """Generate text content start event."""
        return json.dumps(
            {
                "event": {
                    "contentStart": {
                        "promptName": self._connection_id,
                        "contentName": content_name,
                        "type": "TEXT",
                        "role": role,
                        "interactive": True,
                        "textInputConfiguration": NOVA_TEXT_CONFIG,
                    }
                }
            }
        )

    def _get_tool_content_start_event(self, content_name: str, tool_use_id: str) -> str:
        """Generate tool content start event."""
        return json.dumps(
            {
                "event": {
                    "contentStart": {
                        "promptName": self._connection_id,
                        "contentName": content_name,
                        "interactive": False,
                        "type": "TOOL",
                        "role": "TOOL",
                        "toolResultInputConfiguration": {
                            "toolUseId": tool_use_id,
                            "type": "TEXT",
                            "textInputConfiguration": NOVA_TEXT_CONFIG,
                        },
                    }
                }
            }
        )

    def _get_text_input_event(self, content_name: str, text: str) -> str:
        """Generate text input event."""
        return json.dumps(
            {"event": {"textInput": {"promptName": self._connection_id, "contentName": content_name, "content": text}}}
        )

    def _get_tool_result_event(self, content_name: str, result: dict[str, Any]) -> str:
        """Generate tool result event."""
        return json.dumps(
            {
                "event": {
                    "toolResult": {
                        "promptName": self._connection_id,
                        "contentName": content_name,
                        "content": json.dumps(result),
                    }
                }
            }
        )

    def _get_content_end_event(self, content_name: str) -> str:
        """Generate content end event."""
        return json.dumps({"event": {"contentEnd": {"promptName": self._connection_id, "contentName": content_name}}})

    def _get_prompt_end_event(self) -> str:
        """Generate prompt end event."""
        return json.dumps({"event": {"promptEnd": {"promptName": self._connection_id}}})

    def _get_connection_end_event(self) -> str:
        """Generate connection end event."""
        return json.dumps({"event": {"connectionEnd": {}}})

    async def _send_nova_events(self, events: list[str]) -> None:
        """Send event JSON string to Nova Sonic stream.

        A lock is used to send events in sequence when required (e.g., tool result start, content, and end).

        Args:
            events: Jsonified events.
        """
        async with self._send_lock:
            for event in events:
                bytes_data = event.encode("utf-8")
                chunk = InvokeModelWithBidirectionalStreamInputChunk(
                    value=BidirectionalInputPayloadPart(bytes_=bytes_data)
                )
                await self._stream.input_stream.send(chunk)
                logger.debug("nova sonic event sent successfully")

__init__(model_id='amazon.nova-sonic-v1:0', provider_config=None, client_config=None, **kwargs)

Initialize Nova Sonic bidirectional model.

Parameters:

Name Type Description Default
model_id str

Model identifier (default: amazon.nova-sonic-v1:0)

'amazon.nova-sonic-v1:0'
provider_config dict[str, Any] | None

Model behavior (audio, inference settings)

None
client_config dict[str, Any] | None

AWS authentication (boto_session OR region, not both)

None
**kwargs Any

Reserved for future parameters.

{}
Source code in strands/experimental/bidi/models/nova_sonic.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def __init__(
    self,
    model_id: str = "amazon.nova-sonic-v1:0",
    provider_config: dict[str, Any] | None = None,
    client_config: dict[str, Any] | None = None,
    **kwargs: Any,
) -> None:
    """Initialize Nova Sonic bidirectional model.

    Args:
        model_id: Model identifier (default: amazon.nova-sonic-v1:0)
        provider_config: Model behavior (audio, inference settings)
        client_config: AWS authentication (boto_session OR region, not both)
        **kwargs: Reserved for future parameters.
    """
    # Store model ID
    self.model_id = model_id

    # Resolve client config with defaults
    self._client_config = self._resolve_client_config(client_config or {})

    # Resolve provider config with defaults
    self.config = self._resolve_provider_config(provider_config or {})

    # Store session and region for later use
    self._session = self._client_config["boto_session"]
    self.region = self._client_config["region"]

    # Track API-provided identifiers
    self._connection_id: str | None = None
    self._audio_content_name: str | None = None
    self._current_completion_id: str | None = None

    # Indicates if model is done generating transcript
    self._generation_stage: str | None = None

    # Ensure certain events are sent in sequence when required
    self._send_lock = asyncio.Lock()

    logger.debug("model_id=<%s> | nova sonic model initialized", model_id)

receive() async

Receive Nova Sonic events and convert to provider-agnostic format.

Raises:

Type Description
RuntimeError

If start has not been called.

Source code in strands/experimental/bidi/models/nova_sonic.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
async def receive(self) -> AsyncGenerator[BidiOutputEvent, None]:
    """Receive Nova Sonic events and convert to provider-agnostic format.

    Raises:
        RuntimeError: If start has not been called.
    """
    if not self._connection_id:
        raise RuntimeError("model not started | call start before receiving")

    logger.debug("nova event stream starting")
    yield BidiConnectionStartEvent(connection_id=self._connection_id, model=self.model_id)

    _, output = await self._stream.await_output()
    while True:
        try:
            event_data = await output.receive()

        except ValidationException as error:
            if "InternalErrorCode=531" in error.message:
                # nova also times out if user is silent for 175 seconds
                raise BidiModelTimeoutError(error.message) from error
            raise

        except ModelTimeoutException as error:
            raise BidiModelTimeoutError(error.message) from error

        if not event_data:
            continue

        nova_event = json.loads(event_data.value.bytes_.decode("utf-8"))["event"]
        self._log_event_type(nova_event)

        model_event = self._convert_nova_event(nova_event)
        if model_event:
            yield model_event

send(content) async

Unified send method for all content types. Sends the given content to Nova Sonic.

Dispatches to appropriate internal handler based on content type.

Parameters:

Name Type Description Default
content BidiInputEvent | ToolResultEvent

Input event.

required

Raises:

Type Description
ValueError

If content type not supported (e.g., image content).

Source code in strands/experimental/bidi/models/nova_sonic.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
async def send(self, content: BidiInputEvent | ToolResultEvent) -> None:
    """Unified send method for all content types. Sends the given content to Nova Sonic.

    Dispatches to appropriate internal handler based on content type.

    Args:
        content: Input event.

    Raises:
        ValueError: If content type not supported (e.g., image content).
    """
    if not self._connection_id:
        raise RuntimeError("model not started | call start before sending")

    if isinstance(content, BidiTextInputEvent):
        await self._send_text_content(content.text)
    elif isinstance(content, BidiAudioInputEvent):
        await self._send_audio_content(content)
    elif isinstance(content, ToolResultEvent):
        tool_result = content.get("tool_result")
        if tool_result:
            await self._send_tool_result(tool_result)
    else:
        raise ValueError(f"content_type={type(content)} | content not supported")

start(system_prompt=None, tools=None, messages=None, **kwargs) async

Establish bidirectional connection to Nova Sonic.

Parameters:

Name Type Description Default
system_prompt str | None

System instructions for the model.

None
tools list[ToolSpec] | None

List of tools available to the model.

None
messages Messages | None

Conversation history to initialize with.

None
**kwargs Any

Additional configuration options.

{}

Raises:

Type Description
RuntimeError

If user calls start again without first stopping.

Source code in strands/experimental/bidi/models/nova_sonic.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
async def start(
    self,
    system_prompt: str | None = None,
    tools: list[ToolSpec] | None = None,
    messages: Messages | None = None,
    **kwargs: Any,
) -> None:
    """Establish bidirectional connection to Nova Sonic.

    Args:
        system_prompt: System instructions for the model.
        tools: List of tools available to the model.
        messages: Conversation history to initialize with.
        **kwargs: Additional configuration options.

    Raises:
        RuntimeError: If user calls start again without first stopping.
    """
    if self._connection_id:
        raise RuntimeError("model already started | call stop before starting again")

    logger.debug("nova connection starting")

    self._connection_id = str(uuid.uuid4())

    # Get credentials from boto3 session (full credential chain)
    credentials = self._session.get_credentials()

    if not credentials:
        raise ValueError(
            "no AWS credentials found. configure credentials via environment variables, "
            "credential files, IAM roles, or SSO."
        )

    # Use static resolver with credentials configured as properties
    resolver = StaticCredentialsResolver()

    config = Config(
        endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
        region=self.region,
        aws_credentials_identity_resolver=resolver,
        auth_scheme_resolver=HTTPAuthSchemeResolver(),
        auth_schemes={ShapeID("aws.auth#sigv4"): SigV4AuthScheme(service="bedrock")},
        # Configure static credentials as properties
        aws_access_key_id=credentials.access_key,
        aws_secret_access_key=credentials.secret_key,
        aws_session_token=credentials.token,
    )

    self.client = BedrockRuntimeClient(config=config)
    logger.debug("region=<%s> | nova sonic client initialized", self.region)

    client = BedrockRuntimeClient(config=config)
    self._stream = await client.invoke_model_with_bidirectional_stream(
        InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id)
    )
    logger.debug("region=<%s> | nova sonic client initialized", self.region)

    init_events = self._build_initialization_events(system_prompt, tools, messages)
    logger.debug("event_count=<%d> | sending nova sonic initialization events", len(init_events))
    await self._send_nova_events(init_events)

    logger.info("connection_id=<%s> | nova sonic connection established", self._connection_id)

stop() async

Close Nova Sonic connection with proper cleanup sequence.

Source code in strands/experimental/bidi/models/nova_sonic.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
async def stop(self) -> None:
    """Close Nova Sonic connection with proper cleanup sequence."""
    logger.debug("nova connection cleanup starting")

    async def stop_events() -> None:
        if not self._connection_id:
            return

        await self._end_audio_input()
        cleanup_events = [self._get_prompt_end_event(), self._get_connection_end_event()]
        await self._send_nova_events(cleanup_events)

    async def stop_stream() -> None:
        if not hasattr(self, "_stream"):
            return

        await self._stream.close()

    async def stop_connection() -> None:
        self._connection_id = None

    await stop_all(stop_events, stop_stream, stop_connection)

    logger.debug("nova connection closed")

BidiResponseCompleteEvent

Bases: TypedEvent

Model finished generating response.

Parameters:

Name Type Description Default
response_id str

ID of the response that completed (matches response.start).

required
stop_reason StopReason

Why the response ended.

required
Source code in strands/experimental/bidi/types/events.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
class BidiResponseCompleteEvent(TypedEvent):
    """Model finished generating response.

    Parameters:
        response_id: ID of the response that completed (matches response.start).
        stop_reason: Why the response ended.
    """

    def __init__(
        self,
        response_id: str,
        stop_reason: StopReason,
    ):
        """Initialize response complete event."""
        super().__init__(
            {
                "type": "bidi_response_complete",
                "response_id": response_id,
                "stop_reason": stop_reason,
            }
        )

    @property
    def response_id(self) -> str:
        """Unique identifier for this response."""
        return cast(str, self["response_id"])

    @property
    def stop_reason(self) -> StopReason:
        """Why the response ended."""
        return cast(StopReason, self["stop_reason"])

response_id property

Unique identifier for this response.

stop_reason property

Why the response ended.

__init__(response_id, stop_reason)

Initialize response complete event.

Source code in strands/experimental/bidi/types/events.py
393
394
395
396
397
398
399
400
401
402
403
404
405
def __init__(
    self,
    response_id: str,
    stop_reason: StopReason,
):
    """Initialize response complete event."""
    super().__init__(
        {
            "type": "bidi_response_complete",
            "response_id": response_id,
            "stop_reason": stop_reason,
        }
    )

BidiResponseStartEvent

Bases: TypedEvent

Model starts generating a response.

Parameters:

Name Type Description Default
response_id str

Unique identifier for this response (used in response.complete).

required
Source code in strands/experimental/bidi/types/events.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
class BidiResponseStartEvent(TypedEvent):
    """Model starts generating a response.

    Parameters:
        response_id: Unique identifier for this response (used in response.complete).
    """

    def __init__(self, response_id: str):
        """Initialize response start event."""
        super().__init__({"type": "bidi_response_start", "response_id": response_id})

    @property
    def response_id(self) -> str:
        """Unique identifier for this response."""
        return cast(str, self["response_id"])

response_id property

Unique identifier for this response.

__init__(response_id)

Initialize response start event.

Source code in strands/experimental/bidi/types/events.py
244
245
246
def __init__(self, response_id: str):
    """Initialize response start event."""
    super().__init__({"type": "bidi_response_start", "response_id": response_id})

BidiTextInputEvent

Bases: TypedEvent

Text input event for sending text to the model.

Used for sending text content through the send() method.

Parameters:

Name Type Description Default
text str

The text content to send to the model.

required
role Role

The role of the message sender (default: "user").

'user'
Source code in strands/experimental/bidi/types/events.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class BidiTextInputEvent(TypedEvent):
    """Text input event for sending text to the model.

    Used for sending text content through the send() method.

    Parameters:
        text: The text content to send to the model.
        role: The role of the message sender (default: "user").
    """

    def __init__(self, text: str, role: Role = "user"):
        """Initialize text input event."""
        super().__init__(
            {
                "type": "bidi_text_input",
                "text": text,
                "role": role,
            }
        )

    @property
    def text(self) -> str:
        """The text content to send to the model."""
        return cast(str, self["text"])

    @property
    def role(self) -> Role:
        """The role of the message sender."""
        return cast(Role, self["role"])

role property

The role of the message sender.

text property

The text content to send to the model.

__init__(text, role='user')

Initialize text input event.

Source code in strands/experimental/bidi/types/events.py
74
75
76
77
78
79
80
81
82
def __init__(self, text: str, role: Role = "user"):
    """Initialize text input event."""
    super().__init__(
        {
            "type": "bidi_text_input",
            "text": text,
            "role": role,
        }
    )

BidiTranscriptStreamEvent

Bases: ModelStreamEvent

Audio transcription streaming (user or assistant speech).

Supports incremental transcript updates for providers that send partial transcripts before the final version.

Parameters:

Name Type Description Default
delta ContentBlockDelta

The incremental transcript change (ContentBlockDelta).

required
text str

The delta text (same as delta content for convenience).

required
role Role

Who is speaking ("user" or "assistant").

required
is_final bool

Whether this is the final/complete transcript.

required
current_transcript str | None

The accumulated transcript text so far (None for first delta).

None
Source code in strands/experimental/bidi/types/events.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
class BidiTranscriptStreamEvent(ModelStreamEvent):
    """Audio transcription streaming (user or assistant speech).

    Supports incremental transcript updates for providers that send partial
    transcripts before the final version.

    Parameters:
        delta: The incremental transcript change (ContentBlockDelta).
        text: The delta text (same as delta content for convenience).
        role: Who is speaking ("user" or "assistant").
        is_final: Whether this is the final/complete transcript.
        current_transcript: The accumulated transcript text so far (None for first delta).
    """

    def __init__(
        self,
        delta: ContentBlockDelta,
        text: str,
        role: Role,
        is_final: bool,
        current_transcript: str | None = None,
    ):
        """Initialize transcript stream event."""
        super().__init__(
            {
                "type": "bidi_transcript_stream",
                "delta": delta,
                "text": text,
                "role": role,
                "is_final": is_final,
                "current_transcript": current_transcript,
            }
        )

    @property
    def delta(self) -> ContentBlockDelta:
        """The incremental transcript change."""
        return cast(ContentBlockDelta, self["delta"])

    @property
    def text(self) -> str:
        """The text content to send to the model."""
        return cast(str, self["text"])

    @property
    def role(self) -> Role:
        """The role of the message sender."""
        return cast(Role, self["role"])

    @property
    def is_final(self) -> bool:
        """Whether this is the final/complete transcript."""
        return cast(bool, self["is_final"])

    @property
    def current_transcript(self) -> str | None:
        """The accumulated transcript text so far."""
        return cast(str | None, self.get("current_transcript"))

current_transcript property

The accumulated transcript text so far.

delta property

The incremental transcript change.

is_final property

Whether this is the final/complete transcript.

role property

The role of the message sender.

text property

The text content to send to the model.

__init__(delta, text, role, is_final, current_transcript=None)

Initialize transcript stream event.

Source code in strands/experimental/bidi/types/events.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def __init__(
    self,
    delta: ContentBlockDelta,
    text: str,
    role: Role,
    is_final: bool,
    current_transcript: str | None = None,
):
    """Initialize transcript stream event."""
    super().__init__(
        {
            "type": "bidi_transcript_stream",
            "delta": delta,
            "text": text,
            "role": role,
            "is_final": is_final,
            "current_transcript": current_transcript,
        }
    )

BidiUsageEvent

Bases: TypedEvent

Token usage event with modality breakdown for bidirectional streaming.

Tracks token consumption across different modalities (audio, text, images) during bidirectional streaming sessions.

Parameters:

Name Type Description Default
input_tokens int

Total tokens used for all input modalities.

required
output_tokens int

Total tokens used for all output modalities.

required
total_tokens int

Sum of input and output tokens.

required
modality_details list[ModalityUsage] | None

Optional list of token usage per modality.

None
cache_read_input_tokens int | None

Optional tokens read from cache.

None
cache_write_input_tokens int | None

Optional tokens written to cache.

None
Source code in strands/experimental/bidi/types/events.py
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
class BidiUsageEvent(TypedEvent):
    """Token usage event with modality breakdown for bidirectional streaming.

    Tracks token consumption across different modalities (audio, text, images)
    during bidirectional streaming sessions.

    Parameters:
        input_tokens: Total tokens used for all input modalities.
        output_tokens: Total tokens used for all output modalities.
        total_tokens: Sum of input and output tokens.
        modality_details: Optional list of token usage per modality.
        cache_read_input_tokens: Optional tokens read from cache.
        cache_write_input_tokens: Optional tokens written to cache.
    """

    def __init__(
        self,
        input_tokens: int,
        output_tokens: int,
        total_tokens: int,
        modality_details: list[ModalityUsage] | None = None,
        cache_read_input_tokens: int | None = None,
        cache_write_input_tokens: int | None = None,
    ):
        """Initialize usage event."""
        data: dict[str, Any] = {
            "type": "bidi_usage",
            "inputTokens": input_tokens,
            "outputTokens": output_tokens,
            "totalTokens": total_tokens,
        }
        if modality_details is not None:
            data["modality_details"] = modality_details
        if cache_read_input_tokens is not None:
            data["cacheReadInputTokens"] = cache_read_input_tokens
        if cache_write_input_tokens is not None:
            data["cacheWriteInputTokens"] = cache_write_input_tokens
        super().__init__(data)

    @property
    def input_tokens(self) -> int:
        """Total tokens used for all input modalities."""
        return cast(int, self["inputTokens"])

    @property
    def output_tokens(self) -> int:
        """Total tokens used for all output modalities."""
        return cast(int, self["outputTokens"])

    @property
    def total_tokens(self) -> int:
        """Sum of input and output tokens."""
        return cast(int, self["totalTokens"])

    @property
    def modality_details(self) -> list[ModalityUsage]:
        """Optional list of token usage per modality."""
        return cast(list[ModalityUsage], self.get("modality_details", []))

    @property
    def cache_read_input_tokens(self) -> int | None:
        """Optional tokens read from cache."""
        return cast(int | None, self.get("cacheReadInputTokens"))

    @property
    def cache_write_input_tokens(self) -> int | None:
        """Optional tokens written to cache."""
        return cast(int | None, self.get("cacheWriteInputTokens"))

cache_read_input_tokens property

Optional tokens read from cache.

cache_write_input_tokens property

Optional tokens written to cache.

input_tokens property

Total tokens used for all input modalities.

modality_details property

Optional list of token usage per modality.

output_tokens property

Total tokens used for all output modalities.

total_tokens property

Sum of input and output tokens.

__init__(input_tokens, output_tokens, total_tokens, modality_details=None, cache_read_input_tokens=None, cache_write_input_tokens=None)

Initialize usage event.

Source code in strands/experimental/bidi/types/events.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def __init__(
    self,
    input_tokens: int,
    output_tokens: int,
    total_tokens: int,
    modality_details: list[ModalityUsage] | None = None,
    cache_read_input_tokens: int | None = None,
    cache_write_input_tokens: int | None = None,
):
    """Initialize usage event."""
    data: dict[str, Any] = {
        "type": "bidi_usage",
        "inputTokens": input_tokens,
        "outputTokens": output_tokens,
        "totalTokens": total_tokens,
    }
    if modality_details is not None:
        data["modality_details"] = modality_details
    if cache_read_input_tokens is not None:
        data["cacheReadInputTokens"] = cache_read_input_tokens
    if cache_write_input_tokens is not None:
        data["cacheWriteInputTokens"] = cache_write_input_tokens
    super().__init__(data)

ToolResult

Bases: TypedDict

Result of a tool execution.

Attributes:

Name Type Description
content list[ToolResultContent]

List of result content returned by the tool.

status ToolResultStatus

The status of the tool execution ("success" or "error").

toolUseId str

The unique identifier of the tool use request that produced this result.

Source code in strands/types/tools.py
87
88
89
90
91
92
93
94
95
96
97
98
class ToolResult(TypedDict):
    """Result of a tool execution.

    Attributes:
        content: List of result content returned by the tool.
        status: The status of the tool execution ("success" or "error").
        toolUseId: The unique identifier of the tool use request that produced this result.
    """

    content: list[ToolResultContent]
    status: ToolResultStatus
    toolUseId: str

ToolResultEvent

Bases: TypedEvent

Event emitted when a tool execution completes.

Source code in strands/types/_events.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
class ToolResultEvent(TypedEvent):
    """Event emitted when a tool execution completes."""

    def __init__(self, tool_result: ToolResult) -> None:
        """Initialize with the completed tool result.

        Args:
            tool_result: Final result from the tool execution
        """
        super().__init__({"type": "tool_result", "tool_result": tool_result})

    @property
    def tool_use_id(self) -> str:
        """The toolUseId associated with this result."""
        return cast(ToolResult, self.get("tool_result"))["toolUseId"]

    @property
    def tool_result(self) -> ToolResult:
        """Final result from the completed tool execution."""
        return cast(ToolResult, self.get("tool_result"))

    @property
    @override
    def is_callback_event(self) -> bool:
        return False

tool_result property

Final result from the completed tool execution.

tool_use_id property

The toolUseId associated with this result.

__init__(tool_result)

Initialize with the completed tool result.

Parameters:

Name Type Description Default
tool_result ToolResult

Final result from the tool execution

required
Source code in strands/types/_events.py
278
279
280
281
282
283
284
def __init__(self, tool_result: ToolResult) -> None:
    """Initialize with the completed tool result.

    Args:
        tool_result: Final result from the tool execution
    """
    super().__init__({"type": "tool_result", "tool_result": tool_result})

ToolSpec

Bases: TypedDict

Specification for a tool that can be used by an agent.

Attributes:

Name Type Description
description str

A human-readable description of what the tool does.

inputSchema JSONSchema

JSON Schema defining the expected input parameters.

name str

The unique name of the tool.

outputSchema NotRequired[JSONSchema]

Optional JSON Schema defining the expected output format. Note: Not all model providers support this field. Providers that don't support it should filter it out before sending to their API.

Source code in strands/types/tools.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ToolSpec(TypedDict):
    """Specification for a tool that can be used by an agent.

    Attributes:
        description: A human-readable description of what the tool does.
        inputSchema: JSON Schema defining the expected input parameters.
        name: The unique name of the tool.
        outputSchema: Optional JSON Schema defining the expected output format.
            Note: Not all model providers support this field. Providers that don't
            support it should filter it out before sending to their API.
    """

    description: str
    inputSchema: JSONSchema
    name: str
    outputSchema: NotRequired[JSONSchema]

ToolUse

Bases: TypedDict

A request from the model to use a specific tool with the provided input.

Attributes:

Name Type Description
input Any

The input parameters for the tool. Can be any JSON-serializable type.

name str

The name of the tool to invoke.

toolUseId str

A unique identifier for this specific tool use request.

Source code in strands/types/tools.py
52
53
54
55
56
57
58
59
60
61
62
63
64
class ToolUse(TypedDict):
    """A request from the model to use a specific tool with the provided input.

    Attributes:
        input: The input parameters for the tool.
            Can be any JSON-serializable type.
        name: The name of the tool to invoke.
        toolUseId: A unique identifier for this specific tool use request.
    """

    input: Any
    name: str
    toolUseId: str

ToolUseStreamEvent

Bases: ModelStreamEvent

Event emitted during tool use input streaming.

Source code in strands/types/_events.py
143
144
145
146
147
148
class ToolUseStreamEvent(ModelStreamEvent):
    """Event emitted during tool use input streaming."""

    def __init__(self, delta: ContentBlockDelta, current_tool_use: dict[str, Any]) -> None:
        """Initialize with delta and current tool use state."""
        super().__init__({"type": "tool_use_stream", "delta": delta, "current_tool_use": current_tool_use})

__init__(delta, current_tool_use)

Initialize with delta and current tool use state.

Source code in strands/types/_events.py
146
147
148
def __init__(self, delta: ContentBlockDelta, current_tool_use: dict[str, Any]) -> None:
    """Initialize with delta and current tool use state."""
    super().__init__({"type": "tool_use_stream", "delta": delta, "current_tool_use": current_tool_use})

stop_all(*funcs) async

Call all stops in sequence and aggregate errors.

A failure in one stop call will not block subsequent stop calls.

Parameters:

Name Type Description Default
funcs Callable[..., Awaitable[None]]

Stop functions to call in sequence.

()

Raises:

Type Description
RuntimeError

If any stop function raises an exception.

Source code in strands/experimental/bidi/_async/__init__.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def stop_all(*funcs: Callable[..., Awaitable[None]]) -> None:
    """Call all stops in sequence and aggregate errors.

    A failure in one stop call will not block subsequent stop calls.

    Args:
        funcs: Stop functions to call in sequence.

    Raises:
        RuntimeError: If any stop function raises an exception.
    """
    exceptions = []
    for func in funcs:
        try:
            await func()
        except Exception as exception:
            exceptions.append({"func_name": func.__name__, "exception": repr(exception)})

    if exceptions:
        raise RuntimeError(f"exceptions={exceptions} | failed stop sequence")