Skip to content

strands.event_loop.event_loop

This module implements the central event loop.

The event loop allows agents to:

  1. Process conversation messages
  2. Execute tools based on model requests
  3. Handle errors and recovery strategies
  4. Manage recursive execution cycles

INITIAL_DELAY = 4 module-attribute

MAX_ATTEMPTS = 6 module-attribute

MAX_DELAY = 240 module-attribute

Messages = list[Message] module-attribute

A list of messages representing a conversation.

StopReason = Literal['content_filtered', 'end_turn', 'guardrail_intervened', 'interrupt', 'max_tokens', 'stop_sequence', 'tool_use'] module-attribute

Reason for the model ending its response generation.

  • "content_filtered": Content was filtered due to policy violation
  • "end_turn": Normal completion of the response
  • "guardrail_intervened": Guardrail system intervened
  • "interrupt": Agent was interrupted for human input
  • "max_tokens": Maximum token limit reached
  • "stop_sequence": Stop sequence encountered
  • "tool_use": Model requested to use a tool

logger = logging.getLogger(__name__) module-attribute

AfterModelCallEvent dataclass

Bases: HookEvent

Event triggered after the model invocation completes.

This event is fired after the agent has finished calling the model, regardless of whether the invocation was successful or resulted in an error. Hook providers can use this event for cleanup, logging, or post-processing.

Note: This event uses reverse callback ordering, meaning callbacks registered later will be invoked first during cleanup.

Note: This event is not fired for invocations to structured_output.

Model Retrying

When retry_model is set to True by a hook callback, the agent will discard the current model response and invoke the model again. This has important implications for streaming consumers:

  • Streaming events from the discarded response will have already been emitted to callers before the retry occurs. Agent invokers consuming streamed events should be prepared to handle this scenario, potentially by tracking retry state or implementing idempotent event processing
  • The original model message is thrown away internally and not added to the conversation history

Attributes:

Name Type Description
invocation_state dict[str, Any]

State and configuration passed through the agent invocation. This can include shared context for multi-agent coordination, request tracking, and dynamic configuration.

stop_response ModelStopResponse | None

The model response data if invocation was successful, None if failed.

exception Exception | None

Exception if the model invocation failed, None if successful.

retry bool

Whether to retry the model invocation. Can be set by hook callbacks to trigger a retry. When True, the current response is discarded and the model is called again. Defaults to False.

Source code in strands/hooks/events.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
@dataclass
class AfterModelCallEvent(HookEvent):
    """Event triggered after the model invocation completes.

    This event is fired after the agent has finished calling the model,
    regardless of whether the invocation was successful or resulted in an error.
    Hook providers can use this event for cleanup, logging, or post-processing.

    Note: This event uses reverse callback ordering, meaning callbacks registered
    later will be invoked first during cleanup.

    Note: This event is not fired for invocations to structured_output.

    Model Retrying:
        When ``retry_model`` is set to True by a hook callback, the agent will discard
        the current model response and invoke the model again. This has important
        implications for streaming consumers:

        - Streaming events from the discarded response will have already been emitted
          to callers before the retry occurs. Agent invokers consuming streamed events
          should be prepared to handle this scenario, potentially by tracking retry state
          or implementing idempotent event processing
        - The original model message is thrown away internally and not added to the
          conversation history

    Attributes:
        invocation_state: State and configuration passed through the agent invocation.
            This can include shared context for multi-agent coordination, request tracking,
            and dynamic configuration.
        stop_response: The model response data if invocation was successful, None if failed.
        exception: Exception if the model invocation failed, None if successful.
        retry: Whether to retry the model invocation. Can be set by hook callbacks
            to trigger a retry. When True, the current response is discarded and the
            model is called again. Defaults to False.
    """

    @dataclass
    class ModelStopResponse:
        """Model response data from successful invocation.

        Attributes:
            stop_reason: The reason the model stopped generating.
            message: The generated message from the model.
        """

        message: Message
        stop_reason: StopReason

    invocation_state: dict[str, Any] = field(default_factory=dict)
    stop_response: ModelStopResponse | None = None
    exception: Exception | None = None
    retry: bool = False

    def _can_write(self, name: str) -> bool:
        return name == "retry"

    @property
    def should_reverse_callbacks(self) -> bool:
        """True to invoke callbacks in reverse order."""
        return True

should_reverse_callbacks property

True to invoke callbacks in reverse order.

ModelStopResponse dataclass

Model response data from successful invocation.

Attributes:

Name Type Description
stop_reason StopReason

The reason the model stopped generating.

message Message

The generated message from the model.

Source code in strands/hooks/events.py
265
266
267
268
269
270
271
272
273
274
275
@dataclass
class ModelStopResponse:
    """Model response data from successful invocation.

    Attributes:
        stop_reason: The reason the model stopped generating.
        message: The generated message from the model.
    """

    message: Message
    stop_reason: StopReason

Agent

Bases: AgentBase

Core Agent implementation.

An agent orchestrates the following workflow:

  1. Receives user input
  2. Processes the input using a language model
  3. Decides whether to use tools to gather information or perform actions
  4. Executes those tools and receives results
  5. Continues reasoning with the new information
  6. Produces a final response
Source code in strands/agent/agent.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
class Agent(AgentBase):
    """Core Agent implementation.

    An agent orchestrates the following workflow:

    1. Receives user input
    2. Processes the input using a language model
    3. Decides whether to use tools to gather information or perform actions
    4. Executes those tools and receives results
    5. Continues reasoning with the new information
    6. Produces a final response
    """

    # For backwards compatibility
    ToolCaller = _ToolCaller

    def __init__(
        self,
        model: Model | str | None = None,
        messages: Messages | None = None,
        tools: list[Union[str, dict[str, str], "ToolProvider", Any]] | None = None,
        system_prompt: str | list[SystemContentBlock] | None = None,
        structured_output_model: type[BaseModel] | None = None,
        callback_handler: Callable[..., Any] | _DefaultCallbackHandlerSentinel | None = _DEFAULT_CALLBACK_HANDLER,
        conversation_manager: ConversationManager | None = None,
        record_direct_tool_call: bool = True,
        load_tools_from_directory: bool = False,
        trace_attributes: Mapping[str, AttributeValue] | None = None,
        *,
        agent_id: str | None = None,
        name: str | None = None,
        description: str | None = None,
        state: AgentState | dict | None = None,
        hooks: list[HookProvider] | None = None,
        session_manager: SessionManager | None = None,
        structured_output_prompt: str | None = None,
        tool_executor: ToolExecutor | None = None,
        retry_strategy: ModelRetryStrategy | _DefaultRetryStrategySentinel | None = _DEFAULT_RETRY_STRATEGY,
    ):
        """Initialize the Agent with the specified configuration.

        Args:
            model: Provider for running inference or a string representing the model-id for Bedrock to use.
                Defaults to strands.models.BedrockModel if None.
            messages: List of initial messages to pre-load into the conversation.
                Defaults to an empty list if None.
            tools: List of tools to make available to the agent.
                Can be specified as:

                - String tool names (e.g., "retrieve")
                - File paths (e.g., "/path/to/tool.py")
                - Imported Python modules (e.g., from strands_tools import current_time)
                - Dictionaries with name/path keys (e.g., {"name": "tool_name", "path": "/path/to/tool.py"})
                - ToolProvider instances for managed tool collections
                - Functions decorated with `@strands.tool` decorator.

                If provided, only these tools will be available. If None, all tools will be available.
            system_prompt: System prompt to guide model behavior.
                Can be a string or a list of SystemContentBlock objects for advanced features like caching.
                If None, the model will behave according to its default settings.
            structured_output_model: Pydantic model type(s) for structured output.
                When specified, all agent calls will attempt to return structured output of this type.
                This can be overridden on the agent invocation.
                Defaults to None (no structured output).
            callback_handler: Callback for processing events as they happen during agent execution.
                If not provided (using the default), a new PrintingCallbackHandler instance is created.
                If explicitly set to None, null_callback_handler is used.
            conversation_manager: Manager for conversation history and context window.
                Defaults to strands.agent.conversation_manager.SlidingWindowConversationManager if None.
            record_direct_tool_call: Whether to record direct tool calls in message history.
                Defaults to True.
            load_tools_from_directory: Whether to load and automatically reload tools in the `./tools/` directory.
                Defaults to False.
            trace_attributes: Custom trace attributes to apply to the agent's trace span.
            agent_id: Optional ID for the agent, useful for session management and multi-agent scenarios.
                Defaults to "default".
            name: name of the Agent
                Defaults to "Strands Agents".
            description: description of what the Agent does
                Defaults to None.
            state: stateful information for the agent. Can be either an AgentState object, or a json serializable dict.
                Defaults to an empty AgentState object.
            hooks: hooks to be added to the agent hook registry
                Defaults to None.
            session_manager: Manager for handling agent sessions including conversation history and state.
                If provided, enables session-based persistence and state management.
            structured_output_prompt: Custom prompt message used when forcing structured output.
                When using structured output, if the model doesn't automatically use the output tool,
                the agent sends a follow-up message to request structured formatting. This parameter
                allows customizing that message.
                Defaults to "You must format the previous response as structured output."
            tool_executor: Definition of tool execution strategy (e.g., sequential, concurrent, etc.).
            retry_strategy: Strategy for retrying model calls on throttling or other transient errors.
                Defaults to ModelRetryStrategy with max_attempts=6, initial_delay=4s, max_delay=240s.
                Implement a custom HookProvider for custom retry logic, or pass None to disable retries.

        Raises:
            ValueError: If agent id contains path separators.
        """
        self.model = BedrockModel() if not model else BedrockModel(model_id=model) if isinstance(model, str) else model
        self.messages = messages if messages is not None else []
        # initializing self._system_prompt for backwards compatibility
        self._system_prompt, self._system_prompt_content = self._initialize_system_prompt(system_prompt)
        self._default_structured_output_model = structured_output_model
        self._structured_output_prompt = structured_output_prompt
        self.agent_id = _identifier.validate(agent_id or _DEFAULT_AGENT_ID, _identifier.Identifier.AGENT)
        self.name = name or _DEFAULT_AGENT_NAME
        self.description = description

        # If not provided, create a new PrintingCallbackHandler instance
        # If explicitly set to None, use null_callback_handler
        # Otherwise use the passed callback_handler
        self.callback_handler: Callable[..., Any] | PrintingCallbackHandler
        if isinstance(callback_handler, _DefaultCallbackHandlerSentinel):
            self.callback_handler = PrintingCallbackHandler()
        elif callback_handler is None:
            self.callback_handler = null_callback_handler
        else:
            self.callback_handler = callback_handler

        self.conversation_manager = conversation_manager if conversation_manager else SlidingWindowConversationManager()

        # Process trace attributes to ensure they're of compatible types
        self.trace_attributes: dict[str, AttributeValue] = {}
        if trace_attributes:
            for k, v in trace_attributes.items():
                if isinstance(v, (str, int, float, bool)) or (
                    isinstance(v, list) and all(isinstance(x, (str, int, float, bool)) for x in v)
                ):
                    self.trace_attributes[k] = v

        self.record_direct_tool_call = record_direct_tool_call
        self.load_tools_from_directory = load_tools_from_directory

        self.tool_registry = ToolRegistry()

        # Process tool list if provided
        if tools is not None:
            self.tool_registry.process_tools(tools)

        # Initialize tools and configuration
        self.tool_registry.initialize_tools(self.load_tools_from_directory)
        if load_tools_from_directory:
            self.tool_watcher = ToolWatcher(tool_registry=self.tool_registry)

        self.event_loop_metrics = EventLoopMetrics()

        # Initialize tracer instance (no-op if not configured)
        self.tracer = get_tracer()
        self.trace_span: trace_api.Span | None = None

        # Initialize agent state management
        if state is not None:
            if isinstance(state, dict):
                self.state = AgentState(state)
            elif isinstance(state, AgentState):
                self.state = state
            else:
                raise ValueError("state must be an AgentState object or a dict")
        else:
            self.state = AgentState()

        self.tool_caller = _ToolCaller(self)

        self.hooks = HookRegistry()

        self._interrupt_state = _InterruptState()

        # Initialize lock for guarding concurrent invocations
        # Using threading.Lock instead of asyncio.Lock because run_async() creates
        # separate event loops in different threads, so asyncio.Lock wouldn't work
        self._invocation_lock = threading.Lock()

        # In the future, we'll have a RetryStrategy base class but until
        # that API is determined we only allow ModelRetryStrategy
        if (
            retry_strategy is not None
            and not isinstance(retry_strategy, _DefaultRetryStrategySentinel)
            and type(retry_strategy) is not ModelRetryStrategy
        ):
            raise ValueError("retry_strategy must be an instance of ModelRetryStrategy")

        # If not provided (using the default), create a new ModelRetryStrategy instance
        # If explicitly set to None, disable retries (max_attempts=1 means no retries)
        # Otherwise use the passed retry_strategy
        if isinstance(retry_strategy, _DefaultRetryStrategySentinel):
            self._retry_strategy = ModelRetryStrategy(
                max_attempts=MAX_ATTEMPTS, max_delay=MAX_DELAY, initial_delay=INITIAL_DELAY
            )
        elif retry_strategy is None:
            # If no retry strategy is passed in, then we turn retries off
            self._retry_strategy = ModelRetryStrategy(max_attempts=1)
        else:
            self._retry_strategy = retry_strategy

        # Initialize session management functionality
        self._session_manager = session_manager
        if self._session_manager:
            self.hooks.add_hook(self._session_manager)

        # Allow conversation_managers to subscribe to hooks
        self.hooks.add_hook(self.conversation_manager)

        # Register retry strategy as a hook
        self.hooks.add_hook(self._retry_strategy)

        self.tool_executor = tool_executor or ConcurrentToolExecutor()

        if hooks:
            for hook in hooks:
                self.hooks.add_hook(hook)
        self.hooks.invoke_callbacks(AgentInitializedEvent(agent=self))

    @property
    def system_prompt(self) -> str | None:
        """Get the system prompt as a string for backwards compatibility.

        Returns the system prompt as a concatenated string when it contains text content,
        or None if no text content is present. This maintains backwards compatibility
        with existing code that expects system_prompt to be a string.

        Returns:
            The system prompt as a string, or None if no text content exists.
        """
        return self._system_prompt

    @system_prompt.setter
    def system_prompt(self, value: str | list[SystemContentBlock] | None) -> None:
        """Set the system prompt and update internal content representation.

        Accepts either a string or list of SystemContentBlock objects.
        When set, both the backwards-compatible string representation and the internal
        content block representation are updated to maintain consistency.

        Args:
            value: System prompt as string, list of SystemContentBlock objects, or None.
                  - str: Simple text prompt (most common use case)
                  - list[SystemContentBlock]: Content blocks with features like caching
                  - None: Clear the system prompt
        """
        self._system_prompt, self._system_prompt_content = self._initialize_system_prompt(value)

    @property
    def tool(self) -> _ToolCaller:
        """Call tool as a function.

        Returns:
            Tool caller through which user can invoke tool as a function.

        Example:
            ```
            agent = Agent(tools=[calculator])
            agent.tool.calculator(...)
            ```
        """
        return self.tool_caller

    @property
    def tool_names(self) -> list[str]:
        """Get a list of all registered tool names.

        Returns:
            Names of all tools available to this agent.
        """
        all_tools = self.tool_registry.get_all_tools_config()
        return list(all_tools.keys())

    def __call__(
        self,
        prompt: AgentInput = None,
        *,
        invocation_state: dict[str, Any] | None = None,
        structured_output_model: type[BaseModel] | None = None,
        structured_output_prompt: str | None = None,
        **kwargs: Any,
    ) -> AgentResult:
        """Process a natural language prompt through the agent's event loop.

        This method implements the conversational interface with multiple input patterns:
        - String input: `agent("hello!")`
        - ContentBlock list: `agent([{"text": "hello"}, {"image": {...}}])`
        - Message list: `agent([{"role": "user", "content": [{"text": "hello"}]}])`
        - No input: `agent()` - uses existing conversation history

        Args:
            prompt: User input in various formats:
                - str: Simple text input
                - list[ContentBlock]: Multi-modal content blocks
                - list[Message]: Complete messages with roles
                - None: Use existing conversation history
            invocation_state: Additional parameters to pass through the event loop.
            structured_output_model: Pydantic model type(s) for structured output (overrides agent default).
            structured_output_prompt: Custom prompt for forcing structured output (overrides agent default).
            **kwargs: Additional parameters to pass through the event loop.[Deprecating]

        Returns:
            Result object containing:

                - stop_reason: Why the event loop stopped (e.g., "end_turn", "max_tokens")
                - message: The final message from the model
                - metrics: Performance metrics from the event loop
                - state: The final state of the event loop
                - structured_output: Parsed structured output when structured_output_model was specified
        """
        return run_async(
            lambda: self.invoke_async(
                prompt,
                invocation_state=invocation_state,
                structured_output_model=structured_output_model,
                structured_output_prompt=structured_output_prompt,
                **kwargs,
            )
        )

    async def invoke_async(
        self,
        prompt: AgentInput = None,
        *,
        invocation_state: dict[str, Any] | None = None,
        structured_output_model: type[BaseModel] | None = None,
        structured_output_prompt: str | None = None,
        **kwargs: Any,
    ) -> AgentResult:
        """Process a natural language prompt through the agent's event loop.

        This method implements the conversational interface with multiple input patterns:
        - String input: Simple text input
        - ContentBlock list: Multi-modal content blocks
        - Message list: Complete messages with roles
        - No input: Use existing conversation history

        Args:
            prompt: User input in various formats:
                - str: Simple text input
                - list[ContentBlock]: Multi-modal content blocks
                - list[Message]: Complete messages with roles
                - None: Use existing conversation history
            invocation_state: Additional parameters to pass through the event loop.
            structured_output_model: Pydantic model type(s) for structured output (overrides agent default).
            structured_output_prompt: Custom prompt for forcing structured output (overrides agent default).
            **kwargs: Additional parameters to pass through the event loop.[Deprecating]

        Returns:
            Result: object containing:

                - stop_reason: Why the event loop stopped (e.g., "end_turn", "max_tokens")
                - message: The final message from the model
                - metrics: Performance metrics from the event loop
                - state: The final state of the event loop
        """
        events = self.stream_async(
            prompt,
            invocation_state=invocation_state,
            structured_output_model=structured_output_model,
            structured_output_prompt=structured_output_prompt,
            **kwargs,
        )
        async for event in events:
            _ = event

        return cast(AgentResult, event["result"])

    def structured_output(self, output_model: type[T], prompt: AgentInput = None) -> T:
        """This method allows you to get structured output from the agent.

        If you pass in a prompt, it will be used temporarily without adding it to the conversation history.
        If you don't pass in a prompt, it will use only the existing conversation history to respond.

        For smaller models, you may want to use the optional prompt to add additional instructions to explicitly
        instruct the model to output the structured data.

        Args:
            output_model: The output model (a JSON schema written as a Pydantic BaseModel)
                that the agent will use when responding.
            prompt: The prompt to use for the agent in various formats:
                - str: Simple text input
                - list[ContentBlock]: Multi-modal content blocks
                - list[Message]: Complete messages with roles
                - None: Use existing conversation history

        Raises:
            ValueError: If no conversation history or prompt is provided.
        """
        warnings.warn(
            "Agent.structured_output method is deprecated."
            " You should pass in `structured_output_model` directly into the agent invocation."
            " see: https://strandsagents.com/latest/documentation/docs/user-guide/concepts/agents/structured-output/",
            category=DeprecationWarning,
            stacklevel=2,
        )

        return run_async(lambda: self.structured_output_async(output_model, prompt))

    async def structured_output_async(self, output_model: type[T], prompt: AgentInput = None) -> T:
        """This method allows you to get structured output from the agent.

        If you pass in a prompt, it will be used temporarily without adding it to the conversation history.
        If you don't pass in a prompt, it will use only the existing conversation history to respond.

        For smaller models, you may want to use the optional prompt to add additional instructions to explicitly
        instruct the model to output the structured data.

        Args:
            output_model: The output model (a JSON schema written as a Pydantic BaseModel)
                that the agent will use when responding.
            prompt: The prompt to use for the agent (will not be added to conversation history).

        Raises:
            ValueError: If no conversation history or prompt is provided.
        -
        """
        if self._interrupt_state.activated:
            raise RuntimeError("cannot call structured output during interrupt")

        warnings.warn(
            "Agent.structured_output_async method is deprecated."
            " You should pass in `structured_output_model` directly into the agent invocation."
            " see: https://strandsagents.com/latest/documentation/docs/user-guide/concepts/agents/structured-output/",
            category=DeprecationWarning,
            stacklevel=2,
        )
        await self.hooks.invoke_callbacks_async(BeforeInvocationEvent(agent=self, invocation_state={}))
        with self.tracer.tracer.start_as_current_span(
            "execute_structured_output", kind=trace_api.SpanKind.CLIENT
        ) as structured_output_span:
            try:
                if not self.messages and not prompt:
                    raise ValueError("No conversation history or prompt provided")

                temp_messages: Messages = self.messages + await self._convert_prompt_to_messages(prompt)

                structured_output_span.set_attributes(
                    {
                        "gen_ai.system": "strands-agents",
                        "gen_ai.agent.name": self.name,
                        "gen_ai.agent.id": self.agent_id,
                        "gen_ai.operation.name": "execute_structured_output",
                    }
                )
                if self.system_prompt:
                    structured_output_span.add_event(
                        "gen_ai.system.message",
                        attributes={"role": "system", "content": serialize([{"text": self.system_prompt}])},
                    )
                for message in temp_messages:
                    structured_output_span.add_event(
                        f"gen_ai.{message['role']}.message",
                        attributes={"role": message["role"], "content": serialize(message["content"])},
                    )
                events = self.model.structured_output(output_model, temp_messages, system_prompt=self.system_prompt)
                async for event in events:
                    if isinstance(event, TypedEvent):
                        event.prepare(invocation_state={})
                        if event.is_callback_event:
                            self.callback_handler(**event.as_dict())

                structured_output_span.add_event(
                    "gen_ai.choice", attributes={"message": serialize(event["output"].model_dump())}
                )
                return event["output"]

            finally:
                await self.hooks.invoke_callbacks_async(AfterInvocationEvent(agent=self, invocation_state={}))

    def cleanup(self) -> None:
        """Clean up resources used by the agent.

        This method cleans up all tool providers that require explicit cleanup,
        such as MCP clients. It should be called when the agent is no longer needed
        to ensure proper resource cleanup.

        Note: This method uses a "belt and braces" approach with automatic cleanup
        through finalizers as a fallback, but explicit cleanup is recommended.
        """
        self.tool_registry.cleanup()

    def __del__(self) -> None:
        """Clean up resources when agent is garbage collected."""
        # __del__ is called even when an exception is thrown in the constructor,
        # so there is no guarantee tool_registry was set..
        if hasattr(self, "tool_registry"):
            self.tool_registry.cleanup()

    async def stream_async(
        self,
        prompt: AgentInput = None,
        *,
        invocation_state: dict[str, Any] | None = None,
        structured_output_model: type[BaseModel] | None = None,
        structured_output_prompt: str | None = None,
        **kwargs: Any,
    ) -> AsyncIterator[Any]:
        """Process a natural language prompt and yield events as an async iterator.

        This method provides an asynchronous interface for streaming agent events with multiple input patterns:
        - String input: Simple text input
        - ContentBlock list: Multi-modal content blocks
        - Message list: Complete messages with roles
        - No input: Use existing conversation history

        Args:
            prompt: User input in various formats:
                - str: Simple text input
                - list[ContentBlock]: Multi-modal content blocks
                - list[Message]: Complete messages with roles
                - None: Use existing conversation history
            invocation_state: Additional parameters to pass through the event loop.
            structured_output_model: Pydantic model type(s) for structured output (overrides agent default).
            structured_output_prompt: Custom prompt for forcing structured output (overrides agent default).
            **kwargs: Additional parameters to pass to the event loop.[Deprecating]

        Yields:
            An async iterator that yields events. Each event is a dictionary containing
                information about the current state of processing, such as:

                - data: Text content being generated
                - complete: Whether this is the final chunk
                - current_tool_use: Information about tools being executed
                - And other event data provided by the callback handler

        Raises:
            ConcurrencyException: If another invocation is already in progress on this agent instance.
            Exception: Any exceptions from the agent invocation will be propagated to the caller.

        Example:
            ```python
            async for event in agent.stream_async("Analyze this data"):
                if "data" in event:
                    yield event["data"]
            ```
        """
        # Acquire lock to prevent concurrent invocations
        # Using threading.Lock instead of asyncio.Lock because run_async() creates
        # separate event loops in different threads
        acquired = self._invocation_lock.acquire(blocking=False)
        if not acquired:
            raise ConcurrencyException(
                "Agent is already processing a request. Concurrent invocations are not supported."
            )

        try:
            self._interrupt_state.resume(prompt)

            self.event_loop_metrics.reset_usage_metrics()

            merged_state = {}
            if kwargs:
                warnings.warn("`**kwargs` parameter is deprecating, use `invocation_state` instead.", stacklevel=2)
                merged_state.update(kwargs)
                if invocation_state is not None:
                    merged_state["invocation_state"] = invocation_state
            else:
                if invocation_state is not None:
                    merged_state = invocation_state

            callback_handler = self.callback_handler
            if kwargs:
                callback_handler = kwargs.get("callback_handler", self.callback_handler)

            # Process input and get message to add (if any)
            messages = await self._convert_prompt_to_messages(prompt)

            self.trace_span = self._start_agent_trace_span(messages)

            with trace_api.use_span(self.trace_span):
                try:
                    events = self._run_loop(messages, merged_state, structured_output_model, structured_output_prompt)

                    async for event in events:
                        event.prepare(invocation_state=merged_state)

                        if event.is_callback_event:
                            as_dict = event.as_dict()
                            callback_handler(**as_dict)
                            yield as_dict

                    result = AgentResult(*event["stop"])
                    callback_handler(result=result)
                    yield AgentResultEvent(result=result).as_dict()

                    self._end_agent_trace_span(response=result)

                except Exception as e:
                    self._end_agent_trace_span(error=e)
                    raise

        finally:
            self._invocation_lock.release()

    async def _run_loop(
        self,
        messages: Messages,
        invocation_state: dict[str, Any],
        structured_output_model: type[BaseModel] | None = None,
        structured_output_prompt: str | None = None,
    ) -> AsyncGenerator[TypedEvent, None]:
        """Execute the agent's event loop with the given message and parameters.

        Args:
            messages: The input messages to add to the conversation.
            invocation_state: Additional parameters to pass to the event loop.
            structured_output_model: Optional Pydantic model type for structured output.
            structured_output_prompt: Optional custom prompt for forcing structured output.

        Yields:
            Events from the event loop cycle.
        """
        before_invocation_event, _interrupts = await self.hooks.invoke_callbacks_async(
            BeforeInvocationEvent(agent=self, invocation_state=invocation_state, messages=messages)
        )
        messages = before_invocation_event.messages if before_invocation_event.messages is not None else messages

        agent_result: AgentResult | None = None
        try:
            yield InitEventLoopEvent()

            await self._append_messages(*messages)

            structured_output_context = StructuredOutputContext(
                structured_output_model or self._default_structured_output_model,
                structured_output_prompt=structured_output_prompt or self._structured_output_prompt,
            )

            # Execute the event loop cycle with retry logic for context limits
            events = self._execute_event_loop_cycle(invocation_state, structured_output_context)
            async for event in events:
                # Signal from the model provider that the message sent by the user should be redacted,
                # likely due to a guardrail.
                if (
                    isinstance(event, ModelStreamChunkEvent)
                    and event.chunk
                    and event.chunk.get("redactContent")
                    and event.chunk["redactContent"].get("redactUserContentMessage")
                ):
                    self.messages[-1]["content"] = self._redact_user_content(
                        self.messages[-1]["content"], str(event.chunk["redactContent"]["redactUserContentMessage"])
                    )
                    if self._session_manager:
                        self._session_manager.redact_latest_message(self.messages[-1], self)
                yield event

            # Capture the result from the final event if available
            if isinstance(event, EventLoopStopEvent):
                agent_result = AgentResult(*event["stop"])

        finally:
            self.conversation_manager.apply_management(self)
            await self.hooks.invoke_callbacks_async(
                AfterInvocationEvent(agent=self, invocation_state=invocation_state, result=agent_result)
            )

    async def _execute_event_loop_cycle(
        self, invocation_state: dict[str, Any], structured_output_context: StructuredOutputContext | None = None
    ) -> AsyncGenerator[TypedEvent, None]:
        """Execute the event loop cycle with retry logic for context window limits.

        This internal method handles the execution of the event loop cycle and implements
        retry logic for handling context window overflow exceptions by reducing the
        conversation context and retrying.

        Args:
            invocation_state: Additional parameters to pass to the event loop.
            structured_output_context: Optional structured output context for this invocation.

        Yields:
            Events of the loop cycle.
        """
        # Add `Agent` to invocation_state to keep backwards-compatibility
        invocation_state["agent"] = self

        if structured_output_context:
            structured_output_context.register_tool(self.tool_registry)

        try:
            events = event_loop_cycle(
                agent=self,
                invocation_state=invocation_state,
                structured_output_context=structured_output_context,
            )
            async for event in events:
                yield event

        except ContextWindowOverflowException as e:
            # Try reducing the context size and retrying
            self.conversation_manager.reduce_context(self, e=e)

            # Sync agent after reduce_context to keep conversation_manager_state up to date in the session
            if self._session_manager:
                self._session_manager.sync_agent(self)

            events = self._execute_event_loop_cycle(invocation_state, structured_output_context)
            async for event in events:
                yield event

        finally:
            if structured_output_context:
                structured_output_context.cleanup(self.tool_registry)

    async def _convert_prompt_to_messages(self, prompt: AgentInput) -> Messages:
        if self._interrupt_state.activated:
            return []

        messages: Messages | None = None
        if prompt is not None:
            # Check if the latest message is toolUse
            if len(self.messages) > 0 and any("toolUse" in content for content in self.messages[-1]["content"]):
                # Add toolResult message after to have a valid conversation
                logger.info(
                    "Agents latest message is toolUse, appending a toolResult message to have valid conversation."
                )
                tool_use_ids = [
                    content["toolUse"]["toolUseId"] for content in self.messages[-1]["content"] if "toolUse" in content
                ]
                await self._append_messages(
                    {
                        "role": "user",
                        "content": generate_missing_tool_result_content(tool_use_ids),
                    }
                )
            if isinstance(prompt, str):
                # String input - convert to user message
                messages = [{"role": "user", "content": [{"text": prompt}]}]
            elif isinstance(prompt, list):
                if len(prompt) == 0:
                    # Empty list
                    messages = []
                # Check if all item in input list are dictionaries
                elif all(isinstance(item, dict) for item in prompt):
                    # Check if all items are messages
                    if all(all(key in item for key in Message.__annotations__.keys()) for item in prompt):
                        # Messages input - add all messages to conversation
                        messages = cast(Messages, prompt)

                    # Check if all items are content blocks
                    elif all(any(key in ContentBlock.__annotations__.keys() for key in item) for item in prompt):
                        # Treat as List[ContentBlock] input - convert to user message
                        # This allows invalid structures to be passed through to the model
                        messages = [{"role": "user", "content": cast(list[ContentBlock], prompt)}]
        else:
            messages = []
        if messages is None:
            raise ValueError("Input prompt must be of type: `str | list[Contentblock] | Messages | None`.")
        return messages

    def _start_agent_trace_span(self, messages: Messages) -> trace_api.Span:
        """Starts a trace span for the agent.

        Args:
            messages: The input messages.
        """
        model_id = self.model.config.get("model_id") if hasattr(self.model, "config") else None
        return self.tracer.start_agent_span(
            messages=messages,
            agent_name=self.name,
            model_id=model_id,
            tools=self.tool_names,
            system_prompt=self.system_prompt,
            custom_trace_attributes=self.trace_attributes,
            tools_config=self.tool_registry.get_all_tools_config(),
        )

    def _end_agent_trace_span(
        self,
        response: AgentResult | None = None,
        error: Exception | None = None,
    ) -> None:
        """Ends a trace span for the agent.

        Args:
            span: The span to end.
            response: Response to record as a trace attribute.
            error: Error to record as a trace attribute.
        """
        if self.trace_span:
            trace_attributes: dict[str, Any] = {
                "span": self.trace_span,
            }

            if response:
                trace_attributes["response"] = response
            if error:
                trace_attributes["error"] = error

            self.tracer.end_agent_span(**trace_attributes)

    def _initialize_system_prompt(
        self, system_prompt: str | list[SystemContentBlock] | None
    ) -> tuple[str | None, list[SystemContentBlock] | None]:
        """Initialize system prompt fields from constructor input.

        Maintains backwards compatibility by keeping system_prompt as str when string input
        provided, avoiding breaking existing consumers.

        Maps system_prompt input to both string and content block representations:
        - If string: system_prompt=string, _system_prompt_content=[{text: string}]
        - If list with text elements: system_prompt=concatenated_text, _system_prompt_content=list
        - If list without text elements: system_prompt=None, _system_prompt_content=list
        - If None: system_prompt=None, _system_prompt_content=None
        """
        if isinstance(system_prompt, str):
            return system_prompt, [{"text": system_prompt}]
        elif isinstance(system_prompt, list):
            # Concatenate all text elements for backwards compatibility, None if no text found
            text_parts = [block["text"] for block in system_prompt if "text" in block]
            system_prompt_str = "\n".join(text_parts) if text_parts else None
            return system_prompt_str, system_prompt
        else:
            return None, None

    async def _append_messages(self, *messages: Message) -> None:
        """Appends messages to history and invoke the callbacks for the MessageAddedEvent."""
        for message in messages:
            self.messages.append(message)
            await self.hooks.invoke_callbacks_async(MessageAddedEvent(agent=self, message=message))

    def _redact_user_content(self, content: list[ContentBlock], redact_message: str) -> list[ContentBlock]:
        """Redact user content preserving toolResult blocks.

        Args:
            content: content blocks to be redacted
            redact_message: redact message to be replaced

        Returns:
            Redacted content, as follows:
            - if the message contains at least a toolResult block,
                all toolResult blocks(s) are kept, redacting only the result content;
            - otherwise, the entire content of the message is replaced
                with a single text block with the redact message.
        """
        redacted_content = []
        for block in content:
            if "toolResult" in block:
                block["toolResult"]["content"] = [{"text": redact_message}]
                redacted_content.append(block)

        if not redacted_content:
            # Text content is added only if no toolResult blocks were found
            redacted_content = [{"text": redact_message}]

        return redacted_content

