Skip to content

strands.agent.a2a_agent

A2A Agent client for Strands Agents.

This module provides the A2AAgent class, which acts as a client wrapper for remote A2A agents, allowing them to be used standalone or as part of multi-agent patterns.

A2AAgent can be used to get the Agent Card and interact with the agent.

A2AResponse = tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message | Any module-attribute

AgentInput = str | list[ContentBlock] | list[InterruptResponseContent] | Messages | None module-attribute

_DEFAULT_TIMEOUT = 300 module-attribute

logger = logging.getLogger(__name__) module-attribute

A2AAgent

Bases: AgentBase

Client wrapper for remote A2A agents.

Source code in strands/agent/a2a_agent.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
class A2AAgent(AgentBase):
    """Client wrapper for remote A2A agents."""

    def __init__(
        self,
        endpoint: str,
        *,
        name: str | None = None,
        description: str | None = None,
        timeout: int = _DEFAULT_TIMEOUT,
        a2a_client_factory: ClientFactory | None = None,
    ):
        """Initialize A2A agent.

        Args:
            endpoint: The base URL of the remote A2A agent.
            name: Agent name. If not provided, will be populated from agent card.
            description: Agent description. If not provided, will be populated from agent card.
            timeout: Timeout for HTTP operations in seconds (defaults to 300).
            a2a_client_factory: Optional pre-configured A2A ClientFactory. If provided,
                it will be used to create the A2A client after discovering the agent card.
                Note: When providing a custom factory, you are responsible for managing
                the lifecycle of any httpx client it uses.
        """
        self.endpoint = endpoint
        self.name = name
        self.description = description
        self.timeout = timeout
        self._agent_card: AgentCard | None = None
        self._a2a_client_factory: ClientFactory | None = a2a_client_factory

    def __call__(
        self,
        prompt: AgentInput = None,
        **kwargs: Any,
    ) -> AgentResult:
        """Synchronously invoke the remote A2A agent.

        Args:
            prompt: Input to the agent (string, message list, or content blocks).
            **kwargs: Additional arguments (ignored).

        Returns:
            AgentResult containing the agent's response.

        Raises:
            ValueError: If prompt is None.
            RuntimeError: If no response received from agent.
        """
        return run_async(lambda: self.invoke_async(prompt, **kwargs))

    async def invoke_async(
        self,
        prompt: AgentInput = None,
        **kwargs: Any,
    ) -> AgentResult:
        """Asynchronously invoke the remote A2A agent.

        Args:
            prompt: Input to the agent (string, message list, or content blocks).
            **kwargs: Additional arguments (ignored).

        Returns:
            AgentResult containing the agent's response.

        Raises:
            ValueError: If prompt is None.
            RuntimeError: If no response received from agent.
        """
        result: AgentResult | None = None
        async for event in self.stream_async(prompt, **kwargs):
            if "result" in event:
                result = event["result"]

        if result is None:
            raise RuntimeError("No response received from A2A agent")

        return result

    async def stream_async(
        self,
        prompt: AgentInput = None,
        **kwargs: Any,
    ) -> AsyncIterator[Any]:
        """Stream remote agent execution asynchronously.

        This method provides an asynchronous interface for streaming A2A protocol events.
        Unlike Agent.stream_async() which yields text deltas and tool events, this method
        yields raw A2A protocol events wrapped in A2AStreamEvent dictionaries.

        Args:
            prompt: Input to the agent (string, message list, or content blocks).
            **kwargs: Additional arguments (ignored).

        Yields:
            An async iterator that yields events. Each event is a dictionary:
                - A2AStreamEvent: {"type": "a2a_stream", "event": <A2A object>}
                  where the A2A object can be a Message, or a tuple of
                  (Task, TaskStatusUpdateEvent) or (Task, TaskArtifactUpdateEvent).
                - AgentResultEvent: {"result": AgentResult} - always emitted last.

        Raises:
            ValueError: If prompt is None.

        Example:
            ```python
            async for event in a2a_agent.stream_async("Hello"):
                if event.get("type") == "a2a_stream":
                    print(f"A2A event: {event['event']}")
                elif "result" in event:
                    print(f"Final result: {event['result'].message}")
            ```
        """
        last_event = None
        last_complete_event = None

        async for event in self._send_message(prompt):
            last_event = event
            if self._is_complete_event(event):
                last_complete_event = event
            yield A2AStreamEvent(event)

        # Use the last complete event if available, otherwise fall back to last event
        final_event = last_complete_event or last_event

        if final_event is not None:
            result = convert_response_to_agent_result(final_event)
            yield AgentResultEvent(result)

    async def get_agent_card(self) -> AgentCard:
        """Fetch and return the remote agent's card.

        This method eagerly fetches the agent card from the remote endpoint,
        populating name and description if not already set. The card is cached
        after the first fetch.

        Returns:
            The remote agent's AgentCard containing name, description, capabilities, skills, etc.
        """
        if self._agent_card is not None:
            return self._agent_card

        async with httpx.AsyncClient(timeout=self.timeout) as client:
            resolver = A2ACardResolver(httpx_client=client, base_url=self.endpoint)
            self._agent_card = await resolver.get_agent_card()

        # Populate name from card if not set
        if self.name is None and self._agent_card.name:
            self.name = self._agent_card.name

        # Populate description from card if not set
        if self.description is None and self._agent_card.description:
            self.description = self._agent_card.description

        logger.debug("agent=<%s>, endpoint=<%s> | discovered agent card", self.name, self.endpoint)
        return self._agent_card

    @asynccontextmanager
    async def _get_a2a_client(self) -> AsyncIterator[Any]:
        """Get A2A client for sending messages.

        If a custom factory was provided, uses that (caller manages httpx lifecycle).
        Otherwise creates a per-call httpx client with proper cleanup.

        Yields:
            Configured A2A client instance.
        """
        agent_card = await self.get_agent_card()

        if self._a2a_client_factory is not None:
            yield self._a2a_client_factory.create(agent_card)
            return

        async with httpx.AsyncClient(timeout=self.timeout) as httpx_client:
            config = ClientConfig(httpx_client=httpx_client, streaming=True)
            yield ClientFactory(config).create(agent_card)

    async def _send_message(self, prompt: AgentInput) -> AsyncIterator[A2AResponse]:
        """Send message to A2A agent.

        Args:
            prompt: Input to send to the agent.

        Yields:
            A2A response events.

        Raises:
            ValueError: If prompt is None.
        """
        if prompt is None:
            raise ValueError("prompt is required for A2AAgent")

        message = convert_input_to_message(prompt)
        logger.debug("agent=<%s>, endpoint=<%s> | sending message", self.name, self.endpoint)

        async with self._get_a2a_client() as client:
            async for event in client.send_message(message):
                yield event

    def _is_complete_event(self, event: A2AResponse) -> bool:
        """Check if an A2A event represents a complete response.

        Args:
            event: A2A event.

        Returns:
            True if the event represents a complete response.
        """
        # Direct Message is always complete
        if isinstance(event, Message):
            return True

        # Handle tuple responses (Task, UpdateEvent | None)
        if isinstance(event, tuple) and len(event) == 2:
            task, update_event = event

            # Initial task response (no update event)
            if update_event is None:
                return True

            # Artifact update with last_chunk flag
            if isinstance(update_event, TaskArtifactUpdateEvent):
                if hasattr(update_event, "last_chunk") and update_event.last_chunk is not None:
                    return update_event.last_chunk
                return False

            # Status update with completed state
            if isinstance(update_event, TaskStatusUpdateEvent):
                if update_event.status and hasattr(update_event.status, "state"):
                    return update_event.status.state == TaskState.completed

        return False

