Skip to content

strands.models.sagemaker

Amazon SageMaker model provider.

Messages = List[Message] module-attribute

A list of messages representing a conversation.

T = TypeVar('T', bound=BaseModel) module-attribute

ToolChoice = Union[ToolChoiceAutoDict, ToolChoiceAnyDict, ToolChoiceToolDict] module-attribute

Configuration for how the model should choose tools.

  • "auto": The model decides whether to use tools based on the context
  • "any": The model must use at least one tool (any tool)
  • "tool": The model must use the specified tool

logger = logging.getLogger(__name__) module-attribute

ContentBlock

Bases: TypedDict

A block of content for a message that you pass to, or receive from, a model.

Attributes:

Name Type Description
cachePoint CachePoint

A cache point configuration to optimize conversation history.

document DocumentContent

A document to include in the message.

guardContent GuardContent

Contains the content to assess with the guardrail.

image ImageContent

Image to include in the message.

reasoningContent ReasoningContentBlock

Contains content regarding the reasoning that is carried out by the model.

text str

Text to include in the message.

toolResult ToolResult

The result for a tool request that a model makes.

toolUse ToolUse

Information about a tool use request from a model.

video VideoContent

Video to include in the message.

citationsContent CitationsContentBlock

Contains the citations for a document.

Source code in strands/types/content.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class ContentBlock(TypedDict, total=False):
    """A block of content for a message that you pass to, or receive from, a model.

    Attributes:
        cachePoint: A cache point configuration to optimize conversation history.
        document: A document to include in the message.
        guardContent: Contains the content to assess with the guardrail.
        image: Image to include in the message.
        reasoningContent: Contains content regarding the reasoning that is carried out by the model.
        text: Text to include in the message.
        toolResult: The result for a tool request that a model makes.
        toolUse: Information about a tool use request from a model.
        video: Video to include in the message.
        citationsContent: Contains the citations for a document.
    """

    cachePoint: CachePoint
    document: DocumentContent
    guardContent: GuardContent
    image: ImageContent
    reasoningContent: ReasoningContentBlock
    text: str
    toolResult: ToolResult
    toolUse: ToolUse
    video: VideoContent
    citationsContent: CitationsContentBlock

FunctionCall dataclass

Function call for the model.

Attributes:

Name Type Description
name Union[str, dict[Any, Any]]

Name of the function to call

arguments Union[str, dict[Any, Any]]

Arguments to pass to the function

Source code in strands/models/sagemaker.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@dataclass
class FunctionCall:
    """Function call for the model.

    Attributes:
        name: Name of the function to call
        arguments: Arguments to pass to the function
    """

    name: Union[str, dict[Any, Any]]
    arguments: Union[str, dict[Any, Any]]

    def __init__(self, **kwargs: dict[str, str]):
        """Initialize function call.

        Args:
            **kwargs: Keyword arguments for the function call.
        """
        self.name = kwargs.get("name", "")
        self.arguments = kwargs.get("arguments", "")

__init__(**kwargs)

Initialize function call.

Parameters:

Name Type Description Default
**kwargs dict[str, str]

Keyword arguments for the function call.

{}
Source code in strands/models/sagemaker.py
55
56
57
58
59
60
61
62
def __init__(self, **kwargs: dict[str, str]):
    """Initialize function call.

    Args:
        **kwargs: Keyword arguments for the function call.
    """
    self.name = kwargs.get("name", "")
    self.arguments = kwargs.get("arguments", "")

OpenAIModel

Bases: Model

OpenAI model provider implementation.