system_prompt property writable

Get the system prompt as a string for backwards compatibility.

Returns the system prompt as a concatenated string when it contains text content, or None if no text content is present. This maintains backwards compatibility with existing code that expects system_prompt to be a string.

Returns:

Type Description
str | None

The system prompt as a string, or None if no text content exists.

tool property

Call tool as a function.

Returns:

Type Description
_ToolCaller

Tool caller through which user can invoke tool as a function.

Example
agent = Agent(tools=[calculator])
agent.tool.calculator(...)

tool_names property

Get a list of all registered tool names.

Returns:

Type Description
list[str]

Names of all tools available to this agent.

__call__(prompt=None, *, invocation_state=None, structured_output_model=None, structured_output_prompt=None, **kwargs)

Process a natural language prompt through the agent's event loop.

This method implements the conversational interface with multiple input patterns: - String input: agent("hello!") - ContentBlock list: agent([{"text": "hello"}, {"image": {...}}]) - Message list: agent([{"role": "user", "content": [{"text": "hello"}]}]) - No input: agent() - uses existing conversation history

Parameters:

Name Type Description Default
prompt AgentInput

User input in various formats: - str: Simple text input - list[ContentBlock]: Multi-modal content blocks - list[Message]: Complete messages with roles - None: Use existing conversation history

None
invocation_state dict[str, Any] | None

Additional parameters to pass through the event loop.

None
structured_output_model type[BaseModel] | None

Pydantic model type(s) for structured output (overrides agent default).

None
structured_output_prompt str | None

Custom prompt for forcing structured output (overrides agent default).

None
**kwargs Any

Additional parameters to pass through the event loop.[Deprecating]

{}

Returns:

Type Description
AgentResult

Result object containing:

  • stop_reason: Why the event loop stopped (e.g., "end_turn", "max_tokens")
  • message: The final message from the model
  • metrics: Performance metrics from the event loop
  • state: The final state of the event loop
  • structured_output: Parsed structured output when structured_output_model was specified
Source code in strands/agent/agent.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def __call__(
    self,
    prompt: AgentInput = None,
    *,
    invocation_state: dict[str, Any] | None = None,
    structured_output_model: type[BaseModel] | None = None,
    structured_output_prompt: str | None = None,
    **kwargs: Any,
) -> AgentResult:
    """Process a natural language prompt through the agent's event loop.

    This method implements the conversational interface with multiple input patterns:
    - String input: `agent("hello!")`
    - ContentBlock list: `agent([{"text": "hello"}, {"image": {...}}])`
    - Message list: `agent([{"role": "user", "content": [{"text": "hello"}]}])`
    - No input: `agent()` - uses existing conversation history

    Args:
        prompt: User input in various formats:
            - str: Simple text input
            - list[ContentBlock]: Multi-modal content blocks
            - list[Message]: Complete messages with roles
            - None: Use existing conversation history
        invocation_state: Additional parameters to pass through the event loop.
        structured_output_model: Pydantic model type(s) for structured output (overrides agent default).
        structured_output_prompt: Custom prompt for forcing structured output (overrides agent default).
        **kwargs: Additional parameters to pass through the event loop.[Deprecating]

    Returns:
        Result object containing:

            - stop_reason: Why the event loop stopped (e.g., "end_turn", "max_tokens")
            - message: The final message from the model
            - metrics: Performance metrics from the event loop
            - state: The final state of the event loop
            - structured_output: Parsed structured output when structured_output_model was specified
    """
    return run_async(
        lambda: self.invoke_async(
            prompt,
            invocation_state=invocation_state,
            structured_output_model=structured_output_model,
            structured_output_prompt=structured_output_prompt,
            **kwargs,
        )
    )

__del__()

Clean up resources when agent is garbage collected.

Source code in strands/agent/agent.py
570
571
572
573
574
575
def __del__(self) -> None:
    """Clean up resources when agent is garbage collected."""
    # __del__ is called even when an exception is thrown in the constructor,
    # so there is no guarantee tool_registry was set..
    if hasattr(self, "tool_registry"):
        self.tool_registry.cleanup()

__init__(model=None, messages=None, tools=None, system_prompt=None, structured_output_model=None, callback_handler=_DEFAULT_CALLBACK_HANDLER, conversation_manager=None, record_direct_tool_call=True, load_tools_from_directory=False, trace_attributes=None, *, agent_id=None, name=None, description=None, state=None, hooks=None, session_manager=None, structured_output_prompt=None, tool_executor=None, retry_strategy=_DEFAULT_RETRY_STRATEGY)

Initialize the Agent with the specified configuration.

Parameters:

Name Type Description Default
model Model | str | None

Provider for running inference or a string representing the model-id for Bedrock to use. Defaults to strands.models.BedrockModel if None.

None
messages Messages | None

List of initial messages to pre-load into the conversation. Defaults to an empty list if None.

None
tools list[Union[str, dict[str, str], ToolProvider, Any]] | None

List of tools to make available to the agent. Can be specified as:

  • String tool names (e.g., "retrieve")
  • File paths (e.g., "/path/to/tool.py")
  • Imported Python modules (e.g., from strands_tools import current_time)
  • Dictionaries with name/path keys (e.g., {"name": "tool_name", "path": "/path/to/tool.py"})
  • ToolProvider instances for managed tool collections
  • Functions decorated with @strands.tool decorator.

If provided, only these tools will be available. If None, all tools will be available.

None
system_prompt str | list[SystemContentBlock] | None

System prompt to guide model behavior. Can be a string or a list of SystemContentBlock objects for advanced features like caching. If None, the model will behave according to its default settings.

None
structured_output_model type[BaseModel] | None

Pydantic model type(s) for structured output. When specified, all agent calls will attempt to return structured output of this type. This can be overridden on the agent invocation. Defaults to None (no structured output).

None
callback_handler Callable[..., Any] | _DefaultCallbackHandlerSentinel | None

Callback for processing events as they happen during agent execution. If not provided (using the default), a new PrintingCallbackHandler instance is created. If explicitly set to None, null_callback_handler is used.

_DEFAULT_CALLBACK_HANDLER
conversation_manager ConversationManager | None

Manager for conversation history and context window. Defaults to strands.agent.conversation_manager.SlidingWindowConversationManager if None.

None
record_direct_tool_call bool

Whether to record direct tool calls in message history. Defaults to True.

True
load_tools_from_directory bool

Whether to load and automatically reload tools in the ./tools/ directory. Defaults to False.

False
trace_attributes Mapping[str, AttributeValue] | None

Custom trace attributes to apply to the agent's trace span.

None
agent_id str | None

Optional ID for the agent, useful for session management and multi-agent scenarios. Defaults to "default".

None
name str | None

name of the Agent Defaults to "Strands Agents".

None
description str | None

description of what the Agent does Defaults to None.

None
state AgentState | dict | None

stateful information for the agent. Can be either an AgentState object, or a json serializable dict. Defaults to an empty AgentState object.

None
hooks list[HookProvider] | None

hooks to be added to the agent hook registry Defaults to None.

None
session_manager SessionManager | None

Manager for handling agent sessions including conversation history and state. If provided, enables session-based persistence and state management.

None
structured_output_prompt str | None

Custom prompt message used when forcing structured output. When using structured output, if the model doesn't automatically use the output tool, the agent sends a follow-up message to request structured formatting. This parameter allows customizing that message. Defaults to "You must format the previous response as structured output."

None
tool_executor ToolExecutor | None

Definition of tool execution strategy (e.g., sequential, concurrent, etc.).

None
retry_strategy ModelRetryStrategy | _DefaultRetryStrategySentinel | None

Strategy for retrying model calls on throttling or other transient errors. Defaults to ModelRetryStrategy with max_attempts=6, initial_delay=4s, max_delay=240s. Implement a custom HookProvider for custom retry logic, or pass None to disable retries.

_DEFAULT_RETRY_STRATEGY

Raises:

Type Description
ValueError

If agent id contains path separators.