__call__(prompt=None, **kwargs)

Synchronously invoke the remote A2A agent.

Parameters:

Name Type Description Default
prompt AgentInput

Input to the agent (string, message list, or content blocks).

None
**kwargs Any

Additional arguments (ignored).

{}

Returns:

Type Description
AgentResult

AgentResult containing the agent's response.

Raises:

Type Description
ValueError

If prompt is None.

RuntimeError

If no response received from agent.

Source code in strands/agent/a2a_agent.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __call__(
    self,
    prompt: AgentInput = None,
    **kwargs: Any,
) -> AgentResult:
    """Synchronously invoke the remote A2A agent.

    Args:
        prompt: Input to the agent (string, message list, or content blocks).
        **kwargs: Additional arguments (ignored).

    Returns:
        AgentResult containing the agent's response.

    Raises:
        ValueError: If prompt is None.
        RuntimeError: If no response received from agent.
    """
    return run_async(lambda: self.invoke_async(prompt, **kwargs))

__init__(endpoint, *, name=None, description=None, timeout=_DEFAULT_TIMEOUT, a2a_client_factory=None)

Initialize A2A agent.

Parameters:

Name Type Description Default
endpoint str

The base URL of the remote A2A agent.

required
name str | None

Agent name. If not provided, will be populated from agent card.

