Skip to content

Hooks [Experimental]

Experimental Feature

This feature is experimental and may change in future versions. Use with caution in production environments.

Hooks provide a composable extensibility mechanism for extending BidiAgent functionality by subscribing to events throughout the bidirectional streaming lifecycle. The hook system enables both built-in components and user code to react to agent behavior through strongly-typed event callbacks.

Overview

The bidirectional streaming hooks system extends the standard agent hooks with additional events specific to real-time streaming conversations, such as connection lifecycle, interruptions, and connection restarts.

For a comprehensive introduction to the hooks concept and general patterns, see the Hooks documentation. This guide focuses on bidirectional streaming-specific events and use cases.

A Hook Event is a specific event in the lifecycle that callbacks can be associated with. A Hook Callback is a callback function that is invoked when the hook event is emitted.

Hooks enable use cases such as:

  • Monitoring connection state and restarts
  • Tracking interruptions and user behavior
  • Logging conversation history in real-time
  • Implementing custom analytics
  • Managing session persistence

Basic Usage

Hook callbacks are registered against specific event types and receive strongly-typed event objects when those events occur during agent execution.

Creating a Hook Provider

The HookProvider protocol allows a single object to register callbacks for multiple events:

from strands.experimental.bidi import BidiAgent
from strands.experimental.bidi.hooks.events import (
    BidiAgentInitializedEvent,
    BidiBeforeInvocationEvent,
    BidiAfterInvocationEvent,
    BidiMessageAddedEvent
)

class ConversationLogger:
    """Log all conversation events."""

    async def on_agent_initialized(self, event: BidiAgentInitializedEvent):
        print(f"Agent {event.agent.agent_id} initialized")

    async def on_before_invocation(self, event: BidiBeforeInvocationEvent):
        print(f"Starting conversation for agent: {event.agent.name}")

    async def on_message_added(self, event: BidiMessageAddedEvent):
        message = event.message
        role = message['role']
        content = message['content']
        print(f"{role}: {content}")

    async def on_after_invocation(self, event: BidiAfterInvocationEvent):
        print(f"Conversation ended for agent: {event.agent.name}")

# Register the hook provider
agent = BidiAgent(
    model=model,
    hooks=[ConversationLogger()]
)

Registering Individual Callbacks

You can also register individual callbacks:

from strands.experimental.bidi import BidiAgent
from strands.experimental.bidi.hooks.events import BidiMessageAddedEvent

agent = BidiAgent(model=model)

async def log_message(event: BidiMessageAddedEvent):
    print(f"Message added: {event.message}")

agent.hooks.add_callback(BidiMessageAddedEvent, log_message)

Hook Event Lifecycle

The following diagram shows when hook events are emitted during a bidirectional streaming session:

flowchart TB
    subgraph Init["Initialization"]
        A[BidiAgentInitializedEvent]
    end

    subgraph Start["Connection Start"]
        B[BidiBeforeInvocationEvent]
        C[Connection Established]
        B --> C
    end

    subgraph Running["Active Conversation"]
        D[BidiMessageAddedEvent]
        E[BidiInterruptionEvent]
        F[Tool Execution Events]
        D --> E
        E --> F
        F --> D
    end

    subgraph Restart["Connection Restart"]
        G[BidiBeforeConnectionRestartEvent]
        H[Reconnection]
        I[BidiAfterConnectionRestartEvent]
        G --> H
        H --> I
    end

    subgraph End["Connection End"]
        J[BidiAfterInvocationEvent]
    end

    Init --> Start
    Start --> Running
    Running --> Restart
    Restart --> Running
    Running --> End

Available Events

The bidirectional streaming hooks system provides events for different stages of the streaming lifecycle:

Event Description
BidiAgentInitializedEvent Triggered when a BidiAgent has been constructed and finished initialization
BidiBeforeInvocationEvent Triggered when the agent connection starts (before model.start())
BidiAfterInvocationEvent Triggered when the agent connection ends (after model.stop()), regardless of success or failure
BidiMessageAddedEvent Triggered when a message is added to the agent's conversation history
BidiInterruptionEvent Triggered when the model's response is interrupted by user speech
BidiBeforeConnectionRestartEvent Triggered before the model connection is restarted due to timeout
BidiAfterConnectionRestartEvent Triggered after the model connection has been restarted

Cookbook

This section contains practical hook implementations for common use cases.

Tracking Interruptions

Monitor when and why interruptions occur:

from strands.experimental.bidi.hooks.events import BidiInterruptionEvent
import time

class InterruptionTracker:
    def __init__(self):
        self.interruption_count = 0
        self.interruptions = []

    async def on_interruption(self, event: BidiInterruptionEvent):
        self.interruption_count += 1
        self.interruptions.append({
            "reason": event.reason,
            "response_id": event.interrupted_response_id,
            "timestamp": time.time()
        })

        print(f"Interruption #{self.interruption_count}: {event.reason}")

        # Log to analytics
        analytics.track("conversation_interrupted", {
            "reason": event.reason,
            "agent_id": event.agent.agent_id
        })