Source code in strands/agent/agent.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def __init__(
    self,
    model: Model | str | None = None,
    messages: Messages | None = None,
    tools: list[Union[str, dict[str, str], "ToolProvider", Any]] | None = None,
    system_prompt: str | list[SystemContentBlock] | None = None,
    structured_output_model: type[BaseModel] | None = None,
    callback_handler: Callable[..., Any] | _DefaultCallbackHandlerSentinel | None = _DEFAULT_CALLBACK_HANDLER,
    conversation_manager: ConversationManager | None = None,
    record_direct_tool_call: bool = True,
    load_tools_from_directory: bool = False,
    trace_attributes: Mapping[str, AttributeValue] | None = None,
    *,
    agent_id: str | None = None,
    name: str | None = None,
    description: str | None = None,
    state: AgentState | dict | None = None,
    hooks: list[HookProvider] | None = None,
    session_manager: SessionManager | None = None,
    structured_output_prompt: str | None = None,
    tool_executor: ToolExecutor | None = None,
    retry_strategy: ModelRetryStrategy | _DefaultRetryStrategySentinel | None = _DEFAULT_RETRY_STRATEGY,
):
    """Initialize the Agent with the specified configuration.

    Args:
        model: Provider for running inference or a string representing the model-id for Bedrock to use.
            Defaults to strands.models.BedrockModel if None.
        messages: List of initial messages to pre-load into the conversation.
            Defaults to an empty list if None.
        tools: List of tools to make available to the agent.
            Can be specified as:

            - String tool names (e.g., "retrieve")
            - File paths (e.g., "/path/to/tool.py")
            - Imported Python modules (e.g., from strands_tools import current_time)
            - Dictionaries with name/path keys (e.g., {"name": "tool_name", "path": "/path/to/tool.py"})
            - ToolProvider instances for managed tool collections
            - Functions decorated with `@strands.tool` decorator.

            If provided, only these tools will be available. If None, all tools will be available.
        system_prompt: System prompt to guide model behavior.
            Can be a string or a list of SystemContentBlock objects for advanced features like caching.
            If None, the model will behave according to its default settings.
        structured_output_model: Pydantic model type(s) for structured output.
            When specified, all agent calls will attempt to return structured output of this type.
            This can be overridden on the agent invocation.
            Defaults to None (no structured output).
        callback_handler: Callback for processing events as they happen during agent execution.
            If not provided (using the default), a new PrintingCallbackHandler instance is created.
            If explicitly set to None, null_callback_handler is used.
        conversation_manager: Manager for conversation history and context window.
            Defaults to strands.agent.conversation_manager.SlidingWindowConversationManager if None.
        record_direct_tool_call: Whether to record direct tool calls in message history.
            Defaults to True.
        load_tools_from_directory: Whether to load and automatically reload tools in the `./tools/` directory.
            Defaults to False.
        trace_attributes: Custom trace attributes to apply to the agent's trace span.
        agent_id: Optional ID for the agent, useful for session management and multi-agent scenarios.
            Defaults to "default".
        name: name of the Agent
            Defaults to "Strands Agents".
        description: description of what the Agent does
            Defaults to None.
        state: stateful information for the agent. Can be either an AgentState object, or a json serializable dict.
            Defaults to an empty AgentState object.
        hooks: hooks to be added to the agent hook registry
            Defaults to None.
        session_manager: Manager for handling agent sessions including conversation history and state.
            If provided, enables session-based persistence and state management.
        structured_output_prompt: Custom prompt message used when forcing structured output.
            When using structured output, if the model doesn't automatically use the output tool,
            the agent sends a follow-up message to request structured formatting. This parameter
            allows customizing that message.
            Defaults to "You must format the previous response as structured output."
        tool_executor: Definition of tool execution strategy (e.g., sequential, concurrent, etc.).
        retry_strategy: Strategy for retrying model calls on throttling or other transient errors.
            Defaults to ModelRetryStrategy with max_attempts=6, initial_delay=4s, max_delay=240s.
            Implement a custom HookProvider for custom retry logic, or pass None to disable retries.

    Raises:
        ValueError: If agent id contains path separators.
    """
    self.model = BedrockModel() if not model else BedrockModel(model_id=model) if isinstance(model, str) else model
    self.messages = messages if messages is not None else []
    # initializing self._system_prompt for backwards compatibility
    self._system_prompt, self._system_prompt_content = self._initialize_system_prompt(system_prompt)
    self._default_structured_output_model = structured_output_model
    self._structured_output_prompt = structured_output_prompt
    self.agent_id = _identifier.validate(agent_id or _DEFAULT_AGENT_ID, _identifier.Identifier.AGENT)
    self.name = name or _DEFAULT_AGENT_NAME
    self.description = description

    # If not provided, create a new PrintingCallbackHandler instance
    # If explicitly set to None, use null_callback_handler
    # Otherwise use the passed callback_handler
    self.callback_handler: Callable[..., Any] | PrintingCallbackHandler
    if isinstance(callback_handler, _DefaultCallbackHandlerSentinel):
        self.callback_handler = PrintingCallbackHandler()
    elif callback_handler is None:
        self.callback_handler = null_callback_handler
    else:
        self.callback_handler = callback_handler

    self.conversation_manager = conversation_manager if conversation_manager else SlidingWindowConversationManager()

    # Process trace attributes to ensure they're of compatible types
    self.trace_attributes: dict[str, AttributeValue] = {}
    if trace_attributes:
        for k, v in trace_attributes.items():
            if isinstance(v, (str, int, float, bool)) or (
                isinstance(v, list) and all(isinstance(x, (str, int, float, bool)) for x in v)
            ):
                self.trace_attributes[k] = v

    self.record_direct_tool_call = record_direct_tool_call
    self.load_tools_from_directory = load_tools_from_directory

    self.tool_registry = ToolRegistry()

    # Process tool list if provided
    if tools is not None:
        self.tool_registry.process_tools(tools)

    # Initialize tools and configuration
    self.tool_registry.initialize_tools(self.load_tools_from_directory)
    if load_tools_from_directory:
        self.tool_watcher = ToolWatcher(tool_registry=self.tool_registry)

    self.event_loop_metrics = EventLoopMetrics()

    # Initialize tracer instance (no-op if not configured)
    self.tracer = get_tracer()
    self.trace_span: trace_api.Span | None = None

    # Initialize agent state management
    if state is not None:
        if isinstance(state, dict):
            self.state = AgentState(state)
        elif isinstance(state, AgentState):
            self.state = state
        else:
            raise ValueError("state must be an AgentState object or a dict")
    else:
        self.state = AgentState()

    self.tool_caller = _ToolCaller(self)

    self.hooks = HookRegistry()

    self._interrupt_state = _InterruptState()

    # Initialize lock for guarding concurrent invocations
    # Using threading.Lock instead of asyncio.Lock because run_async() creates
    # separate event loops in different threads, so asyncio.Lock wouldn't work
    self._invocation_lock = threading.Lock()

    # In the future, we'll have a RetryStrategy base class but until
    # that API is determined we only allow ModelRetryStrategy
    if (
        retry_strategy is not None
        and not isinstance(retry_strategy, _DefaultRetryStrategySentinel)
        and type(retry_strategy) is not ModelRetryStrategy
    ):
        raise ValueError("retry_strategy must be an instance of ModelRetryStrategy")

    # If not provided (using the default), create a new ModelRetryStrategy instance
    # If explicitly set to None, disable retries (max_attempts=1 means no retries)
    # Otherwise use the passed retry_strategy
    if isinstance(retry_strategy, _DefaultRetryStrategySentinel):
        self._retry_strategy = ModelRetryStrategy(
            max_attempts=MAX_ATTEMPTS, max_delay=MAX_DELAY, initial_delay=INITIAL_DELAY
        )
    elif retry_strategy is None:
        # If no retry strategy is passed in, then we turn retries off
        self._retry_strategy = ModelRetryStrategy(max_attempts=1)
    else:
        self._retry_strategy = retry_strategy

    # Initialize session management functionality
    self._session_manager = session_manager
    if self._session_manager:
        self.hooks.add_hook(self._session_manager)

    # Allow conversation_managers to subscribe to hooks
    self.hooks.add_hook(self.conversation_manager)

    # Register retry strategy as a hook
    self.hooks.add_hook(self._retry_strategy)

    self.tool_executor = tool_executor or ConcurrentToolExecutor()

    if hooks:
        for hook in hooks:
            self.hooks.add_hook(hook)
    self.hooks.invoke_callbacks(AgentInitializedEvent(agent=self))

cleanup()

Clean up resources used by the agent.

This method cleans up all tool providers that require explicit cleanup, such as MCP clients. It should be called when the agent is no longer needed to ensure proper resource cleanup.

Note: This method uses a "belt and braces" approach with automatic cleanup through finalizers as a fallback, but explicit cleanup is recommended.

Source code in strands/agent/agent.py
558
559
560
561
562
563
564
565
566
567
568
def cleanup(self) -> None:
    """Clean up resources used by the agent.

    This method cleans up all tool providers that require explicit cleanup,
    such as MCP clients. It should be called when the agent is no longer needed
    to ensure proper resource cleanup.

    Note: This method uses a "belt and braces" approach with automatic cleanup
    through finalizers as a fallback, but explicit cleanup is recommended.
    """
    self.tool_registry.cleanup()

invoke_async(prompt=None, *, invocation_state=None, structured_output_model=None, structured_output_prompt=None, **kwargs) async

Process a natural language prompt through the agent's event loop.

This method implements the conversational interface with multiple input patterns: - String input: Simple text input - ContentBlock list: Multi-modal content blocks - Message list: Complete messages with roles - No input: Use existing conversation history

Parameters:

Name Type Description Default
prompt AgentInput

User input in various formats: - str: Simple text input - list[ContentBlock]: Multi-modal content blocks - list[Message]: Complete messages with roles - None: Use existing conversation history

None
invocation_state dict[str, Any] | None

Additional parameters to pass through the event loop.

None
structured_output_model type[BaseModel] | None

Pydantic model type(s) for structured output (overrides agent default).

None
structured_output_prompt str | None

Custom prompt for forcing structured output (overrides agent default).

None
**kwargs Any

Additional parameters to pass through the event loop.[Deprecating]

{}

Returns:

Name Type Description
Result AgentResult

object containing:

  • stop_reason: Why the event loop stopped (e.g., "end_turn", "max_tokens")
  • message: The final message from the model
  • metrics: Performance metrics from the event loop
  • state: The final state of the event loop
Source code in strands/agent/agent.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
async def invoke_async(
    self,
    prompt: AgentInput = None,
    *,
    invocation_state: dict[str, Any] | None = None,
    structured_output_model: type[BaseModel] | None = None,
    structured_output_prompt: str | None = None,
    **kwargs: Any,
) -> AgentResult:
    """Process a natural language prompt through the agent's event loop.

    This method implements the conversational interface with multiple input patterns:
    - String input: Simple text input
    - ContentBlock list: Multi-modal content blocks
    - Message list: Complete messages with roles
    - No input: Use existing conversation history

    Args:
        prompt: User input in various formats:
            - str: Simple text input
            - list[ContentBlock]: Multi-modal content blocks
            - list[Message]: Complete messages with roles
            - None: Use existing conversation history
        invocation_state: Additional parameters to pass through the event loop.
        structured_output_model: Pydantic model type(s) for structured output (overrides agent default).
        structured_output_prompt: Custom prompt for forcing structured output (overrides agent default).
        **kwargs: Additional parameters to pass through the event loop.[Deprecating]

    Returns:
        Result: object containing:

            - stop_reason: Why the event loop stopped (e.g., "end_turn", "max_tokens")
            - message: The final message from the model
            - metrics: Performance metrics from the event loop
            - state: The final state of the event loop
    """
    events = self.stream_async(
        prompt,
        invocation_state=invocation_state,
        structured_output_model=structured_output_model,
        structured_output_prompt=structured_output_prompt,
        **kwargs,
    )
    async for event in events:
        _ = event

    return cast(AgentResult, event["result"])

stream_async(prompt=None, *, invocation_state=None, structured_output_model=None, structured_output_prompt=None, **kwargs) async

Process a natural language prompt and yield events as an async iterator.

This method provides an asynchronous interface for streaming agent events with multiple input patterns: - String input: Simple text input - ContentBlock list: Multi-modal content blocks - Message list: Complete messages with roles - No input: Use existing conversation history

Parameters:

Name Type Description Default
prompt AgentInput

User input in various formats: - str: Simple text input - list[ContentBlock]: Multi-modal content blocks - list[Message]: Complete messages with roles - None: Use existing conversation history

None
invocation_state dict[str, Any] | None

Additional parameters to pass through the event loop.

None
structured_output_model type[BaseModel] | None

Pydantic model type(s) for structured output (overrides agent default).

None
structured_output_prompt str | None

Custom prompt for forcing structured output (overrides agent default).

None
**kwargs Any

Additional parameters to pass to the event loop.[Deprecating]

{}

Yields:

Type Description
AsyncIterator[Any]

An async iterator that yields events. Each event is a dictionary containing information about the current state of processing, such as:

  • data: Text content being generated
  • complete: Whether this is the final chunk
  • current_tool_use: Information about tools being executed
  • And other event data provided by the callback handler

Raises:

Type Description
ConcurrencyException

If another invocation is already in progress on this agent instance.

Exception

Any exceptions from the agent invocation will be propagated to the caller.

Example
async for event in agent.stream_async("Analyze this data"):
    if "data" in event:
        yield event["data"]
Source code in strands/agent/agent.py
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
async def stream_async(
    self,
    prompt: AgentInput = None,
    *,
    invocation_state: dict[str, Any] | None = None,
    structured_output_model: type[BaseModel] | None = None,
    structured_output_prompt: str | None = None,
    **kwargs: Any,
) -> AsyncIterator[Any]:
    """Process a natural language prompt and yield events as an async iterator.

    This method provides an asynchronous interface for streaming agent events with multiple input patterns:
    - String input: Simple text input
    - ContentBlock list: Multi-modal content blocks
    - Message list: Complete messages with roles
    - No input: Use existing conversation history

    Args:
        prompt: User input in various formats:
            - str: Simple text input
            - list[ContentBlock]: Multi-modal content blocks
            - list[Message]: Complete messages with roles
            - None: Use existing conversation history
        invocation_state: Additional parameters to pass through the event loop.
        structured_output_model: Pydantic model type(s) for structured output (overrides agent default).
        structured_output_prompt: Custom prompt for forcing structured output (overrides agent default).
        **kwargs: Additional parameters to pass to the event loop.[Deprecating]

    Yields:
        An async iterator that yields events. Each event is a dictionary containing
            information about the current state of processing, such as:

            - data: Text content being generated
            - complete: Whether this is the final chunk
            - current_tool_use: Information about tools being executed
            - And other event data provided by the callback handler

    Raises:
        ConcurrencyException: If another invocation is already in progress on this agent instance.
        Exception: Any exceptions from the agent invocation will be propagated to the caller.

    Example:
        ```python
        async for event in agent.stream_async("Analyze this data"):
            if "data" in event:
                yield event["data"]
        ```
    """
    # Acquire lock to prevent concurrent invocations
    # Using threading.Lock instead of asyncio.Lock because run_async() creates
    # separate event loops in different threads
    acquired = self._invocation_lock.acquire(blocking=False)
    if not acquired:
        raise ConcurrencyException(
            "Agent is already processing a request. Concurrent invocations are not supported."
        )

    try:
        self._interrupt_state.resume(prompt)

        self.event_loop_metrics.reset_usage_metrics()

        merged_state = {}
        if kwargs:
            warnings.warn("`**kwargs` parameter is deprecating, use `invocation_state` instead.", stacklevel=2)
            merged_state.update(kwargs)
            if invocation_state is not None:
                merged_state["invocation_state"] = invocation_state
        else:
            if invocation_state is not None:
                merged_state = invocation_state

        callback_handler = self.callback_handler
        if kwargs:
            callback_handler = kwargs.get("callback_handler", self.callback_handler)

        # Process input and get message to add (if any)
        messages = await self._convert_prompt_to_messages(prompt)

        self.trace_span = self._start_agent_trace_span(messages)

        with trace_api.use_span(self.trace_span):
            try:
                events = self._run_loop(messages, merged_state, structured_output_model, structured_output_prompt)

                async for event in events:
                    event.prepare(invocation_state=merged_state)

                    if event.is_callback_event:
                        as_dict = event.as_dict()
                        callback_handler(**as_dict)
                        yield as_dict

                result = AgentResult(*event["stop"])
                callback_handler(result=result)
                yield AgentResultEvent(result=result).as_dict()

                self._end_agent_trace_span(response=result)

            except Exception as e:
                self._end_agent_trace_span(error=e)
                raise

    finally:
        self._invocation_lock.release()

structured_output(output_model, prompt=None)

This method allows you to get structured output from the agent.

If you pass in a prompt, it will be used temporarily without adding it to the conversation history. If you don't pass in a prompt, it will use only the existing conversation history to respond.

For smaller models, you may want to use the optional prompt to add additional instructions to explicitly instruct the model to output the structured data.

Parameters:

Name Type Description Default
output_model type[T]

The output model (a JSON schema written as a Pydantic BaseModel) that the agent will use when responding.

required
prompt AgentInput

The prompt to use for the agent in various formats: - str: Simple text input - list[ContentBlock]: Multi-modal content blocks - list[Message]: Complete messages with roles - None: Use existing conversation history

None

Raises:

Type Description
ValueError

If no conversation history or prompt is provided.

Source code in strands/agent/agent.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
def structured_output(self, output_model: type[T], prompt: AgentInput = None) -> T:
    """This method allows you to get structured output from the agent.

    If you pass in a prompt, it will be used temporarily without adding it to the conversation history.
    If you don't pass in a prompt, it will use only the existing conversation history to respond.

    For smaller models, you may want to use the optional prompt to add additional instructions to explicitly
    instruct the model to output the structured data.

    Args:
        output_model: The output model (a JSON schema written as a Pydantic BaseModel)
            that the agent will use when responding.
        prompt: The prompt to use for the agent in various formats:
            - str: Simple text input
            - list[ContentBlock]: Multi-modal content blocks
            - list[Message]: Complete messages with roles
            - None: Use existing conversation history

    Raises:
        ValueError: If no conversation history or prompt is provided.
    """
    warnings.warn(
        "Agent.structured_output method is deprecated."
        " You should pass in `structured_output_model` directly into the agent invocation."
        " see: https://strandsagents.com/latest/documentation/docs/user-guide/concepts/agents/structured-output/",
        category=DeprecationWarning,
        stacklevel=2,
    )

    return run_async(lambda: self.structured_output_async(output_model, prompt))

structured_output_async(output_model, prompt=None) async

This method allows you to get structured output from the agent.

If you pass in a prompt, it will be used temporarily without adding it to the conversation history. If you don't pass in a prompt, it will use only the existing conversation history to respond.

For smaller models, you may want to use the optional prompt to add additional instructions to explicitly instruct the model to output the structured data.

Parameters:

Name Type Description Default
output_model type[T]

The output model (a JSON schema written as a Pydantic BaseModel) that the agent will use when responding.

required
prompt AgentInput

The prompt to use for the agent (will not be added to conversation history).

None

Raises:

Type Description
ValueError

If no conversation history or prompt is provided.

-

Source code in strands/agent/agent.py
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
async def structured_output_async(self, output_model: type[T], prompt: AgentInput = None) -> T:
    """This method allows you to get structured output from the agent.

    If you pass in a prompt, it will be used temporarily without adding it to the conversation history.
    If you don't pass in a prompt, it will use only the existing conversation history to respond.

    For smaller models, you may want to use the optional prompt to add additional instructions to explicitly
    instruct the model to output the structured data.

    Args:
        output_model: The output model (a JSON schema written as a Pydantic BaseModel)
            that the agent will use when responding.
        prompt: The prompt to use for the agent (will not be added to conversation history).

    Raises:
        ValueError: If no conversation history or prompt is provided.
    -
    """
    if self._interrupt_state.activated:
        raise RuntimeError("cannot call structured output during interrupt")

    warnings.warn(
        "Agent.structured_output_async method is deprecated."
        " You should pass in `structured_output_model` directly into the agent invocation."
        " see: https://strandsagents.com/latest/documentation/docs/user-guide/concepts/agents/structured-output/",
        category=DeprecationWarning,
        stacklevel=2,
    )
    await self.hooks.invoke_callbacks_async(BeforeInvocationEvent(agent=self, invocation_state={}))
    with self.tracer.tracer.start_as_current_span(
        "execute_structured_output", kind=trace_api.SpanKind.CLIENT
    ) as structured_output_span:
        try:
            if not self.messages and not prompt:
                raise ValueError("No conversation history or prompt provided")

            temp_messages: Messages = self.messages + await self._convert_prompt_to_messages(prompt)

            structured_output_span.set_attributes(
                {
                    "gen_ai.system": "strands-agents",
                    "gen_ai.agent.name": self.name,
                    "gen_ai.agent.id": self.agent_id,
                    "gen_ai.operation.name": "execute_structured_output",
                }
            )
            if self.system_prompt:
                structured_output_span.add_event(
                    "gen_ai.system.message",
                    attributes={"role": "system", "content": serialize([{"text": self.system_prompt}])},
                )
            for message in temp_messages:
                structured_output_span.add_event(
                    f"gen_ai.{message['role']}.message",
                    attributes={"role": message["role"], "content": serialize(message["content"])},
                )
            events = self.model.structured_output(output_model, temp_messages, system_prompt=self.system_prompt)
            async for event in events:
                if isinstance(event, TypedEvent):
                    event.prepare(invocation_state={})
                    if event.is_callback_event:
                        self.callback_handler(**event.as_dict())

            structured_output_span.add_event(
                "gen_ai.choice", attributes={"message": serialize(event["output"].model_dump())}
            )
            return event["output"]

        finally:
            await self.hooks.invoke_callbacks_async(AfterInvocationEvent(agent=self, invocation_state={}))

BeforeModelCallEvent dataclass

Bases: HookEvent

Event triggered before the model is invoked.

This event is fired just before the agent calls the model for inference, allowing hook providers to inspect or modify the messages and configuration that will be sent to the model.

Note: This event is not fired for invocations to structured_output.

Attributes:

Name Type Description
invocation_state dict[str, Any]

State and configuration passed through the agent invocation. This can include shared context for multi-agent coordination, request tracking, and dynamic configuration.

Source code in strands/hooks/events.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
@dataclass
class BeforeModelCallEvent(HookEvent):
    """Event triggered before the model is invoked.

    This event is fired just before the agent calls the model for inference,
    allowing hook providers to inspect or modify the messages and configuration
    that will be sent to the model.

    Note: This event is not fired for invocations to structured_output.

    Attributes:
        invocation_state: State and configuration passed through the agent invocation.
            This can include shared context for multi-agent coordination, request tracking,
            and dynamic configuration.
    """

    invocation_state: dict[str, Any] = field(default_factory=dict)

ContextWindowOverflowException

Bases: Exception

Exception raised when the context window is exceeded.

This exception is raised when the input to a model exceeds the maximum context window size that the model can handle. This typically occurs when the combined length of the conversation history, system prompt, and current message is too large for the model to process.

Source code in strands/types/exceptions.py
38
39
40
41
42
43
44
45
46
class ContextWindowOverflowException(Exception):
    """Exception raised when the context window is exceeded.

    This exception is raised when the input to a model exceeds the maximum context window size that the model can
    handle. This typically occurs when the combined length of the conversation history, system prompt, and current
    message is too large for the model to process.
    """

    pass

EventLoopException

Bases: Exception

Exception raised by the event loop.

Source code in strands/types/exceptions.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class EventLoopException(Exception):
    """Exception raised by the event loop."""

    def __init__(self, original_exception: Exception, request_state: Any = None) -> None:
        """Initialize exception.

        Args:
            original_exception: The original exception that was raised.
            request_state: The state of the request at the time of the exception.
        """
        self.original_exception = original_exception
        self.request_state = request_state if request_state is not None else {}
        super().__init__(str(original_exception))

__init__(original_exception, request_state=None)

Initialize exception.

Parameters:

Name Type Description Default
original_exception Exception

The original exception that was raised.

required
request_state Any

The state of the request at the time of the exception.

None
Source code in strands/types/exceptions.py
 9
10
11
12
13
14
15
16
17
18
def __init__(self, original_exception: Exception, request_state: Any = None) -> None:
    """Initialize exception.

    Args:
        original_exception: The original exception that was raised.
        request_state: The state of the request at the time of the exception.
    """
    self.original_exception = original_exception
    self.request_state = request_state if request_state is not None else {}
    super().__init__(str(original_exception))

EventLoopStopEvent

Bases: TypedEvent

Event emitted when the agent execution completes normally.