None
description str | None

Agent description. If not provided, will be populated from agent card.

None
timeout int

Timeout for HTTP operations in seconds (defaults to 300).

_DEFAULT_TIMEOUT
a2a_client_factory ClientFactory | None

Optional pre-configured A2A ClientFactory. If provided, it will be used to create the A2A client after discovering the agent card. Note: When providing a custom factory, you are responsible for managing the lifecycle of any httpx client it uses.

None
Source code in strands/agent/a2a_agent.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    endpoint: str,
    *,
    name: str | None = None,
    description: str | None = None,
    timeout: int = _DEFAULT_TIMEOUT,
    a2a_client_factory: ClientFactory | None = None,
):
    """Initialize A2A agent.

    Args:
        endpoint: The base URL of the remote A2A agent.
        name: Agent name. If not provided, will be populated from agent card.
        description: Agent description. If not provided, will be populated from agent card.
        timeout: Timeout for HTTP operations in seconds (defaults to 300).
        a2a_client_factory: Optional pre-configured A2A ClientFactory. If provided,
            it will be used to create the A2A client after discovering the agent card.
            Note: When providing a custom factory, you are responsible for managing
            the lifecycle of any httpx client it uses.
    """
    self.endpoint = endpoint
    self.name = name
    self.description = description
    self.timeout = timeout
    self._agent_card: AgentCard | None = None
    self._a2a_client_factory: ClientFactory | None = a2a_client_factory

get_agent_card() async

Fetch and return the remote agent's card.

This method eagerly fetches the agent card from the remote endpoint, populating name and description if not already set. The card is cached after the first fetch.

Returns:

Type Description
AgentCard

The remote agent's AgentCard containing name, description, capabilities, skills, etc.

Source code in strands/agent/a2a_agent.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
async def get_agent_card(self) -> AgentCard:
    """Fetch and return the remote agent's card.

    This method eagerly fetches the agent card from the remote endpoint,
    populating name and description if not already set. The card is cached
    after the first fetch.

    Returns:
        The remote agent's AgentCard containing name, description, capabilities, skills, etc.
    """
    if self._agent_card is not None:
        return self._agent_card

    async with httpx.AsyncClient(timeout=self.timeout) as client:
        resolver = A2ACardResolver(httpx_client=client, base_url=self.endpoint)
        self._agent_card = await resolver.get_agent_card()

    # Populate name from card if not set
    if self.name is None and self._agent_card.name:
        self.name = self._agent_card.name

    # Populate description from card if not set
    if self.description is None and self._agent_card.description:
        self.description = self._agent_card.description

    logger.debug("agent=<%s>, endpoint=<%s> | discovered agent card", self.name, self.endpoint)
    return self._agent_card

invoke_async(prompt=None, **kwargs) async

Asynchronously invoke the remote A2A agent.