Source code in strands/models/openai.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
class OpenAIModel(Model):
    """OpenAI model provider implementation."""

    client: Client

    class OpenAIConfig(TypedDict, total=False):
        """Configuration options for OpenAI models.

        Attributes:
            model_id: Model ID (e.g., "gpt-4o").
                For a complete list of supported models, see https://platform.openai.com/docs/models.
            params: Model parameters (e.g., max_tokens).
                For a complete list of supported parameters, see
                https://platform.openai.com/docs/api-reference/chat/create.
        """

        model_id: str
        params: Optional[dict[str, Any]]

    def __init__(
        self,
        client: Optional[Client] = None,
        client_args: Optional[dict[str, Any]] = None,
        **model_config: Unpack[OpenAIConfig],
    ) -> None:
        """Initialize provider instance.

        Args:
            client: Pre-configured OpenAI-compatible client to reuse across requests.
                When provided, this client will be reused for all requests and will NOT be closed
                by the model. The caller is responsible for managing the client lifecycle.
                This is useful for:
                - Injecting custom client wrappers (e.g., GuardrailsAsyncOpenAI)
                - Reusing connection pools within a single event loop/worker
                - Centralizing observability, retries, and networking policy
                - Pointing to custom model gateways
                Note: The client should not be shared across different asyncio event loops.
            client_args: Arguments for the OpenAI client (legacy approach).
                For a complete list of supported arguments, see https://pypi.org/project/openai/.
            **model_config: Configuration options for the OpenAI model.

        Raises:
            ValueError: If both `client` and `client_args` are provided.
        """
        validate_config_keys(model_config, self.OpenAIConfig)
        self.config = dict(model_config)

        # Validate that only one client configuration method is provided
        if client is not None and client_args is not None and len(client_args) > 0:
            raise ValueError("Only one of 'client' or 'client_args' should be provided, not both.")

        self._custom_client = client
        self.client_args = client_args or {}

        logger.debug("config=<%s> | initializing", self.config)

    @override
    def update_config(self, **model_config: Unpack[OpenAIConfig]) -> None:  # type: ignore[override]
        """Update the OpenAI model configuration with the provided arguments.

        Args:
            **model_config: Configuration overrides.
        """
        validate_config_keys(model_config, self.OpenAIConfig)
        self.config.update(model_config)

    @override
    def get_config(self) -> OpenAIConfig:
        """Get the OpenAI model configuration.

        Returns:
            The OpenAI model configuration.
        """
        return cast(OpenAIModel.OpenAIConfig, self.config)

    @classmethod
    def format_request_message_content(cls, content: ContentBlock, **kwargs: Any) -> dict[str, Any]:
        """Format an OpenAI compatible content block.

        Args:
            content: Message content.
            **kwargs: Additional keyword arguments for future extensibility.

        Returns:
            OpenAI compatible content block.

        Raises:
            TypeError: If the content block type cannot be converted to an OpenAI-compatible format.
        """
        if "document" in content:
            mime_type = mimetypes.types_map.get(f".{content['document']['format']}", "application/octet-stream")
            file_data = base64.b64encode(content["document"]["source"]["bytes"]).decode("utf-8")
            return {
                "file": {
                    "file_data": f"data:{mime_type};base64,{file_data}",
                    "filename": content["document"]["name"],
                },
                "type": "file",
            }

        if "image" in content:
            mime_type = mimetypes.types_map.get(f".{content['image']['format']}", "application/octet-stream")
            image_data = base64.b64encode(content["image"]["source"]["bytes"]).decode("utf-8")

            return {
                "image_url": {
                    "detail": "auto",
                    "format": mime_type,
                    "url": f"data:{mime_type};base64,{image_data}",
                },
                "type": "image_url",
            }

        if "text" in content:
            return {"text": content["text"], "type": "text"}

        raise TypeError(f"content_type=<{next(iter(content))}> | unsupported type")

    @classmethod
    def format_request_message_tool_call(cls, tool_use: ToolUse, **kwargs: Any) -> dict[str, Any]:
        """Format an OpenAI compatible tool call.

        Args:
            tool_use: Tool use requested by the model.
            **kwargs: Additional keyword arguments for future extensibility.

        Returns:
            OpenAI compatible tool call.
        """
        return {
            "function": {
                "arguments": json.dumps(tool_use["input"]),
                "name": tool_use["name"],
            },
            "id": tool_use["toolUseId"],
            "type": "function",
        }

    @classmethod
    def format_request_tool_message(cls, tool_result: ToolResult, **kwargs: Any) -> dict[str, Any]:
        """Format an OpenAI compatible tool message.

        Args:
            tool_result: Tool result collected from a tool execution.
            **kwargs: Additional keyword arguments for future extensibility.

        Returns:
            OpenAI compatible tool message.
        """
        contents = cast(
            list[ContentBlock],
            [
                {"text": json.dumps(content["json"])} if "json" in content else content
                for content in tool_result["content"]
            ],
        )

        return {
            "role": "tool",
            "tool_call_id": tool_result["toolUseId"],
            "content": [cls.format_request_message_content(content) for content in contents],
        }

    @classmethod
    def _split_tool_message_images(
        cls, tool_message: dict[str, Any]
    ) -> tuple[dict[str, Any], Optional[dict[str, Any]]]:
        """Split a tool message into text-only tool message and optional user message with images.

        OpenAI API restricts images to user role messages only. This method extracts any image
        content from a tool message and returns it separately as a user message.

        Args:
            tool_message: A formatted tool message that may contain images.

        Returns:
            A tuple of (tool_message_without_images, user_message_with_images_or_None).
        """
        if tool_message.get("role") != "tool":
            return tool_message, None

        content = tool_message.get("content", [])
        if not isinstance(content, list):
            return tool_message, None

        # Separate image and non-image content
        text_content = []
        image_content = []

        for item in content:
            if isinstance(item, dict) and item.get("type") == "image_url":
                image_content.append(item)
            else:
                text_content.append(item)

        # If no images found, return original message
        if not image_content:
            return tool_message, None

        # Let the user know that we are modifying the messages for OpenAI compatibility
        logger.warning(
            "tool_call_id=<%s> | Moving image from tool message to a new user message for OpenAI compatibility",
            tool_message["tool_call_id"],
        )

        # Append a message to the text content to inform the model about the upcoming image
        text_content.append(
            {
                "type": "text",
                "text": (
                    "Tool successfully returned an image. The image is being provided in the following user message."
                ),
            }
        )

        # Create the clean tool message with the updated text content
        tool_message_clean = {
            "role": "tool",
            "tool_call_id": tool_message["tool_call_id"],
            "content": text_content,
        }

        # Create user message with only images
        user_message_with_images = {"role": "user", "content": image_content}

        return tool_message_clean, user_message_with_images

    @classmethod
    def _format_request_tool_choice(cls, tool_choice: ToolChoice | None) -> dict[str, Any]:
        """Format a tool choice for OpenAI compatibility.

        Args:
            tool_choice: Tool choice configuration in Bedrock format.

        Returns:
            OpenAI compatible tool choice format.
        """
        if not tool_choice:
            return {}

        match tool_choice:
            case {"auto": _}:
                return {"tool_choice": "auto"}  # OpenAI SDK doesn't define constants for these values
            case {"any": _}:
                return {"tool_choice": "required"}
            case {"tool": {"name": tool_name}}:
                return {"tool_choice": {"type": "function", "function": {"name": tool_name}}}
            case _:
                # This should not happen with proper typing, but handle gracefully
                return {"tool_choice": "auto"}

    @classmethod
    def _format_system_messages(
        cls,
        system_prompt: Optional[str] = None,
        *,
        system_prompt_content: Optional[list[SystemContentBlock]] = None,
        **kwargs: Any,
    ) -> list[dict[str, Any]]:
        """Format system messages for OpenAI-compatible providers.

        Args:
            system_prompt: System prompt to provide context to the model.
            system_prompt_content: System prompt content blocks to provide context to the model.
            **kwargs: Additional keyword arguments for future extensibility.

        Returns:
            List of formatted system messages.
        """
        # Handle backward compatibility: if system_prompt is provided but system_prompt_content is None
        if system_prompt and system_prompt_content is None:
            system_prompt_content = [{"text": system_prompt}]

        # TODO: Handle caching blocks https://github.com/strands-agents/sdk-python/issues/1140
        return [
            {"role": "system", "content": content["text"]}
            for content in system_prompt_content or []
            if "text" in content
        ]

    @classmethod
    def _format_regular_messages(cls, messages: Messages, **kwargs: Any) -> list[dict[str, Any]]:
        """Format regular messages for OpenAI-compatible providers.

        Args:
            messages: List of message objects to be processed by the model.
            **kwargs: Additional keyword arguments for future extensibility.

        Returns:
            List of formatted messages.
        """
        formatted_messages = []

        for message in messages:
            contents = message["content"]

            # Check for reasoningContent and warn user
            if any("reasoningContent" in content for content in contents):
                logger.warning(
                    "reasoningContent is not supported in multi-turn conversations with the Chat Completions API."
                )

            formatted_contents = [
                cls.format_request_message_content(content)
                for content in contents
                if not any(block_type in content for block_type in ["toolResult", "toolUse", "reasoningContent"])
            ]
            formatted_tool_calls = [
                cls.format_request_message_tool_call(content["toolUse"]) for content in contents if "toolUse" in content
            ]
            formatted_tool_messages = [
                cls.format_request_tool_message(content["toolResult"])
                for content in contents
                if "toolResult" in content
            ]

            formatted_message = {
                "role": message["role"],
                "content": formatted_contents,
                **({"tool_calls": formatted_tool_calls} if formatted_tool_calls else {}),
            }
            formatted_messages.append(formatted_message)

            # Process tool messages to extract images into separate user messages
            # OpenAI API requires images to be in user role messages only
            for tool_msg in formatted_tool_messages:
                tool_msg_clean, user_msg_with_images = cls._split_tool_message_images(tool_msg)
                formatted_messages.append(tool_msg_clean)
                if user_msg_with_images:
                    formatted_messages.append(user_msg_with_images)

        return formatted_messages

    @classmethod
    def format_request_messages(
        cls,
        messages: Messages,
        system_prompt: Optional[str] = None,
        *,
        system_prompt_content: Optional[list[SystemContentBlock]] = None,
        **kwargs: Any,
    ) -> list[dict[str, Any]]:
        """Format an OpenAI compatible messages array.

        Args:
            messages: List of message objects to be processed by the model.
            system_prompt: System prompt to provide context to the model.
            system_prompt_content: System prompt content blocks to provide context to the model.
            **kwargs: Additional keyword arguments for future extensibility.

        Returns:
            An OpenAI compatible messages array.
        """
        formatted_messages = cls._format_system_messages(system_prompt, system_prompt_content=system_prompt_content)
        formatted_messages.extend(cls._format_regular_messages(messages))

        return [message for message in formatted_messages if message["content"] or "tool_calls" in message]

    def format_request(
        self,
        messages: Messages,
        tool_specs: list[ToolSpec] | None = None,
        system_prompt: str | None = None,
        tool_choice: ToolChoice | None = None,
        *,
        system_prompt_content: list[SystemContentBlock] | None = None,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """Format an OpenAI compatible chat streaming request.

        Args:
            messages: List of message objects to be processed by the model.
            tool_specs: List of tool specifications to make available to the model.
            system_prompt: System prompt to provide context to the model.
            tool_choice: Selection strategy for tool invocation.
            system_prompt_content: System prompt content blocks to provide context to the model.
            **kwargs: Additional keyword arguments for future extensibility.

        Returns:
            An OpenAI compatible chat streaming request.

        Raises:
            TypeError: If a message contains a content block type that cannot be converted to an OpenAI-compatible
                format.
        """
        return {
            "messages": self.format_request_messages(
                messages, system_prompt, system_prompt_content=system_prompt_content
            ),
            "model": self.config["model_id"],
            "stream": True,
            "stream_options": {"include_usage": True},
            "tools": [
                {
                    "type": "function",
                    "function": {
                        "name": tool_spec["name"],
                        "description": tool_spec["description"],
                        "parameters": tool_spec["inputSchema"]["json"],
                    },
                }
                for tool_spec in tool_specs or []
            ],
            **(self._format_request_tool_choice(tool_choice)),
            **cast(dict[str, Any], self.config.get("params", {})),
        }

    def format_chunk(self, event: dict[str, Any], **kwargs: Any) -> StreamEvent:
        """Format an OpenAI response event into a standardized message chunk.

        Args:
            event: A response event from the OpenAI compatible model.
            **kwargs: Additional keyword arguments for future extensibility.

        Returns:
            The formatted chunk.

        Raises:
            RuntimeError: If chunk_type is not recognized.
                This error should never be encountered as chunk_type is controlled in the stream method.
        """
        match event["chunk_type"]:
            case "message_start":
                return {"messageStart": {"role": "assistant"}}

            case "content_start":
                if event["data_type"] == "tool":
                    return {
                        "contentBlockStart": {
                            "start": {
                                "toolUse": {
                                    "name": event["data"].function.name,
                                    "toolUseId": event["data"].id,
                                }
                            }
                        }
                    }

                return {"contentBlockStart": {"start": {}}}

            case "content_delta":
                if event["data_type"] == "tool":
                    return {
                        "contentBlockDelta": {"delta": {"toolUse": {"input": event["data"].function.arguments or ""}}}
                    }

                if event["data_type"] == "reasoning_content":
                    return {"contentBlockDelta": {"delta": {"reasoningContent": {"text": event["data"]}}}}

                return {"contentBlockDelta": {"delta": {"text": event["data"]}}}

            case "content_stop":
                return {"contentBlockStop": {}}

            case "message_stop":
                match event["data"]:
                    case "tool_calls":
                        return {"messageStop": {"stopReason": "tool_use"}}
                    case "length":
                        return {"messageStop": {"stopReason": "max_tokens"}}
                    case _:
                        return {"messageStop": {"stopReason": "end_turn"}}

            case "metadata":
                return {
                    "metadata": {
                        "usage": {
                            "inputTokens": event["data"].prompt_tokens,
                            "outputTokens": event["data"].completion_tokens,
                            "totalTokens": event["data"].total_tokens,
                        },
                        "metrics": {
                            "latencyMs": 0,  # TODO
                        },
                    },
                }

            case _:
                raise RuntimeError(f"chunk_type=<{event['chunk_type']} | unknown type")

    @asynccontextmanager
    async def _get_client(self) -> AsyncIterator[Any]:
        """Get an OpenAI client for making requests.

        This context manager handles client lifecycle management:
        - If an injected client was provided during initialization, it yields that client
          without closing it (caller manages lifecycle).
        - Otherwise, creates a new AsyncOpenAI client from client_args and automatically
          closes it when the context exits.

        Note: We create a new client per request to avoid connection sharing in the underlying
        httpx client, as the asyncio event loop does not allow connections to be shared.
        For more details, see https://github.com/encode/httpx/discussions/2959.

        Yields:
            Client: An OpenAI-compatible client instance.
        """
        if self._custom_client is not None:
            # Use the injected client (caller manages lifecycle)
            yield self._custom_client
        else:
            # Create a new client from client_args
            # We initialize an OpenAI context on every request so as to avoid connection sharing in the underlying
            # httpx client. The asyncio event loop does not allow connections to be shared. For more details, please
            # refer to https://github.com/encode/httpx/discussions/2959.
            async with openai.AsyncOpenAI(**self.client_args) as client:
                yield client

    @override
    async def stream(
        self,
        messages: Messages,
        tool_specs: Optional[list[ToolSpec]] = None,
        system_prompt: Optional[str] = None,
        *,
        tool_choice: ToolChoice | None = None,
        **kwargs: Any,
    ) -> AsyncGenerator[StreamEvent, None]:
        """Stream conversation with the OpenAI model.

        Args:
            messages: List of message objects to be processed by the model.
            tool_specs: List of tool specifications to make available to the model.
            system_prompt: System prompt to provide context to the model.
            tool_choice: Selection strategy for tool invocation.
            **kwargs: Additional keyword arguments for future extensibility.

        Yields:
            Formatted message chunks from the model.

        Raises:
            ContextWindowOverflowException: If the input exceeds the model's context window.
            ModelThrottledException: If the request is throttled by OpenAI (rate limits).
        """
        logger.debug("formatting request")
        request = self.format_request(messages, tool_specs, system_prompt, tool_choice)
        logger.debug("formatted request=<%s>", request)

        logger.debug("invoking model")

        # We initialize an OpenAI context on every request so as to avoid connection sharing in the underlying httpx
        # client. The asyncio event loop does not allow connections to be shared. For more details, please refer to
        # https://github.com/encode/httpx/discussions/2959.
        async with self._get_client() as client:
            try:
                response = await client.chat.completions.create(**request)
            except openai.BadRequestError as e:
                # Check if this is a context length exceeded error
                if hasattr(e, "code") and e.code == "context_length_exceeded":
                    logger.warning("OpenAI threw context window overflow error")
                    raise ContextWindowOverflowException(str(e)) from e
                # Re-raise other BadRequestError exceptions
                raise
            except openai.RateLimitError as e:
                # All rate limit errors should be treated as throttling, not context overflow
                # Rate limits (including TPM) require waiting/retrying, not context reduction
                logger.warning("OpenAI threw rate limit error")
                raise ModelThrottledException(str(e)) from e

            logger.debug("got response from model")
            yield self.format_chunk({"chunk_type": "message_start"})
            tool_calls: dict[int, list[Any]] = {}
            data_type = None
            finish_reason = None  # Store finish_reason for later use
            event = None  # Initialize for scope safety

            async for event in response:
                # Defensive: skip events with empty or missing choices
                if not getattr(event, "choices", None):
                    continue
                choice = event.choices[0]

                if hasattr(choice.delta, "reasoning_content") and choice.delta.reasoning_content:
                    chunks, data_type = self._stream_switch_content("reasoning_content", data_type)
                    for chunk in chunks:
                        yield chunk
                    yield self.format_chunk(
                        {
                            "chunk_type": "content_delta",
                            "data_type": data_type,
                            "data": choice.delta.reasoning_content,
                        }
                    )

                if choice.delta.content:
                    chunks, data_type = self._stream_switch_content("text", data_type)
                    for chunk in chunks:
                        yield chunk
                    yield self.format_chunk(
                        {"chunk_type": "content_delta", "data_type": data_type, "data": choice.delta.content}
                    )

                for tool_call in choice.delta.tool_calls or []:
                    tool_calls.setdefault(tool_call.index, []).append(tool_call)

                if choice.finish_reason:
                    finish_reason = choice.finish_reason  # Store for use outside loop
                    if data_type:
                        yield self.format_chunk({"chunk_type": "content_stop", "data_type": data_type})
                    break

            for tool_deltas in tool_calls.values():
                yield self.format_chunk({"chunk_type": "content_start", "data_type": "tool", "data": tool_deltas[0]})

                for tool_delta in tool_deltas:
                    yield self.format_chunk({"chunk_type": "content_delta", "data_type": "tool", "data": tool_delta})

                yield self.format_chunk({"chunk_type": "content_stop", "data_type": "tool"})

            yield self.format_chunk({"chunk_type": "message_stop", "data": finish_reason or "end_turn"})

            # Skip remaining events as we don't have use for anything except the final usage payload
            async for event in response:
                _ = event

            if event and hasattr(event, "usage") and event.usage:
                yield self.format_chunk({"chunk_type": "metadata", "data": event.usage})

        logger.debug("finished streaming response from model")

    def _stream_switch_content(self, data_type: str, prev_data_type: str | None) -> tuple[list[StreamEvent], str]:
        """Handle switching to a new content stream.

        Args:
            data_type: The next content data type.
            prev_data_type: The previous content data type.

        Returns:
            Tuple containing:
            - Stop block for previous content and the start block for the next content.
            - Next content data type.
        """
        chunks = []
        if data_type != prev_data_type:
            if prev_data_type is not None:
                chunks.append(self.format_chunk({"chunk_type": "content_stop", "data_type": prev_data_type}))
            chunks.append(self.format_chunk({"chunk_type": "content_start", "data_type": data_type}))

        return chunks, data_type

    @override
    async def structured_output(
        self, output_model: Type[T], prompt: Messages, system_prompt: Optional[str] = None, **kwargs: Any
    ) -> AsyncGenerator[dict[str, Union[T, Any]], None]:
        """Get structured output from the model.

        Args:
            output_model: The output model to use for the agent.
            prompt: The prompt messages to use for the agent.
            system_prompt: System prompt to provide context to the model.
            **kwargs: Additional keyword arguments for future extensibility.

        Yields:
            Model events with the last being the structured output.

        Raises:
            ContextWindowOverflowException: If the input exceeds the model's context window.
            ModelThrottledException: If the request is throttled by OpenAI (rate limits).
        """
        # We initialize an OpenAI context on every request so as to avoid connection sharing in the underlying httpx
        # client. The asyncio event loop does not allow connections to be shared. For more details, please refer to
        # https://github.com/encode/httpx/discussions/2959.
        async with self._get_client() as client:
            try:
                response: ParsedChatCompletion = await client.beta.chat.completions.parse(
                    model=self.get_config()["model_id"],
                    messages=self.format_request(prompt, system_prompt=system_prompt)["messages"],
                    response_format=output_model,
                )
            except openai.BadRequestError as e:
                # Check if this is a context length exceeded error
                if hasattr(e, "code") and e.code == "context_length_exceeded":
                    logger.warning("OpenAI threw context window overflow error")
                    raise ContextWindowOverflowException(str(e)) from e
                # Re-raise other BadRequestError exceptions
                raise
            except openai.RateLimitError as e:
                # All rate limit errors should be treated as throttling, not context overflow
                # Rate limits (including TPM) require waiting/retrying, not context reduction
                logger.warning("OpenAI threw rate limit error")
                raise ModelThrottledException(str(e)) from e

        parsed: T | None = None
        # Find the first choice with tool_calls
        if len(response.choices) > 1:
            raise ValueError("Multiple choices found in the OpenAI response.")

        for choice in response.choices:
            if isinstance(choice.message.parsed, output_model):
                parsed = choice.message.parsed
                break

        if parsed:
            yield {"output": parsed}
        else:
            raise ValueError("No valid tool use or tool use input was found in the OpenAI response.")

OpenAIConfig

Bases: TypedDict

Configuration options for OpenAI models.

Attributes:

Name Type Description
model_id str

Model ID (e.g., "gpt-4o"). For a complete list of supported models, see https://platform.openai.com/docs/models.

params Optional[dict[str, Any]]

Model parameters (e.g., max_tokens). For a complete list of supported parameters, see https://platform.openai.com/docs/api-reference/chat/create.

Source code in strands/models/openai.py
45
46
47
48
49
50
51
52
53
54
55
56
57
class OpenAIConfig(TypedDict, total=False):
    """Configuration options for OpenAI models.

    Attributes:
        model_id: Model ID (e.g., "gpt-4o").
            For a complete list of supported models, see https://platform.openai.com/docs/models.
        params: Model parameters (e.g., max_tokens).
            For a complete list of supported parameters, see
            https://platform.openai.com/docs/api-reference/chat/create.
    """

    model_id: str
    params: Optional[dict[str, Any]]

__init__(client=None, client_args=None, **model_config)

Initialize provider instance.

Parameters:

Name Type Description Default
client Optional[Client]

Pre-configured OpenAI-compatible client to reuse across requests. When provided, this client will be reused for all requests and will NOT be closed by the model. The caller is responsible for managing the client lifecycle. This is useful for: - Injecting custom client wrappers (e.g., GuardrailsAsyncOpenAI) - Reusing connection pools within a single event loop/worker - Centralizing observability, retries, and networking policy - Pointing to custom model gateways Note: The client should not be shared across different asyncio event loops.

None
client_args Optional[dict[str, Any]]

Arguments for the OpenAI client (legacy approach). For a complete list of supported arguments, see https://pypi.org/project/openai/.

None
**model_config Unpack[OpenAIConfig]

Configuration options for the OpenAI model.

{}

Raises:

Type Description
ValueError

If both client and client_args are provided.

Source code in strands/models/openai.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def __init__(
    self,
    client: Optional[Client] = None,
    client_args: Optional[dict[str, Any]] = None,
    **model_config: Unpack[OpenAIConfig],
) -> None:
    """Initialize provider instance.

    Args:
        client: Pre-configured OpenAI-compatible client to reuse across requests.
            When provided, this client will be reused for all requests and will NOT be closed
            by the model. The caller is responsible for managing the client lifecycle.
            This is useful for:
            - Injecting custom client wrappers (e.g., GuardrailsAsyncOpenAI)
            - Reusing connection pools within a single event loop/worker
            - Centralizing observability, retries, and networking policy
            - Pointing to custom model gateways
            Note: The client should not be shared across different asyncio event loops.
        client_args: Arguments for the OpenAI client (legacy approach).
            For a complete list of supported arguments, see https://pypi.org/project/openai/.
        **model_config: Configuration options for the OpenAI model.

    Raises:
        ValueError: If both `client` and `client_args` are provided.
    """
    validate_config_keys(model_config, self.OpenAIConfig)
    self.config = dict(model_config)

    # Validate that only one client configuration method is provided
    if client is not None and client_args is not None and len(client_args) > 0:
        raise ValueError("Only one of 'client' or 'client_args' should be provided, not both.")

    self._custom_client = client
    self.client_args = client_args or {}

    logger.debug("config=<%s> | initializing", self.config)

format_chunk(event, **kwargs)

Format an OpenAI response event into a standardized message chunk.

Parameters:

Name Type Description Default
event dict[str, Any]

A response event from the OpenAI compatible model.

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Returns:

Type Description
StreamEvent

The formatted chunk.

Raises:

Type Description
RuntimeError

If chunk_type is not recognized. This error should never be encountered as chunk_type is controlled in the stream method.

Source code in strands/models/openai.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def format_chunk(self, event: dict[str, Any], **kwargs: Any) -> StreamEvent:
    """Format an OpenAI response event into a standardized message chunk.

    Args:
        event: A response event from the OpenAI compatible model.
        **kwargs: Additional keyword arguments for future extensibility.

    Returns:
        The formatted chunk.

    Raises:
        RuntimeError: If chunk_type is not recognized.
            This error should never be encountered as chunk_type is controlled in the stream method.
    """
    match event["chunk_type"]:
        case "message_start":
            return {"messageStart": {"role": "assistant"}}

        case "content_start":
            if event["data_type"] == "tool":
                return {
                    "contentBlockStart": {
                        "start": {
                            "toolUse": {
                                "name": event["data"].function.name,
                                "toolUseId": event["data"].id,
                            }
                        }
                    }
                }

            return {"contentBlockStart": {"start": {}}}

        case "content_delta":
            if event["data_type"] == "tool":
                return {
                    "contentBlockDelta": {"delta": {"toolUse": {"input": event["data"].function.arguments or ""}}}
                }

            if event["data_type"] == "reasoning_content":
                return {"contentBlockDelta": {"delta": {"reasoningContent": {"text": event["data"]}}}}

            return {"contentBlockDelta": {"delta": {"text": event["data"]}}}

        case "content_stop":
            return {"contentBlockStop": {}}

        case "message_stop":
            match event["data"]:
                case "tool_calls":
                    return {"messageStop": {"stopReason": "tool_use"}}
                case "length":
                    return {"messageStop": {"stopReason": "max_tokens"}}
                case _:
                    return {"messageStop": {"stopReason": "end_turn"}}

        case "metadata":
            return {
                "metadata": {
                    "usage": {
                        "inputTokens": event["data"].prompt_tokens,
                        "outputTokens": event["data"].completion_tokens,
                        "totalTokens": event["data"].total_tokens,
                    },
                    "metrics": {
                        "latencyMs": 0,  # TODO
                    },
                },
            }

        case _:
            raise RuntimeError(f"chunk_type=<{event['chunk_type']} | unknown type")

format_request(messages, tool_specs=None, system_prompt=None, tool_choice=None, *, system_prompt_content=None, **kwargs)

Format an OpenAI compatible chat streaming request.

Parameters:

Name Type Description Default
messages Messages

List of message objects to be processed by the model.

required
tool_specs list[ToolSpec] | None

List of tool specifications to make available to the model.

None
system_prompt str | None

System prompt to provide context to the model.

None
tool_choice ToolChoice | None

Selection strategy for tool invocation.

None
system_prompt_content list[SystemContentBlock] | None

System prompt content blocks to provide context to the model.

None
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Returns:

Type Description
dict[str, Any]

An OpenAI compatible chat streaming request.

Raises:

Type Description
TypeError

If a message contains a content block type that cannot be converted to an OpenAI-compatible format.

Source code in strands/models/openai.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def format_request(
    self,
    messages: Messages,
    tool_specs: list[ToolSpec] | None = None,
    system_prompt: str | None = None,
    tool_choice: ToolChoice | None = None,
    *,
    system_prompt_content: list[SystemContentBlock] | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Format an OpenAI compatible chat streaming request.

    Args:
        messages: List of message objects to be processed by the model.
        tool_specs: List of tool specifications to make available to the model.
        system_prompt: System prompt to provide context to the model.
        tool_choice: Selection strategy for tool invocation.
        system_prompt_content: System prompt content blocks to provide context to the model.
        **kwargs: Additional keyword arguments for future extensibility.

    Returns:
        An OpenAI compatible chat streaming request.

    Raises:
        TypeError: If a message contains a content block type that cannot be converted to an OpenAI-compatible
            format.
    """
    return {
        "messages": self.format_request_messages(
            messages, system_prompt, system_prompt_content=system_prompt_content
        ),
        "model": self.config["model_id"],
        "stream": True,
        "stream_options": {"include_usage": True},
        "tools": [
            {
                "type": "function",
                "function": {
                    "name": tool_spec["name"],
                    "description": tool_spec["description"],
                    "parameters": tool_spec["inputSchema"]["json"],
                },
            }
            for tool_spec in tool_specs or []
        ],
        **(self._format_request_tool_choice(tool_choice)),
        **cast(dict[str, Any], self.config.get("params", {})),
    }

format_request_message_content(content, **kwargs) classmethod

Format an OpenAI compatible content block.

Parameters:

Name Type Description Default
content ContentBlock

Message content.

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Returns:

Type Description
dict[str, Any]

OpenAI compatible content block.

Raises:

Type Description
TypeError

If the content block type cannot be converted to an OpenAI-compatible format.

Source code in strands/models/openai.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@classmethod
def format_request_message_content(cls, content: ContentBlock, **kwargs: Any) -> dict[str, Any]:
    """Format an OpenAI compatible content block.

    Args:
        content: Message content.
        **kwargs: Additional keyword arguments for future extensibility.

    Returns:
        OpenAI compatible content block.

    Raises:
        TypeError: If the content block type cannot be converted to an OpenAI-compatible format.
    """
    if "document" in content:
        mime_type = mimetypes.types_map.get(f".{content['document']['format']}", "application/octet-stream")
        file_data = base64.b64encode(content["document"]["source"]["bytes"]).decode("utf-8")
        return {
            "file": {
                "file_data": f"data:{mime_type};base64,{file_data}",
                "filename": content["document"]["name"],
            },
            "type": "file",
        }

    if "image" in content:
        mime_type = mimetypes.types_map.get(f".{content['image']['format']}", "application/octet-stream")
        image_data = base64.b64encode(content["image"]["source"]["bytes"]).decode("utf-8")

        return {
            "image_url": {
                "detail": "auto",
                "format": mime_type,
                "url": f"data:{mime_type};base64,{image_data}",
            },
            "type": "image_url",
        }

    if "text" in content:
        return {"text": content["text"], "type": "text"}

    raise TypeError(f"content_type=<{next(iter(content))}> | unsupported type")

format_request_message_tool_call(tool_use, **kwargs) classmethod

Format an OpenAI compatible tool call.

Parameters:

Name Type Description Default
tool_use ToolUse

Tool use requested by the model.

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Returns:

Type Description
dict[str, Any]

OpenAI compatible tool call.

Source code in strands/models/openai.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@classmethod
def format_request_message_tool_call(cls, tool_use: ToolUse, **kwargs: Any) -> dict[str, Any]:
    """Format an OpenAI compatible tool call.

    Args:
        tool_use: Tool use requested by the model.
        **kwargs: Additional keyword arguments for future extensibility.

    Returns:
        OpenAI compatible tool call.
    """
    return {
        "function": {
            "arguments": json.dumps(tool_use["input"]),
            "name": tool_use["name"],
        },
        "id": tool_use["toolUseId"],
        "type": "function",
    }

format_request_messages(messages, system_prompt=None, *, system_prompt_content=None, **kwargs) classmethod

Format an OpenAI compatible messages array.

Parameters:

Name Type Description Default
messages Messages

List of message objects to be processed by the model.

required
system_prompt Optional[str]

System prompt to provide context to the model.

None
system_prompt_content Optional[list[SystemContentBlock]]

System prompt content blocks to provide context to the model.

None
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Returns:

Type Description
list[dict[str, Any]]

An OpenAI compatible messages array.

Source code in strands/models/openai.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
@classmethod
def format_request_messages(
    cls,
    messages: Messages,
    system_prompt: Optional[str] = None,
    *,
    system_prompt_content: Optional[list[SystemContentBlock]] = None,
    **kwargs: Any,
) -> list[dict[str, Any]]:
    """Format an OpenAI compatible messages array.

    Args:
        messages: List of message objects to be processed by the model.
        system_prompt: System prompt to provide context to the model.
        system_prompt_content: System prompt content blocks to provide context to the model.
        **kwargs: Additional keyword arguments for future extensibility.

    Returns:
        An OpenAI compatible messages array.
    """
    formatted_messages = cls._format_system_messages(system_prompt, system_prompt_content=system_prompt_content)
    formatted_messages.extend(cls._format_regular_messages(messages))

    return [message for message in formatted_messages if message["content"] or "tool_calls" in message]

format_request_tool_message(tool_result, **kwargs) classmethod

Format an OpenAI compatible tool message.

Parameters:

Name Type Description Default
tool_result ToolResult

Tool result collected from a tool execution.

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Returns:

Type Description
dict[str, Any]

OpenAI compatible tool message.

Source code in strands/models/openai.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
@classmethod
def format_request_tool_message(cls, tool_result: ToolResult, **kwargs: Any) -> dict[str, Any]:
    """Format an OpenAI compatible tool message.

    Args:
        tool_result: Tool result collected from a tool execution.
        **kwargs: Additional keyword arguments for future extensibility.

    Returns:
        OpenAI compatible tool message.
    """
    contents = cast(
        list[ContentBlock],
        [
            {"text": json.dumps(content["json"])} if "json" in content else content
            for content in tool_result["content"]
        ],
    )

    return {
        "role": "tool",
        "tool_call_id": tool_result["toolUseId"],
        "content": [cls.format_request_message_content(content) for content in contents],
    }

get_config()

Get the OpenAI model configuration.

Returns:

Type Description
OpenAIConfig

The OpenAI model configuration.

Source code in strands/models/openai.py
106
107
108
109
110
111
112
113
@override
def get_config(self) -> OpenAIConfig:
    """Get the OpenAI model configuration.

    Returns:
        The OpenAI model configuration.
    """
    return cast(OpenAIModel.OpenAIConfig, self.config)

stream(messages, tool_specs=None, system_prompt=None, *, tool_choice=None, **kwargs) async

Stream conversation with the OpenAI model.

Parameters:

Name Type Description Default
messages Messages

List of message objects to be processed by the model.

required
tool_specs Optional[list[ToolSpec]]

List of tool specifications to make available to the model.

None
system_prompt Optional[str]

System prompt to provide context to the model.

None
tool_choice ToolChoice | None

Selection strategy for tool invocation.

None
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Yields:

Type Description
AsyncGenerator[StreamEvent, None]

Formatted message chunks from the model.

Raises:

Type Description
ContextWindowOverflowException

If the input exceeds the model's context window.

ModelThrottledException

If the request is throttled by OpenAI (rate limits).

Source code in strands/models/openai.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
@override
async def stream(
    self,
    messages: Messages,
    tool_specs: Optional[list[ToolSpec]] = None,
    system_prompt: Optional[str] = None,
    *,
    tool_choice: ToolChoice | None = None,
    **kwargs: Any,
) -> AsyncGenerator[StreamEvent, None]:
    """Stream conversation with the OpenAI model.

    Args:
        messages: List of message objects to be processed by the model.
        tool_specs: List of tool specifications to make available to the model.
        system_prompt: System prompt to provide context to the model.
        tool_choice: Selection strategy for tool invocation.
        **kwargs: Additional keyword arguments for future extensibility.

    Yields:
        Formatted message chunks from the model.

    Raises:
        ContextWindowOverflowException: If the input exceeds the model's context window.
        ModelThrottledException: If the request is throttled by OpenAI (rate limits).
    """
    logger.debug("formatting request")
    request = self.format_request(messages, tool_specs, system_prompt, tool_choice)
    logger.debug("formatted request=<%s>", request)

    logger.debug("invoking model")

    # We initialize an OpenAI context on every request so as to avoid connection sharing in the underlying httpx
    # client. The asyncio event loop does not allow connections to be shared. For more details, please refer to
    # https://github.com/encode/httpx/discussions/2959.
    async with self._get_client() as client:
        try:
            response = await client.chat.completions.create(**request)
        except openai.BadRequestError as e:
            # Check if this is a context length exceeded error
            if hasattr(e, "code") and e.code == "context_length_exceeded":
                logger.warning("OpenAI threw context window overflow error")
                raise ContextWindowOverflowException(str(e)) from e
            # Re-raise other BadRequestError exceptions
            raise
        except openai.RateLimitError as e:
            # All rate limit errors should be treated as throttling, not context overflow
            # Rate limits (including TPM) require waiting/retrying, not context reduction
            logger.warning("OpenAI threw rate limit error")
            raise ModelThrottledException(str(e)) from e

        logger.debug("got response from model")
        yield self.format_chunk({"chunk_type": "message_start"})
        tool_calls: dict[int, list[Any]] = {}
        data_type = None
        finish_reason = None  # Store finish_reason for later use
        event = None  # Initialize for scope safety

        async for event in response:
            # Defensive: skip events with empty or missing choices
            if not getattr(event, "choices", None):
                continue
            choice = event.choices[0]

            if hasattr(choice.delta, "reasoning_content") and choice.delta.reasoning_content:
                chunks, data_type = self._stream_switch_content("reasoning_content", data_type)
                for chunk in chunks:
                    yield chunk
                yield self.format_chunk(
                    {
                        "chunk_type": "content_delta",
                        "data_type": data_type,
                        "data": choice.delta.reasoning_content,
                    }
                )

            if choice.delta.content:
                chunks, data_type = self._stream_switch_content("text", data_type)
                for chunk in chunks:
                    yield chunk
                yield self.format_chunk(
                    {"chunk_type": "content_delta", "data_type": data_type, "data": choice.delta.content}
                )

            for tool_call in choice.delta.tool_calls or []:
                tool_calls.setdefault(tool_call.index, []).append(tool_call)

            if choice.finish_reason:
                finish_reason = choice.finish_reason  # Store for use outside loop
                if data_type:
                    yield self.format_chunk({"chunk_type": "content_stop", "data_type": data_type})
                break

        for tool_deltas in tool_calls.values():
            yield self.format_chunk({"chunk_type": "content_start", "data_type": "tool", "data": tool_deltas[0]})

            for tool_delta in tool_deltas:
                yield self.format_chunk({"chunk_type": "content_delta", "data_type": "tool", "data": tool_delta})

            yield self.format_chunk({"chunk_type": "content_stop", "data_type": "tool"})

        yield self.format_chunk({"chunk_type": "message_stop", "data": finish_reason or "end_turn"})

        # Skip remaining events as we don't have use for anything except the final usage payload
        async for event in response:
            _ = event

        if event and hasattr(event, "usage") and event.usage:
            yield self.format_chunk({"chunk_type": "metadata", "data": event.usage})

    logger.debug("finished streaming response from model")

structured_output(output_model, prompt, system_prompt=None, **kwargs) async

Get structured output from the model.

Parameters:

Name Type Description Default
output_model Type[T]

The output model to use for the agent.

required
prompt Messages

The prompt messages to use for the agent.

required
system_prompt Optional[str]

System prompt to provide context to the model.

None
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Yields:

Type Description
AsyncGenerator[dict[str, Union[T, Any]], None]

Model events with the last being the structured output.

Raises:

Type Description
ContextWindowOverflowException

If the input exceeds the model's context window.

ModelThrottledException

If the request is throttled by OpenAI (rate limits).

Source code in strands/models/openai.py
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
@override
async def structured_output(
    self, output_model: Type[T], prompt: Messages, system_prompt: Optional[str] = None, **kwargs: Any
) -> AsyncGenerator[dict[str, Union[T, Any]], None]:
    """Get structured output from the model.

    Args:
        output_model: The output model to use for the agent.
        prompt: The prompt messages to use for the agent.
        system_prompt: System prompt to provide context to the model.
        **kwargs: Additional keyword arguments for future extensibility.

    Yields:
        Model events with the last being the structured output.

    Raises:
        ContextWindowOverflowException: If the input exceeds the model's context window.
        ModelThrottledException: If the request is throttled by OpenAI (rate limits).
    """
    # We initialize an OpenAI context on every request so as to avoid connection sharing in the underlying httpx
    # client. The asyncio event loop does not allow connections to be shared. For more details, please refer to
    # https://github.com/encode/httpx/discussions/2959.
    async with self._get_client() as client:
        try:
            response: ParsedChatCompletion = await client.beta.chat.completions.parse(
                model=self.get_config()["model_id"],
                messages=self.format_request(prompt, system_prompt=system_prompt)["messages"],
                response_format=output_model,
            )
        except openai.BadRequestError as e:
            # Check if this is a context length exceeded error
            if hasattr(e, "code") and e.code == "context_length_exceeded":
                logger.warning("OpenAI threw context window overflow error")
                raise ContextWindowOverflowException(str(e)) from e
            # Re-raise other BadRequestError exceptions
            raise
        except openai.RateLimitError as e:
            # All rate limit errors should be treated as throttling, not context overflow
            # Rate limits (including TPM) require waiting/retrying, not context reduction
            logger.warning("OpenAI threw rate limit error")
            raise ModelThrottledException(str(e)) from e

    parsed: T | None = None
    # Find the first choice with tool_calls
    if len(response.choices) > 1:
        raise ValueError("Multiple choices found in the OpenAI response.")

    for choice in response.choices:
        if isinstance(choice.message.parsed, output_model):
            parsed = choice.message.parsed
            break

    if parsed:
        yield {"output": parsed}
    else:
        raise ValueError("No valid tool use or tool use input was found in the OpenAI response.")

update_config(**model_config)

Update the OpenAI model configuration with the provided arguments.

Parameters:

Name Type Description Default
**model_config Unpack[OpenAIConfig]

Configuration overrides.

{}
Source code in strands/models/openai.py
 96
 97
 98
 99
100
101
102
103
104
@override
def update_config(self, **model_config: Unpack[OpenAIConfig]) -> None:  # type: ignore[override]
    """Update the OpenAI model configuration with the provided arguments.

    Args:
        **model_config: Configuration overrides.
    """
    validate_config_keys(model_config, self.OpenAIConfig)
    self.config.update(model_config)

SageMakerAIModel

Bases: OpenAIModel

Amazon SageMaker model provider implementation.

Source code in strands/models/sagemaker.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
class SageMakerAIModel(OpenAIModel):
    """Amazon SageMaker model provider implementation."""

    client: SageMakerRuntimeClient  # type: ignore[assignment]

    class SageMakerAIPayloadSchema(TypedDict, total=False):
        """Payload schema for the Amazon SageMaker AI model.

        Attributes:
            max_tokens: Maximum number of tokens to generate in the completion
            stream: Whether to stream the response
            temperature: Sampling temperature to use for the model (optional)
            top_p: Nucleus sampling parameter (optional)
            top_k: Top-k sampling parameter (optional)
            stop: List of stop sequences to use for the model (optional)
            tool_results_as_user_messages: Convert tool result to user messages (optional)
            additional_args: Additional request parameters, as supported by https://bit.ly/djl-lmi-request-schema
        """

        max_tokens: int
        stream: bool
        temperature: Optional[float]
        top_p: Optional[float]
        top_k: Optional[int]
        stop: Optional[list[str]]
        tool_results_as_user_messages: Optional[bool]
        additional_args: Optional[dict[str, Any]]

    class SageMakerAIEndpointConfig(TypedDict, total=False):
        """Configuration options for SageMaker models.

        Attributes:
            endpoint_name: The name of the SageMaker endpoint to invoke
            inference_component_name: The name of the inference component to use

            additional_args: Other request parameters, as supported by https://bit.ly/sagemaker-invoke-endpoint-params
        """

        endpoint_name: str
        region_name: str
        inference_component_name: Union[str, None]
        target_model: Union[Optional[str], None]
        target_variant: Union[Optional[str], None]
        additional_args: Optional[dict[str, Any]]

    def __init__(
        self,
        endpoint_config: SageMakerAIEndpointConfig,
        payload_config: SageMakerAIPayloadSchema,
        boto_session: Optional[boto3.Session] = None,
        boto_client_config: Optional[BotocoreConfig] = None,
    ):
        """Initialize provider instance.

        Args:
            endpoint_config: Endpoint configuration for SageMaker.
            payload_config: Payload configuration for the model.
            boto_session: Boto Session to use when calling the SageMaker Runtime.
            boto_client_config: Configuration to use when creating the SageMaker-Runtime Boto Client.
        """
        validate_config_keys(endpoint_config, self.SageMakerAIEndpointConfig)
        validate_config_keys(payload_config, self.SageMakerAIPayloadSchema)
        payload_config.setdefault("stream", True)
        payload_config.setdefault("tool_results_as_user_messages", False)
        self.endpoint_config = self.SageMakerAIEndpointConfig(**endpoint_config)
        self.payload_config = self.SageMakerAIPayloadSchema(**payload_config)
        logger.debug(
            "endpoint_config=<%s> payload_config=<%s> | initializing", self.endpoint_config, self.payload_config
        )

        region = self.endpoint_config.get("region_name") or os.getenv("AWS_REGION") or "us-west-2"
        session = boto_session or boto3.Session(region_name=str(region))

        # Add strands-agents to the request user agent
        if boto_client_config:
            existing_user_agent = getattr(boto_client_config, "user_agent_extra", None)

            # Append 'strands-agents' to existing user_agent_extra or set it if not present
            new_user_agent = f"{existing_user_agent} strands-agents" if existing_user_agent else "strands-agents"

            client_config = boto_client_config.merge(BotocoreConfig(user_agent_extra=new_user_agent))
        else:
            client_config = BotocoreConfig(user_agent_extra="strands-agents")

        self.client = session.client(
            service_name="sagemaker-runtime",
            config=client_config,
        )

    @override
    def update_config(self, **endpoint_config: Unpack[SageMakerAIEndpointConfig]) -> None:  # type: ignore[override]
        """Update the Amazon SageMaker model configuration with the provided arguments.

        Args:
            **endpoint_config: Configuration overrides.
        """
        validate_config_keys(endpoint_config, self.SageMakerAIEndpointConfig)
        self.endpoint_config.update(endpoint_config)

    @override
    def get_config(self) -> "SageMakerAIModel.SageMakerAIEndpointConfig":  # type: ignore[override]
        """Get the Amazon SageMaker model configuration.

        Returns:
            The Amazon SageMaker model configuration.
        """
        return self.endpoint_config

    @override
    def format_request(
        self,
        messages: Messages,
        tool_specs: Optional[list[ToolSpec]] = None,
        system_prompt: Optional[str] = None,
        tool_choice: ToolChoice | None = None,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """Format an Amazon SageMaker chat streaming request.

        Args:
            messages: List of message objects to be processed by the model.
            tool_specs: List of tool specifications to make available to the model.
            system_prompt: System prompt to provide context to the model.
            tool_choice: Selection strategy for tool invocation. **Note: This parameter is accepted for
                interface consistency but is currently ignored for this model provider.**
            **kwargs: Additional keyword arguments for future extensibility.

        Returns:
            An Amazon SageMaker chat streaming request.
        """
        formatted_messages = self.format_request_messages(messages, system_prompt)

        payload = {
            "messages": formatted_messages,
            "tools": [
                {
                    "type": "function",
                    "function": {
                        "name": tool_spec["name"],
                        "description": tool_spec["description"],
                        "parameters": tool_spec["inputSchema"]["json"],
                    },
                }
                for tool_spec in tool_specs or []
            ],
            # Add payload configuration parameters
            **{
                k: v
                for k, v in self.payload_config.items()
                if k not in ["additional_args", "tool_results_as_user_messages"]
            },
        }

        payload_additional_args = self.payload_config.get("additional_args")
        if payload_additional_args:
            payload.update(payload_additional_args)

        # Remove tools and tool_choice if tools = []
        if not payload["tools"]:
            payload.pop("tools")
            payload.pop("tool_choice", None)
        else:
            # Ensure the model can use tools when available
            payload["tool_choice"] = "auto"

        for message in payload["messages"]:  # type: ignore
            # Assistant message must have either content or tool_calls, but not both
            if message.get("role", "") == "assistant" and message.get("tool_calls", []) != []:
                message.pop("content", None)
            if message.get("role") == "tool" and self.payload_config.get("tool_results_as_user_messages", False):
                # Convert tool message to user message
                tool_call_id = message.get("tool_call_id", "ABCDEF")
                content = message.get("content", "")
                message = {"role": "user", "content": f"Tool call ID '{tool_call_id}' returned: {content}"}
            # Cannot have both reasoning_text and text - if "text", content becomes an array of content["text"]
            for c in message.get("content", []):
                if "text" in c:
                    message["content"] = [c]
                    break
            # Cast message content to string for TGI compatibility
            # message["content"] = str(message.get("content", ""))

        logger.info("payload=<%s>", json.dumps(payload, indent=2))
        # Format the request according to the SageMaker Runtime API requirements
        request = {
            "EndpointName": self.endpoint_config["endpoint_name"],
            "Body": json.dumps(payload),
            "ContentType": "application/json",
            "Accept": "application/json",
        }

        # Add optional SageMaker parameters if provided
        inf_component_name = self.endpoint_config.get("inference_component_name")
        if inf_component_name:
            request["InferenceComponentName"] = inf_component_name
        target_model = self.endpoint_config.get("target_model")
        if target_model:
            request["TargetModel"] = target_model
        target_variant = self.endpoint_config.get("target_variant")
        if target_variant:
            request["TargetVariant"] = target_variant

        # Add additional request args if provided
        additional_args = self.endpoint_config.get("additional_args")
        if additional_args:
            request.update(additional_args)

        return request

    @override
    async def stream(
        self,
        messages: Messages,
        tool_specs: Optional[list[ToolSpec]] = None,
        system_prompt: Optional[str] = None,
        *,
        tool_choice: ToolChoice | None = None,
        **kwargs: Any,
    ) -> AsyncGenerator[StreamEvent, None]:
        """Stream conversation with the SageMaker model.

        Args:
            messages: List of message objects to be processed by the model.
            tool_specs: List of tool specifications to make available to the model.
            system_prompt: System prompt to provide context to the model.
            tool_choice: Selection strategy for tool invocation. **Note: This parameter is accepted for
                interface consistency but is currently ignored for this model provider.**
            **kwargs: Additional keyword arguments for future extensibility.

        Yields:
            Formatted message chunks from the model.
        """
        warn_on_tool_choice_not_supported(tool_choice)

        logger.debug("formatting request")
        request = self.format_request(messages, tool_specs, system_prompt)
        logger.debug("formatted request=<%s>", request)

        logger.debug("invoking model")

        try:
            if self.payload_config.get("stream", True):
                response = self.client.invoke_endpoint_with_response_stream(**request)

                # Message start
                yield self.format_chunk({"chunk_type": "message_start"})

                # Parse the content
                finish_reason = ""
                partial_content = ""
                tool_calls: dict[int, list[Any]] = {}
                has_text_content = False
                text_content_started = False
                reasoning_content_started = False

                for event in response["Body"]:
                    chunk = event["PayloadPart"]["Bytes"].decode("utf-8")
                    partial_content += chunk[6:] if chunk.startswith("data: ") else chunk  # TGI fix
                    logger.info("chunk=<%s>", partial_content)
                    try:
                        content = json.loads(partial_content)
                        partial_content = ""
                        choice = content["choices"][0]
                        logger.info("choice=<%s>", json.dumps(choice, indent=2))

                        # Handle text content
                        if choice["delta"].get("content"):
                            if not text_content_started:
                                yield self.format_chunk({"chunk_type": "content_start", "data_type": "text"})
                                text_content_started = True
                            has_text_content = True
                            yield self.format_chunk(
                                {
                                    "chunk_type": "content_delta",
                                    "data_type": "text",
                                    "data": choice["delta"]["content"],
                                }
                            )

                        # Handle reasoning content
                        if choice["delta"].get("reasoning_content"):
                            if not reasoning_content_started:
                                yield self.format_chunk(
                                    {"chunk_type": "content_start", "data_type": "reasoning_content"}
                                )
                                reasoning_content_started = True
                            yield self.format_chunk(
                                {
                                    "chunk_type": "content_delta",
                                    "data_type": "reasoning_content",
                                    "data": choice["delta"]["reasoning_content"],
                                }
                            )

                        # Handle tool calls
                        generated_tool_calls = choice["delta"].get("tool_calls", [])
                        if not isinstance(generated_tool_calls, list):
                            generated_tool_calls = [generated_tool_calls]
                        for tool_call in generated_tool_calls:
                            tool_calls.setdefault(tool_call["index"], []).append(tool_call)

                        if choice["finish_reason"] is not None:
                            finish_reason = choice["finish_reason"]
                            break

                        if choice.get("usage"):
                            yield self.format_chunk(
                                {"chunk_type": "metadata", "data": UsageMetadata(**choice["usage"])}
                            )

                    except json.JSONDecodeError:
                        # Continue accumulating content until we have valid JSON
                        continue

                # Close reasoning content if it was started
                if reasoning_content_started:
                    yield self.format_chunk({"chunk_type": "content_stop", "data_type": "reasoning_content"})

                # Close text content if it was started
                if text_content_started:
                    yield self.format_chunk({"chunk_type": "content_stop", "data_type": "text"})

                # Handle tool calling
                logger.info("tool_calls=<%s>", json.dumps(tool_calls, indent=2))
                for tool_deltas in tool_calls.values():
                    if not tool_deltas[0]["function"].get("name"):
                        raise Exception("The model did not provide a tool name.")
                    yield self.format_chunk(
                        {"chunk_type": "content_start", "data_type": "tool", "data": ToolCall(**tool_deltas[0])}
                    )
                    for tool_delta in tool_deltas:
                        yield self.format_chunk(
                            {"chunk_type": "content_delta", "data_type": "tool", "data": ToolCall(**tool_delta)}
                        )
                    yield self.format_chunk({"chunk_type": "content_stop", "data_type": "tool"})

                # If no content was generated at all, ensure we have empty text content
                if not has_text_content and not tool_calls:
                    yield self.format_chunk({"chunk_type": "content_start", "data_type": "text"})
                    yield self.format_chunk({"chunk_type": "content_stop", "data_type": "text"})

                # Message close
                yield self.format_chunk({"chunk_type": "message_stop", "data": finish_reason})

            else:
                # Not all SageMaker AI models support streaming!
                response = self.client.invoke_endpoint(**request)  # type: ignore[assignment]
                final_response_json = json.loads(response["Body"].read().decode("utf-8"))  # type: ignore[attr-defined]
                logger.info("response=<%s>", json.dumps(final_response_json, indent=2))

                # Obtain the key elements from the response
                message = final_response_json["choices"][0]["message"]
                message_stop_reason = final_response_json["choices"][0]["finish_reason"]

                # Message start
                yield self.format_chunk({"chunk_type": "message_start"})

                # Handle text
                if message.get("content", ""):
                    yield self.format_chunk({"chunk_type": "content_start", "data_type": "text"})
                    yield self.format_chunk(
                        {"chunk_type": "content_delta", "data_type": "text", "data": message["content"]}
                    )
                    yield self.format_chunk({"chunk_type": "content_stop", "data_type": "text"})

                # Handle reasoning content
                if message.get("reasoning_content"):
                    yield self.format_chunk({"chunk_type": "content_start", "data_type": "reasoning_content"})
                    yield self.format_chunk(
                        {
                            "chunk_type": "content_delta",
                            "data_type": "reasoning_content",
                            "data": message["reasoning_content"],
                        }
                    )
                    yield self.format_chunk({"chunk_type": "content_stop", "data_type": "reasoning_content"})

                # Handle the tool calling, if any
                if message.get("tool_calls") or message_stop_reason == "tool_calls":
                    if not isinstance(message["tool_calls"], list):
                        message["tool_calls"] = [message["tool_calls"]]
                    for tool_call in message["tool_calls"]:
                        # if arguments of tool_call is not str, cast it
                        if not isinstance(tool_call["function"]["arguments"], str):
                            tool_call["function"]["arguments"] = json.dumps(tool_call["function"]["arguments"])
                        yield self.format_chunk(
                            {"chunk_type": "content_start", "data_type": "tool", "data": ToolCall(**tool_call)}
                        )
                        yield self.format_chunk(
                            {"chunk_type": "content_delta", "data_type": "tool", "data": ToolCall(**tool_call)}
                        )
                        yield self.format_chunk({"chunk_type": "content_stop", "data_type": "tool"})
                    message_stop_reason = "tool_calls"

                # Message close
                yield self.format_chunk({"chunk_type": "message_stop", "data": message_stop_reason})
                # Handle usage metadata
                if final_response_json.get("usage"):
                    yield self.format_chunk(
                        {"chunk_type": "metadata", "data": UsageMetadata(**final_response_json.get("usage"))}
                    )
        except (
            self.client.exceptions.InternalFailure,
            self.client.exceptions.ServiceUnavailable,
            self.client.exceptions.ValidationError,
            self.client.exceptions.ModelError,
            self.client.exceptions.InternalDependencyException,
            self.client.exceptions.ModelNotReadyException,
        ) as e:
            logger.error("SageMaker error: %s", str(e))
            raise e

        logger.debug("finished streaming response from model")

    @override
    @classmethod
    def format_request_tool_message(cls, tool_result: ToolResult, **kwargs: Any) -> dict[str, Any]:
        """Format a SageMaker compatible tool message.

        Args:
            tool_result: Tool result collected from a tool execution.
            **kwargs: Additional keyword arguments for future extensibility.

        Returns:
            SageMaker compatible tool message with content as a string.
        """
        # Convert content blocks to a simple string for SageMaker compatibility
        content_parts = []
        for content in tool_result["content"]:
            if "json" in content:
                content_parts.append(json.dumps(content["json"]))
            elif "text" in content:
                content_parts.append(content["text"])
            else:
                # Handle other content types by converting to string
                content_parts.append(str(content))

        content_string = " ".join(content_parts)

        return {
            "role": "tool",
            "tool_call_id": tool_result["toolUseId"],
            "content": content_string,  # String instead of list
        }

    @override
    @classmethod
    def format_request_message_content(cls, content: ContentBlock, **kwargs: Any) -> dict[str, Any]:
        """Format a content block.

        Args:
            content: Message content.
            **kwargs: Additional keyword arguments for future extensibility.

        Returns:
            Formatted content block.

        Raises:
            TypeError: If the content block type cannot be converted to a SageMaker-compatible format.
        """
        # if "text" in content and not isinstance(content["text"], str):
        #     return {"type": "text", "text": str(content["text"])}

        if "reasoningContent" in content and content["reasoningContent"]:
            return {
                "signature": content["reasoningContent"].get("reasoningText", {}).get("signature", ""),
                "thinking": content["reasoningContent"].get("reasoningText", {}).get("text", ""),
                "type": "thinking",
            }
        elif not content.get("reasoningContent"):
            content.pop("reasoningContent", None)

        if "video" in content:
            return {
                "type": "video_url",
                "video_url": {
                    "detail": "auto",
                    "url": content["video"]["source"]["bytes"],
                },
            }

        return super().format_request_message_content(content)

    @override
    async def structured_output(
        self, output_model: Type[T], prompt: Messages, system_prompt: Optional[str] = None, **kwargs: Any
    ) -> AsyncGenerator[dict[str, Union[T, Any]], None]:
        """Get structured output from the model.

        Args:
            output_model: The output model to use for the agent.
            prompt: The prompt messages to use for the agent.
            system_prompt: System prompt to provide context to the model.
            **kwargs: Additional keyword arguments for future extensibility.

        Yields:
            Model events with the last being the structured output.
        """
        # Format the request for structured output
        request = self.format_request(prompt, system_prompt=system_prompt)

        # Parse the payload to add response format
        payload = json.loads(request["Body"])
        payload["response_format"] = {
            "type": "json_schema",
            "json_schema": {"name": output_model.__name__, "schema": output_model.model_json_schema(), "strict": True},
        }
        request["Body"] = json.dumps(payload)

        try:
            # Use non-streaming mode for structured output
            response = self.client.invoke_endpoint(**request)
            final_response_json = json.loads(response["Body"].read().decode("utf-8"))

            # Extract the structured content
            message = final_response_json["choices"][0]["message"]

            if message.get("content"):
                try:
                    # Parse the JSON content and create the output model instance
                    content_data = json.loads(message["content"])
                    parsed_output = output_model(**content_data)
                    yield {"output": parsed_output}
                except (json.JSONDecodeError, TypeError, ValueError) as e:
                    raise ValueError(f"Failed to parse structured output: {e}") from e
            else:
                raise ValueError("No content found in SageMaker response")

        except (
            self.client.exceptions.InternalFailure,
            self.client.exceptions.ServiceUnavailable,
            self.client.exceptions.ValidationError,
            self.client.exceptions.ModelError,
            self.client.exceptions.InternalDependencyException,
            self.client.exceptions.ModelNotReadyException,
        ) as e:
            logger.error("SageMaker structured output error: %s", str(e))
            raise ValueError(f"SageMaker structured output error: {str(e)}") from e

SageMakerAIEndpointConfig

Bases: TypedDict

Configuration options for SageMaker models.

Attributes:

Name Type Description
endpoint_name str

The name of the SageMaker endpoint to invoke

inference_component_name Union[str, None]

The name of the inference component to use

additional_args Optional[dict[str, Any]]

Other request parameters, as supported by https://bit.ly/sagemaker-invoke-endpoint-params

Source code in strands/models/sagemaker.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class SageMakerAIEndpointConfig(TypedDict, total=False):
    """Configuration options for SageMaker models.

    Attributes:
        endpoint_name: The name of the SageMaker endpoint to invoke
        inference_component_name: The name of the inference component to use

        additional_args: Other request parameters, as supported by https://bit.ly/sagemaker-invoke-endpoint-params
    """

    endpoint_name: str
    region_name: str
    inference_component_name: Union[str, None]
    target_model: Union[Optional[str], None]
    target_variant: Union[Optional[str], None]
    additional_args: Optional[dict[str, Any]]

SageMakerAIPayloadSchema

Bases: TypedDict

Payload schema for the Amazon SageMaker AI model.

Attributes:

Name Type Description
max_tokens int

Maximum number of tokens to generate in the completion

stream bool

Whether to stream the response

temperature Optional[float]

Sampling temperature to use for the model (optional)

top_p Optional[float]

Nucleus sampling parameter (optional)

top_k Optional[int]

Top-k sampling parameter (optional)

stop Optional[list[str]]

List of stop sequences to use for the model (optional)

tool_results_as_user_messages Optional[bool]

Convert tool result to user messages (optional)

additional_args Optional[dict[str, Any]]

Additional request parameters, as supported by https://bit.ly/djl-lmi-request-schema

Source code in strands/models/sagemaker.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class SageMakerAIPayloadSchema(TypedDict, total=False):
    """Payload schema for the Amazon SageMaker AI model.

    Attributes:
        max_tokens: Maximum number of tokens to generate in the completion
        stream: Whether to stream the response
        temperature: Sampling temperature to use for the model (optional)
        top_p: Nucleus sampling parameter (optional)
        top_k: Top-k sampling parameter (optional)
        stop: List of stop sequences to use for the model (optional)
        tool_results_as_user_messages: Convert tool result to user messages (optional)
        additional_args: Additional request parameters, as supported by https://bit.ly/djl-lmi-request-schema
    """

    max_tokens: int
    stream: bool
    temperature: Optional[float]
    top_p: Optional[float]
    top_k: Optional[int]
    stop: Optional[list[str]]
    tool_results_as_user_messages: Optional[bool]
    additional_args: Optional[dict[str, Any]]

__init__(endpoint_config, payload_config, boto_session=None, boto_client_config=None)

Initialize provider instance.

Parameters:

Name Type Description Default
endpoint_config SageMakerAIEndpointConfig

Endpoint configuration for SageMaker.

required
payload_config SageMakerAIPayloadSchema

Payload configuration for the model.

required
boto_session Optional[Session]

Boto Session to use when calling the SageMaker Runtime.

None
boto_client_config Optional[Config]

Configuration to use when creating the SageMaker-Runtime Boto Client.

None
Source code in strands/models/sagemaker.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def __init__(
    self,
    endpoint_config: SageMakerAIEndpointConfig,
    payload_config: SageMakerAIPayloadSchema,
    boto_session: Optional[boto3.Session] = None,
    boto_client_config: Optional[BotocoreConfig] = None,
):
    """Initialize provider instance.

    Args:
        endpoint_config: Endpoint configuration for SageMaker.
        payload_config: Payload configuration for the model.
        boto_session: Boto Session to use when calling the SageMaker Runtime.
        boto_client_config: Configuration to use when creating the SageMaker-Runtime Boto Client.
    """
    validate_config_keys(endpoint_config, self.SageMakerAIEndpointConfig)
    validate_config_keys(payload_config, self.SageMakerAIPayloadSchema)
    payload_config.setdefault("stream", True)
    payload_config.setdefault("tool_results_as_user_messages", False)
    self.endpoint_config = self.SageMakerAIEndpointConfig(**endpoint_config)
    self.payload_config = self.SageMakerAIPayloadSchema(**payload_config)
    logger.debug(
        "endpoint_config=<%s> payload_config=<%s> | initializing", self.endpoint_config, self.payload_config
    )

    region = self.endpoint_config.get("region_name") or os.getenv("AWS_REGION") or "us-west-2"
    session = boto_session or boto3.Session(region_name=str(region))

    # Add strands-agents to the request user agent
    if boto_client_config:
        existing_user_agent = getattr(boto_client_config, "user_agent_extra", None)

        # Append 'strands-agents' to existing user_agent_extra or set it if not present
        new_user_agent = f"{existing_user_agent} strands-agents" if existing_user_agent else "strands-agents"

        client_config = boto_client_config.merge(BotocoreConfig(user_agent_extra=new_user_agent))
    else:
        client_config = BotocoreConfig(user_agent_extra="strands-agents")

    self.client = session.client(
        service_name="sagemaker-runtime",
        config=client_config,
    )

format_request(messages, tool_specs=None, system_prompt=None, tool_choice=None, **kwargs)

Format an Amazon SageMaker chat streaming request.

Parameters:

Name Type Description Default
messages Messages

List of message objects to be processed by the model.

required
tool_specs Optional[list[ToolSpec]]

List of tool specifications to make available to the model.

None
system_prompt Optional[str]

System prompt to provide context to the model.

None
tool_choice ToolChoice | None

Selection strategy for tool invocation. Note: This parameter is accepted for interface consistency but is currently ignored for this model provider.

None
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Returns:

Type Description
dict[str, Any]

An Amazon SageMaker chat streaming request.

Source code in strands/models/sagemaker.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
@override
def format_request(
    self,
    messages: Messages,
    tool_specs: Optional[list[ToolSpec]] = None,
    system_prompt: Optional[str] = None,
    tool_choice: ToolChoice | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Format an Amazon SageMaker chat streaming request.

    Args:
        messages: List of message objects to be processed by the model.
        tool_specs: List of tool specifications to make available to the model.
        system_prompt: System prompt to provide context to the model.
        tool_choice: Selection strategy for tool invocation. **Note: This parameter is accepted for
            interface consistency but is currently ignored for this model provider.**
        **kwargs: Additional keyword arguments for future extensibility.

    Returns:
        An Amazon SageMaker chat streaming request.
    """
    formatted_messages = self.format_request_messages(messages, system_prompt)

    payload = {
        "messages": formatted_messages,
        "tools": [
            {
                "type": "function",
                "function": {
                    "name": tool_spec["name"],
                    "description": tool_spec["description"],
                    "parameters": tool_spec["inputSchema"]["json"],
                },
            }
            for tool_spec in tool_specs or []
        ],
        # Add payload configuration parameters
        **{
            k: v
            for k, v in self.payload_config.items()
            if k not in ["additional_args", "tool_results_as_user_messages"]
        },
    }

    payload_additional_args = self.payload_config.get("additional_args")
    if payload_additional_args:
        payload.update(payload_additional_args)

    # Remove tools and tool_choice if tools = []
    if not payload["tools"]:
        payload.pop("tools")
        payload.pop("tool_choice", None)
    else:
        # Ensure the model can use tools when available
        payload["tool_choice"] = "auto"

    for message in payload["messages"]:  # type: ignore
        # Assistant message must have either content or tool_calls, but not both
        if message.get("role", "") == "assistant" and message.get("tool_calls", []) != []:
            message.pop("content", None)
        if message.get("role") == "tool" and self.payload_config.get("tool_results_as_user_messages", False):
            # Convert tool message to user message
            tool_call_id = message.get("tool_call_id", "ABCDEF")
            content = message.get("content", "")
            message = {"role": "user", "content": f"Tool call ID '{tool_call_id}' returned: {content}"}
        # Cannot have both reasoning_text and text - if "text", content becomes an array of content["text"]
        for c in message.get("content", []):
            if "text" in c:
                message["content"] = [c]
                break
        # Cast message content to string for TGI compatibility
        # message["content"] = str(message.get("content", ""))

    logger.info("payload=<%s>", json.dumps(payload, indent=2))
    # Format the request according to the SageMaker Runtime API requirements
    request = {
        "EndpointName": self.endpoint_config["endpoint_name"],
        "Body": json.dumps(payload),
        "ContentType": "application/json",
        "Accept": "application/json",
    }

    # Add optional SageMaker parameters if provided
    inf_component_name = self.endpoint_config.get("inference_component_name")
    if inf_component_name:
        request["InferenceComponentName"] = inf_component_name
    target_model = self.endpoint_config.get("target_model")
    if target_model:
        request["TargetModel"] = target_model
    target_variant = self.endpoint_config.get("target_variant")
    if target_variant:
        request["TargetVariant"] = target_variant

    # Add additional request args if provided
    additional_args = self.endpoint_config.get("additional_args")
    if additional_args:
        request.update(additional_args)

    return request

format_request_message_content(content, **kwargs) classmethod

Format a content block.

Parameters:

Name Type Description Default
content ContentBlock

Message content.

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Returns:

Type Description
dict[str, Any]

Formatted content block.

Raises:

Type Description
TypeError

If the content block type cannot be converted to a SageMaker-compatible format.

Source code in strands/models/sagemaker.py
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
@override
@classmethod
def format_request_message_content(cls, content: ContentBlock, **kwargs: Any) -> dict[str, Any]:
    """Format a content block.

    Args:
        content: Message content.
        **kwargs: Additional keyword arguments for future extensibility.

    Returns:
        Formatted content block.

    Raises:
        TypeError: If the content block type cannot be converted to a SageMaker-compatible format.
    """
    # if "text" in content and not isinstance(content["text"], str):
    #     return {"type": "text", "text": str(content["text"])}

    if "reasoningContent" in content and content["reasoningContent"]:
        return {
            "signature": content["reasoningContent"].get("reasoningText", {}).get("signature", ""),
            "thinking": content["reasoningContent"].get("reasoningText", {}).get("text", ""),
            "type": "thinking",
        }
    elif not content.get("reasoningContent"):
        content.pop("reasoningContent", None)

    if "video" in content:
        return {
            "type": "video_url",
            "video_url": {
                "detail": "auto",
                "url": content["video"]["source"]["bytes"],
            },
        }

    return super().format_request_message_content(content)

format_request_tool_message(tool_result, **kwargs) classmethod

Format a SageMaker compatible tool message.

Parameters:

Name Type Description Default
tool_result ToolResult

Tool result collected from a tool execution.

required
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Returns:

Type Description
dict[str, Any]

SageMaker compatible tool message with content as a string.

Source code in strands/models/sagemaker.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
@override
@classmethod
def format_request_tool_message(cls, tool_result: ToolResult, **kwargs: Any) -> dict[str, Any]:
    """Format a SageMaker compatible tool message.

    Args:
        tool_result: Tool result collected from a tool execution.
        **kwargs: Additional keyword arguments for future extensibility.

    Returns:
        SageMaker compatible tool message with content as a string.
    """
    # Convert content blocks to a simple string for SageMaker compatibility
    content_parts = []
    for content in tool_result["content"]:
        if "json" in content:
            content_parts.append(json.dumps(content["json"]))
        elif "text" in content:
            content_parts.append(content["text"])
        else:
            # Handle other content types by converting to string
            content_parts.append(str(content))

    content_string = " ".join(content_parts)

    return {
        "role": "tool",
        "tool_call_id": tool_result["toolUseId"],
        "content": content_string,  # String instead of list
    }

get_config()

Get the Amazon SageMaker model configuration.

Returns:

Type Description
SageMakerAIEndpointConfig

The Amazon SageMaker model configuration.

Source code in strands/models/sagemaker.py
189
190
191
192
193
194
195
196
@override
def get_config(self) -> "SageMakerAIModel.SageMakerAIEndpointConfig":  # type: ignore[override]
    """Get the Amazon SageMaker model configuration.

    Returns:
        The Amazon SageMaker model configuration.
    """
    return self.endpoint_config

stream(messages, tool_specs=None, system_prompt=None, *, tool_choice=None, **kwargs) async

Stream conversation with the SageMaker model.

Parameters:

Name Type Description Default
messages Messages

List of message objects to be processed by the model.

required
tool_specs Optional[list[ToolSpec]]

List of tool specifications to make available to the model.

None
system_prompt Optional[str]

System prompt to provide context to the model.

None
tool_choice ToolChoice | None

Selection strategy for tool invocation. Note: This parameter is accepted for interface consistency but is currently ignored for this model provider.

None
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Yields:

Type Description
AsyncGenerator[StreamEvent, None]

Formatted message chunks from the model.

Source code in strands/models/sagemaker.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
@override
async def stream(
    self,
    messages: Messages,
    tool_specs: Optional[list[ToolSpec]] = None,
    system_prompt: Optional[str] = None,
    *,
    tool_choice: ToolChoice | None = None,
    **kwargs: Any,
) -> AsyncGenerator[StreamEvent, None]:
    """Stream conversation with the SageMaker model.

    Args:
        messages: List of message objects to be processed by the model.
        tool_specs: List of tool specifications to make available to the model.
        system_prompt: System prompt to provide context to the model.
        tool_choice: Selection strategy for tool invocation. **Note: This parameter is accepted for
            interface consistency but is currently ignored for this model provider.**
        **kwargs: Additional keyword arguments for future extensibility.

    Yields:
        Formatted message chunks from the model.
    """
    warn_on_tool_choice_not_supported(tool_choice)

    logger.debug("formatting request")
    request = self.format_request(messages, tool_specs, system_prompt)
    logger.debug("formatted request=<%s>", request)

    logger.debug("invoking model")

    try:
        if self.payload_config.get("stream", True):
            response = self.client.invoke_endpoint_with_response_stream(**request)

            # Message start
            yield self.format_chunk({"chunk_type": "message_start"})

            # Parse the content
            finish_reason = ""
            partial_content = ""
            tool_calls: dict[int, list[Any]] = {}
            has_text_content = False
            text_content_started = False
            reasoning_content_started = False

            for event in response["Body"]:
                chunk = event["PayloadPart"]["Bytes"].decode("utf-8")
                partial_content += chunk[6:] if chunk.startswith("data: ") else chunk  # TGI fix
                logger.info("chunk=<%s>", partial_content)
                try:
                    content = json.loads(partial_content)
                    partial_content = ""
                    choice = content["choices"][0]
                    logger.info("choice=<%s>", json.dumps(choice, indent=2))

                    # Handle text content
                    if choice["delta"].get("content"):
                        if not text_content_started:
                            yield self.format_chunk({"chunk_type": "content_start", "data_type": "text"})
                            text_content_started = True
                        has_text_content = True
                        yield self.format_chunk(
                            {
                                "chunk_type": "content_delta",
                                "data_type": "text",
                                "data": choice["delta"]["content"],
                            }
                        )

                    # Handle reasoning content
                    if choice["delta"].get("reasoning_content"):
                        if not reasoning_content_started:
                            yield self.format_chunk(
                                {"chunk_type": "content_start", "data_type": "reasoning_content"}
                            )
                            reasoning_content_started = True
                        yield self.format_chunk(
                            {
                                "chunk_type": "content_delta",
                                "data_type": "reasoning_content",
                                "data": choice["delta"]["reasoning_content"],
                            }
                        )

                    # Handle tool calls
                    generated_tool_calls = choice["delta"].get("tool_calls", [])
                    if not isinstance(generated_tool_calls, list):
                        generated_tool_calls = [generated_tool_calls]
                    for tool_call in generated_tool_calls:
                        tool_calls.setdefault(tool_call["index"], []).append(tool_call)

                    if choice["finish_reason"] is not None:
                        finish_reason = choice["finish_reason"]
                        break

                    if choice.get("usage"):
                        yield self.format_chunk(
                            {"chunk_type": "metadata", "data": UsageMetadata(**choice["usage"])}
                        )

                except json.JSONDecodeError:
                    # Continue accumulating content until we have valid JSON
                    continue

            # Close reasoning content if it was started
            if reasoning_content_started:
                yield self.format_chunk({"chunk_type": "content_stop", "data_type": "reasoning_content"})

            # Close text content if it was started
            if text_content_started:
                yield self.format_chunk({"chunk_type": "content_stop", "data_type": "text"})

            # Handle tool calling
            logger.info("tool_calls=<%s>", json.dumps(tool_calls, indent=2))
            for tool_deltas in tool_calls.values():
                if not tool_deltas[0]["function"].get("name"):
                    raise Exception("The model did not provide a tool name.")
                yield self.format_chunk(
                    {"chunk_type": "content_start", "data_type": "tool", "data": ToolCall(**tool_deltas[0])}
                )
                for tool_delta in tool_deltas:
                    yield self.format_chunk(
                        {"chunk_type": "content_delta", "data_type": "tool", "data": ToolCall(**tool_delta)}
                    )
                yield self.format_chunk({"chunk_type": "content_stop", "data_type": "tool"})

            # If no content was generated at all, ensure we have empty text content
            if not has_text_content and not tool_calls:
                yield self.format_chunk({"chunk_type": "content_start", "data_type": "text"})
                yield self.format_chunk({"chunk_type": "content_stop", "data_type": "text"})

            # Message close
            yield self.format_chunk({"chunk_type": "message_stop", "data": finish_reason})

        else:
            # Not all SageMaker AI models support streaming!
            response = self.client.invoke_endpoint(**request)  # type: ignore[assignment]
            final_response_json = json.loads(response["Body"].read().decode("utf-8"))  # type: ignore[attr-defined]
            logger.info("response=<%s>", json.dumps(final_response_json, indent=2))

            # Obtain the key elements from the response
            message = final_response_json["choices"][0]["message"]
            message_stop_reason = final_response_json["choices"][0]["finish_reason"]

            # Message start
            yield self.format_chunk({"chunk_type": "message_start"})

            # Handle text
            if message.get("content", ""):
                yield self.format_chunk({"chunk_type": "content_start", "data_type": "text"})
                yield self.format_chunk(
                    {"chunk_type": "content_delta", "data_type": "text", "data": message["content"]}
                )
                yield self.format_chunk({"chunk_type": "content_stop", "data_type": "text"})

            # Handle reasoning content
            if message.get("reasoning_content"):
                yield self.format_chunk({"chunk_type": "content_start", "data_type": "reasoning_content"})
                yield self.format_chunk(
                    {
                        "chunk_type": "content_delta",
                        "data_type": "reasoning_content",
                        "data": message["reasoning_content"],
                    }
                )
                yield self.format_chunk({"chunk_type": "content_stop", "data_type": "reasoning_content"})

            # Handle the tool calling, if any
            if message.get("tool_calls") or message_stop_reason == "tool_calls":
                if not isinstance(message["tool_calls"], list):
                    message["tool_calls"] = [message["tool_calls"]]
                for tool_call in message["tool_calls"]:
                    # if arguments of tool_call is not str, cast it
                    if not isinstance(tool_call["function"]["arguments"], str):
                        tool_call["function"]["arguments"] = json.dumps(tool_call["function"]["arguments"])
                    yield self.format_chunk(
                        {"chunk_type": "content_start", "data_type": "tool", "data": ToolCall(**tool_call)}
                    )
                    yield self.format_chunk(
                        {"chunk_type": "content_delta", "data_type": "tool", "data": ToolCall(**tool_call)}
                    )
                    yield self.format_chunk({"chunk_type": "content_stop", "data_type": "tool"})
                message_stop_reason = "tool_calls"

            # Message close
            yield self.format_chunk({"chunk_type": "message_stop", "data": message_stop_reason})
            # Handle usage metadata
            if final_response_json.get("usage"):
                yield self.format_chunk(
                    {"chunk_type": "metadata", "data": UsageMetadata(**final_response_json.get("usage"))}
                )
    except (
        self.client.exceptions.InternalFailure,
        self.client.exceptions.ServiceUnavailable,
        self.client.exceptions.ValidationError,
        self.client.exceptions.ModelError,
        self.client.exceptions.InternalDependencyException,
        self.client.exceptions.ModelNotReadyException,
    ) as e:
        logger.error("SageMaker error: %s", str(e))
        raise e

    logger.debug("finished streaming response from model")

structured_output(output_model, prompt, system_prompt=None, **kwargs) async

Get structured output from the model.

Parameters:

Name Type Description Default
output_model Type[T]

The output model to use for the agent.

required
prompt Messages

The prompt messages to use for the agent.

required
system_prompt Optional[str]

System prompt to provide context to the model.

None
**kwargs Any

Additional keyword arguments for future extensibility.

{}

Yields:

Type Description
AsyncGenerator[dict[str, Union[T, Any]], None]

Model events with the last being the structured output.

Source code in strands/models/sagemaker.py
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
@override
async def structured_output(
    self, output_model: Type[T], prompt: Messages, system_prompt: Optional[str] = None, **kwargs: Any
) -> AsyncGenerator[dict[str, Union[T, Any]], None]:
    """Get structured output from the model.

    Args:
        output_model: The output model to use for the agent.
        prompt: The prompt messages to use for the agent.
        system_prompt: System prompt to provide context to the model.
        **kwargs: Additional keyword arguments for future extensibility.

    Yields:
        Model events with the last being the structured output.
    """
    # Format the request for structured output
    request = self.format_request(prompt, system_prompt=system_prompt)

    # Parse the payload to add response format
    payload = json.loads(request["Body"])
    payload["response_format"] = {
        "type": "json_schema",
        "json_schema": {"name": output_model.__name__, "schema": output_model.model_json_schema(), "strict": True},
    }
    request["Body"] = json.dumps(payload)

    try:
        # Use non-streaming mode for structured output
        response = self.client.invoke_endpoint(**request)
        final_response_json = json.loads(response["Body"].read().decode("utf-8"))

        # Extract the structured content
        message = final_response_json["choices"][0]["message"]

        if message.get("content"):
            try:
                # Parse the JSON content and create the output model instance
                content_data = json.loads(message["content"])
                parsed_output = output_model(**content_data)
                yield {"output": parsed_output}
            except (json.JSONDecodeError, TypeError, ValueError) as e:
                raise ValueError(f"Failed to parse structured output: {e}") from e
        else:
            raise ValueError("No content found in SageMaker response")

    except (
        self.client.exceptions.InternalFailure,
        self.client.exceptions.ServiceUnavailable,
        self.client.exceptions.ValidationError,
        self.client.exceptions.ModelError,
        self.client.exceptions.InternalDependencyException,
        self.client.exceptions.ModelNotReadyException,
    ) as e:
        logger.error("SageMaker structured output error: %s", str(e))
        raise ValueError(f"SageMaker structured output error: {str(e)}") from e

update_config(**endpoint_config)

Update the Amazon SageMaker model configuration with the provided arguments.

Parameters:

Name Type Description Default
**endpoint_config Unpack[SageMakerAIEndpointConfig]

Configuration overrides.

{}
Source code in strands/models/sagemaker.py
179
180
181
182
183
184
185
186
187
@override
def update_config(self, **endpoint_config: Unpack[SageMakerAIEndpointConfig]) -> None:  # type: ignore[override]
    """Update the Amazon SageMaker model configuration with the provided arguments.

    Args:
        **endpoint_config: Configuration overrides.
    """
    validate_config_keys(endpoint_config, self.SageMakerAIEndpointConfig)
    self.endpoint_config.update(endpoint_config)

StreamEvent

Bases: TypedDict

The messages output stream.

Attributes:

Name Type Description
contentBlockDelta ContentBlockDeltaEvent

Delta content for a content block.

contentBlockStart ContentBlockStartEvent

Start of a content block.

contentBlockStop ContentBlockStopEvent

End of a content block.

internalServerException ExceptionEvent

Internal server error information.

messageStart MessageStartEvent

Start of a message.

messageStop MessageStopEvent

End of a message.

metadata MetadataEvent

Metadata about the streaming response.

modelStreamErrorException ModelStreamErrorEvent

Model streaming error information.

serviceUnavailableException ExceptionEvent

Service unavailable error information.

throttlingException ExceptionEvent

Throttling error information.

validationException ExceptionEvent

Validation error information.

Source code in strands/types/streaming.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
class StreamEvent(TypedDict, total=False):
    """The messages output stream.

    Attributes:
        contentBlockDelta: Delta content for a content block.
        contentBlockStart: Start of a content block.
        contentBlockStop: End of a content block.
        internalServerException: Internal server error information.
        messageStart: Start of a message.
        messageStop: End of a message.
        metadata: Metadata about the streaming response.
        modelStreamErrorException: Model streaming error information.
        serviceUnavailableException: Service unavailable error information.
        throttlingException: Throttling error information.
        validationException: Validation error information.
    """

    contentBlockDelta: ContentBlockDeltaEvent
    contentBlockStart: ContentBlockStartEvent
    contentBlockStop: ContentBlockStopEvent
    internalServerException: ExceptionEvent
    messageStart: MessageStartEvent
    messageStop: MessageStopEvent
    metadata: MetadataEvent
    redactContent: RedactContentEvent
    modelStreamErrorException: ModelStreamErrorEvent
    serviceUnavailableException: ExceptionEvent
    throttlingException: ExceptionEvent
    validationException: ExceptionEvent

ToolCall dataclass

Tool call for the model object.

Attributes:

Name Type Description
id str

Tool call ID

type Literal['function']

Tool call type

function FunctionCall

Tool call function

Source code in strands/models/sagemaker.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@dataclass
class ToolCall:
    """Tool call for the model object.

    Attributes:
        id: Tool call ID
        type: Tool call type
        function: Tool call function
    """

    id: str
    type: Literal["function"]
    function: FunctionCall

    def __init__(self, **kwargs: dict):
        """Initialize tool call object.

        Args:
            **kwargs: Keyword arguments for the tool call.
        """
        self.id = str(kwargs.get("id", ""))
        self.type = "function"
        self.function = FunctionCall(**kwargs.get("function", {"name": "", "arguments": ""}))

__init__(**kwargs)

Initialize tool call object.

Parameters:

Name Type Description Default
**kwargs dict

Keyword arguments for the tool call.

{}
Source code in strands/models/sagemaker.py
79
80
81
82
83
84
85
86
87
def __init__(self, **kwargs: dict):
    """Initialize tool call object.

    Args:
        **kwargs: Keyword arguments for the tool call.
    """
    self.id = str(kwargs.get("id", ""))
    self.type = "function"
    self.function = FunctionCall(**kwargs.get("function", {"name": "", "arguments": ""}))

ToolResult

Bases: TypedDict

Result of a tool execution.

Attributes:

Name Type Description
content list[ToolResultContent]

List of result content returned by the tool.

status ToolResultStatus

The status of the tool execution ("success" or "error").

toolUseId str

The unique identifier of the tool use request that produced this result.

Source code in strands/types/tools.py
87
88
89
90
91
92
93
94
95
96
97
98
class ToolResult(TypedDict):
    """Result of a tool execution.

    Attributes:
        content: List of result content returned by the tool.
        status: The status of the tool execution ("success" or "error").
        toolUseId: The unique identifier of the tool use request that produced this result.
    """

    content: list[ToolResultContent]
    status: ToolResultStatus
    toolUseId: str

ToolSpec

Bases: TypedDict

Specification for a tool that can be used by an agent.

Attributes:

Name Type Description
description str

A human-readable description of what the tool does.

inputSchema JSONSchema

JSON Schema defining the expected input parameters.

name str

The unique name of the tool.

outputSchema NotRequired[JSONSchema]

Optional JSON Schema defining the expected output format. Note: Not all model providers support this field. Providers that don't support it should filter it out before sending to their API.

Source code in strands/types/tools.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ToolSpec(TypedDict):
    """Specification for a tool that can be used by an agent.

    Attributes:
        description: A human-readable description of what the tool does.
        inputSchema: JSON Schema defining the expected input parameters.
        name: The unique name of the tool.
        outputSchema: Optional JSON Schema defining the expected output format.
            Note: Not all model providers support this field. Providers that don't
            support it should filter it out before sending to their API.
    """

    description: str
    inputSchema: JSONSchema
    name: str
    outputSchema: NotRequired[JSONSchema]

UsageMetadata dataclass

Usage metadata for the model.

Attributes:

Name Type Description
total_tokens int

Total number of tokens used in the request

completion_tokens int

Number of tokens used in the completion

prompt_tokens int

Number of tokens used in the prompt

prompt_tokens_details Optional[int]

Additional information about the prompt tokens (optional)

Source code in strands/models/sagemaker.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@dataclass
class UsageMetadata:
    """Usage metadata for the model.

    Attributes:
        total_tokens: Total number of tokens used in the request
        completion_tokens: Number of tokens used in the completion
        prompt_tokens: Number of tokens used in the prompt
        prompt_tokens_details: Additional information about the prompt tokens (optional)
    """

    total_tokens: int
    completion_tokens: int
    prompt_tokens: int
    prompt_tokens_details: Optional[int] = 0

validate_config_keys(config_dict, config_class)

Validate that config keys match the TypedDict fields.

Parameters:

Name Type Description Default
config_dict Mapping[str, Any]

Dictionary of configuration parameters

required
config_class Type

TypedDict class to validate against

required
Source code in strands/models/_validation.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def validate_config_keys(config_dict: Mapping[str, Any], config_class: Type) -> None:
    """Validate that config keys match the TypedDict fields.

    Args:
        config_dict: Dictionary of configuration parameters
        config_class: TypedDict class to validate against
    """
    valid_keys = set(get_type_hints(config_class).keys())
    provided_keys = set(config_dict.keys())
    invalid_keys = provided_keys - valid_keys

    if invalid_keys:
        warnings.warn(
            f"Invalid configuration parameters: {sorted(invalid_keys)}."
            f"\nValid parameters are: {sorted(valid_keys)}."
            f"\n"
            f"\nSee https://github.com/strands-agents/sdk-python/issues/815",
            stacklevel=4,
        )

warn_on_tool_choice_not_supported(tool_choice)

Emits a warning if a tool choice is provided but not supported by the provider.

Parameters:

Name Type Description Default
tool_choice ToolChoice | None

the tool_choice provided to the provider

required
Source code in strands/models/_validation.py
32
33
34
35
36
37
38
39
40
41
42
def warn_on_tool_choice_not_supported(tool_choice: ToolChoice | None) -> None:
    """Emits a warning if a tool choice is provided but not supported by the provider.

    Args:
        tool_choice: the tool_choice provided to the provider
    """
    if tool_choice:
        warnings.warn(
            "A ToolChoice was provided to this provider but is not supported and will be ignored",
            stacklevel=4,
        )