Source code in strands/types/_events.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
class EventLoopStopEvent(TypedEvent):
    """Event emitted when the agent execution completes normally."""

    def __init__(
        self,
        stop_reason: StopReason,
        message: Message,
        metrics: "EventLoopMetrics",
        request_state: Any,
        interrupts: Sequence[Interrupt] | None = None,
        structured_output: BaseModel | None = None,
    ) -> None:
        """Initialize with the final execution results.

        Args:
            stop_reason: Why the agent execution stopped
            message: Final message from the model
            metrics: Execution metrics and performance data
            request_state: Final state of the agent execution
            interrupts: Interrupts raised by user during agent execution.
            structured_output: Optional structured output result
        """
        super().__init__({"stop": (stop_reason, message, metrics, request_state, interrupts, structured_output)})

    @property
    @override
    def is_callback_event(self) -> bool:
        return False

__init__(stop_reason, message, metrics, request_state, interrupts=None, structured_output=None)

Initialize with the final execution results.

Parameters:

Name Type Description Default
stop_reason StopReason

Why the agent execution stopped

required
message Message

Final message from the model

required
metrics EventLoopMetrics

Execution metrics and performance data

required
request_state Any

Final state of the agent execution

required
interrupts Sequence[Interrupt] | None

Interrupts raised by user during agent execution.

None
structured_output BaseModel | None

Optional structured output result

None
Source code in strands/types/_events.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def __init__(
    self,
    stop_reason: StopReason,
    message: Message,
    metrics: "EventLoopMetrics",
    request_state: Any,
    interrupts: Sequence[Interrupt] | None = None,
    structured_output: BaseModel | None = None,
) -> None:
    """Initialize with the final execution results.

    Args:
        stop_reason: Why the agent execution stopped
        message: Final message from the model
        metrics: Execution metrics and performance data
        request_state: Final state of the agent execution
        interrupts: Interrupts raised by user during agent execution.
        structured_output: Optional structured output result
    """
    super().__init__({"stop": (stop_reason, message, metrics, request_state, interrupts, structured_output)})

ForceStopEvent

Bases: TypedEvent

Event emitted when the agent execution is forcibly stopped, either by a tool or by an exception.

Source code in strands/types/_events.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
class ForceStopEvent(TypedEvent):
    """Event emitted when the agent execution is forcibly stopped, either by a tool or by an exception."""

    def __init__(self, reason: str | Exception) -> None:
        """Initialize with the reason for forced stop.

        Args:
            reason: String description or exception that caused the forced stop
        """
        super().__init__(
            {
                "force_stop": True,
                "force_stop_reason": str(reason),
            }
        )

__init__(reason)

Initialize with the reason for forced stop.

Parameters:

Name Type Description Default
reason str | Exception

String description or exception that caused the forced stop

required
Source code in strands/types/_events.py
398
399
400
401
402
403
404
405
406
407
408
409
def __init__(self, reason: str | Exception) -> None:
    """Initialize with the reason for forced stop.

    Args:
        reason: String description or exception that caused the forced stop
    """
    super().__init__(
        {
            "force_stop": True,
            "force_stop_reason": str(reason),
        }
    )

MaxTokensReachedException

Bases: Exception

Exception raised when the model reaches its maximum token generation limit.

This exception is raised when the model stops generating tokens because it has reached the maximum number of tokens allowed for output generation. This can occur when the model's max_tokens parameter is set too low for the complexity of the response, or when the model naturally reaches its configured output limit during generation.

Source code in strands/types/exceptions.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class MaxTokensReachedException(Exception):
    """Exception raised when the model reaches its maximum token generation limit.

    This exception is raised when the model stops generating tokens because it has reached the maximum number of
    tokens allowed for output generation. This can occur when the model's max_tokens parameter is set too low for
    the complexity of the response, or when the model naturally reaches its configured output limit during generation.
    """

    def __init__(self, message: str):
        """Initialize the exception with an error message and the incomplete message object.

        Args:
            message: The error message describing the token limit issue
        """
        super().__init__(message)

__init__(message)

Initialize the exception with an error message and the incomplete message object.

Parameters:

Name Type Description Default
message str

The error message describing the token limit issue

required
Source code in strands/types/exceptions.py
29
30
31
32
33
34
35
def __init__(self, message: str):
    """Initialize the exception with an error message and the incomplete message object.

    Args:
        message: The error message describing the token limit issue
    """
    super().__init__(message)

Message

Bases: TypedDict

A message in a conversation with the agent.

Attributes:

Name Type Description
content list[ContentBlock]

The message content.

role Role

The role of the message sender.

Source code in strands/types/content.py
178
179
180
181
182
183
184
185
186
187
class Message(TypedDict):
    """A message in a conversation with the agent.

    Attributes:
        content: The message content.
        role: The role of the message sender.
    """

    content: list[ContentBlock]
    role: Role

MessageAddedEvent dataclass

Bases: HookEvent

Event triggered when a message is added to the agent's conversation.

This event is fired whenever the agent adds a new message to its internal message history, including user messages, assistant responses, and tool results. Hook providers can use this event for logging, monitoring, or implementing custom message processing logic.

Note: This event is only triggered for messages added by the framework itself, not for messages manually added by tools or external code.

Attributes:

Name Type Description
message Message

The message that was added to the conversation history.

Source code in strands/hooks/events.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@dataclass
class MessageAddedEvent(HookEvent):
    """Event triggered when a message is added to the agent's conversation.

    This event is fired whenever the agent adds a new message to its internal
    message history, including user messages, assistant responses, and tool
    results. Hook providers can use this event for logging, monitoring, or
    implementing custom message processing logic.

    Note: This event is only triggered for messages added by the framework
    itself, not for messages manually added by tools or external code.

    Attributes:
        message: The message that was added to the conversation history.
    """

    message: Message

ModelMessageEvent

Bases: TypedEvent

Event emitted when the model invocation has completed.

This event is fired whenever the model generates a response message that gets added to the conversation history.

Source code in strands/types/_events.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
class ModelMessageEvent(TypedEvent):
    """Event emitted when the model invocation has completed.

    This event is fired whenever the model generates a response message that
    gets added to the conversation history.
    """

    def __init__(self, message: Message) -> None:
        """Initialize with the model-generated message.

        Args:
            message: The response message from the model
        """
        super().__init__({"message": message})

__init__(message)

Initialize with the model-generated message.

Parameters:

Name Type Description Default
message Message

The response message from the model

required
Source code in strands/types/_events.py
369
370
371
372
373
374
375
def __init__(self, message: Message) -> None:
    """Initialize with the model-generated message.

    Args:
        message: The response message from the model
    """
    super().__init__({"message": message})

ModelRetryStrategy

Bases: HookProvider

Default retry strategy for model throttling with exponential backoff.

Retries model calls on ModelThrottledException using exponential backoff. Delay doubles after each attempt: initial_delay, initial_delay2, initial_delay4, etc., capped at max_delay. State resets after successful calls.

With defaults (initial_delay=4, max_delay=240, max_attempts=6), delays are: 4s → 8s → 16s → 32s → 64s (5 retries before giving up on the 6th attempt).

Parameters:

Name Type Description Default
max_attempts int

Total model attempts before re-raising the exception.

6
initial_delay int

Base delay in seconds; used for first two retries, then doubles.

4
max_delay int

Upper bound in seconds for the exponential backoff.

240
Source code in strands/event_loop/_retry.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class ModelRetryStrategy(HookProvider):
    """Default retry strategy for model throttling with exponential backoff.

    Retries model calls on ModelThrottledException using exponential backoff.
    Delay doubles after each attempt: initial_delay, initial_delay*2, initial_delay*4,
    etc., capped at max_delay. State resets after successful calls.

    With defaults (initial_delay=4, max_delay=240, max_attempts=6), delays are:
    4s → 8s → 16s → 32s → 64s (5 retries before giving up on the 6th attempt).

    Args:
        max_attempts: Total model attempts before re-raising the exception.
        initial_delay: Base delay in seconds; used for first two retries, then doubles.
        max_delay: Upper bound in seconds for the exponential backoff.
    """

    def __init__(
        self,
        *,
        max_attempts: int = 6,
        initial_delay: int = 4,
        max_delay: int = 240,
    ):
        """Initialize the retry strategy.

        Args:
            max_attempts: Total model attempts before re-raising the exception. Defaults to 6.
            initial_delay: Base delay in seconds; used for first two retries, then doubles.
                Defaults to 4.
            max_delay: Upper bound in seconds for the exponential backoff. Defaults to 240.
        """
        self._max_attempts = max_attempts
        self._initial_delay = initial_delay
        self._max_delay = max_delay
        self._current_attempt = 0
        self._backwards_compatible_event_to_yield: TypedEvent | None = None

    def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
        """Register callbacks for AfterModelCallEvent and AfterInvocationEvent.

        Args:
            registry: The hook registry to register callbacks with.
            **kwargs: Additional keyword arguments for future extensibility.
        """
        registry.add_callback(AfterModelCallEvent, self._handle_after_model_call)
        registry.add_callback(AfterInvocationEvent, self._handle_after_invocation)

    def _calculate_delay(self, attempt: int) -> int:
        """Calculate retry delay using exponential backoff.

        Args:
            attempt: The attempt number (0-indexed) to calculate delay for.

        Returns:
            Delay in seconds for the given attempt.
        """
        delay: int = self._initial_delay * (2**attempt)
        return min(delay, self._max_delay)

    def _reset_retry_state(self) -> None:
        """Reset retry state to initial values."""
        self._current_attempt = 0

    async def _handle_after_invocation(self, event: AfterInvocationEvent) -> None:
        """Reset retry state after invocation completes.

        Args:
            event: The AfterInvocationEvent signaling invocation completion.
        """
        self._reset_retry_state()

    async def _handle_after_model_call(self, event: AfterModelCallEvent) -> None:
        """Handle model call completion and determine if retry is needed.

        This callback is invoked after each model call. If the call failed with
        a ModelThrottledException and we haven't exceeded max_attempts, it sets
        event.retry to True and sleeps for the current delay before returning.

        On successful calls, it resets the retry state to prepare for future calls.

        Args:
            event: The AfterModelCallEvent containing call results or exception.
        """
        delay = self._calculate_delay(self._current_attempt)

        self._backwards_compatible_event_to_yield = None

        # If already retrying, skip processing (another hook may have triggered retry)
        if event.retry:
            return

        # If model call succeeded, reset retry state
        if event.stop_response is not None:
            logger.debug(
                "stop_reason=<%s> | model call succeeded, resetting retry state",
                event.stop_response.stop_reason,
            )
            self._reset_retry_state()
            return

        # Check if we have an exception and reset state if no exception
        if event.exception is None:
            self._reset_retry_state()
            return

        # Only retry on ModelThrottledException
        if not isinstance(event.exception, ModelThrottledException):
            return

        # Increment attempt counter first
        self._current_attempt += 1

        # Check if we've exceeded max attempts
        if self._current_attempt >= self._max_attempts:
            logger.debug(
                "current_attempt=<%d>, max_attempts=<%d> | max retry attempts reached, not retrying",
                self._current_attempt,
                self._max_attempts,
            )
            return

        self._backwards_compatible_event_to_yield = EventLoopThrottleEvent(delay=delay)

        # Retry the model call
        logger.debug(
            "retry_delay_seconds=<%s>, max_attempts=<%s>, current_attempt=<%s> "
            "| throttling exception encountered | delaying before next retry",
            delay,
            self._max_attempts,
            self._current_attempt,
        )

        # Sleep for current delay
        await asyncio.sleep(delay)

        # Set retry flag and track that this strategy triggered it
        event.retry = True

__init__(*, max_attempts=6, initial_delay=4, max_delay=240)

Initialize the retry strategy.

Parameters:

Name Type Description Default
max_attempts int

Total model attempts before re-raising the exception. Defaults to 6.

6
initial_delay int

Base delay in seconds; used for first two retries, then doubles. Defaults to 4.

4
max_delay int

Upper bound in seconds for the exponential backoff. Defaults to 240.

240
Source code in strands/event_loop/_retry.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def __init__(
    self,
    *,
    max_attempts: int = 6,
    initial_delay: int = 4,
    max_delay: int = 240,
):
    """Initialize the retry strategy.

    Args:
        max_attempts: Total model attempts before re-raising the exception. Defaults to 6.
        initial_delay: Base delay in seconds; used for first two retries, then doubles.
            Defaults to 4.
        max_delay: Upper bound in seconds for the exponential backoff. Defaults to 240.
    """
    self._max_attempts = max_attempts
    self._initial_delay = initial_delay
    self._max_delay = max_delay
    self._current_attempt = 0
    self._backwards_compatible_event_to_yield: TypedEvent | None = None

register_hooks(registry, **kwargs)

Register callbacks for AfterModelCallEvent and AfterInvocationEvent.

Parameters:

Name Type Description Default
registry HookRegistry

The hook registry to register callbacks with.

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}
Source code in strands/event_loop/_retry.py
58
59
60
61
62
63
64
65
66
def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
    """Register callbacks for AfterModelCallEvent and AfterInvocationEvent.

    Args:
        registry: The hook registry to register callbacks with.
        **kwargs: Additional keyword arguments for future extensibility.
    """
    registry.add_callback(AfterModelCallEvent, self._handle_after_model_call)
    registry.add_callback(AfterInvocationEvent, self._handle_after_invocation)

ModelStopReason

Bases: TypedEvent

Event emitted during reasoning signature streaming.

Source code in strands/types/_events.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class ModelStopReason(TypedEvent):
    """Event emitted during reasoning signature streaming."""

    def __init__(
        self,
        stop_reason: StopReason,
        message: Message,
        usage: Usage,
        metrics: Metrics,
    ) -> None:
        """Initialize with the final execution results.

        Args:
            stop_reason: Why the agent execution stopped
            message: Final message from the model
            usage: Usage information from the model
            metrics: Execution metrics and performance data
        """
        super().__init__({"stop": (stop_reason, message, usage, metrics)})

    @property
    @override
    def is_callback_event(self) -> bool:
        return False

__init__(stop_reason, message, usage, metrics)

Initialize with the final execution results.

Parameters:

Name Type Description Default
stop_reason StopReason

Why the agent execution stopped

required
message Message

Final message from the model

required
usage Usage

Usage information from the model

required
metrics Metrics

Execution metrics and performance data

required
Source code in strands/types/_events.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def __init__(
    self,
    stop_reason: StopReason,
    message: Message,
    usage: Usage,
    metrics: Metrics,
) -> None:
    """Initialize with the final execution results.

    Args:
        stop_reason: Why the agent execution stopped
        message: Final message from the model
        usage: Usage information from the model
        metrics: Execution metrics and performance data
    """
    super().__init__({"stop": (stop_reason, message, usage, metrics)})

StartEvent

Bases: TypedEvent

Event emitted at the start of each event loop cycle.

!!deprecated!! Use StartEventLoopEvent instead.

This event events the beginning of a new processing cycle within the agent's event loop. It's fired before model invocation and tool execution begin.

Source code in strands/types/_events.py
75
76
77
78
79
80
81
82
83
84
85
86
87
class StartEvent(TypedEvent):
    """Event emitted at the start of each event loop cycle.

    !!deprecated!!
        Use StartEventLoopEvent instead.

    This event events the beginning of a new processing cycle within the agent's
    event loop. It's fired before model invocation and tool execution begin.
    """

    def __init__(self) -> None:
        """Initialize the event loop start event."""
        super().__init__({"start": True})

__init__()

Initialize the event loop start event.

Source code in strands/types/_events.py
85
86
87
def __init__(self) -> None:
    """Initialize the event loop start event."""
    super().__init__({"start": True})

StartEventLoopEvent

Bases: TypedEvent

Event emitted when the event loop cycle begins processing.

This event is fired after StartEvent and indicates that the event loop has begun its core processing logic, including model invocation preparation.

Source code in strands/types/_events.py
90
91
92
93
94
95
96
97
98
99
class StartEventLoopEvent(TypedEvent):
    """Event emitted when the event loop cycle begins processing.

    This event is fired after StartEvent and indicates that the event loop
    has begun its core processing logic, including model invocation preparation.
    """

    def __init__(self) -> None:
        """Initialize the event loop processing start event."""
        super().__init__({"start_event_loop": True})

__init__()

Initialize the event loop processing start event.

Source code in strands/types/_events.py
97
98
99
def __init__(self) -> None:
    """Initialize the event loop processing start event."""
    super().__init__({"start_event_loop": True})

StructuredOutputContext

Per-invocation context for structured output execution.

Source code in strands/tools/structured_output/_structured_output_context.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class StructuredOutputContext:
    """Per-invocation context for structured output execution."""

    def __init__(
        self,
        structured_output_model: type[BaseModel] | None = None,
        structured_output_prompt: str | None = None,
    ):
        """Initialize a new structured output context.

        Args:
            structured_output_model: Optional Pydantic model type for structured output.
            structured_output_prompt: Optional custom prompt message to use when forcing structured output.
                Defaults to "You must format the previous response as structured output."
        """
        self.results: dict[str, BaseModel] = {}
        self.structured_output_model: type[BaseModel] | None = structured_output_model
        self.structured_output_tool: StructuredOutputTool | None = None
        self.forced_mode: bool = False
        self.force_attempted: bool = False
        self.tool_choice: ToolChoice | None = None
        self.stop_loop: bool = False
        self.expected_tool_name: str | None = None
        self.structured_output_prompt: str = structured_output_prompt or DEFAULT_STRUCTURED_OUTPUT_PROMPT

        if structured_output_model:
            self.structured_output_tool = StructuredOutputTool(structured_output_model)
            self.expected_tool_name = self.structured_output_tool.tool_name

    @property
    def is_enabled(self) -> bool:
        """Check if structured output is enabled for this context.

        Returns:
            True if a structured output model is configured, False otherwise.
        """
        return self.structured_output_model is not None

    def store_result(self, tool_use_id: str, result: BaseModel) -> None:
        """Store a validated structured output result.

        Args:
            tool_use_id: Unique identifier for the tool use.
            result: Validated Pydantic model instance.
        """
        self.results[tool_use_id] = result

    def get_result(self, tool_use_id: str) -> BaseModel | None:
        """Retrieve a stored structured output result.

        Args:
            tool_use_id: Unique identifier for the tool use.

        Returns:
            The validated Pydantic model instance, or None if not found.
        """
        return self.results.get(tool_use_id)

    def set_forced_mode(self, tool_choice: dict | None = None) -> None:
        """Mark this context as being in forced structured output mode.

        Args:
            tool_choice: Optional tool choice configuration.
        """
        if not self.is_enabled:
            return
        self.forced_mode = True
        self.force_attempted = True
        self.tool_choice = tool_choice or {"any": {}}

    def has_structured_output_tool(self, tool_uses: list[ToolUse]) -> bool:
        """Check if any tool uses are for the structured output tool.

        Args:
            tool_uses: List of tool use dictionaries to check.

        Returns:
            True if any tool use matches the expected structured output tool name,
            False if no structured output tool is present or expected.
        """
        if not self.expected_tool_name:
            return False
        return any(tool_use.get("name") == self.expected_tool_name for tool_use in tool_uses)

    def get_tool_spec(self) -> ToolSpec | None:
        """Get the tool specification for structured output.

        Returns:
            Tool specification, or None if no structured output model.
        """
        if self.structured_output_tool:
            return self.structured_output_tool.tool_spec
        return None

    def extract_result(self, tool_uses: list[ToolUse]) -> BaseModel | None:
        """Extract and remove structured output result from stored results.

        Args:
            tool_uses: List of tool use dictionaries from the current execution cycle.

        Returns:
            The structured output result if found, or None if no result available.
        """
        if not self.has_structured_output_tool(tool_uses):
            return None

        for tool_use in tool_uses:
            if tool_use.get("name") == self.expected_tool_name:
                tool_use_id = str(tool_use.get("toolUseId", ""))
                result = self.results.pop(tool_use_id, None)
                if result is not None:
                    logger.debug("Extracted structured output for %s", tool_use.get("name"))
                    return result
        return None

    def register_tool(self, registry: "ToolRegistry") -> None:
        """Register the structured output tool with the registry.

        Args:
            registry: The tool registry to register the tool with.
        """
        if self.structured_output_tool and self.structured_output_tool.tool_name not in registry.dynamic_tools:
            registry.register_dynamic_tool(self.structured_output_tool)
            logger.debug("Registered structured output tool: %s", self.structured_output_tool.tool_name)

    def cleanup(self, registry: "ToolRegistry") -> None:
        """Clean up the registered structured output tool from the registry.

        Args:
            registry: The tool registry to clean up the tool from.
        """
        if self.structured_output_tool and self.structured_output_tool.tool_name in registry.dynamic_tools:
            del registry.dynamic_tools[self.structured_output_tool.tool_name]
            logger.debug("Cleaned up structured output tool: %s", self.structured_output_tool.tool_name)

is_enabled property

Check if structured output is enabled for this context.

Returns:

Type Description
bool

True if a structured output model is configured, False otherwise.

__init__(structured_output_model=None, structured_output_prompt=None)

Initialize a new structured output context.

Parameters:

Name Type Description Default
structured_output_model type[BaseModel] | None

Optional Pydantic model type for structured output.

None
structured_output_prompt str | None

Optional custom prompt message to use when forcing structured output. Defaults to "You must format the previous response as structured output."

None
Source code in strands/tools/structured_output/_structured_output_context.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(
    self,
    structured_output_model: type[BaseModel] | None = None,
    structured_output_prompt: str | None = None,
):
    """Initialize a new structured output context.

    Args:
        structured_output_model: Optional Pydantic model type for structured output.
        structured_output_prompt: Optional custom prompt message to use when forcing structured output.
            Defaults to "You must format the previous response as structured output."
    """
    self.results: dict[str, BaseModel] = {}
    self.structured_output_model: type[BaseModel] | None = structured_output_model
    self.structured_output_tool: StructuredOutputTool | None = None
    self.forced_mode: bool = False
    self.force_attempted: bool = False
    self.tool_choice: ToolChoice | None = None
    self.stop_loop: bool = False
    self.expected_tool_name: str | None = None
    self.structured_output_prompt: str = structured_output_prompt or DEFAULT_STRUCTURED_OUTPUT_PROMPT

    if structured_output_model:
        self.structured_output_tool = StructuredOutputTool(structured_output_model)
        self.expected_tool_name = self.structured_output_tool.tool_name

cleanup(registry)

Clean up the registered structured output tool from the registry.

Parameters:

Name Type Description Default
registry ToolRegistry

The tool registry to clean up the tool from.

required
Source code in strands/tools/structured_output/_structured_output_context.py
144
145
146
147
148
149
150
151
152
def cleanup(self, registry: "ToolRegistry") -> None:
    """Clean up the registered structured output tool from the registry.

    Args:
        registry: The tool registry to clean up the tool from.
    """
    if self.structured_output_tool and self.structured_output_tool.tool_name in registry.dynamic_tools:
        del registry.dynamic_tools[self.structured_output_tool.tool_name]
        logger.debug("Cleaned up structured output tool: %s", self.structured_output_tool.tool_name)