Parameters:

Name Type Description Default
prompt AgentInput

Input to the agent (string, message list, or content blocks).

None
**kwargs Any

Additional arguments (ignored).

{}

Returns:

Type Description
AgentResult

AgentResult containing the agent's response.

Raises:

Type Description
ValueError

If prompt is None.

RuntimeError

If no response received from agent.

Source code in strands/agent/a2a_agent.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
async def invoke_async(
    self,
    prompt: AgentInput = None,
    **kwargs: Any,
) -> AgentResult:
    """Asynchronously invoke the remote A2A agent.

    Args:
        prompt: Input to the agent (string, message list, or content blocks).
        **kwargs: Additional arguments (ignored).

    Returns:
        AgentResult containing the agent's response.

    Raises:
        ValueError: If prompt is None.
        RuntimeError: If no response received from agent.
    """
    result: AgentResult | None = None
    async for event in self.stream_async(prompt, **kwargs):
        if "result" in event:
            result = event["result"]

    if result is None:
        raise RuntimeError("No response received from A2A agent")

    return result

stream_async(prompt=None, **kwargs) async

Stream remote agent execution asynchronously.

This method provides an asynchronous interface for streaming A2A protocol events. Unlike Agent.stream_async() which yields text deltas and tool events, this method yields raw A2A protocol events wrapped in A2AStreamEvent dictionaries.

Parameters:

Name Type Description Default
prompt AgentInput

Input to the agent (string, message list, or content blocks).

None
**kwargs Any

Additional arguments (ignored).

{}

Yields:

Type Description
AsyncIterator[Any]

An async iterator that yields events. Each event is a dictionary: - A2AStreamEvent: {"type": "a2a_stream", "event": } where the A2A object can be a Message, or a tuple of (Task, TaskStatusUpdateEvent) or (Task, TaskArtifactUpdateEvent). - AgentResultEvent: {"result": AgentResult} - always emitted last.

Raises:

Type Description
ValueError

If prompt is None.

Example
async for event in a2a_agent.stream_async("Hello"):
    if event.get("type") == "a2a_stream":
        print(f"A2A event: {event['event']}")
    elif "result" in event:
        print(f"Final result: {event['result'].message}")
Source code in strands/agent/a2a_agent.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
async def stream_async(
    self,
    prompt: AgentInput = None,
    **kwargs: Any,
) -> AsyncIterator[Any]:
    """Stream remote agent execution asynchronously.

    This method provides an asynchronous interface for streaming A2A protocol events.
    Unlike Agent.stream_async() which yields text deltas and tool events, this method
    yields raw A2A protocol events wrapped in A2AStreamEvent dictionaries.

    Args:
        prompt: Input to the agent (string, message list, or content blocks).
        **kwargs: Additional arguments (ignored).

    Yields:
        An async iterator that yields events. Each event is a dictionary:
            - A2AStreamEvent: {"type": "a2a_stream", "event": <A2A object>}
              where the A2A object can be a Message, or a tuple of
              (Task, TaskStatusUpdateEvent) or (Task, TaskArtifactUpdateEvent).
            - AgentResultEvent: {"result": AgentResult} - always emitted last.

    Raises:
        ValueError: If prompt is None.

    Example:
        ```python
        async for event in a2a_agent.stream_async("Hello"):
            if event.get("type") == "a2a_stream":
                print(f"A2A event: {event['event']}")
            elif "result" in event:
                print(f"Final result: {event['result'].message}")
        ```
    """
    last_event = None
    last_complete_event = None

    async for event in self._send_message(prompt):
        last_event = event
        if self._is_complete_event(event):
            last_complete_event = event
        yield A2AStreamEvent(event)

    # Use the last complete event if available, otherwise fall back to last event
    final_event = last_complete_event or last_event

    if final_event is not None:
        result = convert_response_to_agent_result(final_event)
        yield AgentResultEvent(result)

A2AStreamEvent

Bases: TypedEvent

