Streaming Events¶
Strands Agents SDK provides real-time streaming capabilities that allow you to monitor and process events as they occur during agent execution. This enables responsive user interfaces, real-time monitoring, and custom output formatting.
Strands has multiple approaches for handling streaming events:
- Async Iterators: Ideal for asynchronous server frameworks
- Callback Handlers (Python only): Perfect for synchronous applications and custom event processing
Both methods receive the same event types but differ in their execution model and use cases.
Event Types¶
All streaming methods yield the same set of events:
Lifecycle Events¶
init_event_loop: True at the start of agent invocation initializingstart_event_loop: True when the event loop is startingmessage: Present when a new message is createdevent: Raw event from the model streamforce_stop: True if the event loop was forced to stopforce_stop_reason: Reason for forced stop
result: The finalAgentResult
Each event emitted from the typescript agent is a class with a type attribute that has a unique value. When determining an event, you can use instanceof on the class, or an equality check on the event.type value.
BeforeInvocationEvent: Start of agent loop (before any iterations)AfterInvocationEvent: End of agent loop (after all iterations complete)error?: Optional error if loop terminated due to exception
BeforeModelEvent: Before model invocationmessages: Array of messages being sent to model
AfterModelEvent: After model invocationmessage: Assistant message returned by modelstopReason: Why generation stopped
BeforeToolsEvent: Before tools executionmessage: Assistant message containing tool use blocks
AfterToolsEvent: After tools executionmessage: User message containing tool results
Model Stream Events¶
data: Text chunk from the model's outputdelta: Raw delta content from the modelreasoning: True for reasoning eventsreasoningText: Text from reasoning processreasoning_signature: Signature from reasoning processredactedContent: Reasoning content redacted by the model
ModelMessageStartEvent: Start of a message from the modelModelContentBlockStartEvent: Start of a content block from a model for text, toolUse, reasoning, etc.ModelContentBlockDeltaEvent: Content deltas for text, tool input, or reasoningModelContentBlockStopEvent: End of a content blockModelMessageStopEvent: End of a messageModelMetadataEvent: Usage and metrics metadata
Tool Events¶
current_tool_use: Information about the current tool being used, including:toolUseId: Unique ID for this tool usename: Name of the toolinput: Tool input parameters (accumulated as streaming occurs)
tool_stream_event: Information about an event streamed from a tool, including:tool_use: TheToolUsefor the tool that streamed the eventdata: The data streamed from the tool
BeforeToolsEvent: Information about the current tool being used, including:message: The assistant message containing tool use blocks
ToolStreamEvent: Information about an event streamed from a tool, including:data: The data streamed from the tool
Multi-Agent Events¶
Multi-agent systems (Graph and Swarm) emit additional coordination events:
multiagent_node_start: When a node begins executiontype:"multiagent_node_start"node_id: Unique identifier for the nodenode_type: Type of node ("agent","swarm","graph")
multiagent_node_stream: Forwarded events from agents/multi-agents with node contexttype:"multiagent_node_stream"node_id: Identifier of the node generating the eventevent: The original agent event (nested)
multiagent_node_stop: When a node completes executiontype:"multiagent_node_stop"node_id: Unique identifier for the nodenode_result: Complete NodeResult with execution details, metrics, and status
multiagent_handoff: When control is handed off between agents (Swarm) or batch transitions (Graph)type:"multiagent_handoff"from_node_ids: List of node IDs completing executionto_node_ids: List of node IDs beginning executionmessage: Optional handoff message (typically used in Swarm)
multiagent_result: Final multi-agent resulttype:"multiagent_result"result: The final GraphResult or SwarmResult
See Graph streaming and Swarm streaming for usage examples.
Coming soon to Typescript!
Quick Examples¶
Async Iterator Pattern
async for event in agent.stream_async("Calculate 2+2"):
if "data" in event:
print(event["data"], end="")
Callback Handler Pattern
def handle_events(**kwargs):
if "data" in kwargs:
print(kwargs["data"], end="")
agent = Agent(callback_handler=handle_events)
agent("Calculate 2+2")
Async Iterator Pattern
const agent = new Agent({ tools: [notebook] })
for await (const event of agent.stream('Calculate 2+2')) {
if (event.type === 'modelContentBlockDeltaEvent' && event.delta.type === 'textDelta') {
// Print out the model text delta event data
process.stdout.write(event.delta.text)
}
}
console.log("\nDone!")
Identifying Events Emitted from Agent¶
This example demonstrates how to identify event emitted from an agent:
from strands import Agent
from strands_tools import calculator
def process_event(event):
"""Shared event processor for both async iterators and callback handlers"""
# Track event loop lifecycle
if event.get("init_event_loop", False):
print("š Event loop initialized")
elif event.get("start_event_loop", False):
print("ā¶ļø Event loop cycle starting")
elif "message" in event:
print(f"š¬ New message created: {event['message']['role']}")
elif event.get("complete", False):
print("ā
Cycle completed")
elif event.get("force_stop", False):
print(f"š Event loop force-stopped: {event.get('force_stop_reason', 'unknown reason')}")
# Track tool usage
if "current_tool_use" in event and event["current_tool_use"].get("name"):
tool_name = event["current_tool_use"]["name"]
print(f"š§ Using tool: {tool_name}")
# Show text snippets
if "data" in event:
data_snippet = event["data"][:20] + ("..." if len(event["data"]) > 20 else "")
print(f"š Text: {data_snippet}")
agent = Agent(tools=[calculator], callback_handler=None)
async for event in agent.stream_async("What is the capital of France and what is 42+7?"):
process_event(event)
function processEvent(event: AgentStreamEvent): void {
// Track agent loop lifecycle
switch (event.type) {
case 'beforeInvocationEvent':
console.log('š Agent loop initialized')
break
case 'beforeModelCallEvent':
console.log('ā¶ļø Agent loop cycle starting')
break
case 'afterModelCallEvent':
console.log(`š¬ New message created: ${event.stopData?.message.role}`)
break
case 'beforeToolsEvent':
console.log("About to execute tool!")
break
case 'beforeToolsEvent':
console.log("Finished execute tool!")
break
case 'afterInvocationEvent':
console.log('ā
Agent loop completed')
break
}
// Track tool usage
if (event.type === 'modelContentBlockStartEvent' && event.start?.type === 'toolUseStart') {
console.log(`\nš§ Using tool: ${event.start.name}`)
}
// Show text snippets
if (event.type === 'modelContentBlockDeltaEvent' && event.delta.type === 'textDelta') {
process.stdout.write(event.delta.text)
}
}
const responseGenerator = agent.stream(
'What is the capital of France and what is 42+7? Record in the notebook.'
)
for await (const event of responseGenerator) {
processEvent(event)
}
Sub-Agent Streaming Example¶
Utilizing both agents as a tool and tool streaming, this example shows how to stream events from sub-agents:
from typing import AsyncIterator
from dataclasses import dataclass
from strands import Agent, tool
from strands_tools import calculator
@dataclass
class SubAgentResult:
agent: Agent
event: dict
@tool
async def math_agent(query: str) -> AsyncIterator:
"""Solve math problems using the calculator tool."""
agent = Agent(
name="Math Expert",
system_prompt="You are a math expert. Use the calculator tool for calculations.",
callback_handler=None,
tools=[calculator]
)
result = None
async for event in agent.stream_async(query):
yield SubAgentResult(agent=agent, event=event)
if "result" in event:
result = event["result"]
yield str(result)
def process_sub_agent_events(event):
"""Shared processor for sub-agent streaming events"""
tool_stream = event.get("tool_stream_event", {}).get("data")
if isinstance(tool_stream, SubAgentResult):
current_tool = tool_stream.event.get("current_tool_use", {})
tool_name = current_tool.get("name")
if tool_name:
print(f"Agent '{tool_stream.agent.name}' using tool '{tool_name}'")
# Also show regular text output
if "data" in event:
print(event["data"], end="")
# Using with async iterators
orchestrator_async_iterator = Agent(
system_prompt="Route math questions to the math_agent tool.",
callback_handler=None,
tools=[math_agent]
)
# With async-iterator
async for event in orchestrator_async_iterator.stream_async("What is 3+3?"):
process_sub_agent_events(event)
# With callback handler
def handle_events(**kwargs):
process_sub_agent_events(kwargs)
orchestrator_callback = Agent(
system_prompt="Route math questions to the math_agent tool.",
callback_handler=handle_events,
tools=[math_agent]
)
orchestrator_callback("What is 3+3?")
// Create the math agent
const mathAgent = new Agent({
systemPrompt: 'You are a math expert. Answer a math problem in one sentence',
printer: false,
})
const calculator = tool({
name: 'mathAgent',
description: 'Agent that calculates the answer to a math problem input.',
inputSchema: z.object({input: z.string()}),
callback: async function* (input): AsyncGenerator<string, string, unknown> {
// Stream from the sub-agent
const generator = mathAgent.stream(input.input)
let result = await generator.next()
while (!result.done) {
// Process events from the sub-agent
if (result.value.type === 'modelContentBlockDeltaEvent' && result.value.delta.type === 'textDelta') {
yield result.value.delta.text
}
result = await generator.next()
}
return result.value.lastMessage.content[0]!.type === "textBlock"
? result.value.lastMessage.content[0]!.text
: result.value.lastMessage.content[0]!.toString()
}
})
const agent = new Agent({tools: [calculator]})
for await (const event of agent.stream("What is 2 * 3? Use your tool.")) {
if (event.type === "toolStreamEvent") {
console.log(`Tool Event: ${JSON.stringify(event.data)}`)
}
}
console.log("\nDone!")
Next Steps¶
- Learn about Async Iterators for asynchronous streaming
- Explore Callback Handlers for synchronous event processing
- See the Agent API Reference for complete method documentation