extract_result(tool_uses)

Extract and remove structured output result from stored results.

Parameters:

Name Type Description Default
tool_uses list[ToolUse]

List of tool use dictionaries from the current execution cycle.

required

Returns:

Type Description
BaseModel | None

The structured output result if found, or None if no result available.

Source code in strands/tools/structured_output/_structured_output_context.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def extract_result(self, tool_uses: list[ToolUse]) -> BaseModel | None:
    """Extract and remove structured output result from stored results.

    Args:
        tool_uses: List of tool use dictionaries from the current execution cycle.

    Returns:
        The structured output result if found, or None if no result available.
    """
    if not self.has_structured_output_tool(tool_uses):
        return None

    for tool_use in tool_uses:
        if tool_use.get("name") == self.expected_tool_name:
            tool_use_id = str(tool_use.get("toolUseId", ""))
            result = self.results.pop(tool_use_id, None)
            if result is not None:
                logger.debug("Extracted structured output for %s", tool_use.get("name"))
                return result
    return None

get_result(tool_use_id)

Retrieve a stored structured output result.

Parameters:

Name Type Description Default
tool_use_id str

Unique identifier for the tool use.

required

Returns:

Type Description
BaseModel | None

The validated Pydantic model instance, or None if not found.

Source code in strands/tools/structured_output/_structured_output_context.py
66
67
68
69
70
71
72
73
74
75
def get_result(self, tool_use_id: str) -> BaseModel | None:
    """Retrieve a stored structured output result.

    Args:
        tool_use_id: Unique identifier for the tool use.

    Returns:
        The validated Pydantic model instance, or None if not found.
    """
    return self.results.get(tool_use_id)

get_tool_spec()

Get the tool specification for structured output.

Returns:

Type Description
ToolSpec | None

Tool specification, or None if no structured output model.

Source code in strands/tools/structured_output/_structured_output_context.py
103
104
105
106
107
108
109
110
111
def get_tool_spec(self) -> ToolSpec | None:
    """Get the tool specification for structured output.

    Returns:
        Tool specification, or None if no structured output model.
    """
    if self.structured_output_tool:
        return self.structured_output_tool.tool_spec
    return None

has_structured_output_tool(tool_uses)

Check if any tool uses are for the structured output tool.

Parameters:

Name Type Description Default
tool_uses list[ToolUse]

List of tool use dictionaries to check.

required

Returns:

Type Description
bool

True if any tool use matches the expected structured output tool name,

bool

False if no structured output tool is present or expected.

Source code in strands/tools/structured_output/_structured_output_context.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def has_structured_output_tool(self, tool_uses: list[ToolUse]) -> bool:
    """Check if any tool uses are for the structured output tool.

    Args:
        tool_uses: List of tool use dictionaries to check.

    Returns:
        True if any tool use matches the expected structured output tool name,
        False if no structured output tool is present or expected.
    """
    if not self.expected_tool_name:
        return False
    return any(tool_use.get("name") == self.expected_tool_name for tool_use in tool_uses)

register_tool(registry)

Register the structured output tool with the registry.

Parameters:

Name Type Description Default
registry ToolRegistry

The tool registry to register the tool with.

required
Source code in strands/tools/structured_output/_structured_output_context.py
134
135
136
137
138
139
140
141
142
def register_tool(self, registry: "ToolRegistry") -> None:
    """Register the structured output tool with the registry.

    Args:
        registry: The tool registry to register the tool with.
    """
    if self.structured_output_tool and self.structured_output_tool.tool_name not in registry.dynamic_tools:
        registry.register_dynamic_tool(self.structured_output_tool)
        logger.debug("Registered structured output tool: %s", self.structured_output_tool.tool_name)

set_forced_mode(tool_choice=None)

Mark this context as being in forced structured output mode.

Parameters:

Name Type Description Default
tool_choice dict | None

Optional tool choice configuration.

None
Source code in strands/tools/structured_output/_structured_output_context.py
77
78
79
80
81
82
83
84
85
86
87
def set_forced_mode(self, tool_choice: dict | None = None) -> None:
    """Mark this context as being in forced structured output mode.

    Args:
        tool_choice: Optional tool choice configuration.
    """
    if not self.is_enabled:
        return
    self.forced_mode = True
    self.force_attempted = True
    self.tool_choice = tool_choice or {"any": {}}

store_result(tool_use_id, result)

Store a validated structured output result.

Parameters:

Name Type Description Default
tool_use_id str

Unique identifier for the tool use.

required
result BaseModel

Validated Pydantic model instance.

required
Source code in strands/tools/structured_output/_structured_output_context.py
57
58
59
60
61
62
63
64
def store_result(self, tool_use_id: str, result: BaseModel) -> None:
    """Store a validated structured output result.

    Args:
        tool_use_id: Unique identifier for the tool use.
        result: Validated Pydantic model instance.
    """
    self.results[tool_use_id] = result

StructuredOutputEvent

Bases: TypedEvent

Event emitted when structured output is detected and processed.

Source code in strands/types/_events.py
248
249
250
251
252
253
254
255
256
257
class StructuredOutputEvent(TypedEvent):
    """Event emitted when structured output is detected and processed."""

    def __init__(self, structured_output: BaseModel) -> None:
        """Initialize with the structured output result.

        Args:
            structured_output: The parsed structured output instance
        """
        super().__init__({"structured_output": structured_output})

__init__(structured_output)

Initialize with the structured output result.

Parameters:

Name Type Description Default
structured_output BaseModel

The parsed structured output instance

required
Source code in strands/types/_events.py
251
252
253
254
255
256
257
def __init__(self, structured_output: BaseModel) -> None:
    """Initialize with the structured output result.

    Args:
        structured_output: The parsed structured output instance
    """
    super().__init__({"structured_output": structured_output})

StructuredOutputException

Bases: Exception

Exception raised when structured output validation fails after maximum retry attempts.

Source code in strands/types/exceptions.py
86
87
88
89
90
91
92
93
94
95
96
class StructuredOutputException(Exception):
    """Exception raised when structured output validation fails after maximum retry attempts."""

    def __init__(self, message: str):
        """Initialize the exception with details about the failure.

        Args:
            message: The error message describing the structured output failure
        """
        self.message = message
        super().__init__(message)

__init__(message)

Initialize the exception with details about the failure.

Parameters:

Name Type Description Default
message str

The error message describing the structured output failure

required
Source code in strands/types/exceptions.py
89
90
91
92
93
94
95
96
def __init__(self, message: str):
    """Initialize the exception with details about the failure.

    Args:
        message: The error message describing the structured output failure
    """
    self.message = message
    super().__init__(message)

ToolInterruptEvent

Bases: TypedEvent

Event emitted when a tool is interrupted.

Source code in strands/types/_events.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
class ToolInterruptEvent(TypedEvent):
    """Event emitted when a tool is interrupted."""

    def __init__(self, tool_use: ToolUse, interrupts: list[Interrupt]) -> None:
        """Set interrupt in the event payload."""
        super().__init__({"tool_interrupt_event": {"tool_use": tool_use, "interrupts": interrupts}})

    @property
    def tool_use_id(self) -> str:
        """The id of the tool interrupted."""
        return cast(ToolUse, cast(dict, self.get("tool_interrupt_event")).get("tool_use"))["toolUseId"]

    @property
    def interrupts(self) -> list[Interrupt]:
        """The interrupt instances."""
        return cast(list[Interrupt], self["tool_interrupt_event"]["interrupts"])

interrupts property

The interrupt instances.

tool_use_id property

The id of the tool interrupted.

__init__(tool_use, interrupts)

Set interrupt in the event payload.

Source code in strands/types/_events.py
347
348
349
def __init__(self, tool_use: ToolUse, interrupts: list[Interrupt]) -> None:
    """Set interrupt in the event payload."""
    super().__init__({"tool_interrupt_event": {"tool_use": tool_use, "interrupts": interrupts}})

ToolResult

Bases: TypedDict

Result of a tool execution.

Attributes:

Name Type Description
content list[ToolResultContent]

List of result content returned by the tool.

status ToolResultStatus

The status of the tool execution ("success" or "error").

toolUseId str

The unique identifier of the tool use request that produced this result.

Source code in strands/types/tools.py
88
89
90
91
92
93
94
95
96
97
98
99
class ToolResult(TypedDict):
    """Result of a tool execution.

    Attributes:
        content: List of result content returned by the tool.
        status: The status of the tool execution ("success" or "error").
        toolUseId: The unique identifier of the tool use request that produced this result.
    """

    content: list[ToolResultContent]
    status: ToolResultStatus
    toolUseId: str

ToolResultMessageEvent

Bases: TypedEvent

Event emitted when tool results are formatted as a message.

This event is fired when tool execution results are converted into a message format to be added to the conversation history. It provides access to the formatted message containing tool results.

Source code in strands/types/_events.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
class ToolResultMessageEvent(TypedEvent):
    """Event emitted when tool results are formatted as a message.

    This event is fired when tool execution results are converted into a
    message format to be added to the conversation history. It provides
    access to the formatted message containing tool results.
    """

    def __init__(self, message: Any) -> None:
        """Initialize with the model-generated message.

        Args:
            message: Message containing tool results for conversation history
        """
        super().__init__({"message": message})

__init__(message)

Initialize with the model-generated message.

Parameters:

Name Type Description Default
message Any

Message containing tool results for conversation history

required
Source code in strands/types/_events.py
386
387
388
389
390
391
392
def __init__(self, message: Any) -> None:
    """Initialize with the model-generated message.

    Args:
        message: Message containing tool results for conversation history
    """
    super().__init__({"message": message})

ToolUse

Bases: TypedDict

A request from the model to use a specific tool with the provided input.

Attributes:

Name Type Description
input Any

The input parameters for the tool. Can be any JSON-serializable type.

name str

The name of the tool to invoke.

toolUseId str

A unique identifier for this specific tool use request.

Source code in strands/types/tools.py
53
54
55
56
57
58
59
60
61
62
63
64
65
class ToolUse(TypedDict):
    """A request from the model to use a specific tool with the provided input.

    Attributes:
        input: The input parameters for the tool.
            Can be any JSON-serializable type.
        name: The name of the tool to invoke.
        toolUseId: A unique identifier for this specific tool use request.
    """

    input: Any
    name: str
    toolUseId: str

Trace

A trace representing a single operation or step in the execution flow.

Source code in strands/telemetry/metrics.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class Trace:
    """A trace representing a single operation or step in the execution flow."""

    def __init__(
        self,
        name: str,
        parent_id: str | None = None,
        start_time: float | None = None,
        raw_name: str | None = None,
        metadata: dict[str, Any] | None = None,
        message: Message | None = None,
    ) -> None:
        """Initialize a new trace.

        Args:
            name: Human-readable name of the operation being traced.
            parent_id: ID of the parent trace, if this is a child operation.
            start_time: Timestamp when the trace started.
                If not provided, the current time will be used.
            raw_name: System level name.
            metadata: Additional contextual information about the trace.
            message: Message associated with the trace.
        """
        self.id: str = str(uuid.uuid4())
        self.name: str = name
        self.raw_name: str | None = raw_name
        self.parent_id: str | None = parent_id
        self.start_time: float = start_time if start_time is not None else time.time()
        self.end_time: float | None = None
        self.children: list[Trace] = []
        self.metadata: dict[str, Any] = metadata or {}
        self.message: Message | None = message

    def end(self, end_time: float | None = None) -> None:
        """Mark the trace as complete with the given or current timestamp.

        Args:
            end_time: Timestamp to use as the end time.
                If not provided, the current time will be used.
        """
        self.end_time = end_time if end_time is not None else time.time()

    def add_child(self, child: "Trace") -> None:
        """Add a child trace to this trace.

        Args:
            child: The child trace to add.
        """
        self.children.append(child)

    def duration(self) -> float | None:
        """Calculate the duration of this trace.

        Returns:
            The duration in seconds, or None if the trace hasn't ended yet.
        """
        return None if self.end_time is None else self.end_time - self.start_time

    def add_message(self, message: Message) -> None:
        """Add a message to the trace.

        Args:
            message: The message to add.
        """
        self.message = message

    def to_dict(self) -> dict[str, Any]:
        """Convert the trace to a dictionary representation.

        Returns:
            A dictionary containing all trace information, suitable for serialization.
        """
        return {
            "id": self.id,
            "name": self.name,
            "raw_name": self.raw_name,
            "parent_id": self.parent_id,
            "start_time": self.start_time,
            "end_time": self.end_time,
            "duration": self.duration(),
            "children": [child.to_dict() for child in self.children],
            "metadata": self.metadata,
            "message": self.message,
        }

__init__(name, parent_id=None, start_time=None, raw_name=None, metadata=None, message=None)

Initialize a new trace.

Parameters:

Name Type Description Default
name str

Human-readable name of the operation being traced.

required
parent_id str | None

ID of the parent trace, if this is a child operation.

None
start_time float | None

Timestamp when the trace started. If not provided, the current time will be used.

None
raw_name str | None

System level name.

None
metadata dict[str, Any] | None

Additional contextual information about the trace.

None
message Message | None

Message associated with the trace.

None
Source code in strands/telemetry/metrics.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    name: str,
    parent_id: str | None = None,
    start_time: float | None = None,
    raw_name: str | None = None,
    metadata: dict[str, Any] | None = None,
    message: Message | None = None,
) -> None:
    """Initialize a new trace.

    Args:
        name: Human-readable name of the operation being traced.
        parent_id: ID of the parent trace, if this is a child operation.
        start_time: Timestamp when the trace started.
            If not provided, the current time will be used.
        raw_name: System level name.
        metadata: Additional contextual information about the trace.
        message: Message associated with the trace.
    """
    self.id: str = str(uuid.uuid4())
    self.name: str = name
    self.raw_name: str | None = raw_name
    self.parent_id: str | None = parent_id
    self.start_time: float = start_time if start_time is not None else time.time()
    self.end_time: float | None = None
    self.children: list[Trace] = []
    self.metadata: dict[str, Any] = metadata or {}
    self.message: Message | None = message

add_child(child)

Add a child trace to this trace.

Parameters:

Name Type Description Default
child Trace

The child trace to add.

required
Source code in strands/telemetry/metrics.py
63
64
65
66
67
68
69
def add_child(self, child: "Trace") -> None:
    """Add a child trace to this trace.

    Args:
        child: The child trace to add.
    """
    self.children.append(child)

add_message(message)

Add a message to the trace.

Parameters:

Name Type Description Default
message Message

The message to add.

required
Source code in strands/telemetry/metrics.py
79
80
81
82
83
84
85
def add_message(self, message: Message) -> None:
    """Add a message to the trace.

    Args:
        message: The message to add.
    """
    self.message = message

duration()

Calculate the duration of this trace.

Returns:

Type Description
float | None

The duration in seconds, or None if the trace hasn't ended yet.

Source code in strands/telemetry/metrics.py
71
72
73
74
75
76
77
def duration(self) -> float | None:
    """Calculate the duration of this trace.

    Returns:
        The duration in seconds, or None if the trace hasn't ended yet.
    """
    return None if self.end_time is None else self.end_time - self.start_time

end(end_time=None)

Mark the trace as complete with the given or current timestamp.

Parameters:

Name Type Description Default
end_time float | None

Timestamp to use as the end time. If not provided, the current time will be used.

None
Source code in strands/telemetry/metrics.py
54
55
56
57
58
59
60
61
def end(self, end_time: float | None = None) -> None:
    """Mark the trace as complete with the given or current timestamp.

    Args:
        end_time: Timestamp to use as the end time.
            If not provided, the current time will be used.
    """
    self.end_time = end_time if end_time is not None else time.time()

to_dict()

Convert the trace to a dictionary representation.

Returns:

Type Description
dict[str, Any]

A dictionary containing all trace information, suitable for serialization.

Source code in strands/telemetry/metrics.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def to_dict(self) -> dict[str, Any]:
    """Convert the trace to a dictionary representation.

    Returns:
        A dictionary containing all trace information, suitable for serialization.
    """
    return {
        "id": self.id,
        "name": self.name,
        "raw_name": self.raw_name,
        "parent_id": self.parent_id,
        "start_time": self.start_time,
        "end_time": self.end_time,
        "duration": self.duration(),
        "children": [child.to_dict() for child in self.children],
        "metadata": self.metadata,
        "message": self.message,
    }

Tracer

Handles OpenTelemetry tracing.

This class provides a simple interface for creating and managing traces, with support for sending to OTLP endpoints.

When the OTEL_EXPORTER_OTLP_ENDPOINT environment variable is set, traces are sent to the OTLP endpoint.

Both attributes are controlled by including "gen_ai_latest_experimental" or "gen_ai_tool_definitions", respectively, in the OTEL_SEMCONV_STABILITY_OPT_IN environment variable.

