Events [Experimental]¶
Experimental Feature
This feature is experimental and may change in future versions. Use with caution in production environments.
Bidirectional streaming events enable real-time monitoring and processing of audio, text, and tool execution during persistent conversations. Unlike standard streaming which uses async iterators or callbacks, bidirectional streaming uses send() and receive() methods for explicit control over the conversation flow.
Event Model¶
Bidirectional streaming uses a different event model than standard streaming:
Standard Streaming:
- Uses
stream_async()or callback handlers - Request-response pattern (one invocation per call)
- Events flow in one direction (model → application)
Bidirectional Streaming:
- Uses
send()andreceive()methods - Persistent connection (multiple turns per connection)
- Events flow in both directions (application ↔ model)
- Supports real-time audio and interruptions
import asyncio
from strands.experimental.bidi import BidiAgent
from strands.experimental.bidi.models import BidiNovaSonicModel
async def main():
model = BidiNovaSonicModel()
async with BidiAgent(model=model) as agent:
# Send input to model
await agent.send("What is 2+2?")
# Receive events from model
async for event in agent.receive():
print(f"Event: {event['type']}")
asyncio.run(main())
Input Event Types¶
Events sent to the model via agent.send().
BidiTextInputEvent¶
Send text input to the model.
await agent.send("What is the weather?")
# Or explicitly:
from strands.experimental.bidi.types.events import BidiTextInputEvent
await agent.send(BidiTextInputEvent(text="What is the weather?", role="user"))
BidiAudioInputEvent¶
Send audio input to the model. Audio must be base64-encoded.
import base64
from strands.experimental.bidi.types.events import BidiAudioInputEvent
audio_bytes = record_audio() # Your audio capture logic
audio_base64 = base64.b64encode(audio_bytes).decode('utf-8')
await agent.send(BidiAudioInputEvent(
audio=audio_base64,
format="pcm",
sample_rate=16000,
channels=1
))
BidiImageInputEvent¶
Send image input to the model. Images must be base64-encoded.
import base64
from strands.experimental.bidi.types.events import BidiImageInputEvent
with open("image.jpg", "rb") as f:
image_bytes = f.read()
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
await agent.send(BidiImageInputEvent(
image=image_base64,
mime_type="image/jpeg"
))
Output Event Types¶
Events received from the model via agent.receive().
Connection Lifecycle Events¶
Events that track the connection state throughout the conversation.
BidiConnectionStartEvent¶
Emitted when the streaming connection is established and ready for interaction.
{
"type": "bidi_connection_start",
"connection_id": "conn_abc123",
"model": "amazon.nova-sonic-v1:0"
}
Properties:
connection_id: Unique identifier for this streaming connectionmodel: Model identifier (e.g., "amazon.nova-sonic-v1:0", "gemini-2.0-flash-live")
BidiConnectionRestartEvent¶
Emitted when the agent is restarting the model connection after a timeout. The conversation history is preserved and the connection resumes automatically.
{
"type": "bidi_connection_restart",
"timeout_error": BidiModelTimeoutError(...)
}
Properties:
timeout_error: The timeout error that triggered the restart
Usage:
async for event in agent.receive():
if event["type"] == "bidi_connection_restart":
print("Connection restarting, please wait...")
# Connection resumes automatically with full history
See Connection Lifecycle for more details on timeout handling.
BidiConnectionCloseEvent¶
Emitted when the streaming connection is closed.
{
"type": "bidi_connection_close",
"connection_id": "conn_abc123",
"reason": "user_request"
}
Properties:
connection_id: Unique identifier for this streaming connectionreason: Why the connection closed"client_disconnect": Client disconnected"timeout": Connection timed out"error": Error occurred"complete": Conversation completed normally"user_request": User requested closure (viastop_conversationtool)
Response Lifecycle Events¶
Events that track individual model responses within the conversation.
BidiResponseStartEvent¶
Emitted when the model begins generating a response.
{
"type": "bidi_response_start",
"response_id": "resp_xyz789"
}
Properties:
response_id: Unique identifier for this response (matchesBidiResponseCompleteEvent)
BidiResponseCompleteEvent¶
Emitted when the model finishes generating a response.
{
"type": "bidi_response_complete",
"response_id": "resp_xyz789",
"stop_reason": "complete"
}
Properties:
response_id: Unique identifier for this responsestop_reason: Why the response ended"complete": Model completed its response"interrupted": User interrupted the response"tool_use": Model is requesting tool execution"error": Error occurred during generation
Audio Events¶
Events for streaming audio input and output.
BidiAudioStreamEvent¶
Emitted when the model generates audio output. Audio is base64-encoded for JSON compatibility.
{
"type": "bidi_audio_stream",
"audio": "base64_encoded_audio_data...",
"format": "pcm",
"sample_rate": 16000,
"channels": 1
}
Properties:
audio: Base64-encoded audio stringformat: Audio encoding format ("pcm","wav","opus","mp3")sample_rate: Sample rate in Hz (16000,24000,48000)channels: Number of audio channels (1= mono,2= stereo)
Usage:
import base64
async for event in agent.receive():
if event["type"] == "bidi_audio_stream":
# Decode and play audio
audio_bytes = base64.b64decode(event["audio"])
play_audio(audio_bytes, sample_rate=event["sample_rate"])
Transcript Events¶
Events for speech-to-text transcription of both user and assistant speech.
BidiTranscriptStreamEvent¶
Emitted when speech is transcribed. Supports incremental updates for providers that send partial transcripts.
{
"type": "bidi_transcript_stream",
"delta": {"text": "Hello"},
"text": "Hello",
"role": "assistant",
"is_final": True,
"current_transcript": "Hello world"
}
Properties:
delta: The incremental transcript changetext: The delta text (same as delta content)role: Who is speaking ("user"or"assistant")is_final: Whether this is the final/complete transcriptcurrent_transcript: The accumulated transcript text so far (None for first delta)
Usage:
async for event in agent.receive():
if event["type"] == "bidi_transcript_stream":
role = event["role"]
text = event["text"]
is_final = event["is_final"]
if is_final:
print(f"{role}: {text}")
else:
print(f"{role} (preview): {text}")
Interruption Events¶
Events for handling user interruptions during model responses.
BidiInterruptionEvent¶
Emitted when the model's response is interrupted, typically by user speech detected via voice activity detection.
{
"type": "bidi_interruption",
"reason": "user_speech"
}
Properties:
reason: Why the interruption occurred"user_speech": User started speaking (most common)"error": Error caused interruption
Usage:
async for event in agent.receive():
if event["type"] == "bidi_interruption":
print(f"Interrupted by {event['reason']}")
# Audio output automatically cleared
# Model ready for new input
BidiInterruptionEvent vs Human-in-the-Loop Interrupts
BidiInterruptionEvent is different from human-in-the-loop (HIL) interrupts. BidiInterruptionEvent is emitted when the model detects user speech during audio conversations and automatically stops generating the current response. HIL interrupts pause agent execution to request human approval or input before continuing, typically used for tool execution approval. BidiInterruptionEvent is automatic and audio-specific, while HIL interrupts are programmatic and require explicit handling.
See Interruptions for more details on interruption handling.
Tool Events¶
Events for tool execution during conversations. Bidirectional streaming reuses the standard ToolUseStreamEvent from Strands.
ToolUseStreamEvent¶
Emitted when the model requests tool execution. See Tools Overview for details.
{
"type": "tool_use_stream",
"current_tool_use": {
"toolUseId": "tool_123",
"name": "calculator",
"input": {"expression": "2+2"}
}
}
Properties:
current_tool_use: Information about the tool being usedtoolUseId: Unique ID for this tool usename: Name of the toolinput: Tool input parameters
Tools execute automatically in the background and results are sent back to the model without blocking the conversation.
Usage Events¶
Events for tracking token consumption across different modalities.
BidiUsageEvent¶
Emitted periodically to report token usage with modality breakdown.
{
"type": "bidi_usage",
"inputTokens": 150,
"outputTokens": 75,
"totalTokens": 225,
"modality_details": [
{"modality": "text", "input_tokens": 100, "output_tokens": 50},
{"modality": "audio", "input_tokens": 50, "output_tokens": 25}
]
}
Properties:
inputTokens: Total tokens used for all input modalitiesoutputTokens: Total tokens used for all output modalitiestotalTokens: Sum of input and output tokensmodality_details: Optional list of token usage per modalitycacheReadInputTokens: Optional tokens read from cachecacheWriteInputTokens: Optional tokens written to cache
Error Events¶
Events for error handling during conversations.
BidiErrorEvent¶
Emitted when an error occurs during the session.
{
"type": "bidi_error",
"message": "Connection failed",
"code": "ConnectionError",
"details": {"retry_after": 5}
}
Properties:
message: Human-readable error messagecode: Error code (exception class name)details: Optional additional error contexterror: The original exception (accessible via property, not in JSON)
Usage:
async for event in agent.receive():
if event["type"] == "bidi_error":
print(f"Error: {event['message']}")
# Access original exception if needed
if hasattr(event, 'error'):
raise event.error
Event Flow Examples¶
Basic Audio Conversation¶
import asyncio
from strands.experimental.bidi import BidiAgent, BidiAudioIO
from strands.experimental.bidi.models import BidiNovaSonicModel
async def main():
model = BidiNovaSonicModel()
agent = BidiAgent(model=model)
audio_io = BidiAudioIO()
await agent.start()
# Process events from audio conversation
async for event in agent.receive():
if event["type"] == "bidi_connection_start":
print(f"🔗 Connected to {event['model']}")
elif event["type"] == "bidi_response_start":
print(f"▶️ Response starting: {event['response_id']}")
elif event["type"] == "bidi_audio_stream":
print(f"🔊 Audio chunk: {len(event['audio'])} bytes")
elif event["type"] == "bidi_transcript_stream":
if event["is_final"]:
print(f"{event['role']}: {event['text']}")
elif event["type"] == "bidi_response_complete":
print(f"✅ Response complete: {event['stop_reason']}")
await agent.stop()
asyncio.run(main())
Tracking Transcript State¶
import asyncio
from strands.experimental.bidi import BidiAgent
from strands.experimental.bidi.models import BidiNovaSonicModel
async def main():
model = BidiNovaSonicModel()
async with BidiAgent(model=model) as agent:
await agent.send("Tell me about Python")
# Track incremental transcript updates
current_speaker = None
current_text = ""
async for event in agent.receive():
if event["type"] == "bidi_transcript_stream":
role = event["role"]
if role != current_speaker:
if current_text:
print(f"\n{current_speaker}: {current_text}")
current_speaker = role
current_text = ""
current_text = event.get("current_transcript", event["text"])
if event["is_final"]:
print(f"\n{role}: {current_text}")
current_text = ""
asyncio.run(main())
Tool Execution During Conversation¶
import asyncio
from strands.experimental.bidi import BidiAgent
from strands.experimental.bidi.models import BidiNovaSonicModel
from strands_tools import calculator
async def main():
model = BidiNovaSonicModel()
agent = BidiAgent(model=model, tools=[calculator])
async with agent as agent:
await agent.send("What is 25 times 48?")
async for event in agent.receive():
event_type = event["type"]
if event_type == "bidi_transcript_stream" and event["is_final"]:
print(f"{event['role']}: {event['text']}")
elif event_type == "tool_use_stream":
tool_use = event["current_tool_use"]
print(f"🔧 Using tool: {tool_use['name']}")
print(f" Input: {tool_use['input']}")
elif event_type == "bidi_response_complete":
if event["stop_reason"] == "tool_use":
print(" Tool executing in background...")
asyncio.run(main())
Handling Interruptions¶
import asyncio
from strands.experimental.bidi import BidiAgent
from strands.experimental.bidi.models import BidiNovaSonicModel
async def main():
model = BidiNovaSonicModel()
async with BidiAgent(model=model) as agent:
await agent.send("Tell me a long story about space exploration")
interruption_count = 0
async for event in agent.receive():
if event["type"] == "bidi_transcript_stream" and event["is_final"]:
print(f"{event['role']}: {event['text']}")
elif event["type"] == "bidi_interruption":
interruption_count += 1
print(f"\n⚠️ Interrupted (#{interruption_count})")
elif event["type"] == "bidi_response_complete":
if event["stop_reason"] == "interrupted":
print(f"Response interrupted {interruption_count} times")
asyncio.run(main())
Connection Restart Handling¶
import asyncio
from strands.experimental.bidi import BidiAgent
from strands.experimental.bidi.models import BidiNovaSonicModel
async def main():
model = BidiNovaSonicModel() # 8-minute timeout
async with BidiAgent(model=model) as agent:
# Continuous conversation that handles restarts
async for event in agent.receive():
if event["type"] == "bidi_connection_restart":
print("⚠️ Connection restarting (timeout)...")
print(" Conversation history preserved")
# Connection resumes automatically
elif event["type"] == "bidi_connection_start":
print(f"✅ Connected to {event['model']}")
elif event["type"] == "bidi_transcript_stream" and event["is_final"]:
print(f"{event['role']}: {event['text']}")
asyncio.run(main())
Hook Events¶
Hook events are a separate concept from streaming events. While streaming events flow through agent.receive() during conversations, hook events are callbacks that trigger at specific lifecycle points (like initialization, message added, or interruption). Hook events allow you to inject custom logic for cross-cutting concerns like logging, analytics, and session persistence without processing the event stream directly.
For details on hook events and usage patterns, see the Hooks documentation.