Event emitted for every update received from the remote A2A server.

This event wraps all A2A response types during streaming, including: - Partial task updates (TaskArtifactUpdateEvent) - Status updates (TaskStatusUpdateEvent) - Complete messages (Message) - Final task completions

The event is emitted for EVERY update from the server, regardless of whether it represents a complete or partial response. When streaming completes, an AgentResultEvent containing the final AgentResult is also emitted after all A2AStreamEvents.

Source code in strands/types/a2a.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class A2AStreamEvent(TypedEvent):
    """Event emitted for every update received from the remote A2A server.

    This event wraps all A2A response types during streaming, including:
    - Partial task updates (TaskArtifactUpdateEvent)
    - Status updates (TaskStatusUpdateEvent)
    - Complete messages (Message)
    - Final task completions

    The event is emitted for EVERY update from the server, regardless of whether
    it represents a complete or partial response. When streaming completes, an
    AgentResultEvent containing the final AgentResult is also emitted after all
    A2AStreamEvents.
    """

    def __init__(self, a2a_event: A2AResponse) -> None:
        """Initialize with A2A event.

        Args:
            a2a_event: The original A2A event (Task tuple or Message)
        """
        super().__init__(
            {
                "type": "a2a_stream",
                "event": a2a_event,  # Nest A2A event to avoid field conflicts
            }
        )

__init__(a2a_event)

Initialize with A2A event.

Parameters:

Name Type Description Default
a2a_event A2AResponse

The original A2A event (Task tuple or Message)

required
Source code in strands/types/a2a.py
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(self, a2a_event: A2AResponse) -> None:
    """Initialize with A2A event.

    Args:
        a2a_event: The original A2A event (Task tuple or Message)
    """
    super().__init__(
        {
            "type": "a2a_stream",
            "event": a2a_event,  # Nest A2A event to avoid field conflicts
        }
    )

AgentBase

Bases: Protocol

Protocol defining the interface for all agent types in Strands.

This protocol defines the minimal contract that all agent implementations must satisfy.

Source code in strands/agent/base.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@runtime_checkable
class AgentBase(Protocol):
    """Protocol defining the interface for all agent types in Strands.

    This protocol defines the minimal contract that all agent implementations
    must satisfy.
    """

    async def invoke_async(
        self,
        prompt: AgentInput = None,
        **kwargs: Any,
    ) -> AgentResult:
        """Asynchronously invoke the agent with the given prompt.

        Args:
            prompt: Input to the agent.
            **kwargs: Additional arguments.

        Returns:
            AgentResult containing the agent's response.
        """
        ...

    def __call__(
        self,
        prompt: AgentInput = None,
        **kwargs: Any,
    ) -> AgentResult:
        """Synchronously invoke the agent with the given prompt.

        Args:
            prompt: Input to the agent.
            **kwargs: Additional arguments.

        Returns:
            AgentResult containing the agent's response.
        """
        ...

    def stream_async(
        self,
        prompt: AgentInput = None,
        **kwargs: Any,
    ) -> AsyncIterator[Any]:
        """Stream agent execution asynchronously.

        Args:
            prompt: Input to the agent.
            **kwargs: Additional arguments.

        Yields:
            Events representing the streaming execution.
        """
        ...

__call__(prompt=None, **kwargs)

Synchronously invoke the agent with the given prompt.

Parameters:

Name Type Description Default
prompt AgentInput

Input to the agent.

None
**kwargs Any

Additional arguments.

{}

Returns:

Type Description
AgentResult

AgentResult containing the agent's response.

Source code in strands/agent/base.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __call__(
    self,
    prompt: AgentInput = None,
    **kwargs: Any,
) -> AgentResult:
    """Synchronously invoke the agent with the given prompt.

    Args:
        prompt: Input to the agent.
        **kwargs: Additional arguments.

    Returns:
        AgentResult containing the agent's response.
    """
    ...

invoke_async(prompt=None, **kwargs) async

