Skip to content

strands.multiagent.base

Multi-Agent Base Class.

Provides minimal foundation for multi-agent patterns (Swarm, Graph).

AttributeValue = Union[str, bool, float, int, List[str], List[bool], List[float], List[int], Sequence[str], Sequence[bool], Sequence[int], Sequence[float]] module-attribute

MultiAgentInput = str | list[ContentBlock] | list[InterruptResponseContent] module-attribute

logger = logging.getLogger(__name__) module-attribute

AgentResult dataclass

Represents the last result of invoking an agent with a prompt.

Attributes:

Name Type Description
stop_reason StopReason

The reason why the agent's processing stopped.

message Message

The last message generated by the agent.

metrics EventLoopMetrics

Performance metrics collected during processing.

state Any

Additional state information from the event loop.

interrupts Sequence[Interrupt] | None

List of interrupts if raised by user.

structured_output BaseModel | None

Parsed structured output when structured_output_model was specified.

Source code in strands/agent/agent_result.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@dataclass
class AgentResult:
    """Represents the last result of invoking an agent with a prompt.

    Attributes:
        stop_reason: The reason why the agent's processing stopped.
        message: The last message generated by the agent.
        metrics: Performance metrics collected during processing.
        state: Additional state information from the event loop.
        interrupts: List of interrupts if raised by user.
        structured_output: Parsed structured output when structured_output_model was specified.
    """

    stop_reason: StopReason
    message: Message
    metrics: EventLoopMetrics
    state: Any
    interrupts: Sequence[Interrupt] | None = None
    structured_output: BaseModel | None = None

    def __str__(self) -> str:
        """Get the agent's last message as a string.

        This method extracts and concatenates all text content from the final message, ignoring any non-text content
        like images or structured data. If there's no text content but structured output is present, it serializes
        the structured output instead.

        Returns:
            The agent's last message as a string.
        """
        content_array = self.message.get("content", [])

        result = ""
        for item in content_array:
            if isinstance(item, dict) and "text" in item:
                result += item.get("text", "") + "\n"

        if not result and self.structured_output:
            result = self.structured_output.model_dump_json()

        return result

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "AgentResult":
        """Rehydrate an AgentResult from persisted JSON.

        Args:
            data: Dictionary containing the serialized AgentResult data
        Returns:
            AgentResult instance
        Raises:
            TypeError: If the data format is invalid@
        """
        if data.get("type") != "agent_result":
            raise TypeError(f"AgentResult.from_dict: unexpected type {data.get('type')!r}")

        message = cast(Message, data.get("message"))
        stop_reason = cast(StopReason, data.get("stop_reason"))

        return cls(message=message, stop_reason=stop_reason, metrics=EventLoopMetrics(), state={})

    def to_dict(self) -> dict[str, Any]:
        """Convert this AgentResult to JSON-serializable dictionary.

        Returns:
            Dictionary containing serialized AgentResult data
        """
        return {
            "type": "agent_result",
            "message": self.message,
            "stop_reason": self.stop_reason,
        }

__str__()

Get the agent's last message as a string.

This method extracts and concatenates all text content from the final message, ignoring any non-text content like images or structured data. If there's no text content but structured output is present, it serializes the structured output instead.

Returns:

Type Description
str

The agent's last message as a string.

Source code in strands/agent/agent_result.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def __str__(self) -> str:
    """Get the agent's last message as a string.

    This method extracts and concatenates all text content from the final message, ignoring any non-text content
    like images or structured data. If there's no text content but structured output is present, it serializes
    the structured output instead.

    Returns:
        The agent's last message as a string.
    """
    content_array = self.message.get("content", [])

    result = ""
    for item in content_array:
        if isinstance(item, dict) and "text" in item:
            result += item.get("text", "") + "\n"

    if not result and self.structured_output:
        result = self.structured_output.model_dump_json()

    return result

from_dict(data) classmethod

Rehydrate an AgentResult from persisted JSON.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary containing the serialized AgentResult data

required

Returns: AgentResult instance Raises: TypeError: If the data format is invalid@