Source code in strands/telemetry/tracer.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
class Tracer:
    """Handles OpenTelemetry tracing.

    This class provides a simple interface for creating and managing traces,
    with support for sending to OTLP endpoints.

    When the OTEL_EXPORTER_OTLP_ENDPOINT environment variable is set, traces
    are sent to the OTLP endpoint.

    Both attributes are controlled by including "gen_ai_latest_experimental" or "gen_ai_tool_definitions",
    respectively, in the OTEL_SEMCONV_STABILITY_OPT_IN environment variable.
    """

    def __init__(self) -> None:
        """Initialize the tracer."""
        self.service_name = __name__
        self.tracer_provider: trace_api.TracerProvider | None = None
        self.tracer_provider = trace_api.get_tracer_provider()
        self.tracer = self.tracer_provider.get_tracer(self.service_name)
        ThreadingInstrumentor().instrument()

        # Read OTEL_SEMCONV_STABILITY_OPT_IN environment variable
        opt_in_values = self._parse_semconv_opt_in()
        ## To-do: should not set below attributes directly, use env var instead
        self.use_latest_genai_conventions = "gen_ai_latest_experimental" in opt_in_values
        self._include_tool_definitions = "gen_ai_tool_definitions" in opt_in_values

    def _parse_semconv_opt_in(self) -> set[str]:
        """Parse the OTEL_SEMCONV_STABILITY_OPT_IN environment variable.

        Returns:
            A set of opt-in values from the environment variable.
        """
        opt_in_env = os.getenv("OTEL_SEMCONV_STABILITY_OPT_IN", "")
        return {value.strip() for value in opt_in_env.split(",")}

    def _start_span(
        self,
        span_name: str,
        parent_span: Span | None = None,
        attributes: dict[str, AttributeValue] | None = None,
        span_kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL,
    ) -> Span:
        """Generic helper method to start a span with common attributes.

        Args:
            span_name: Name of the span to create
            parent_span: Optional parent span to link this span to
            attributes: Dictionary of attributes to set on the span
            span_kind: enum of OptenTelemetry SpanKind

        Returns:
            The created span, or None if tracing is not enabled
        """
        if not parent_span:
            parent_span = trace_api.get_current_span()

        context = None
        if parent_span and parent_span.is_recording() and parent_span != trace_api.INVALID_SPAN:
            context = trace_api.set_span_in_context(parent_span)

        span = self.tracer.start_span(name=span_name, context=context, kind=span_kind)

        # Set start time as a common attribute
        span.set_attribute("gen_ai.event.start_time", datetime.now(timezone.utc).isoformat())

        # Add all provided attributes
        if attributes:
            self._set_attributes(span, attributes)

        return span

    def _set_attributes(self, span: Span, attributes: dict[str, AttributeValue]) -> None:
        """Set attributes on a span, handling different value types appropriately.

        Args:
            span: The span to set attributes on
            attributes: Dictionary of attributes to set
        """
        if not span:
            return

        for key, value in attributes.items():
            span.set_attribute(key, value)

    def _add_optional_usage_and_metrics_attributes(
        self, attributes: dict[str, AttributeValue], usage: Usage, metrics: Metrics
    ) -> None:
        """Add optional usage and metrics attributes if they have values.

        Args:
            attributes: Dictionary to add attributes to
            usage: Token usage information from the model call
            metrics: Metrics from the model call
        """
        if "cacheReadInputTokens" in usage:
            attributes["gen_ai.usage.cache_read_input_tokens"] = usage["cacheReadInputTokens"]

        if "cacheWriteInputTokens" in usage:
            attributes["gen_ai.usage.cache_write_input_tokens"] = usage["cacheWriteInputTokens"]

        if metrics.get("timeToFirstByteMs", 0) > 0:
            attributes["gen_ai.server.time_to_first_token"] = metrics["timeToFirstByteMs"]

        if metrics.get("latencyMs", 0) > 0:
            attributes["gen_ai.server.request.duration"] = metrics["latencyMs"]

    def _end_span(
        self,
        span: Span,
        attributes: dict[str, AttributeValue] | None = None,
        error: Exception | None = None,
    ) -> None:
        """Generic helper method to end a span.

        Args:
            span: The span to end
            attributes: Optional attributes to set before ending the span
            error: Optional exception if an error occurred
        """
        if not span:
            return

        try:
            # Set end time as a common attribute
            span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat())

            # Add any additional attributes
            if attributes:
                self._set_attributes(span, attributes)

            # Handle error if present
            if error:
                span.set_status(StatusCode.ERROR, str(error))
                span.record_exception(error)
            else:
                span.set_status(StatusCode.OK)
        except Exception as e:
            logger.warning("error=<%s> | error while ending span", e, exc_info=True)
        finally:
            span.end()
            # Force flush to ensure spans are exported
            if self.tracer_provider and hasattr(self.tracer_provider, "force_flush"):
                try:
                    self.tracer_provider.force_flush()
                except Exception as e:
                    logger.warning("error=<%s> | failed to force flush tracer provider", e)

    def end_span_with_error(self, span: Span, error_message: str, exception: Exception | None = None) -> None:
        """End a span with error status.

        Args:
            span: The span to end.
            error_message: Error message to set in the span status.
            exception: Optional exception to record in the span.
        """
        if not span:
            return

        error = exception or Exception(error_message)
        self._end_span(span, error=error)

    def _add_event(self, span: Span | None, event_name: str, event_attributes: Attributes) -> None:
        """Add an event with attributes to a span.

        Args:
            span: The span to add the event to
            event_name: Name of the event
            event_attributes: Dictionary of attributes to set on the event
        """
        if not span:
            return

        span.add_event(event_name, attributes=event_attributes)

    def _get_event_name_for_message(self, message: Message) -> str:
        """Determine the appropriate OpenTelemetry event name for a message.

        According to OpenTelemetry semantic conventions v1.36.0, messages containing tool results
        should be labeled as 'gen_ai.tool.message' regardless of their role field.
        This ensures proper categorization of tool responses in traces.

        Note: The GenAI namespace is experimental and may change in future versions.

        Reference: https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/gen-ai/gen-ai-events.md#event-gen_aitoolmessage

        Args:
            message: The message to determine the event name for

        Returns:
            The OpenTelemetry event name (e.g., 'gen_ai.user.message', 'gen_ai.tool.message')
        """
        # Check if the message contains a tool result
        for content_block in message.get("content", []):
            if "toolResult" in content_block:
                return "gen_ai.tool.message"

        return f"gen_ai.{message['role']}.message"

    def start_model_invoke_span(
        self,
        messages: Messages,
        parent_span: Span | None = None,
        model_id: str | None = None,
        custom_trace_attributes: Mapping[str, AttributeValue] | None = None,
        **kwargs: Any,
    ) -> Span:
        """Start a new span for a model invocation.

        Args:
            messages: Messages being sent to the model.
            parent_span: Optional parent span to link this span to.
            model_id: Optional identifier for the model being invoked.
            custom_trace_attributes: Optional mapping of custom trace attributes to include in the span.
            **kwargs: Additional attributes to add to the span.

        Returns:
            The created span, or None if tracing is not enabled.
        """
        attributes: dict[str, AttributeValue] = self._get_common_attributes(operation_name="chat")

        if custom_trace_attributes:
            attributes.update(custom_trace_attributes)

        if model_id:
            attributes["gen_ai.request.model"] = model_id

        # Add additional kwargs as attributes
        attributes.update({k: v for k, v in kwargs.items() if isinstance(v, (str, int, float, bool))})

        span = self._start_span("chat", parent_span, attributes=attributes, span_kind=trace_api.SpanKind.INTERNAL)
        self._add_event_messages(span, messages)

        return span

    def end_model_invoke_span(
        self,
        span: Span,
        message: Message,
        usage: Usage,
        metrics: Metrics,
        stop_reason: StopReason,
    ) -> None:
        """End a model invocation span with results and metrics.

        Note: The span is automatically closed and exceptions recorded. This method just sets the necessary attributes.
        Status in the span is automatically set to UNSET (OK) on success or ERROR on exception.

        Args:
            span: The span to set attributes on.
            message: The message response from the model.
            usage: Token usage information from the model call.
            metrics: Metrics from the model call.
            stop_reason: The reason the model stopped generating.
        """
        # Set end time attribute
        span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat())

        attributes: dict[str, AttributeValue] = {
            "gen_ai.usage.prompt_tokens": usage["inputTokens"],
            "gen_ai.usage.input_tokens": usage["inputTokens"],
            "gen_ai.usage.completion_tokens": usage["outputTokens"],
            "gen_ai.usage.output_tokens": usage["outputTokens"],
            "gen_ai.usage.total_tokens": usage["totalTokens"],
        }

        # Add optional attributes if they have values
        self._add_optional_usage_and_metrics_attributes(attributes, usage, metrics)

        if self.use_latest_genai_conventions:
            self._add_event(
                span,
                "gen_ai.client.inference.operation.details",
                {
                    "gen_ai.output.messages": serialize(
                        [
                            {
                                "role": message["role"],
                                "parts": self._map_content_blocks_to_otel_parts(message["content"]),
                                "finish_reason": str(stop_reason),
                            }
                        ]
                    ),
                },
            )
        else:
            self._add_event(
                span,
                "gen_ai.choice",
                event_attributes={"finish_reason": str(stop_reason), "message": serialize(message["content"])},
            )

        self._set_attributes(span, attributes)

    def start_tool_call_span(
        self,
        tool: ToolUse,
        parent_span: Span | None = None,
        custom_trace_attributes: Mapping[str, AttributeValue] | None = None,
        **kwargs: Any,
    ) -> Span:
        """Start a new span for a tool call.

        Args:
            tool: The tool being used.
            parent_span: Optional parent span to link this span to.
            custom_trace_attributes: Optional mapping of custom trace attributes to include in the span.
            **kwargs: Additional attributes to add to the span.

        Returns:
            The created span, or None if tracing is not enabled.
        """
        attributes: dict[str, AttributeValue] = self._get_common_attributes(operation_name="execute_tool")
        attributes.update(
            {
                "gen_ai.tool.name": tool["name"],
                "gen_ai.tool.call.id": tool["toolUseId"],
            }
        )

        if custom_trace_attributes:
            attributes.update(custom_trace_attributes)
        # Add additional kwargs as attributes
        attributes.update(kwargs)

        span_name = f"execute_tool {tool['name']}"
        span = self._start_span(span_name, parent_span, attributes=attributes, span_kind=trace_api.SpanKind.INTERNAL)

        if self.use_latest_genai_conventions:
            self._add_event(
                span,
                "gen_ai.client.inference.operation.details",
                {
                    "gen_ai.input.messages": serialize(
                        [
                            {
                                "role": "tool",
                                "parts": [
                                    {
                                        "type": "tool_call",
                                        "name": tool["name"],
                                        "id": tool["toolUseId"],
                                        "arguments": tool["input"],
                                    }
                                ],
                            }
                        ]
                    )
                },
            )
        else:
            self._add_event(
                span,
                "gen_ai.tool.message",
                event_attributes={
                    "role": "tool",
                    "content": serialize(tool["input"]),
                    "id": tool["toolUseId"],
                },
            )

        return span

    def end_tool_call_span(self, span: Span, tool_result: ToolResult | None, error: Exception | None = None) -> None:
        """End a tool call span with results.

        Args:
            span: The span to end.
            tool_result: The result from the tool execution.
            error: Optional exception if the tool call failed.
        """
        attributes: dict[str, AttributeValue] = {}
        if tool_result is not None:
            status = tool_result.get("status")
            status_str = str(status) if status is not None else ""

            attributes.update(
                {
                    "gen_ai.tool.status": status_str,
                }
            )

            if self.use_latest_genai_conventions:
                self._add_event(
                    span,
                    "gen_ai.client.inference.operation.details",
                    {
                        "gen_ai.output.messages": serialize(
                            [
                                {
                                    "role": "tool",
                                    "parts": [
                                        {
                                            "type": "tool_call_response",
                                            "id": tool_result.get("toolUseId", ""),
                                            "response": tool_result.get("content"),
                                        }
                                    ],
                                }
                            ]
                        )
                    },
                )
            else:
                self._add_event(
                    span,
                    "gen_ai.choice",
                    event_attributes={
                        "message": serialize(tool_result.get("content")),
                        "id": tool_result.get("toolUseId", ""),
                    },
                )

        self._end_span(span, attributes, error)

    def start_event_loop_cycle_span(
        self,
        invocation_state: Any,
        messages: Messages,
        parent_span: Span | None = None,
        custom_trace_attributes: Mapping[str, AttributeValue] | None = None,
        **kwargs: Any,
    ) -> Span:
        """Start a new span for an event loop cycle.

        Args:
            invocation_state: Arguments for the event loop cycle.
            parent_span: Optional parent span to link this span to.
            messages:  Messages being processed in this cycle.
            custom_trace_attributes: Optional mapping of custom trace attributes to include in the span.
            **kwargs: Additional attributes to add to the span.

        Returns:
            The created span, or None if tracing is not enabled.
        """
        event_loop_cycle_id = str(invocation_state.get("event_loop_cycle_id"))
        parent_span = parent_span if parent_span else invocation_state.get("event_loop_parent_span")

        attributes: dict[str, AttributeValue] = {
            "event_loop.cycle_id": event_loop_cycle_id,
        }

        if custom_trace_attributes:
            attributes.update(custom_trace_attributes)

        if "event_loop_parent_cycle_id" in invocation_state:
            attributes["event_loop.parent_cycle_id"] = str(invocation_state["event_loop_parent_cycle_id"])

        # Add additional kwargs as attributes
        attributes.update({k: v for k, v in kwargs.items() if isinstance(v, (str, int, float, bool))})

        span_name = "execute_event_loop_cycle"
        span = self._start_span(span_name, parent_span, attributes)
        self._add_event_messages(span, messages)

        return span

    def end_event_loop_cycle_span(
        self,
        span: Span,
        message: Message,
        tool_result_message: Message | None = None,
    ) -> None:
        """End an event loop cycle span with results.

        Note: The span is automatically closed and exceptions recorded. This method just sets the necessary attributes.
        Status in the span is automatically set to UNSET (OK) on success or ERROR on exception.

        Args:
            span: The span to set attributes on.
            message: The message response from this cycle.
            tool_result_message: Optional tool result message if a tool was called.
        """
        if not span:
            return

        # Set end time attribute
        span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat())

        event_attributes: dict[str, AttributeValue] = {"message": serialize(message["content"])}

        if tool_result_message:
            event_attributes["tool.result"] = serialize(tool_result_message["content"])

            if self.use_latest_genai_conventions:
                self._add_event(
                    span,
                    "gen_ai.client.inference.operation.details",
                    {
                        "gen_ai.output.messages": serialize(
                            [
                                {
                                    "role": tool_result_message["role"],
                                    "parts": self._map_content_blocks_to_otel_parts(tool_result_message["content"]),
                                }
                            ]
                        )
                    },
                )
            else:
                self._add_event(span, "gen_ai.choice", event_attributes=event_attributes)

    def start_agent_span(
        self,
        messages: Messages,
        agent_name: str,
        model_id: str | None = None,
        tools: list | None = None,
        custom_trace_attributes: Mapping[str, AttributeValue] | None = None,
        tools_config: dict | None = None,
        **kwargs: Any,
    ) -> Span:
        """Start a new span for an agent invocation.

        Args:
            messages: List of messages being sent to the agent.
            agent_name: Name of the agent.
            model_id: Optional model identifier.
            tools: Optional list of tools being used.
            custom_trace_attributes: Optional mapping of custom trace attributes to include in the span.
            tools_config: Optional dictionary of tool configurations.
            **kwargs: Additional attributes to add to the span.

        Returns:
            The created span, or None if tracing is not enabled.
        """
        attributes: dict[str, AttributeValue] = self._get_common_attributes(operation_name="invoke_agent")
        attributes.update(
            {
                "gen_ai.agent.name": agent_name,
            }
        )

        if model_id:
            attributes["gen_ai.request.model"] = model_id

        if tools:
            attributes["gen_ai.agent.tools"] = serialize(tools)

        if self._include_tool_definitions and tools_config:
            try:
                tool_definitions = self._construct_tool_definitions(tools_config)
                attributes["gen_ai.tool.definitions"] = serialize(tool_definitions)
            except Exception:
                # A failure in telemetry should not crash the agent
                logger.warning("failed to attach tool metadata to agent span", exc_info=True)

        # Add custom trace attributes if provided
        if custom_trace_attributes:
            attributes.update(custom_trace_attributes)

        # Add additional kwargs as attributes
        attributes.update({k: v for k, v in kwargs.items() if isinstance(v, (str, int, float, bool))})

        span = self._start_span(
            f"invoke_agent {agent_name}", attributes=attributes, span_kind=trace_api.SpanKind.INTERNAL
        )
        self._add_event_messages(span, messages)

        return span

    def end_agent_span(
        self,
        span: Span,
        response: AgentResult | None = None,
        error: Exception | None = None,
    ) -> None:
        """End an agent span with results and metrics.

        Args:
            span: The span to end.
            response: The response from the agent.
            error: Any error that occurred.
        """
        attributes: dict[str, AttributeValue] = {}

        if response:
            if self.use_latest_genai_conventions:
                self._add_event(
                    span,
                    "gen_ai.client.inference.operation.details",
                    {
                        "gen_ai.output.messages": serialize(
                            [
                                {
                                    "role": "assistant",
                                    "parts": [{"type": "text", "content": str(response)}],
                                    "finish_reason": str(response.stop_reason),
                                }
                            ]
                        )
                    },
                )
            else:
                self._add_event(
                    span,
                    "gen_ai.choice",
                    event_attributes={"message": str(response), "finish_reason": str(response.stop_reason)},
                )

            if hasattr(response, "metrics") and hasattr(response.metrics, "accumulated_usage"):
                if "langfuse" in os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "") or "langfuse" in os.getenv(
                    "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", ""
                ):
                    attributes.update({"langfuse.observation.type": "span"})
                accumulated_usage = response.metrics.accumulated_usage
                attributes.update(
                    {
                        "gen_ai.usage.prompt_tokens": accumulated_usage["inputTokens"],
                        "gen_ai.usage.completion_tokens": accumulated_usage["outputTokens"],
                        "gen_ai.usage.input_tokens": accumulated_usage["inputTokens"],
                        "gen_ai.usage.output_tokens": accumulated_usage["outputTokens"],
                        "gen_ai.usage.total_tokens": accumulated_usage["totalTokens"],
                        "gen_ai.usage.cache_read_input_tokens": accumulated_usage.get("cacheReadInputTokens", 0),
                        "gen_ai.usage.cache_write_input_tokens": accumulated_usage.get("cacheWriteInputTokens", 0),
                    }
                )

        self._end_span(span, attributes, error)

    def _construct_tool_definitions(self, tools_config: dict) -> list[dict[str, Any]]:
        """Constructs a list of tool definitions from the provided tools_config."""
        return [
            {
                "name": name,
                "description": spec.get("description"),
                "inputSchema": spec.get("inputSchema"),
                "outputSchema": spec.get("outputSchema"),
            }
            for name, spec in tools_config.items()
        ]

    def start_multiagent_span(
        self,
        task: MultiAgentInput,
        instance: str,
        custom_trace_attributes: Mapping[str, AttributeValue] | None = None,
    ) -> Span:
        """Start a new span for swarm invocation."""
        operation = f"invoke_{instance}"
        attributes: dict[str, AttributeValue] = self._get_common_attributes(operation)
        attributes.update(
            {
                "gen_ai.agent.name": instance,
            }
        )

        if custom_trace_attributes:
            attributes.update(custom_trace_attributes)

        span = self._start_span(operation, attributes=attributes, span_kind=trace_api.SpanKind.CLIENT)

        if self.use_latest_genai_conventions:
            parts: list[dict[str, Any]] = []
            if isinstance(task, list):
                parts = self._map_content_blocks_to_otel_parts(task)
            else:
                parts = [{"type": "text", "content": task}]
            self._add_event(
                span,
                "gen_ai.client.inference.operation.details",
                {"gen_ai.input.messages": serialize([{"role": "user", "parts": parts}])},
            )
        else:
            self._add_event(
                span,
                "gen_ai.user.message",
                event_attributes={"content": serialize(task) if isinstance(task, list) else task},
            )

        return span

    def end_swarm_span(
        self,
        span: Span,
        result: str | None = None,
    ) -> None:
        """End a swarm span with results."""
        if result:
            if self.use_latest_genai_conventions:
                self._add_event(
                    span,
                    "gen_ai.client.inference.operation.details",
                    {
                        "gen_ai.output.messages": serialize(
                            [
                                {
                                    "role": "assistant",
                                    "parts": [{"type": "text", "content": result}],
                                }
                            ]
                        )
                    },
                )
            else:
                self._add_event(
                    span,
                    "gen_ai.choice",
                    event_attributes={"message": result},
                )

    def _get_common_attributes(
        self,
        operation_name: str,
    ) -> dict[str, AttributeValue]:
        """Returns a dictionary of common attributes based on the convention version used.

        Args:
            operation_name: The name of the operation.

        Returns:
            A dictionary of attributes following the appropriate GenAI conventions.
        """
        common_attributes = {"gen_ai.operation.name": operation_name}
        if self.use_latest_genai_conventions:
            common_attributes.update(
                {
                    "gen_ai.provider.name": "strands-agents",
                }
            )
        else:
            common_attributes.update(
                {
                    "gen_ai.system": "strands-agents",
                }
            )
        return dict(common_attributes)

    def _add_event_messages(self, span: Span, messages: Messages) -> None:
        """Adds messages as event to the provided span based on the current GenAI conventions.

        Args:
            span: The span to which events will be added.
            messages: List of messages being sent to the agent.
        """
        if self.use_latest_genai_conventions:
            input_messages: list = []
            for message in messages:
                input_messages.append(
                    {"role": message["role"], "parts": self._map_content_blocks_to_otel_parts(message["content"])}
                )
            self._add_event(
                span, "gen_ai.client.inference.operation.details", {"gen_ai.input.messages": serialize(input_messages)}
            )
        else:
            for message in messages:
                self._add_event(
                    span,
                    self._get_event_name_for_message(message),
                    {"content": serialize(message["content"])},
                )

    def _map_content_blocks_to_otel_parts(
        self, content_blocks: list[ContentBlock] | list[InterruptResponseContent]
    ) -> list[dict[str, Any]]:
        """Map content blocks to OpenTelemetry parts format."""
        parts: list[dict[str, Any]] = []

        for block in cast(list[dict[str, Any]], content_blocks):
            if "interruptResponse" in block:
                interrupt_response = block["interruptResponse"]
                parts.append(
                    {
                        "type": "interrupt_response",
                        "id": interrupt_response["interruptId"],
                        "response": interrupt_response["response"],
                    },
                )
            elif "text" in block:
                # Standard TextPart
                parts.append({"type": "text", "content": block["text"]})
            elif "toolUse" in block:
                # Standard ToolCallRequestPart
                tool_use = block["toolUse"]
                parts.append(
                    {
                        "type": "tool_call",
                        "name": tool_use["name"],
                        "id": tool_use["toolUseId"],
                        "arguments": tool_use["input"],
                    }
                )
            elif "toolResult" in block:
                # Standard ToolCallResponsePart
                tool_result = block["toolResult"]
                parts.append(
                    {
                        "type": "tool_call_response",
                        "id": tool_result["toolUseId"],
                        "response": tool_result["content"],
                    }
                )
            else:
                # For all other ContentBlock types, use the key as type and value as content
                for key, value in block.items():
                    parts.append({"type": key, "content": value})
        return parts

__init__()

Initialize the tracer.

Source code in strands/telemetry/tracer.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(self) -> None:
    """Initialize the tracer."""
    self.service_name = __name__
    self.tracer_provider: trace_api.TracerProvider | None = None
    self.tracer_provider = trace_api.get_tracer_provider()
    self.tracer = self.tracer_provider.get_tracer(self.service_name)
    ThreadingInstrumentor().instrument()

    # Read OTEL_SEMCONV_STABILITY_OPT_IN environment variable
    opt_in_values = self._parse_semconv_opt_in()
    ## To-do: should not set below attributes directly, use env var instead
    self.use_latest_genai_conventions = "gen_ai_latest_experimental" in opt_in_values
    self._include_tool_definitions = "gen_ai_tool_definitions" in opt_in_values

end_agent_span(span, response=None, error=None)

End an agent span with results and metrics.

Parameters:

Name Type Description Default
span Span

The span to end.

required
response AgentResult | None

The response from the agent.

None
error Exception | None

Any error that occurred.

None
Source code in strands/telemetry/tracer.py
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
def end_agent_span(
    self,
    span: Span,
    response: AgentResult | None = None,
    error: Exception | None = None,
) -> None:
    """End an agent span with results and metrics.

    Args:
        span: The span to end.
        response: The response from the agent.
        error: Any error that occurred.
    """
    attributes: dict[str, AttributeValue] = {}

    if response:
        if self.use_latest_genai_conventions:
            self._add_event(
                span,
                "gen_ai.client.inference.operation.details",
                {
                    "gen_ai.output.messages": serialize(
                        [
                            {
                                "role": "assistant",
                                "parts": [{"type": "text", "content": str(response)}],
                                "finish_reason": str(response.stop_reason),
                            }
                        ]
                    )
                },
            )
        else:
            self._add_event(
                span,
                "gen_ai.choice",
                event_attributes={"message": str(response), "finish_reason": str(response.stop_reason)},
            )

        if hasattr(response, "metrics") and hasattr(response.metrics, "accumulated_usage"):
            if "langfuse" in os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "") or "langfuse" in os.getenv(
                "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", ""
            ):
                attributes.update({"langfuse.observation.type": "span"})
            accumulated_usage = response.metrics.accumulated_usage
            attributes.update(
                {
                    "gen_ai.usage.prompt_tokens": accumulated_usage["inputTokens"],
                    "gen_ai.usage.completion_tokens": accumulated_usage["outputTokens"],
                    "gen_ai.usage.input_tokens": accumulated_usage["inputTokens"],
                    "gen_ai.usage.output_tokens": accumulated_usage["outputTokens"],
                    "gen_ai.usage.total_tokens": accumulated_usage["totalTokens"],
                    "gen_ai.usage.cache_read_input_tokens": accumulated_usage.get("cacheReadInputTokens", 0),
                    "gen_ai.usage.cache_write_input_tokens": accumulated_usage.get("cacheWriteInputTokens", 0),
                }
            )

    self._end_span(span, attributes, error)

end_event_loop_cycle_span(span, message, tool_result_message=None)

End an event loop cycle span with results.

Note: The span is automatically closed and exceptions recorded. This method just sets the necessary attributes. Status in the span is automatically set to UNSET (OK) on success or ERROR on exception.

Parameters:

Name Type Description Default
span Span

The span to set attributes on.

required
message Message

The message response from this cycle.

required
tool_result_message Message | None

Optional tool result message if a tool was called.

None
Source code in strands/telemetry/tracer.py
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
def end_event_loop_cycle_span(
    self,
    span: Span,
    message: Message,
    tool_result_message: Message | None = None,
) -> None:
    """End an event loop cycle span with results.

    Note: The span is automatically closed and exceptions recorded. This method just sets the necessary attributes.
    Status in the span is automatically set to UNSET (OK) on success or ERROR on exception.

    Args:
        span: The span to set attributes on.
        message: The message response from this cycle.
        tool_result_message: Optional tool result message if a tool was called.
    """
    if not span:
        return

    # Set end time attribute
    span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat())

    event_attributes: dict[str, AttributeValue] = {"message": serialize(message["content"])}

    if tool_result_message:
        event_attributes["tool.result"] = serialize(tool_result_message["content"])

        if self.use_latest_genai_conventions:
            self._add_event(
                span,
                "gen_ai.client.inference.operation.details",
                {
                    "gen_ai.output.messages": serialize(
                        [
                            {
                                "role": tool_result_message["role"],
                                "parts": self._map_content_blocks_to_otel_parts(tool_result_message["content"]),
                            }
                        ]
                    )
                },
            )
        else:
            self._add_event(span, "gen_ai.choice", event_attributes=event_attributes)