Asynchronously invoke the agent with the given prompt.

Parameters:

Name Type Description Default
prompt AgentInput

Input to the agent.

None
**kwargs Any

Additional arguments.

{}

Returns:

Type Description
AgentResult

AgentResult containing the agent's response.

Source code in strands/agent/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
async def invoke_async(
    self,
    prompt: AgentInput = None,
    **kwargs: Any,
) -> AgentResult:
    """Asynchronously invoke the agent with the given prompt.

    Args:
        prompt: Input to the agent.
        **kwargs: Additional arguments.

    Returns:
        AgentResult containing the agent's response.
    """
    ...

stream_async(prompt=None, **kwargs)

Stream agent execution asynchronously.

Parameters:

Name Type Description Default
prompt AgentInput

Input to the agent.

None
**kwargs Any

Additional arguments.

{}

Yields:

Type Description
AsyncIterator[Any]

Events representing the streaming execution.

Source code in strands/agent/base.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def stream_async(
    self,
    prompt: AgentInput = None,
    **kwargs: Any,
) -> AsyncIterator[Any]:
    """Stream agent execution asynchronously.

    Args:
        prompt: Input to the agent.
        **kwargs: Additional arguments.

    Yields:
        Events representing the streaming execution.
    """
    ...

AgentResult dataclass

Represents the last result of invoking an agent with a prompt.

Attributes:

Name Type Description
stop_reason StopReason

The reason why the agent's processing stopped.

message Message

The last message generated by the agent.

metrics EventLoopMetrics

Performance metrics collected during processing.

state Any

Additional state information from the event loop.

interrupts Sequence[Interrupt] | None

List of interrupts if raised by user.

structured_output BaseModel | None

Parsed structured output when structured_output_model was specified.

Source code in strands/agent/agent_result.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@dataclass
class AgentResult:
    """Represents the last result of invoking an agent with a prompt.

    Attributes:
        stop_reason: The reason why the agent's processing stopped.
        message: The last message generated by the agent.
        metrics: Performance metrics collected during processing.
        state: Additional state information from the event loop.
        interrupts: List of interrupts if raised by user.
        structured_output: Parsed structured output when structured_output_model was specified.
    """

    stop_reason: StopReason
    message: Message
    metrics: EventLoopMetrics
    state: Any
    interrupts: Sequence[Interrupt] | None = None
    structured_output: BaseModel | None = None

    def __str__(self) -> str:
        """Return a string representation of the agent result.

        Priority order:
        1. Interrupts (if present) → stringified list of interrupt dicts
        2. Structured output (if present) → JSON string
        3. Text content from message → concatenated text blocks

        Returns:
            String representation based on the priority order above.
        """
        if self.interrupts:
            return str([interrupt.to_dict() for interrupt in self.interrupts])

        if self.structured_output:
            return self.structured_output.model_dump_json()

        content_array = self.message.get("content", [])
        result = ""
        for item in content_array:
            if isinstance(item, dict):
                if "text" in item:
                    result += item.get("text", "") + "\n"
                elif "citationsContent" in item:
                    citations_block = item["citationsContent"]
                    if "content" in citations_block:
                        for content in citations_block["content"]:
                            if isinstance(content, dict) and "text" in content:
                                result += content.get("text", "") + "\n"

        return result

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "AgentResult":
        """Rehydrate an AgentResult from persisted JSON.

        Args:
            data: Dictionary containing the serialized AgentResult data
        Returns:
            AgentResult instance
        Raises:
            TypeError: If the data format is invalid@
        """
        if data.get("type") != "agent_result":
            raise TypeError(f"AgentResult.from_dict: unexpected type {data.get('type')!r}")

        message = cast(Message, data.get("message"))
        stop_reason = cast(StopReason, data.get("stop_reason"))

        return cls(message=message, stop_reason=stop_reason, metrics=EventLoopMetrics(), state={})

    def to_dict(self) -> dict[str, Any]:
        """Convert this AgentResult to JSON-serializable dictionary.

        Returns:
            Dictionary containing serialized AgentResult data
        """
        return {
            "type": "agent_result",
            "message": self.message,
            "stop_reason": self.stop_reason,
        }