Source code in strands/agent/agent_result.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AgentResult":
    """Rehydrate an AgentResult from persisted JSON.

    Args:
        data: Dictionary containing the serialized AgentResult data
    Returns:
        AgentResult instance
    Raises:
        TypeError: If the data format is invalid@
    """
    if data.get("type") != "agent_result":
        raise TypeError(f"AgentResult.from_dict: unexpected type {data.get('type')!r}")

    message = cast(Message, data.get("message"))
    stop_reason = cast(StopReason, data.get("stop_reason"))

    return cls(message=message, stop_reason=stop_reason, metrics=EventLoopMetrics(), state={})

to_dict()

Convert this AgentResult to JSON-serializable dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary containing serialized AgentResult data

Source code in strands/agent/agent_result.py
78
79
80
81
82
83
84
85
86
87
88
def to_dict(self) -> dict[str, Any]:
    """Convert this AgentResult to JSON-serializable dictionary.

    Returns:
        Dictionary containing serialized AgentResult data
    """
    return {
        "type": "agent_result",
        "message": self.message,
        "stop_reason": self.stop_reason,
    }

Interrupt dataclass

Represents an interrupt that can pause agent execution for human-in-the-loop workflows.

Attributes:

Name Type Description
id str

Unique identifier.

name str

User defined name.

reason Any

User provided reason for raising the interrupt.

response Any

Human response provided when resuming the agent after an interrupt.