end_model_invoke_span(span, message, usage, metrics, stop_reason)

End a model invocation span with results and metrics.

Note: The span is automatically closed and exceptions recorded. This method just sets the necessary attributes. Status in the span is automatically set to UNSET (OK) on success or ERROR on exception.

Parameters:

Name Type Description Default
span Span

The span to set attributes on.

required
message Message

The message response from the model.

required
usage Usage

Token usage information from the model call.

required
metrics Metrics

Metrics from the model call.

required
stop_reason StopReason

The reason the model stopped generating.

required
Source code in strands/telemetry/tracer.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def end_model_invoke_span(
    self,
    span: Span,
    message: Message,
    usage: Usage,
    metrics: Metrics,
    stop_reason: StopReason,
) -> None:
    """End a model invocation span with results and metrics.

    Note: The span is automatically closed and exceptions recorded. This method just sets the necessary attributes.
    Status in the span is automatically set to UNSET (OK) on success or ERROR on exception.

    Args:
        span: The span to set attributes on.
        message: The message response from the model.
        usage: Token usage information from the model call.
        metrics: Metrics from the model call.
        stop_reason: The reason the model stopped generating.
    """
    # Set end time attribute
    span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat())

    attributes: dict[str, AttributeValue] = {
        "gen_ai.usage.prompt_tokens": usage["inputTokens"],
        "gen_ai.usage.input_tokens": usage["inputTokens"],
        "gen_ai.usage.completion_tokens": usage["outputTokens"],
        "gen_ai.usage.output_tokens": usage["outputTokens"],
        "gen_ai.usage.total_tokens": usage["totalTokens"],
    }

    # Add optional attributes if they have values
    self._add_optional_usage_and_metrics_attributes(attributes, usage, metrics)

    if self.use_latest_genai_conventions:
        self._add_event(
            span,
            "gen_ai.client.inference.operation.details",
            {
                "gen_ai.output.messages": serialize(
                    [
                        {
                            "role": message["role"],
                            "parts": self._map_content_blocks_to_otel_parts(message["content"]),
                            "finish_reason": str(stop_reason),
                        }
                    ]
                ),
            },
        )
    else:
        self._add_event(
            span,
            "gen_ai.choice",
            event_attributes={"finish_reason": str(stop_reason), "message": serialize(message["content"])},
        )

    self._set_attributes(span, attributes)

end_span_with_error(span, error_message, exception=None)

End a span with error status.

Parameters:

Name Type Description Default
span Span

The span to end.

required
error_message str

Error message to set in the span status.

required
exception Exception | None

Optional exception to record in the span.

None
Source code in strands/telemetry/tracer.py
225
226
227
228
229
230
231
232
233
234
235
236
237
def end_span_with_error(self, span: Span, error_message: str, exception: Exception | None = None) -> None:
    """End a span with error status.

    Args:
        span: The span to end.
        error_message: Error message to set in the span status.
        exception: Optional exception to record in the span.
    """
    if not span:
        return

    error = exception or Exception(error_message)
    self._end_span(span, error=error)

end_swarm_span(span, result=None)

End a swarm span with results.

Source code in strands/telemetry/tracer.py
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
def end_swarm_span(
    self,
    span: Span,
    result: str | None = None,
) -> None:
    """End a swarm span with results."""
    if result:
        if self.use_latest_genai_conventions:
            self._add_event(
                span,
                "gen_ai.client.inference.operation.details",
                {
                    "gen_ai.output.messages": serialize(
                        [
                            {
                                "role": "assistant",
                                "parts": [{"type": "text", "content": result}],
                            }
                        ]
                    )
                },
            )
        else:
            self._add_event(
                span,
                "gen_ai.choice",
                event_attributes={"message": result},
            )

end_tool_call_span(span, tool_result, error=None)

End a tool call span with results.

Parameters:

Name Type Description Default
span Span

The span to end.

required
tool_result ToolResult | None

The result from the tool execution.

required
error Exception | None

Optional exception if the tool call failed.

None
Source code in strands/telemetry/tracer.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
def end_tool_call_span(self, span: Span, tool_result: ToolResult | None, error: Exception | None = None) -> None:
    """End a tool call span with results.

    Args:
        span: The span to end.
        tool_result: The result from the tool execution.
        error: Optional exception if the tool call failed.
    """
    attributes: dict[str, AttributeValue] = {}
    if tool_result is not None:
        status = tool_result.get("status")
        status_str = str(status) if status is not None else ""

        attributes.update(
            {
                "gen_ai.tool.status": status_str,
            }
        )

        if self.use_latest_genai_conventions:
            self._add_event(
                span,
                "gen_ai.client.inference.operation.details",
                {
                    "gen_ai.output.messages": serialize(
                        [
                            {
                                "role": "tool",
                                "parts": [
                                    {
                                        "type": "tool_call_response",
                                        "id": tool_result.get("toolUseId", ""),
                                        "response": tool_result.get("content"),
                                    }
                                ],
                            }
                        ]
                    )
                },
            )
        else:
            self._add_event(
                span,
                "gen_ai.choice",
                event_attributes={
                    "message": serialize(tool_result.get("content")),
                    "id": tool_result.get("toolUseId", ""),
                },
            )

    self._end_span(span, attributes, error)

start_agent_span(messages, agent_name, model_id=None, tools=None, custom_trace_attributes=None, tools_config=None, **kwargs)

Start a new span for an agent invocation.

Parameters:

Name Type Description Default
messages Messages

List of messages being sent to the agent.

required
agent_name str

Name of the agent.

required
model_id str | None

Optional model identifier.

None
tools list | None

Optional list of tools being used.

None
custom_trace_attributes Mapping[str, AttributeValue] | None

Optional mapping of custom trace attributes to include in the span.

None
tools_config dict | None

Optional dictionary of tool configurations.

None
**kwargs Any

Additional attributes to add to the span.

{}

Returns:

Type Description
Span

The created span, or None if tracing is not enabled.

Source code in strands/telemetry/tracer.py
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
def start_agent_span(
    self,
    messages: Messages,
    agent_name: str,
    model_id: str | None = None,
    tools: list | None = None,
    custom_trace_attributes: Mapping[str, AttributeValue] | None = None,
    tools_config: dict | None = None,
    **kwargs: Any,
) -> Span:
    """Start a new span for an agent invocation.

    Args:
        messages: List of messages being sent to the agent.
        agent_name: Name of the agent.
        model_id: Optional model identifier.
        tools: Optional list of tools being used.
        custom_trace_attributes: Optional mapping of custom trace attributes to include in the span.
        tools_config: Optional dictionary of tool configurations.
        **kwargs: Additional attributes to add to the span.

    Returns:
        The created span, or None if tracing is not enabled.
    """
    attributes: dict[str, AttributeValue] = self._get_common_attributes(operation_name="invoke_agent")
    attributes.update(
        {
            "gen_ai.agent.name": agent_name,
        }
    )

    if model_id:
        attributes["gen_ai.request.model"] = model_id

    if tools:
        attributes["gen_ai.agent.tools"] = serialize(tools)

    if self._include_tool_definitions and tools_config:
        try:
            tool_definitions = self._construct_tool_definitions(tools_config)
            attributes["gen_ai.tool.definitions"] = serialize(tool_definitions)
        except Exception:
            # A failure in telemetry should not crash the agent
            logger.warning("failed to attach tool metadata to agent span", exc_info=True)

    # Add custom trace attributes if provided
    if custom_trace_attributes:
        attributes.update(custom_trace_attributes)

    # Add additional kwargs as attributes
    attributes.update({k: v for k, v in kwargs.items() if isinstance(v, (str, int, float, bool))})

    span = self._start_span(
        f"invoke_agent {agent_name}", attributes=attributes, span_kind=trace_api.SpanKind.INTERNAL
    )
    self._add_event_messages(span, messages)

    return span

start_event_loop_cycle_span(invocation_state, messages, parent_span=None, custom_trace_attributes=None, **kwargs)

Start a new span for an event loop cycle.

Parameters:

Name Type Description Default
invocation_state Any

Arguments for the event loop cycle.

required
parent_span Span | None

Optional parent span to link this span to.

None
messages Messages

Messages being processed in this cycle.

required
custom_trace_attributes Mapping[str, AttributeValue] | None

Optional mapping of custom trace attributes to include in the span.

None
**kwargs Any

Additional attributes to add to the span.

{}

Returns:

Type Description
Span

The created span, or None if tracing is not enabled.

Source code in strands/telemetry/tracer.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
def start_event_loop_cycle_span(
    self,
    invocation_state: Any,
    messages: Messages,
    parent_span: Span | None = None,
    custom_trace_attributes: Mapping[str, AttributeValue] | None = None,
    **kwargs: Any,
) -> Span:
    """Start a new span for an event loop cycle.

    Args:
        invocation_state: Arguments for the event loop cycle.
        parent_span: Optional parent span to link this span to.
        messages:  Messages being processed in this cycle.
        custom_trace_attributes: Optional mapping of custom trace attributes to include in the span.
        **kwargs: Additional attributes to add to the span.

    Returns:
        The created span, or None if tracing is not enabled.
    """
    event_loop_cycle_id = str(invocation_state.get("event_loop_cycle_id"))
    parent_span = parent_span if parent_span else invocation_state.get("event_loop_parent_span")

    attributes: dict[str, AttributeValue] = {
        "event_loop.cycle_id": event_loop_cycle_id,
    }

    if custom_trace_attributes:
        attributes.update(custom_trace_attributes)

    if "event_loop_parent_cycle_id" in invocation_state:
        attributes["event_loop.parent_cycle_id"] = str(invocation_state["event_loop_parent_cycle_id"])

    # Add additional kwargs as attributes
    attributes.update({k: v for k, v in kwargs.items() if isinstance(v, (str, int, float, bool))})

    span_name = "execute_event_loop_cycle"
    span = self._start_span(span_name, parent_span, attributes)
    self._add_event_messages(span, messages)

    return span

start_model_invoke_span(messages, parent_span=None, model_id=None, custom_trace_attributes=None, **kwargs)

Start a new span for a model invocation.

Parameters:

Name Type Description Default
messages Messages

Messages being sent to the model.

required
parent_span Span | None

Optional parent span to link this span to.

None
model_id str | None

Optional identifier for the model being invoked.

None
custom_trace_attributes Mapping[str, AttributeValue] | None

Optional mapping of custom trace attributes to include in the span.

None
**kwargs Any

Additional attributes to add to the span.

{}

Returns:

Type Description
Span

The created span, or None if tracing is not enabled.

Source code in strands/telemetry/tracer.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def start_model_invoke_span(
    self,
    messages: Messages,
    parent_span: Span | None = None,
    model_id: str | None = None,
    custom_trace_attributes: Mapping[str, AttributeValue] | None = None,
    **kwargs: Any,
) -> Span:
    """Start a new span for a model invocation.

    Args:
        messages: Messages being sent to the model.
        parent_span: Optional parent span to link this span to.
        model_id: Optional identifier for the model being invoked.
        custom_trace_attributes: Optional mapping of custom trace attributes to include in the span.
        **kwargs: Additional attributes to add to the span.

    Returns:
        The created span, or None if tracing is not enabled.
    """
    attributes: dict[str, AttributeValue] = self._get_common_attributes(operation_name="chat")

    if custom_trace_attributes:
        attributes.update(custom_trace_attributes)

    if model_id:
        attributes["gen_ai.request.model"] = model_id

    # Add additional kwargs as attributes
    attributes.update({k: v for k, v in kwargs.items() if isinstance(v, (str, int, float, bool))})

    span = self._start_span("chat", parent_span, attributes=attributes, span_kind=trace_api.SpanKind.INTERNAL)
    self._add_event_messages(span, messages)

    return span

start_multiagent_span(task, instance, custom_trace_attributes=None)

Start a new span for swarm invocation.

Source code in strands/telemetry/tracer.py
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
def start_multiagent_span(
    self,
    task: MultiAgentInput,
    instance: str,
    custom_trace_attributes: Mapping[str, AttributeValue] | None = None,
) -> Span:
    """Start a new span for swarm invocation."""
    operation = f"invoke_{instance}"
    attributes: dict[str, AttributeValue] = self._get_common_attributes(operation)
    attributes.update(
        {
            "gen_ai.agent.name": instance,
        }
    )

    if custom_trace_attributes:
        attributes.update(custom_trace_attributes)

    span = self._start_span(operation, attributes=attributes, span_kind=trace_api.SpanKind.CLIENT)

    if self.use_latest_genai_conventions:
        parts: list[dict[str, Any]] = []
        if isinstance(task, list):
            parts = self._map_content_blocks_to_otel_parts(task)
        else:
            parts = [{"type": "text", "content": task}]
        self._add_event(
            span,
            "gen_ai.client.inference.operation.details",
            {"gen_ai.input.messages": serialize([{"role": "user", "parts": parts}])},
        )
    else:
        self._add_event(
            span,
            "gen_ai.user.message",
            event_attributes={"content": serialize(task) if isinstance(task, list) else task},
        )

    return span

start_tool_call_span(tool, parent_span=None, custom_trace_attributes=None, **kwargs)

Start a new span for a tool call.

Parameters:

Name Type Description Default
tool ToolUse

The tool being used.

required
parent_span Span | None

Optional parent span to link this span to.

None
custom_trace_attributes Mapping[str, AttributeValue] | None

Optional mapping of custom trace attributes to include in the span.

None
**kwargs Any

Additional attributes to add to the span.

{}

Returns:

Type Description
Span

The created span, or None if tracing is not enabled.

Source code in strands/telemetry/tracer.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def start_tool_call_span(
    self,
    tool: ToolUse,
    parent_span: Span | None = None,
    custom_trace_attributes: Mapping[str, AttributeValue] | None = None,
    **kwargs: Any,
) -> Span:
    """Start a new span for a tool call.

    Args:
        tool: The tool being used.
        parent_span: Optional parent span to link this span to.
        custom_trace_attributes: Optional mapping of custom trace attributes to include in the span.
        **kwargs: Additional attributes to add to the span.

    Returns:
        The created span, or None if tracing is not enabled.
    """
    attributes: dict[str, AttributeValue] = self._get_common_attributes(operation_name="execute_tool")
    attributes.update(
        {
            "gen_ai.tool.name": tool["name"],
            "gen_ai.tool.call.id": tool["toolUseId"],
        }
    )

    if custom_trace_attributes:
        attributes.update(custom_trace_attributes)
    # Add additional kwargs as attributes
    attributes.update(kwargs)

    span_name = f"execute_tool {tool['name']}"
    span = self._start_span(span_name, parent_span, attributes=attributes, span_kind=trace_api.SpanKind.INTERNAL)

    if self.use_latest_genai_conventions:
        self._add_event(
            span,
            "gen_ai.client.inference.operation.details",
            {
                "gen_ai.input.messages": serialize(
                    [
                        {
                            "role": "tool",
                            "parts": [
                                {
                                    "type": "tool_call",
                                    "name": tool["name"],
                                    "id": tool["toolUseId"],
                                    "arguments": tool["input"],
                                }
                            ],
                        }
                    ]
                )
            },
        )
    else:
        self._add_event(
            span,
            "gen_ai.tool.message",
            event_attributes={
                "role": "tool",
                "content": serialize(tool["input"]),
                "id": tool["toolUseId"],
            },
        )

    return span

TypedEvent

Bases: dict

Base class for all typed events in the agent system.

Source code in strands/types/_events.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class TypedEvent(dict):
    """Base class for all typed events in the agent system."""

    def __init__(self, data: dict[str, Any] | None = None) -> None:
        """Initialize the typed event with optional data.

        Args:
            data: Optional dictionary of event data to initialize with
        """
        super().__init__(data or {})

    @property
    def is_callback_event(self) -> bool:
        """True if this event should trigger the callback_handler to fire."""
        return True

    def as_dict(self) -> dict:
        """Convert this event to a raw dictionary for emitting purposes."""
        return {**self}

    def prepare(self, invocation_state: dict) -> None:
        """Prepare the event for emission by adding invocation state.

        This allows a subset of events to merge with the invocation_state without needing to
        pass around the invocation_state throughout the system.
        """
        ...

is_callback_event property

True if this event should trigger the callback_handler to fire.

__init__(data=None)

Initialize the typed event with optional data.

Parameters:

Name Type Description Default
data dict[str, Any] | None

Optional dictionary of event data to initialize with

None
Source code in strands/types/_events.py
30
31
32
33
34
35
36
def __init__(self, data: dict[str, Any] | None = None) -> None:
    """Initialize the typed event with optional data.

    Args:
        data: Optional dictionary of event data to initialize with
    """
    super().__init__(data or {})

as_dict()

Convert this event to a raw dictionary for emitting purposes.

Source code in strands/types/_events.py
43
44
45
def as_dict(self) -> dict:
    """Convert this event to a raw dictionary for emitting purposes."""
    return {**self}

prepare(invocation_state)

Prepare the event for emission by adding invocation state.

This allows a subset of events to merge with the invocation_state without needing to pass around the invocation_state throughout the system.

Source code in strands/types/_events.py
47
48
49
50
51
52
53
def prepare(self, invocation_state: dict) -> None:
    """Prepare the event for emission by adding invocation state.

    This allows a subset of events to merge with the invocation_state without needing to
    pass around the invocation_state throughout the system.
    """
    ...

_handle_model_execution(agent, cycle_span, cycle_trace, invocation_state, tracer, structured_output_context) async

Handle model execution with retry logic for throttling exceptions.

Executes the model inference with automatic retry handling for throttling exceptions. Manages tracing, hooks, and metrics collection throughout the process.

Parameters:

Name Type Description Default
agent Agent

The agent executing the model.

required
cycle_span Any

Span object for tracing the cycle.

required
cycle_trace Trace

Trace object for the current event loop cycle.

required
invocation_state dict[str, Any]

State maintained across cycles.

required
tracer Tracer

Tracer instance for span management.

required
structured_output_context StructuredOutputContext

Context for structured output management.

required

Yields:

Type Description
AsyncGenerator[TypedEvent, None]

Model stream events and throttle events during retries.

Raises:

Type Description
ModelThrottledException

If max retry attempts are exceeded.

Exception

Any other model execution errors.

Source code in strands/event_loop/event_loop.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
async def _handle_model_execution(
    agent: "Agent",
    cycle_span: Any,
    cycle_trace: Trace,
    invocation_state: dict[str, Any],
    tracer: Tracer,
    structured_output_context: StructuredOutputContext,
) -> AsyncGenerator[TypedEvent, None]:
    """Handle model execution with retry logic for throttling exceptions.

    Executes the model inference with automatic retry handling for throttling exceptions.
    Manages tracing, hooks, and metrics collection throughout the process.

    Args:
        agent: The agent executing the model.
        cycle_span: Span object for tracing the cycle.
        cycle_trace: Trace object for the current event loop cycle.
        invocation_state: State maintained across cycles.
        tracer: Tracer instance for span management.
        structured_output_context: Context for structured output management.

    Yields:
        Model stream events and throttle events during retries.

    Raises:
        ModelThrottledException: If max retry attempts are exceeded.
        Exception: Any other model execution errors.
    """
    # Create a trace for the stream_messages call
    stream_trace = Trace("stream_messages", parent_id=cycle_trace.id)
    cycle_trace.add_child(stream_trace)

    # Retry loop - actual retry logic is handled by retry_strategy hook
    # Hooks control when to stop retrying via the event.retry flag
    while True:
        model_id = agent.model.config.get("model_id") if hasattr(agent.model, "config") else None
        model_invoke_span = tracer.start_model_invoke_span(
            messages=agent.messages,
            parent_span=cycle_span,
            model_id=model_id,
            custom_trace_attributes=agent.trace_attributes,
        )
        with trace_api.use_span(model_invoke_span, end_on_exit=True):
            await agent.hooks.invoke_callbacks_async(
                BeforeModelCallEvent(
                    agent=agent,
                    invocation_state=invocation_state,
                )
            )

            if structured_output_context.forced_mode:
                tool_spec = structured_output_context.get_tool_spec()
                tool_specs = [tool_spec] if tool_spec else []
            else:
                tool_specs = agent.tool_registry.get_all_tool_specs()
            try:
                async for event in stream_messages(
                    agent.model,
                    agent.system_prompt,
                    agent.messages,
                    tool_specs,
                    system_prompt_content=agent._system_prompt_content,
                    tool_choice=structured_output_context.tool_choice,
                    invocation_state=invocation_state,
                ):
                    yield event

                stop_reason, message, usage, metrics = event["stop"]
                invocation_state.setdefault("request_state", {})

                after_model_call_event = AfterModelCallEvent(
                    agent=agent,
                    invocation_state=invocation_state,
                    stop_response=AfterModelCallEvent.ModelStopResponse(
                        stop_reason=stop_reason,
                        message=message,
                    ),
                )

                await agent.hooks.invoke_callbacks_async(after_model_call_event)

                # Check if hooks want to retry the model call
                if after_model_call_event.retry:
                    logger.debug(
                        "stop_reason=<%s>, retry_requested=<True> | hook requested model retry",
                        stop_reason,
                    )
                    continue  # Retry the model call

                if stop_reason == "max_tokens":
                    message = recover_message_on_max_tokens_reached(message)

                # Set attributes before span auto-closes
                tracer.end_model_invoke_span(model_invoke_span, message, usage, metrics, stop_reason)
                break  # Success! Break out of retry loop

            except Exception as e:
                # Exception is automatically recorded by use_span with end_on_exit=True
                after_model_call_event = AfterModelCallEvent(
                    agent=agent,
                    invocation_state=invocation_state,
                    exception=e,
                )
                await agent.hooks.invoke_callbacks_async(after_model_call_event)

                # Emit backwards-compatible events if retry strategy supports it
                # (prior to making the retry strategy configurable, this is what we emitted)

                if (
                    isinstance(agent._retry_strategy, ModelRetryStrategy)
                    and agent._retry_strategy._backwards_compatible_event_to_yield
                ):
                    yield agent._retry_strategy._backwards_compatible_event_to_yield

                # Check if hooks want to retry the model call
                if after_model_call_event.retry:
                    logger.debug(
                        "exception=<%s>, retry_requested=<True> | hook requested model retry",
                        type(e).__name__,
                    )

                    continue  # Retry the model call

                # No retry requested, raise the exception
                yield ForceStopEvent(reason=e)
                raise e

    try:
        # Add message in trace and mark the end of the stream messages trace
        stream_trace.add_message(message)
        stream_trace.end()

        # Add the response message to the conversation
        agent.messages.append(message)
        await agent.hooks.invoke_callbacks_async(MessageAddedEvent(agent=agent, message=message))

        # Update metrics
        agent.event_loop_metrics.update_usage(usage)
        agent.event_loop_metrics.update_metrics(metrics)

    except Exception as e:
        yield ForceStopEvent(reason=e)
        logger.exception("cycle failed")
        raise EventLoopException(e, invocation_state["request_state"]) from e