__str__()

Return a string representation of the agent result.

Priority order: 1. Interrupts (if present) → stringified list of interrupt dicts 2. Structured output (if present) → JSON string 3. Text content from message → concatenated text blocks

Returns:

Type Description
str

String representation based on the priority order above.

Source code in strands/agent/agent_result.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def __str__(self) -> str:
    """Return a string representation of the agent result.

    Priority order:
    1. Interrupts (if present) → stringified list of interrupt dicts
    2. Structured output (if present) → JSON string
    3. Text content from message → concatenated text blocks

    Returns:
        String representation based on the priority order above.
    """
    if self.interrupts:
        return str([interrupt.to_dict() for interrupt in self.interrupts])

    if self.structured_output:
        return self.structured_output.model_dump_json()

    content_array = self.message.get("content", [])
    result = ""
    for item in content_array:
        if isinstance(item, dict):
            if "text" in item:
                result += item.get("text", "") + "\n"
            elif "citationsContent" in item:
                citations_block = item["citationsContent"]
                if "content" in citations_block:
                    for content in citations_block["content"]:
                        if isinstance(content, dict) and "text" in content:
                            result += content.get("text", "") + "\n"

    return result

from_dict(data) classmethod

Rehydrate an AgentResult from persisted JSON.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary containing the serialized AgentResult data

required

Returns: AgentResult instance Raises: TypeError: If the data format is invalid@

Source code in strands/agent/agent_result.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AgentResult":
    """Rehydrate an AgentResult from persisted JSON.

    Args:
        data: Dictionary containing the serialized AgentResult data
    Returns:
        AgentResult instance
    Raises:
        TypeError: If the data format is invalid@
    """
    if data.get("type") != "agent_result":
        raise TypeError(f"AgentResult.from_dict: unexpected type {data.get('type')!r}")

    message = cast(Message, data.get("message"))
    stop_reason = cast(StopReason, data.get("stop_reason"))

    return cls(message=message, stop_reason=stop_reason, metrics=EventLoopMetrics(), state={})

to_dict()

Convert this AgentResult to JSON-serializable dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary containing serialized AgentResult data

Source code in strands/agent/agent_result.py
89
90
91
92
93
94
95
96
97
98
99
def to_dict(self) -> dict[str, Any]:
    """Convert this AgentResult to JSON-serializable dictionary.

    Returns:
        Dictionary containing serialized AgentResult data
    """
    return {
        "type": "agent_result",
        "message": self.message,
        "stop_reason": self.stop_reason,
    }

AgentResultEvent

Bases: TypedEvent

Source code in strands/types/_events.py
412
413
414
class AgentResultEvent(TypedEvent):
    def __init__(self, result: "AgentResult"):
        super().__init__({"result": result})

convert_input_to_message(prompt)

Convert AgentInput to A2A Message.

Parameters:

Name Type Description Default
prompt AgentInput

Input in various formats (string, message list, or content blocks).

required

Returns:

Type Description
Message

A2AMessage ready to send to the remote agent.

Raises:

Type Description
ValueError

If prompt format is unsupported.