Source code in strands/interrupt.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@dataclass
class Interrupt:
    """Represents an interrupt that can pause agent execution for human-in-the-loop workflows.

    Attributes:
        id: Unique identifier.
        name: User defined name.
        reason: User provided reason for raising the interrupt.
        response: Human response provided when resuming the agent after an interrupt.
    """

    id: str
    name: str
    reason: Any = None
    response: Any = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to dict for session management."""
        return asdict(self)

to_dict()

Serialize to dict for session management.

Source code in strands/interrupt.py
27
28
29
def to_dict(self) -> dict[str, Any]:
    """Serialize to dict for session management."""
    return asdict(self)

Metrics

Bases: TypedDict

Performance metrics for model interactions.

Attributes:

Name Type Description
latencyMs int

Latency of the model request in milliseconds.

timeToFirstByteMs int

Latency from sending model request to first content chunk (contentBlockDelta or contentBlockStart) from the model in milliseconds.

Source code in strands/types/event_loop.py
26
27
28
29
30
31
32
33
34
35
36
class Metrics(TypedDict, total=False):
    """Performance metrics for model interactions.

    Attributes:
        latencyMs (int): Latency of the model request in milliseconds.
        timeToFirstByteMs (int): Latency from sending model request to first
            content chunk (contentBlockDelta or contentBlockStart) from the model in milliseconds.
    """

    latencyMs: Required[int]
    timeToFirstByteMs: int

MultiAgentBase

Bases: ABC

Base class for multi-agent helpers.

This class integrates with existing Strands Agent instances and provides multi-agent orchestration capabilities.

Attributes:

Name Type Description
id str

Unique MultiAgent id for session management,etc.

Source code in strands/multiagent/base.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
class MultiAgentBase(ABC):
    """Base class for multi-agent helpers.

    This class integrates with existing Strands Agent instances and provides
    multi-agent orchestration capabilities.

    Attributes:
        id: Unique MultiAgent id for session management,etc.
    """

    id: str

    @abstractmethod
    async def invoke_async(
        self, task: MultiAgentInput, invocation_state: dict[str, Any] | None = None, **kwargs: Any
    ) -> MultiAgentResult:
        """Invoke asynchronously.

        Args:
            task: The task to execute
            invocation_state: Additional state/context passed to underlying agents.
                Defaults to None to avoid mutable default argument issues.
            **kwargs: Additional keyword arguments passed to underlying agents.
        """
        raise NotImplementedError("invoke_async not implemented")

    async def stream_async(
        self, task: MultiAgentInput, invocation_state: dict[str, Any] | None = None, **kwargs: Any
    ) -> AsyncIterator[dict[str, Any]]:
        """Stream events during multi-agent execution.

        Default implementation executes invoke_async and yields the result as a single event.
        Subclasses can override this method to provide true streaming capabilities.

        Args:
            task: The task to execute
            invocation_state: Additional state/context passed to underlying agents.
                Defaults to None to avoid mutable default argument issues.
            **kwargs: Additional keyword arguments passed to underlying agents.

        Yields:
            Dictionary events containing multi-agent execution information including:
            - Multi-agent coordination events (node start/complete, handoffs)
            - Forwarded single-agent events with node context
            - Final result event
        """
        # Default implementation for backward compatibility
        # Execute invoke_async and yield the result as a single event
        result = await self.invoke_async(task, invocation_state, **kwargs)
        yield {"result": result}

    def __call__(
        self, task: MultiAgentInput, invocation_state: dict[str, Any] | None = None, **kwargs: Any
    ) -> MultiAgentResult:
        """Invoke synchronously.

        Args:
            task: The task to execute
            invocation_state: Additional state/context passed to underlying agents.
                Defaults to None to avoid mutable default argument issues.
            **kwargs: Additional keyword arguments passed to underlying agents.
        """
        if invocation_state is None:
            invocation_state = {}

        if kwargs:
            invocation_state.update(kwargs)
            warnings.warn("`**kwargs` parameter is deprecating, use `invocation_state` instead.", stacklevel=2)

        return run_async(lambda: self.invoke_async(task, invocation_state))

    def serialize_state(self) -> dict[str, Any]:
        """Return a JSON-serializable snapshot of the orchestrator state."""
        raise NotImplementedError

    def deserialize_state(self, payload: dict[str, Any]) -> None:
        """Restore orchestrator state from a session dict."""
        raise NotImplementedError

    def _parse_trace_attributes(
        self, attributes: Mapping[str, AttributeValue] | None = None
    ) -> dict[str, AttributeValue]:
        trace_attributes: dict[str, AttributeValue] = {}
        if attributes:
            for k, v in attributes.items():
                if isinstance(v, (str, int, float, bool)) or (
                    isinstance(v, list) and all(isinstance(x, (str, int, float, bool)) for x in v)
                ):
                    trace_attributes[k] = v
        return trace_attributes

__call__(task, invocation_state=None, **kwargs)

Invoke synchronously.

Parameters:

Name Type Description Default
task MultiAgentInput

The task to execute

required
invocation_state dict[str, Any] | None

Additional state/context passed to underlying agents. Defaults to None to avoid mutable default argument issues.

None
**kwargs Any

Additional keyword arguments passed to underlying agents.

{}
Source code in strands/multiagent/base.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def __call__(
    self, task: MultiAgentInput, invocation_state: dict[str, Any] | None = None, **kwargs: Any
) -> MultiAgentResult:
    """Invoke synchronously.

    Args:
        task: The task to execute
        invocation_state: Additional state/context passed to underlying agents.
            Defaults to None to avoid mutable default argument issues.
        **kwargs: Additional keyword arguments passed to underlying agents.
    """
    if invocation_state is None:
        invocation_state = {}

    if kwargs:
        invocation_state.update(kwargs)
        warnings.warn("`**kwargs` parameter is deprecating, use `invocation_state` instead.", stacklevel=2)

    return run_async(lambda: self.invoke_async(task, invocation_state))

deserialize_state(payload)

Restore orchestrator state from a session dict.

Source code in strands/multiagent/base.py
252
253
254
def deserialize_state(self, payload: dict[str, Any]) -> None:
    """Restore orchestrator state from a session dict."""
    raise NotImplementedError

invoke_async(task, invocation_state=None, **kwargs) abstractmethod async

Invoke asynchronously.

Parameters:

Name Type Description Default
task MultiAgentInput

The task to execute

required
invocation_state dict[str, Any] | None

Additional state/context passed to underlying agents. Defaults to None to avoid mutable default argument issues.

None
**kwargs Any

Additional keyword arguments passed to underlying agents.

{}
Source code in strands/multiagent/base.py
189
190
191
192
193
194
195
196
197
198
199
200
201
@abstractmethod
async def invoke_async(
    self, task: MultiAgentInput, invocation_state: dict[str, Any] | None = None, **kwargs: Any
) -> MultiAgentResult:
    """Invoke asynchronously.

    Args:
        task: The task to execute
        invocation_state: Additional state/context passed to underlying agents.
            Defaults to None to avoid mutable default argument issues.
        **kwargs: Additional keyword arguments passed to underlying agents.
    """
    raise NotImplementedError("invoke_async not implemented")

serialize_state()

Return a JSON-serializable snapshot of the orchestrator state.

Source code in strands/multiagent/base.py
248
249
250
def serialize_state(self) -> dict[str, Any]:
    """Return a JSON-serializable snapshot of the orchestrator state."""
    raise NotImplementedError

stream_async(task, invocation_state=None, **kwargs) async

Stream events during multi-agent execution.

Default implementation executes invoke_async and yields the result as a single event. Subclasses can override this method to provide true streaming capabilities.

Parameters:

Name Type Description Default
task MultiAgentInput

The task to execute

required
invocation_state dict[str, Any] | None

Additional state/context passed to underlying agents. Defaults to None to avoid mutable default argument issues.

None
**kwargs Any

Additional keyword arguments passed to underlying agents.

{}

Yields:

Type Description
AsyncIterator[dict[str, Any]]

Dictionary events containing multi-agent execution information including:

AsyncIterator[dict[str, Any]]
  • Multi-agent coordination events (node start/complete, handoffs)
AsyncIterator[dict[str, Any]]
  • Forwarded single-agent events with node context
AsyncIterator[dict[str, Any]]
  • Final result event
Source code in strands/multiagent/base.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
async def stream_async(
    self, task: MultiAgentInput, invocation_state: dict[str, Any] | None = None, **kwargs: Any
) -> AsyncIterator[dict[str, Any]]:
    """Stream events during multi-agent execution.

    Default implementation executes invoke_async and yields the result as a single event.
    Subclasses can override this method to provide true streaming capabilities.

    Args:
        task: The task to execute
        invocation_state: Additional state/context passed to underlying agents.
            Defaults to None to avoid mutable default argument issues.
        **kwargs: Additional keyword arguments passed to underlying agents.

    Yields:
        Dictionary events containing multi-agent execution information including:
        - Multi-agent coordination events (node start/complete, handoffs)
        - Forwarded single-agent events with node context
        - Final result event
    """
    # Default implementation for backward compatibility
    # Execute invoke_async and yield the result as a single event
    result = await self.invoke_async(task, invocation_state, **kwargs)
    yield {"result": result}

MultiAgentResult dataclass

Result from multi-agent execution with accumulated metrics.

Source code in strands/multiagent/base.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@dataclass
class MultiAgentResult:
    """Result from multi-agent execution with accumulated metrics."""

    status: Status = Status.PENDING
    results: dict[str, NodeResult] = field(default_factory=lambda: {})
    accumulated_usage: Usage = field(default_factory=lambda: Usage(inputTokens=0, outputTokens=0, totalTokens=0))
    accumulated_metrics: Metrics = field(default_factory=lambda: Metrics(latencyMs=0))
    execution_count: int = 0
    execution_time: int = 0
    interrupts: list[Interrupt] = field(default_factory=list)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "MultiAgentResult":
        """Rehydrate a MultiAgentResult from persisted JSON."""
        if data.get("type") != "multiagent_result":
            raise TypeError(f"MultiAgentResult.from_dict: unexpected type {data.get('type')!r}")

        results = {k: NodeResult.from_dict(v) for k, v in data.get("results", {}).items()}
        usage = _parse_usage(data.get("accumulated_usage", {}))
        metrics = _parse_metrics(data.get("accumulated_metrics", {}))

        interrupts = []
        for interrupt_data in data.get("interrupts", []):
            interrupts.append(Interrupt(**interrupt_data))

        multiagent_result = cls(
            status=Status(data["status"]),
            results=results,
            accumulated_usage=usage,
            accumulated_metrics=metrics,
            execution_count=int(data.get("execution_count", 0)),
            execution_time=int(data.get("execution_time", 0)),
            interrupts=interrupts,
        )
        return multiagent_result

    def to_dict(self) -> dict[str, Any]:
        """Convert MultiAgentResult to JSON-serializable dict."""
        return {
            "type": "multiagent_result",
            "status": self.status.value,
            "results": {k: v.to_dict() for k, v in self.results.items()},
            "accumulated_usage": self.accumulated_usage,
            "accumulated_metrics": self.accumulated_metrics,
            "execution_count": self.execution_count,
            "execution_time": self.execution_time,
            "interrupts": [interrupt.to_dict() for interrupt in self.interrupts],
        }

from_dict(data) classmethod

Rehydrate a MultiAgentResult from persisted JSON.

Source code in strands/multiagent/base.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MultiAgentResult":
    """Rehydrate a MultiAgentResult from persisted JSON."""
    if data.get("type") != "multiagent_result":
        raise TypeError(f"MultiAgentResult.from_dict: unexpected type {data.get('type')!r}")

    results = {k: NodeResult.from_dict(v) for k, v in data.get("results", {}).items()}
    usage = _parse_usage(data.get("accumulated_usage", {}))
    metrics = _parse_metrics(data.get("accumulated_metrics", {}))

    interrupts = []
    for interrupt_data in data.get("interrupts", []):
        interrupts.append(Interrupt(**interrupt_data))

    multiagent_result = cls(
        status=Status(data["status"]),
        results=results,
        accumulated_usage=usage,
        accumulated_metrics=metrics,
        execution_count=int(data.get("execution_count", 0)),
        execution_time=int(data.get("execution_time", 0)),
        interrupts=interrupts,
    )
    return multiagent_result

to_dict()

Convert MultiAgentResult to JSON-serializable dict.

Source code in strands/multiagent/base.py
163
164
165
166
167
168
169
170
171
172
173
174
def to_dict(self) -> dict[str, Any]:
    """Convert MultiAgentResult to JSON-serializable dict."""
    return {
        "type": "multiagent_result",
        "status": self.status.value,
        "results": {k: v.to_dict() for k, v in self.results.items()},
        "accumulated_usage": self.accumulated_usage,
        "accumulated_metrics": self.accumulated_metrics,
        "execution_count": self.execution_count,
        "execution_time": self.execution_time,
        "interrupts": [interrupt.to_dict() for interrupt in self.interrupts],
    }

NodeResult dataclass

Unified result from node execution - handles both Agent and nested MultiAgentBase results.

Source code in strands/multiagent/base.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@dataclass
class NodeResult:
    """Unified result from node execution - handles both Agent and nested MultiAgentBase results."""

    # Core result data - single AgentResult, nested MultiAgentResult, or Exception
    result: Union[AgentResult, "MultiAgentResult", Exception]

    # Execution metadata
    execution_time: int = 0
    status: Status = Status.PENDING

    # Accumulated metrics from this node and all children
    accumulated_usage: Usage = field(default_factory=lambda: Usage(inputTokens=0, outputTokens=0, totalTokens=0))
    accumulated_metrics: Metrics = field(default_factory=lambda: Metrics(latencyMs=0))
    execution_count: int = 0
    interrupts: list[Interrupt] = field(default_factory=list)

    def get_agent_results(self) -> list[AgentResult]:
        """Get all AgentResult objects from this node, flattened if nested."""
        if isinstance(self.result, Exception):
            return []  # No agent results for exceptions
        elif isinstance(self.result, AgentResult):
            return [self.result]
        else:
            # Flatten nested results from MultiAgentResult
            flattened = []
            for nested_node_result in self.result.results.values():
                flattened.extend(nested_node_result.get_agent_results())
            return flattened

    def to_dict(self) -> dict[str, Any]:
        """Convert NodeResult to JSON-serializable dict, ignoring state field."""
        if isinstance(self.result, Exception):
            result_data: dict[str, Any] = {"type": "exception", "message": str(self.result)}
        elif isinstance(self.result, AgentResult):
            result_data = self.result.to_dict()
        else:
            # MultiAgentResult case
            result_data = self.result.to_dict()

        return {
            "result": result_data,
            "execution_time": self.execution_time,
            "status": self.status.value,
            "accumulated_usage": self.accumulated_usage,
            "accumulated_metrics": self.accumulated_metrics,
            "execution_count": self.execution_count,
            "interrupts": [interrupt.to_dict() for interrupt in self.interrupts],
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "NodeResult":
        """Rehydrate a NodeResult from persisted JSON."""
        if "result" not in data:
            raise TypeError("NodeResult.from_dict: missing 'result'")
        raw = data["result"]

        result: Union[AgentResult, "MultiAgentResult", Exception]
        if isinstance(raw, dict) and raw.get("type") == "agent_result":
            result = AgentResult.from_dict(raw)
        elif isinstance(raw, dict) and raw.get("type") == "exception":
            result = Exception(str(raw.get("message", "node failed")))
        elif isinstance(raw, dict) and raw.get("type") == "multiagent_result":
            result = MultiAgentResult.from_dict(raw)
        else:
            raise TypeError(f"NodeResult.from_dict: unsupported result payload: {raw!r}")

        usage = _parse_usage(data.get("accumulated_usage", {}))
        metrics = _parse_metrics(data.get("accumulated_metrics", {}))

        interrupts = []
        for interrupt_data in data.get("interrupts", []):
            interrupts.append(Interrupt(**interrupt_data))

        return cls(
            result=result,
            execution_time=int(data.get("execution_time", 0)),
            status=Status(data.get("status", "pending")),
            accumulated_usage=usage,
            accumulated_metrics=metrics,
            execution_count=int(data.get("execution_count", 0)),
            interrupts=interrupts,
        )

from_dict(data) classmethod

Rehydrate a NodeResult from persisted JSON.

Source code in strands/multiagent/base.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "NodeResult":
    """Rehydrate a NodeResult from persisted JSON."""
    if "result" not in data:
        raise TypeError("NodeResult.from_dict: missing 'result'")
    raw = data["result"]

    result: Union[AgentResult, "MultiAgentResult", Exception]
    if isinstance(raw, dict) and raw.get("type") == "agent_result":
        result = AgentResult.from_dict(raw)
    elif isinstance(raw, dict) and raw.get("type") == "exception":
        result = Exception(str(raw.get("message", "node failed")))
    elif isinstance(raw, dict) and raw.get("type") == "multiagent_result":
        result = MultiAgentResult.from_dict(raw)
    else:
        raise TypeError(f"NodeResult.from_dict: unsupported result payload: {raw!r}")

    usage = _parse_usage(data.get("accumulated_usage", {}))
    metrics = _parse_metrics(data.get("accumulated_metrics", {}))

    interrupts = []
    for interrupt_data in data.get("interrupts", []):
        interrupts.append(Interrupt(**interrupt_data))

    return cls(
        result=result,
        execution_time=int(data.get("execution_time", 0)),
        status=Status(data.get("status", "pending")),
        accumulated_usage=usage,
        accumulated_metrics=metrics,
        execution_count=int(data.get("execution_count", 0)),
        interrupts=interrupts,
    )

get_agent_results()

Get all AgentResult objects from this node, flattened if nested.

Source code in strands/multiagent/base.py
58
59
60
61
62
63
64
65
66
67
68
69
def get_agent_results(self) -> list[AgentResult]:
    """Get all AgentResult objects from this node, flattened if nested."""
    if isinstance(self.result, Exception):
        return []  # No agent results for exceptions
    elif isinstance(self.result, AgentResult):
        return [self.result]
    else:
        # Flatten nested results from MultiAgentResult
        flattened = []
        for nested_node_result in self.result.results.values():
            flattened.extend(nested_node_result.get_agent_results())
        return flattened

to_dict()

Convert NodeResult to JSON-serializable dict, ignoring state field.

Source code in strands/multiagent/base.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def to_dict(self) -> dict[str, Any]:
    """Convert NodeResult to JSON-serializable dict, ignoring state field."""
    if isinstance(self.result, Exception):
        result_data: dict[str, Any] = {"type": "exception", "message": str(self.result)}
    elif isinstance(self.result, AgentResult):
        result_data = self.result.to_dict()
    else:
        # MultiAgentResult case
        result_data = self.result.to_dict()

    return {
        "result": result_data,
        "execution_time": self.execution_time,
        "status": self.status.value,
        "accumulated_usage": self.accumulated_usage,
        "accumulated_metrics": self.accumulated_metrics,
        "execution_count": self.execution_count,
        "interrupts": [interrupt.to_dict() for interrupt in self.interrupts],
    }

Status

Bases: Enum

Execution status for both graphs and nodes.

Attributes:

Name Type Description
PENDING

Task has not started execution yet.

EXECUTING

Task is currently running.

COMPLETED

Task finished successfully.

FAILED

Task encountered an error and could not complete.

INTERRUPTED

Task was interrupted by user.

Source code in strands/multiagent/base.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Status(Enum):
    """Execution status for both graphs and nodes.

    Attributes:
        PENDING: Task has not started execution yet.
        EXECUTING: Task is currently running.
        COMPLETED: Task finished successfully.
        FAILED: Task encountered an error and could not complete.
        INTERRUPTED: Task was interrupted by user.
    """

    PENDING = "pending"
    EXECUTING = "executing"
    COMPLETED = "completed"
    FAILED = "failed"
    INTERRUPTED = "interrupted"

Usage

Bases: TypedDict

Token usage information for model interactions.

Attributes:

Name Type Description
inputTokens Required[int]

Number of tokens sent in the request to the model.

outputTokens Required[int]

Number of tokens that the model generated for the request.

totalTokens Required[int]

Total number of tokens (input + output).

cacheReadInputTokens int

Number of tokens read from cache (optional).

cacheWriteInputTokens int

Number of tokens written to cache (optional).

Source code in strands/types/event_loop.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Usage(TypedDict, total=False):
    """Token usage information for model interactions.

    Attributes:
        inputTokens: Number of tokens sent in the request to the model.
        outputTokens: Number of tokens that the model generated for the request.
        totalTokens: Total number of tokens (input + output).
        cacheReadInputTokens: Number of tokens read from cache (optional).
        cacheWriteInputTokens: Number of tokens written to cache (optional).
    """

    inputTokens: Required[int]
    outputTokens: Required[int]
    totalTokens: Required[int]
    cacheReadInputTokens: int
    cacheWriteInputTokens: int

_parse_metrics(metrics_data)

Parse Metrics from dict data.

Source code in strands/multiagent/base.py
287
288
289
def _parse_metrics(metrics_data: dict[str, Any]) -> Metrics:
    """Parse Metrics from dict data."""
    return Metrics(latencyMs=metrics_data.get("latencyMs", 0))

_parse_usage(usage_data)

Parse Usage from dict data.

Source code in strands/multiagent/base.py
272
273
274
275
276
277
278
279
280
281
282
283
284
def _parse_usage(usage_data: dict[str, Any]) -> Usage:
    """Parse Usage from dict data."""
    usage = Usage(
        inputTokens=usage_data.get("inputTokens", 0),
        outputTokens=usage_data.get("outputTokens", 0),
        totalTokens=usage_data.get("totalTokens", 0),
    )
    # Add optional fields if they exist
    if "cacheReadInputTokens" in usage_data:
        usage["cacheReadInputTokens"] = usage_data["cacheReadInputTokens"]
    if "cacheWriteInputTokens" in usage_data:
        usage["cacheWriteInputTokens"] = usage_data["cacheWriteInputTokens"]
    return usage

run_async(async_func)

Run an async function in a separate thread to avoid event loop conflicts.

This utility handles the common pattern of running async code from sync contexts by using ThreadPoolExecutor to isolate the async execution.

Parameters:

Name Type Description Default
async_func Callable[[], Awaitable[T]]

A callable that returns an awaitable

required

Returns:

Type Description
T

The result of the async function

Source code in strands/_async.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def run_async(async_func: Callable[[], Awaitable[T]]) -> T:
    """Run an async function in a separate thread to avoid event loop conflicts.

    This utility handles the common pattern of running async code from sync contexts
    by using ThreadPoolExecutor to isolate the async execution.

    Args:
        async_func: A callable that returns an awaitable

    Returns:
        The result of the async function
    """

    async def execute_async() -> T:
        return await async_func()

    def execute() -> T:
        return asyncio.run(execute_async())

    with ThreadPoolExecutor() as executor:
        context = contextvars.copy_context()
        future = executor.submit(context.run, execute)
        return future.result()