Async Iterators for Streaming¶
Async iterators provide asynchronous streaming of agent events, allowing you to process events as they occur in real-time. This approach is ideal for asynchronous frameworks where you need fine-grained control over async execution flow.
For a complete list of available events including text generation, tool usage, lifecycle, and reasoning events, see the streaming overview.
Basic Usage¶
Python uses the stream_async, which is a streaming counterpart to the invoke_async method, for asynchronous streaming. This is ideal for frameworks like FastAPI, aiohttp, or Django Channels.
Note: Python also supports synchronous event handling via callback handlers.
import asyncio
from strands import Agent
from strands_tools import calculator
# Initialize our agent without a callback handler
agent = Agent(
tools=[calculator],
callback_handler=None
)
# Async function that iterators over streamed agent events
async def process_streaming_response():
agent_stream = agent.stream_async("Calculate 2+2")
async for event in agent_stream:
print(event)
# Run the agent
asyncio.run(process_streaming_response())
TypeScript uses the stream method for streaming, which is async by default. This is ideal for frameworks like Express.js or NestJS.
// Initialize our agent without a printer
const agent = new Agent({
tools: [notebook],
printer: false,
})
// Async function that iterates over streamed agent events
async function processStreamingResponse(): Promise<void> {
for await (const event of agent.stream('Record that my favorite color is blue!')) {
console.log(event)
}
}
// Run the agent
await processStreamingResponse()
Server examples¶
Here's how to integrate streaming with web frameworks to create a streaming endpoint:
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from strands import Agent
from strands_tools import calculator, http_request
app = FastAPI()
class PromptRequest(BaseModel):
prompt: str
@app.post("/stream")
async def stream_response(request: PromptRequest):
async def generate():
agent = Agent(
tools=[calculator, http_request],
callback_handler=None
)
try:
async for event in agent.stream_async(request.prompt):
if "data" in event:
# Only stream text chunks to the client
yield event["data"]
except Exception as e:
yield f"Error: {str(e)}"
return StreamingResponse(
generate(),
media_type="text/plain"
)
Note: This is a conceptual example. Install Express.js with
npm install express @types/expressto use it in your project.
// Install Express: npm install express @types/express
interface PromptRequest {
prompt: string
}
async function handleStreamRequest(req: any, res: any) {
console.log(`Got Request: ${JSON.stringify(req.body)}`)
const { prompt } = req.body as PromptRequest
const agent = new Agent({
tools: [notebook],
printer: false,
})
for await (const event of agent.stream(prompt)) {
res.write(`${JSON.stringify(event)}\n`)
}
res.end()
}
const app = express()
app.use(express.json())
app.post('/stream', handleStreamRequest)
app.listen(3000)
You can then curl your local server with:
curl localhost:3000/stream -d '{"prompt": "Hello"}' -H "Content-Type: application/json"
Agentic Loop¶
This async stream processor illustrates the event loop lifecycle events and how they relate to each other. It's useful for understanding the flow of execution in the Strands agent:
from strands import Agent
from strands_tools import calculator
# Create agent with event loop tracker
agent = Agent(
tools=[calculator],
callback_handler=None
)
# This will show the full event lifecycle in the console
async for event in agent.stream_async("What is the capital of France and what is 42+7?"):
# Track event loop lifecycle
if event.get("init_event_loop", False):
print("š Event loop initialized")
elif event.get("start_event_loop", False):
print("ā¶ļø Event loop cycle starting")
elif "message" in event:
print(f"š¬ New message created: {event['message']['role']}")
elif event.get("complete", False):
print("ā
Cycle completed")
elif event.get("force_stop", False):
print(f"š Event loop force-stopped: {event.get('force_stop_reason', 'unknown reason')}")
# Track tool usage
if "current_tool_use" in event and event["current_tool_use"].get("name"):
tool_name = event["current_tool_use"]["name"]
print(f"š§ Using tool: {tool_name}")
# Show only a snippet of text to keep output clean
if "data" in event:
# Only show first 20 chars of each chunk for demo purposes
data_snippet = event["data"][:20] + ("..." if len(event["data"]) > 20 else "")
print(f"š Text: {data_snippet}")
The output will show the sequence of events:
- First the event loop initializes (
init_event_loop) - Then the cycle begins (
start_event_loop) - New cycles may start multiple times during execution (
start_event_loop) - Text generation and tool usage events occur during the cycle
- Finally, the cycle completes (
complete) or may be force-stopped (force_stop)
function processEvent(event: AgentStreamEvent): void {
// Track agent loop lifecycle
switch (event.type) {
case 'beforeInvocationEvent':
console.log('š Agent loop initialized')
break
case 'beforeModelCallEvent':
console.log('ā¶ļø Agent loop cycle starting')
break
case 'afterModelCallEvent':
console.log(`š¬ New message created: ${event.stopData?.message.role}`)
break
case 'beforeToolsEvent':
console.log("About to execute tool!")
break
case 'beforeToolsEvent':
console.log("Finished execute tool!")
break
case 'afterInvocationEvent':
console.log('ā
Agent loop completed')
break
}
// Track tool usage
if (event.type === 'modelContentBlockStartEvent' && event.start?.type === 'toolUseStart') {
console.log(`\nš§ Using tool: ${event.start.name}`)
}
// Show text snippets
if (event.type === 'modelContentBlockDeltaEvent' && event.delta.type === 'textDelta') {
process.stdout.write(event.delta.text)
}
}
const responseGenerator = agent.stream(
'What is the capital of France and what is 42+7? Record in the notebook.'
)
for await (const event of responseGenerator) {
processEvent(event)
}
The output will show the sequence of events:
- First the invocation starts (
beforeInvocationEvent) - Then the model is called (
beforeModelEvent) - The model generates content with delta events (
modelContentBlockDeltaEvent) - Tools may be executed (
beforeToolsEvent,afterToolsEvent) - The model may be called again in subsequent cycles
- Finally, the invocation completes (
afterInvocationEvent)