Source code in strands/multiagent/a2a/_converters.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def convert_input_to_message(prompt: AgentInput) -> A2AMessage:
    """Convert AgentInput to A2A Message.

    Args:
        prompt: Input in various formats (string, message list, or content blocks).

    Returns:
        A2AMessage ready to send to the remote agent.

    Raises:
        ValueError: If prompt format is unsupported.
    """
    message_id = uuid4().hex

    if isinstance(prompt, str):
        return A2AMessage(
            kind="message",
            role=Role.user,
            parts=[Part(TextPart(kind="text", text=prompt))],
            message_id=message_id,
        )

    if isinstance(prompt, list) and prompt and (isinstance(prompt[0], dict)):
        # Check for interrupt responses - not supported in A2A
        if "interruptResponse" in prompt[0]:
            raise ValueError("InterruptResponseContent is not supported for A2AAgent")

        if "role" in prompt[0]:
            for msg in reversed(prompt):
                if msg.get("role") == "user":
                    content = cast(list[ContentBlock], msg.get("content", []))
                    parts = convert_content_blocks_to_parts(content)
                    return A2AMessage(
                        kind="message",
                        role=Role.user,
                        parts=parts,
                        message_id=message_id,
                    )
        else:
            parts = convert_content_blocks_to_parts(cast(list[ContentBlock], prompt))
            return A2AMessage(
                kind="message",
                role=Role.user,
                parts=parts,
                message_id=message_id,
            )

    raise ValueError(f"Unsupported input type: {type(prompt)}")

convert_response_to_agent_result(response)

Convert A2A response to AgentResult.

Parameters:

Name Type Description Default
response A2AResponse

A2A response (either A2AMessage or tuple of task and update event).

required

Returns:

Type Description
AgentResult

AgentResult with extracted content and metadata.

Source code in strands/multiagent/a2a/_converters.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def convert_response_to_agent_result(response: A2AResponse) -> AgentResult:
    """Convert A2A response to AgentResult.

    Args:
        response: A2A response (either A2AMessage or tuple of task and update event).

    Returns:
        AgentResult with extracted content and metadata.
    """
    content: list[ContentBlock] = []

    if isinstance(response, tuple) and len(response) == 2:
        task, update_event = response

        # Handle artifact updates
        if isinstance(update_event, TaskArtifactUpdateEvent):
            if update_event.artifact and hasattr(update_event.artifact, "parts"):
                for part in update_event.artifact.parts:
                    if hasattr(part, "root") and hasattr(part.root, "text"):
                        content.append({"text": part.root.text})
        # Handle status updates with messages
        elif isinstance(update_event, TaskStatusUpdateEvent):
            if update_event.status and hasattr(update_event.status, "message") and update_event.status.message:
                for part in update_event.status.message.parts:
                    if hasattr(part, "root") and hasattr(part.root, "text"):
                        content.append({"text": part.root.text})
        # Handle initial task or task without update event
        elif update_event is None and task and hasattr(task, "artifacts") and task.artifacts is not None:
            for artifact in task.artifacts:
                if hasattr(artifact, "parts"):
                    for part in artifact.parts:
                        if hasattr(part, "root") and hasattr(part.root, "text"):
                            content.append({"text": part.root.text})
    elif isinstance(response, A2AMessage):
        for part in response.parts:
            if hasattr(part, "root") and hasattr(part.root, "text"):
                content.append({"text": part.root.text})

    message: Message = {
        "role": "assistant",
        "content": content,
    }

    return AgentResult(
        stop_reason="end_turn",
        message=message,
        metrics=EventLoopMetrics(),
        state={},
    )

run_async(async_func)

Run an async function in a separate thread to avoid event loop conflicts.

This utility handles the common pattern of running async code from sync contexts by using ThreadPoolExecutor to isolate the async execution.

Parameters:

Name Type Description Default
async_func Callable[[], Awaitable[T]]

A callable that returns an awaitable

required

Returns:

Type Description
T

The result of the async function

Source code in strands/_async.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def run_async(async_func: Callable[[], Awaitable[T]]) -> T:
    """Run an async function in a separate thread to avoid event loop conflicts.

    This utility handles the common pattern of running async code from sync contexts
    by using ThreadPoolExecutor to isolate the async execution.

    Args:
        async_func: A callable that returns an awaitable

    Returns:
        The result of the async function
    """

    async def execute_async() -> T:
        return await async_func()

    def execute() -> T:
        return asyncio.run(execute_async())

    with ThreadPoolExecutor() as executor:
        context = contextvars.copy_context()
        future = executor.submit(context.run, execute)
        return future.result()