tracker = InterruptionTracker()
agent = BidiAgent(model=model, hooks=[tracker])

Connection Restart Monitoring

Track connection restarts and handle failures:

from strands.experimental.bidi.hooks.events import (
    BidiBeforeConnectionRestartEvent,
    BidiAfterConnectionRestartEvent
)

class ConnectionMonitor:
    def __init__(self):
        self.restart_count = 0
        self.restart_failures = []

    async def on_before_restart(self, event: BidiBeforeConnectionRestartEvent):
        self.restart_count += 1
        timeout_error = event.timeout_error

        print(f"Connection restarting (attempt #{self.restart_count})")
        print(f"Timeout reason: {timeout_error}")

        # Log to monitoring system
        logger.warning(f"Connection timeout: {timeout_error}")

    async def on_after_restart(self, event: BidiAfterConnectionRestartEvent):
        if event.exception:
            self.restart_failures.append(event.exception)
            print(f"Restart failed: {event.exception}")

            # Alert on repeated failures
            if len(self.restart_failures) >= 3:
                alert_ops_team("Multiple connection restart failures")
        else:
            print("Connection successfully restarted")

monitor = ConnectionMonitor()
agent = BidiAgent(model=model, hooks=[monitor])

Conversation Analytics

Collect metrics about conversation patterns:

from strands.experimental.bidi.hooks.events import *
import time

class ConversationAnalytics:
    def __init__(self):
        self.start_time = None
        self.message_count = 0
        self.user_messages = 0
        self.assistant_messages = 0
        self.tool_calls = 0
        self.interruptions = 0

    async def on_before_invocation(self, event: BidiBeforeInvocationEvent):
        self.start_time = time.time()

    async def on_message_added(self, event: BidiMessageAddedEvent):
        self.message_count += 1

        if event.message['role'] == 'user':
            self.user_messages += 1
        elif event.message['role'] == 'assistant':
            self.assistant_messages += 1

            # Check for tool use
            for content in event.message.get('content', []):
                if 'toolUse' in content:
                    self.tool_calls += 1

    async def on_interruption(self, event: BidiInterruptionEvent):
        self.interruptions += 1

    async def on_after_invocation(self, event: BidiAfterInvocationEvent):
        duration = time.time() - self.start_time

        # Log analytics
        analytics.track("conversation_completed", {
            "duration": duration,
            "message_count": self.message_count,
            "user_messages": self.user_messages,
            "assistant_messages": self.assistant_messages,
            "tool_calls": self.tool_calls,
            "interruptions": self.interruptions,
            "agent_id": event.agent.agent_id
        })

analytics_hook = ConversationAnalytics()
agent = BidiAgent(model=model, hooks=[analytics_hook])

Session Persistence

Automatically save conversation state:

from strands.experimental.bidi.hooks.events import BidiMessageAddedEvent

class SessionPersistence:
    def __init__(self, storage):
        self.storage = storage

    async def on_message_added(self, event: BidiMessageAddedEvent):
        # Save message to storage
        await self.storage.save_message(
            agent_id=event.agent.agent_id,
            message=event.message
        )

persistence = SessionPersistence(storage=my_storage)
agent = BidiAgent(model=model, hooks=[persistence])

Accessing Invocation State

Invocation state provides context data passed through the agent invocation. You can access it in tools and use hooks to track when tools are called:

from strands import tool
from strands.experimental.bidi import BidiAgent
from strands.experimental.bidi.hooks.events import BidiMessageAddedEvent

@tool
def get_user_context(invocation_state: dict) -> str:
    """Access user context from invocation state."""
    user_id = invocation_state.get("user_id", "unknown")
    session_id = invocation_state.get("session_id")
    return f"User {user_id} in session {session_id}"

class ContextualLogger:
    async def on_message_added(self, event: BidiMessageAddedEvent):
        # Log when messages are added
        logger.info(
            f"Agent {event.agent.agent_id}: "
            f"{event.message['role']} message added"
        )

agent = BidiAgent(
    model=model,
    tools=[get_user_context],
    hooks=[ContextualLogger()]
)

# Pass context when starting
await agent.start(invocation_state={
    "user_id": "user_123",
    "session_id": "session_456",
    "database": db_connection
})

Best Practices

Make Your Hook Callbacks Asynchronous

Always make your bidirectional streaming hook callbacks async. Synchronous callbacks will block the agent's communication loop, preventing real-time streaming and potentially causing connection timeouts.

class MyHook:
    async def on_message_added(self, event: BidiMessageAddedEvent):
        # Can use await without blocking communications
        await self.save_to_database(event.message)

For additional best practices on performance considerations, error handling, composability, and advanced patterns, see the Hooks documentation.

Next Steps

  • Agent - Learn about BidiAgent configuration and lifecycle
  • Session Management - Persist conversations across sessions
  • Events - Complete guide to bidirectional streaming events
  • API Reference - Complete API documentation