_handle_tool_execution(stop_reason, message, agent, cycle_trace, cycle_span, cycle_start_time, invocation_state, tracer, structured_output_context) async

Handles the execution of tools requested by the model during an event loop cycle.

Parameters:

Name Type Description Default
stop_reason StopReason

The reason the model stopped generating.

required
message Message

The message from the model that may contain tool use requests.

required
agent Agent

Agent for which tools are being executed.

required
cycle_trace Trace

Trace object for the current event loop cycle.

required
cycle_span Any

Span object for tracing the cycle (type may vary).

required
cycle_start_time float

Start time of the current cycle.

required
invocation_state dict[str, Any]

Additional keyword arguments, including request state.

required
tracer Tracer

Tracer instance for span management.

required
structured_output_context StructuredOutputContext

Optional context for structured output management.

required

Yields:

Name Type Description
AsyncGenerator[TypedEvent, None]

Tool stream events along with events yielded from a recursive call to the event loop. The last event is a tuple

containing AsyncGenerator[TypedEvent, None]
  • The stop reason,
  • The updated message,
  • The updated event loop metrics,
  • The updated request state.
Source code in strands/event_loop/event_loop.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
async def _handle_tool_execution(
    stop_reason: StopReason,
    message: Message,
    agent: "Agent",
    cycle_trace: Trace,
    cycle_span: Any,
    cycle_start_time: float,
    invocation_state: dict[str, Any],
    tracer: Tracer,
    structured_output_context: StructuredOutputContext,
) -> AsyncGenerator[TypedEvent, None]:
    """Handles the execution of tools requested by the model during an event loop cycle.

    Args:
        stop_reason: The reason the model stopped generating.
        message: The message from the model that may contain tool use requests.
        agent: Agent for which tools are being executed.
        cycle_trace: Trace object for the current event loop cycle.
        cycle_span: Span object for tracing the cycle (type may vary).
        cycle_start_time: Start time of the current cycle.
        invocation_state: Additional keyword arguments, including request state.
        tracer: Tracer instance for span management.
        structured_output_context: Optional context for structured output management.

    Yields:
        Tool stream events along with events yielded from a recursive call to the event loop. The last event is a tuple
        containing:
            - The stop reason,
            - The updated message,
            - The updated event loop metrics,
            - The updated request state.
    """
    tool_uses: list[ToolUse] = []
    tool_results: list[ToolResult] = []
    invalid_tool_use_ids: list[str] = []

    validate_and_prepare_tools(message, tool_uses, tool_results, invalid_tool_use_ids)
    tool_uses = [tool_use for tool_use in tool_uses if tool_use.get("toolUseId") not in invalid_tool_use_ids]

    if agent._interrupt_state.activated:
        tool_results.extend(agent._interrupt_state.context["tool_results"])

        # Filter to only the interrupted tools when resuming from interrupt (tool uses without results)
        tool_use_ids = {tool_result["toolUseId"] for tool_result in tool_results}
        tool_uses = [tool_use for tool_use in tool_uses if tool_use["toolUseId"] not in tool_use_ids]

    interrupts = []
    tool_events = agent.tool_executor._execute(
        agent, tool_uses, tool_results, cycle_trace, cycle_span, invocation_state, structured_output_context
    )
    async for tool_event in tool_events:
        if isinstance(tool_event, ToolInterruptEvent):
            interrupts.extend(tool_event["tool_interrupt_event"]["interrupts"])

        yield tool_event

    structured_output_result = None
    if structured_output_context.is_enabled:
        if structured_output_result := structured_output_context.extract_result(tool_uses):
            yield StructuredOutputEvent(structured_output=structured_output_result)
            structured_output_context.stop_loop = True

    invocation_state["event_loop_parent_cycle_id"] = invocation_state["event_loop_cycle_id"]

    if interrupts:
        # Session state stored on AfterInvocationEvent.
        agent._interrupt_state.context = {"tool_use_message": message, "tool_results": tool_results}
        agent._interrupt_state.activate()

        agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace)
        yield EventLoopStopEvent(
            "interrupt",
            message,
            agent.event_loop_metrics,
            invocation_state["request_state"],
            interrupts,
            structured_output=structured_output_result,
        )
        # Set attributes before span auto-closes (span is managed by use_span in event_loop_cycle)
        if cycle_span:
            tracer.end_event_loop_cycle_span(span=cycle_span, message=message)

        return

    agent._interrupt_state.deactivate()

    tool_result_message: Message = {
        "role": "user",
        "content": [{"toolResult": result} for result in tool_results],
    }

    agent.messages.append(tool_result_message)
    await agent.hooks.invoke_callbacks_async(MessageAddedEvent(agent=agent, message=tool_result_message))

    yield ToolResultMessageEvent(message=tool_result_message)

    # Set attributes before span auto-closes (span is managed by use_span in event_loop_cycle)
    if cycle_span:
        tracer.end_event_loop_cycle_span(span=cycle_span, message=message, tool_result_message=tool_result_message)

    if invocation_state["request_state"].get("stop_event_loop", False) or structured_output_context.stop_loop:
        agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace)
        yield EventLoopStopEvent(
            stop_reason,
            message,
            agent.event_loop_metrics,
            invocation_state["request_state"],
            structured_output=structured_output_result,
        )
        return

    events = recurse_event_loop(
        agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context
    )
    async for event in events:
        yield event

_has_tool_use_in_latest_message(messages)

Check if the latest message contains any ToolUse content blocks.

Parameters:

Name Type Description Default
messages Messages

List of messages in the conversation.

required

Returns:

Type Description
bool

True if the latest message contains at least one ToolUse content block, False otherwise.

Source code in strands/event_loop/event_loop.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def _has_tool_use_in_latest_message(messages: "Messages") -> bool:
    """Check if the latest message contains any ToolUse content blocks.

    Args:
        messages: List of messages in the conversation.

    Returns:
        True if the latest message contains at least one ToolUse content block, False otherwise.
    """
    if len(messages) > 0:
        latest_message = messages[-1]
        content_blocks = latest_message.get("content", [])

        for content_block in content_blocks:
            if "toolUse" in content_block:
                return True

    return False

event_loop_cycle(agent, invocation_state, structured_output_context=None) async

Execute a single cycle of the event loop.

This core function processes a single conversation turn, handling model inference, tool execution, and error recovery. It manages the entire lifecycle of a conversation turn, including:

  1. Initializing cycle state and metrics
  2. Checking execution limits
  3. Processing messages with the model
  4. Handling tool execution requests
  5. Managing recursive calls for multi-turn tool interactions
  6. Collecting and reporting metrics
  7. Error handling and recovery

Parameters:

Name Type Description Default
agent Agent

The agent for which the cycle is being executed.

required
invocation_state dict[str, Any]

Additional arguments including:

  • request_state: State maintained across cycles
  • event_loop_cycle_id: Unique ID for this cycle
  • event_loop_cycle_span: Current tracing Span for this cycle
required
structured_output_context StructuredOutputContext | None

Optional context for structured output management.

None

Yields:

Type Description
AsyncGenerator[TypedEvent, None]

Model and tool stream events. The last event is a tuple containing:

  • StopReason: Reason the model stopped generating (e.g., "tool_use")
  • Message: The generated message from the model
  • EventLoopMetrics: Updated metrics for the event loop
  • Any: Updated request state

Raises:

Type Description
EventLoopException

If an error occurs during execution

ContextWindowOverflowException

If the input is too large for the model

Source code in strands/event_loop/event_loop.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
async def event_loop_cycle(
    agent: "Agent",
    invocation_state: dict[str, Any],
    structured_output_context: StructuredOutputContext | None = None,
) -> AsyncGenerator[TypedEvent, None]:
    """Execute a single cycle of the event loop.

    This core function processes a single conversation turn, handling model inference, tool execution, and error
    recovery. It manages the entire lifecycle of a conversation turn, including:

    1. Initializing cycle state and metrics
    2. Checking execution limits
    3. Processing messages with the model
    4. Handling tool execution requests
    5. Managing recursive calls for multi-turn tool interactions
    6. Collecting and reporting metrics
    7. Error handling and recovery

    Args:
        agent: The agent for which the cycle is being executed.
        invocation_state: Additional arguments including:

            - request_state: State maintained across cycles
            - event_loop_cycle_id: Unique ID for this cycle
            - event_loop_cycle_span: Current tracing Span for this cycle
        structured_output_context: Optional context for structured output management.

    Yields:
        Model and tool stream events. The last event is a tuple containing:

            - StopReason: Reason the model stopped generating (e.g., "tool_use")
            - Message: The generated message from the model
            - EventLoopMetrics: Updated metrics for the event loop
            - Any: Updated request state

    Raises:
        EventLoopException: If an error occurs during execution
        ContextWindowOverflowException: If the input is too large for the model
    """
    structured_output_context = structured_output_context or StructuredOutputContext()

    # Initialize cycle state
    invocation_state["event_loop_cycle_id"] = uuid.uuid4()

    # Initialize state and get cycle trace
    if "request_state" not in invocation_state:
        invocation_state["request_state"] = {}
    attributes = {"event_loop_cycle_id": str(invocation_state.get("event_loop_cycle_id"))}
    cycle_start_time, cycle_trace = agent.event_loop_metrics.start_cycle(attributes=attributes)
    invocation_state["event_loop_cycle_trace"] = cycle_trace

    yield StartEvent()
    yield StartEventLoopEvent()

    # Create tracer span for this event loop cycle
    tracer = get_tracer()
    cycle_span = tracer.start_event_loop_cycle_span(
        invocation_state=invocation_state,
        messages=agent.messages,
        parent_span=agent.trace_span,
        custom_trace_attributes=agent.trace_attributes,
    )
    invocation_state["event_loop_cycle_span"] = cycle_span

    with trace_api.use_span(cycle_span, end_on_exit=True):
        # Skipping model invocation if in interrupt state as interrupts are currently only supported for tool calls.
        if agent._interrupt_state.activated:
            stop_reason: StopReason = "tool_use"
            message = agent._interrupt_state.context["tool_use_message"]
        # Skip model invocation if the latest message contains ToolUse
        elif _has_tool_use_in_latest_message(agent.messages):
            stop_reason = "tool_use"
            message = agent.messages[-1]
        else:
            model_events = _handle_model_execution(
                agent, cycle_span, cycle_trace, invocation_state, tracer, structured_output_context
            )
            async for model_event in model_events:
                if not isinstance(model_event, ModelStopReason):
                    yield model_event

            stop_reason, message, *_ = model_event["stop"]
            yield ModelMessageEvent(message=message)

        try:
            if stop_reason == "max_tokens":
                """
                Handle max_tokens limit reached by the model.

                When the model reaches its maximum token limit, this represents a potentially unrecoverable
                state where the model's response was truncated. By default, Strands fails hard with an
                MaxTokensReachedException to maintain consistency with other failure types.
                """
                raise MaxTokensReachedException(
                    message=(
                        "Agent has reached an unrecoverable state due to max_tokens limit. "
                        "For more information see: "
                        "https://strandsagents.com/latest/user-guide/concepts/agents/agent-loop/#maxtokensreachedexception"
                    )
                )

            if stop_reason == "tool_use":
                # Handle tool execution
                tool_events = _handle_tool_execution(
                    stop_reason,
                    message,
                    agent=agent,
                    cycle_trace=cycle_trace,
                    cycle_span=cycle_span,
                    cycle_start_time=cycle_start_time,
                    invocation_state=invocation_state,
                    tracer=tracer,
                    structured_output_context=structured_output_context,
                )
                async for tool_event in tool_events:
                    yield tool_event

                return

            # End the cycle and return results
            agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace, attributes)
            # Set attributes before span auto-closes
            tracer.end_event_loop_cycle_span(cycle_span, message)
        except EventLoopException:
            # Don't yield or log the exception - we already did it when we
            # raised the exception and we don't need that duplication.
            raise
        except (ContextWindowOverflowException, MaxTokensReachedException) as e:
            # Special cased exceptions which we want to bubble up rather than get wrapped in an EventLoopException
            raise e
        except Exception as e:
            # Handle any other exceptions
            yield ForceStopEvent(reason=e)
            logger.exception("cycle failed")
            raise EventLoopException(e, invocation_state["request_state"]) from e

        # Force structured output tool call if LLM didn't use it automatically
        if structured_output_context.is_enabled and stop_reason == "end_turn":
            if structured_output_context.force_attempted:
                raise StructuredOutputException(
                    "The model failed to invoke the structured output tool even after it was forced."
                )
            structured_output_context.set_forced_mode()
            logger.debug("Forcing structured output tool")
            await agent._append_messages(
                {"role": "user", "content": [{"text": structured_output_context.structured_output_prompt}]}
            )

            events = recurse_event_loop(
                agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context
            )
            async for typed_event in events:
                yield typed_event
            return

        yield EventLoopStopEvent(stop_reason, message, agent.event_loop_metrics, invocation_state["request_state"])

get_tracer()

Get or create the global tracer.

Returns:

Type Description
Tracer

The global tracer instance.

Source code in strands/telemetry/tracer.py
880
881
882
883
884
885
886
887
888
889
890
891
def get_tracer() -> Tracer:
    """Get or create the global tracer.

    Returns:
        The global tracer instance.
    """
    global _tracer_instance

    if not _tracer_instance:
        _tracer_instance = Tracer()

    return _tracer_instance

recover_message_on_max_tokens_reached(message)

Recover and clean up messages when max token limits are reached.

When a model response is truncated due to maximum token limits, all tool use blocks should be replaced with informative error messages since they may be incomplete or unreliable. This function inspects the message content and:

  1. Identifies all tool use blocks (regardless of validity)
  2. Replaces all tool uses with informative error messages
  3. Preserves all non-tool content blocks (text, images, etc.)
  4. Returns a cleaned message suitable for conversation history

This recovery mechanism ensures that the conversation can continue gracefully even when model responses are truncated, providing clear feedback about what happened and preventing potentially incomplete or corrupted tool executions.

Parameters:

Name Type Description Default
message Message

The potentially incomplete message from the model that was truncated due to max token limits.

required

Returns:

Type Description
Message

A cleaned Message with all tool uses replaced by explanatory text content.

Message

The returned message maintains the same role as the input message.

Example

If a message contains any tool use (complete or incomplete):

{"toolUse": {"name": "calculator", "input": {"expression": "2+2"}, "toolUseId": "123"}}

It will be replaced with:

{"text": "The selected tool calculator's tool use was incomplete due to maximum token limits being reached."}

Source code in strands/event_loop/_recover_message_on_max_tokens_reached.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def recover_message_on_max_tokens_reached(message: Message) -> Message:
    """Recover and clean up messages when max token limits are reached.

    When a model response is truncated due to maximum token limits, all tool use blocks
    should be replaced with informative error messages since they may be incomplete or
    unreliable. This function inspects the message content and:

    1. Identifies all tool use blocks (regardless of validity)
    2. Replaces all tool uses with informative error messages
    3. Preserves all non-tool content blocks (text, images, etc.)
    4. Returns a cleaned message suitable for conversation history

    This recovery mechanism ensures that the conversation can continue gracefully even when
    model responses are truncated, providing clear feedback about what happened and preventing
    potentially incomplete or corrupted tool executions.

    Args:
        message: The potentially incomplete message from the model that was truncated
                due to max token limits.

    Returns:
        A cleaned Message with all tool uses replaced by explanatory text content.
        The returned message maintains the same role as the input message.

    Example:
        If a message contains any tool use (complete or incomplete):
        ```
        {"toolUse": {"name": "calculator", "input": {"expression": "2+2"}, "toolUseId": "123"}}
        ```

        It will be replaced with:
        ```
        {"text": "The selected tool calculator's tool use was incomplete due to maximum token limits being reached."}
        ```
    """
    logger.info("handling max_tokens stop reason - replacing all tool uses with error messages")

    valid_content: list[ContentBlock] = []
    for content in message["content"] or []:
        tool_use: ToolUse | None = content.get("toolUse")
        if not tool_use:
            valid_content.append(content)
            continue

        # Replace all tool uses with error messages when max_tokens is reached
        display_name = tool_use.get("name") or "<unknown>"
        logger.warning("tool_name=<%s> | replacing with error message due to max_tokens truncation.", display_name)

        valid_content.append(
            {
                "text": f"The selected tool {display_name}'s tool use was incomplete due "
                f"to maximum token limits being reached."
            }
        )

    return {"content": valid_content, "role": message["role"]}

recurse_event_loop(agent, invocation_state, structured_output_context=None) async

Make a recursive call to event_loop_cycle with the current state.

This function is used when the event loop needs to continue processing after tool execution.

Parameters:

Name Type Description Default
agent Agent

Agent for which the recursive call is being made.

required
invocation_state dict[str, Any]

Arguments to pass through event_loop_cycle

required
structured_output_context StructuredOutputContext | None

Optional context for structured output management.

None

Yields:

Type Description
AsyncGenerator[TypedEvent, None]

Results from event_loop_cycle where the last result contains:

  • StopReason: Reason the model stopped generating
  • Message: The generated message from the model
  • EventLoopMetrics: Updated metrics for the event loop
  • Any: Updated request state
Source code in strands/event_loop/event_loop.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
async def recurse_event_loop(
    agent: "Agent",
    invocation_state: dict[str, Any],
    structured_output_context: StructuredOutputContext | None = None,
) -> AsyncGenerator[TypedEvent, None]:
    """Make a recursive call to event_loop_cycle with the current state.

    This function is used when the event loop needs to continue processing after tool execution.

    Args:
        agent: Agent for which the recursive call is being made.
        invocation_state: Arguments to pass through event_loop_cycle
        structured_output_context: Optional context for structured output management.

    Yields:
        Results from event_loop_cycle where the last result contains:

            - StopReason: Reason the model stopped generating
            - Message: The generated message from the model
            - EventLoopMetrics: Updated metrics for the event loop
            - Any: Updated request state
    """
    cycle_trace = invocation_state["event_loop_cycle_trace"]

    # Recursive call trace
    recursive_trace = Trace("Recursive call", parent_id=cycle_trace.id)
    cycle_trace.add_child(recursive_trace)

    yield StartEvent()

    events = event_loop_cycle(
        agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context
    )
    async for event in events:
        yield event

    recursive_trace.end()

stream_messages(model, system_prompt, messages, tool_specs, *, tool_choice=None, system_prompt_content=None, invocation_state=None, **kwargs) async

Streams messages to the model and processes the response.

Parameters:

Name Type Description Default
model Model

Model provider.

required
system_prompt str | None

The system prompt string, used for backwards compatibility with models that expect it.

required
messages Messages

List of messages to send.

required
tool_specs list[ToolSpec]

The list of tool specs.

required
tool_choice Any | None

Optional tool choice constraint for forcing specific tool usage.

None
system_prompt_content list[SystemContentBlock] | None

The authoritative system prompt content blocks that always contains the system prompt data.

None
invocation_state dict[str, Any] | None

Caller-provided state/context that was passed to the agent when it was invoked.

None
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Yields:

Type Description
AsyncGenerator[TypedEvent, None]

The reason for stopping, the final message, and the usage metrics

Source code in strands/event_loop/streaming.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
async def stream_messages(
    model: Model,
    system_prompt: str | None,
    messages: Messages,
    tool_specs: list[ToolSpec],
    *,
    tool_choice: Any | None = None,
    system_prompt_content: list[SystemContentBlock] | None = None,
    invocation_state: dict[str, Any] | None = None,
    **kwargs: Any,
) -> AsyncGenerator[TypedEvent, None]:
    """Streams messages to the model and processes the response.

    Args:
        model: Model provider.
        system_prompt: The system prompt string, used for backwards compatibility with models that expect it.
        messages: List of messages to send.
        tool_specs: The list of tool specs.
        tool_choice: Optional tool choice constraint for forcing specific tool usage.
        system_prompt_content: The authoritative system prompt content blocks that always contains the
            system prompt data.
        invocation_state: Caller-provided state/context that was passed to the agent when it was invoked.
        **kwargs: Additional keyword arguments for future extensibility.

    Yields:
        The reason for stopping, the final message, and the usage metrics
    """
    logger.debug("model=<%s> | streaming messages", model)

    messages = _normalize_messages(messages)
    start_time = time.time()

    chunks = model.stream(
        messages,
        tool_specs if tool_specs else None,
        system_prompt,
        tool_choice=tool_choice,
        system_prompt_content=system_prompt_content,
        invocation_state=invocation_state,
    )

    async for event in process_stream(chunks, start_time):
        yield event

validate_and_prepare_tools(message, tool_uses, tool_results, invalid_tool_use_ids)

Validate tool uses and prepare them for execution.

Parameters:

Name Type Description Default
message Message

Current message.

required
tool_uses list[ToolUse]

List to populate with tool uses.

required
tool_results list[ToolResult]

List to populate with tool results for invalid tools.

required
invalid_tool_use_ids list[str]

List to populate with invalid tool use IDs.

required
Source code in strands/tools/_validator.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def validate_and_prepare_tools(
    message: Message,
    tool_uses: list[ToolUse],
    tool_results: list[ToolResult],
    invalid_tool_use_ids: list[str],
) -> None:
    """Validate tool uses and prepare them for execution.

    Args:
        message: Current message.
        tool_uses: List to populate with tool uses.
        tool_results: List to populate with tool results for invalid tools.
        invalid_tool_use_ids: List to populate with invalid tool use IDs.
    """
    # Extract tool uses from message
    for content in message["content"]:
        if isinstance(content, dict) and "toolUse" in content:
            tool_uses.append(content["toolUse"])

    # Validate tool uses
    # Avoid modifying original `tool_uses` variable during iteration
    tool_uses_copy = tool_uses.copy()
    for tool in tool_uses_copy:
        try:
            validate_tool_use(tool)
        except InvalidToolUseNameException as e:
            # Return invalid name error as ToolResult to the LLM as context
            # The replacement of the tool name to INVALID_TOOL_NAME happens in streaming.py now
            tool_uses.remove(tool)
            invalid_tool_use_ids.append(tool["toolUseId"])
            tool_uses.append(tool)
            tool_results.append(
                {
                    "toolUseId": tool["toolUseId"],
                    "status": "error",
                    "content": [{"text": f"Error: {str(e)}"}],
                }
            )