Skip to content

strands.telemetry.metrics

Utilities for collecting and reporting performance metrics in the SDK.

logger = logging.getLogger(__name__) module-attribute

AgentInvocation dataclass

Metrics for a single agent invocation.

AgentInvocation contains all the event loop cycles and accumulated token usage for that invocation.

Attributes:

Name Type Description
cycles list[EventLoopCycleMetric]

List of event loop cycles that occurred during this invocation.

usage Usage

Accumulated token usage for this invocation across all cycles.

Source code in strands/telemetry/metrics.py
167
168
169
170
171
172
173
174
175
176
177
178
179
@dataclass
class AgentInvocation:
    """Metrics for a single agent invocation.

    AgentInvocation contains all the event loop cycles and accumulated token usage for that invocation.

    Attributes:
        cycles: List of event loop cycles that occurred during this invocation.
        usage: Accumulated token usage for this invocation across all cycles.
    """

    cycles: list[EventLoopCycleMetric] = field(default_factory=list)
    usage: Usage = field(default_factory=lambda: Usage(inputTokens=0, outputTokens=0, totalTokens=0))

EventLoopCycleMetric dataclass

Aggregated metrics for a single event loop cycle.

Attributes:

Name Type Description
event_loop_cycle_id str

Current eventLoop cycle id.

usage Usage

Total token usage for the entire cycle (succeeded model invocation, excluding tool invocations).

Source code in strands/telemetry/metrics.py
154
155
156
157
158
159
160
161
162
163
164
@dataclass
class EventLoopCycleMetric:
    """Aggregated metrics for a single event loop cycle.

    Attributes:
        event_loop_cycle_id: Current eventLoop cycle id.
        usage: Total token usage for the entire cycle (succeeded model invocation, excluding tool invocations).
    """

    event_loop_cycle_id: str
    usage: Usage

EventLoopMetrics dataclass

Aggregated metrics for an event loop's execution.

Attributes:

Name Type Description
cycle_count int

Number of event loop cycles executed.

tool_metrics dict[str, ToolMetrics]

Metrics for each tool used, keyed by tool name.

cycle_durations list[float]

List of durations for each cycle in seconds.

agent_invocations list[AgentInvocation]

Agent invocation metrics containing cycles and usage data.

traces list[Trace]

List of execution traces.

accumulated_usage Usage

Accumulated token usage across all model invocations (across all requests).

accumulated_metrics Metrics

Accumulated performance metrics across all model invocations.

Source code in strands/telemetry/metrics.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
@dataclass
class EventLoopMetrics:
    """Aggregated metrics for an event loop's execution.

    Attributes:
        cycle_count: Number of event loop cycles executed.
        tool_metrics: Metrics for each tool used, keyed by tool name.
        cycle_durations: List of durations for each cycle in seconds.
        agent_invocations: Agent invocation metrics containing cycles and usage data.
        traces: List of execution traces.
        accumulated_usage: Accumulated token usage across all model invocations (across all requests).
        accumulated_metrics: Accumulated performance metrics across all model invocations.
    """

    cycle_count: int = 0
    tool_metrics: dict[str, ToolMetrics] = field(default_factory=dict)
    cycle_durations: list[float] = field(default_factory=list)
    agent_invocations: list[AgentInvocation] = field(default_factory=list)
    traces: list[Trace] = field(default_factory=list)
    accumulated_usage: Usage = field(default_factory=lambda: Usage(inputTokens=0, outputTokens=0, totalTokens=0))
    accumulated_metrics: Metrics = field(default_factory=lambda: Metrics(latencyMs=0))

    @property
    def _metrics_client(self) -> "MetricsClient":
        """Get the singleton MetricsClient instance."""
        return MetricsClient()

    @property
    def latest_agent_invocation(self) -> Optional[AgentInvocation]:
        """Get the most recent agent invocation.

        Returns:
            The most recent AgentInvocation, or None if no invocations exist.
        """
        return self.agent_invocations[-1] if self.agent_invocations else None

    def start_cycle(
        self,
        attributes: Dict[str, Any],
    ) -> Tuple[float, Trace]:
        """Start a new event loop cycle and create a trace for it.

        Args:
            attributes: attributes of the metrics, including event_loop_cycle_id.

        Returns:
            A tuple containing the start time and the cycle trace object.
        """
        self._metrics_client.event_loop_cycle_count.add(1, attributes=attributes)
        self._metrics_client.event_loop_start_cycle.add(1, attributes=attributes)
        self.cycle_count += 1
        start_time = time.time()
        cycle_trace = Trace(f"Cycle {self.cycle_count}", start_time=start_time)
        self.traces.append(cycle_trace)

        self.agent_invocations[-1].cycles.append(
            EventLoopCycleMetric(
                event_loop_cycle_id=attributes["event_loop_cycle_id"],
                usage=Usage(inputTokens=0, outputTokens=0, totalTokens=0),
            )
        )

        return start_time, cycle_trace

    def end_cycle(self, start_time: float, cycle_trace: Trace, attributes: Optional[Dict[str, Any]] = None) -> None:
        """End the current event loop cycle and record its duration.

        Args:
            start_time: The timestamp when the cycle started.
            cycle_trace: The trace object for this cycle.
            attributes: attributes of the metrics.
        """
        self._metrics_client.event_loop_end_cycle.add(1, attributes)
        end_time = time.time()
        duration = end_time - start_time
        self._metrics_client.event_loop_cycle_duration.record(duration, attributes)
        self.cycle_durations.append(duration)
        cycle_trace.end(end_time)

    def add_tool_usage(
        self,
        tool: ToolUse,
        duration: float,
        tool_trace: Trace,
        success: bool,
        message: Message,
    ) -> None:
        """Record metrics for a tool invocation.

        Args:
            tool: The tool that was used.
            duration: How long the tool call took in seconds.
            tool_trace: The trace object for this tool call.
            success: Whether the tool call was successful.
            message: The message associated with the tool call.
        """
        tool_name = tool.get("name", "unknown_tool")
        tool_use_id = tool.get("toolUseId", "unknown")

        tool_trace.metadata.update(
            {
                "toolUseId": tool_use_id,
                "tool_name": tool_name,
            }
        )
        tool_trace.raw_name = f"{tool_name} - {tool_use_id}"
        tool_trace.add_message(message)

        self.tool_metrics.setdefault(tool_name, ToolMetrics(tool)).add_call(
            tool,
            duration,
            success,
            self._metrics_client,
            attributes={
                "tool_name": tool_name,
                "tool_use_id": tool_use_id,
            },
        )
        tool_trace.end()

    def _accumulate_usage(self, target: Usage, source: Usage) -> None:
        """Helper method to accumulate usage from source to target.

        Args:
            target: The Usage object to accumulate into.
            source: The Usage object to accumulate from.
        """
        target["inputTokens"] += source["inputTokens"]
        target["outputTokens"] += source["outputTokens"]
        target["totalTokens"] += source["totalTokens"]

        if "cacheReadInputTokens" in source:
            target["cacheReadInputTokens"] = target.get("cacheReadInputTokens", 0) + source["cacheReadInputTokens"]

        if "cacheWriteInputTokens" in source:
            target["cacheWriteInputTokens"] = target.get("cacheWriteInputTokens", 0) + source["cacheWriteInputTokens"]

    def update_usage(self, usage: Usage) -> None:
        """Update the accumulated token usage with new usage data.

        Args:
            usage: The usage data to add to the accumulated totals.
        """
        # Record metrics to OpenTelemetry
        self._metrics_client.event_loop_input_tokens.record(usage["inputTokens"])
        self._metrics_client.event_loop_output_tokens.record(usage["outputTokens"])

        # Handle optional cached token metrics for OpenTelemetry
        if "cacheReadInputTokens" in usage:
            self._metrics_client.event_loop_cache_read_input_tokens.record(usage["cacheReadInputTokens"])
        if "cacheWriteInputTokens" in usage:
            self._metrics_client.event_loop_cache_write_input_tokens.record(usage["cacheWriteInputTokens"])

        self._accumulate_usage(self.accumulated_usage, usage)
        self._accumulate_usage(self.agent_invocations[-1].usage, usage)

        if self.agent_invocations[-1].cycles:
            current_cycle = self.agent_invocations[-1].cycles[-1]
            self._accumulate_usage(current_cycle.usage, usage)

    def reset_usage_metrics(self) -> None:
        """Start a new agent invocation by creating a new AgentInvocation.

        This should be called at the start of a new request to begin tracking
        a new agent invocation with fresh usage and cycle data.
        """
        self.agent_invocations.append(AgentInvocation())

    def update_metrics(self, metrics: Metrics) -> None:
        """Update the accumulated performance metrics with new metrics data.

        Args:
            metrics: The metrics data to add to the accumulated totals.
        """
        self._metrics_client.event_loop_latency.record(metrics["latencyMs"])
        if metrics.get("timeToFirstByteMs") is not None:
            self._metrics_client.model_time_to_first_token.record(metrics["timeToFirstByteMs"])
        self.accumulated_metrics["latencyMs"] += metrics["latencyMs"]

    def get_summary(self) -> Dict[str, Any]:
        """Generate a comprehensive summary of all collected metrics.

        Returns:
            A dictionary containing summarized metrics data.
            This includes cycle statistics, tool usage, traces, and accumulated usage information.
        """
        summary = {
            "total_cycles": self.cycle_count,
            "total_duration": sum(self.cycle_durations),
            "average_cycle_time": (sum(self.cycle_durations) / self.cycle_count if self.cycle_count > 0 else 0),
            "tool_usage": {
                tool_name: {
                    "tool_info": {
                        "tool_use_id": metrics.tool.get("toolUseId", "N/A"),
                        "name": metrics.tool.get("name", "unknown"),
                        "input_params": metrics.tool.get("input", {}),
                    },
                    "execution_stats": {
                        "call_count": metrics.call_count,
                        "success_count": metrics.success_count,
                        "error_count": metrics.error_count,
                        "total_time": metrics.total_time,
                        "average_time": (metrics.total_time / metrics.call_count if metrics.call_count > 0 else 0),
                        "success_rate": (metrics.success_count / metrics.call_count if metrics.call_count > 0 else 0),
                    },
                }
                for tool_name, metrics in self.tool_metrics.items()
            },
            "traces": [trace.to_dict() for trace in self.traces],
            "accumulated_usage": self.accumulated_usage,
            "accumulated_metrics": self.accumulated_metrics,
            "agent_invocations": [
                {
                    "usage": invocation.usage,
                    "cycles": [
                        {"event_loop_cycle_id": cycle.event_loop_cycle_id, "usage": cycle.usage}
                        for cycle in invocation.cycles
                    ],
                }
                for invocation in self.agent_invocations
            ],
        }
        return summary

latest_agent_invocation property

Get the most recent agent invocation.

Returns:

Type Description
Optional[AgentInvocation]

The most recent AgentInvocation, or None if no invocations exist.

add_tool_usage(tool, duration, tool_trace, success, message)

Record metrics for a tool invocation.

Parameters:

Name Type Description Default
tool ToolUse

The tool that was used.

required
duration float

How long the tool call took in seconds.

required
tool_trace Trace

The trace object for this tool call.

required
success bool

Whether the tool call was successful.

required
message Message

The message associated with the tool call.

required
Source code in strands/telemetry/metrics.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def add_tool_usage(
    self,
    tool: ToolUse,
    duration: float,
    tool_trace: Trace,
    success: bool,
    message: Message,
) -> None:
    """Record metrics for a tool invocation.

    Args:
        tool: The tool that was used.
        duration: How long the tool call took in seconds.
        tool_trace: The trace object for this tool call.
        success: Whether the tool call was successful.
        message: The message associated with the tool call.
    """
    tool_name = tool.get("name", "unknown_tool")
    tool_use_id = tool.get("toolUseId", "unknown")

    tool_trace.metadata.update(
        {
            "toolUseId": tool_use_id,
            "tool_name": tool_name,
        }
    )
    tool_trace.raw_name = f"{tool_name} - {tool_use_id}"
    tool_trace.add_message(message)

    self.tool_metrics.setdefault(tool_name, ToolMetrics(tool)).add_call(
        tool,
        duration,
        success,
        self._metrics_client,
        attributes={
            "tool_name": tool_name,
            "tool_use_id": tool_use_id,
        },
    )
    tool_trace.end()

end_cycle(start_time, cycle_trace, attributes=None)

End the current event loop cycle and record its duration.

Parameters:

Name Type Description Default
start_time float

The timestamp when the cycle started.

required
cycle_trace Trace

The trace object for this cycle.

required
attributes Optional[Dict[str, Any]]

attributes of the metrics.

None
Source code in strands/telemetry/metrics.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def end_cycle(self, start_time: float, cycle_trace: Trace, attributes: Optional[Dict[str, Any]] = None) -> None:
    """End the current event loop cycle and record its duration.

    Args:
        start_time: The timestamp when the cycle started.
        cycle_trace: The trace object for this cycle.
        attributes: attributes of the metrics.
    """
    self._metrics_client.event_loop_end_cycle.add(1, attributes)
    end_time = time.time()
    duration = end_time - start_time
    self._metrics_client.event_loop_cycle_duration.record(duration, attributes)
    self.cycle_durations.append(duration)
    cycle_trace.end(end_time)

get_summary()

Generate a comprehensive summary of all collected metrics.

Returns:

Type Description
Dict[str, Any]

A dictionary containing summarized metrics data.

Dict[str, Any]

This includes cycle statistics, tool usage, traces, and accumulated usage information.

Source code in strands/telemetry/metrics.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def get_summary(self) -> Dict[str, Any]:
    """Generate a comprehensive summary of all collected metrics.

    Returns:
        A dictionary containing summarized metrics data.
        This includes cycle statistics, tool usage, traces, and accumulated usage information.
    """
    summary = {
        "total_cycles": self.cycle_count,
        "total_duration": sum(self.cycle_durations),
        "average_cycle_time": (sum(self.cycle_durations) / self.cycle_count if self.cycle_count > 0 else 0),
        "tool_usage": {
            tool_name: {
                "tool_info": {
                    "tool_use_id": metrics.tool.get("toolUseId", "N/A"),
                    "name": metrics.tool.get("name", "unknown"),
                    "input_params": metrics.tool.get("input", {}),
                },
                "execution_stats": {
                    "call_count": metrics.call_count,
                    "success_count": metrics.success_count,
                    "error_count": metrics.error_count,
                    "total_time": metrics.total_time,
                    "average_time": (metrics.total_time / metrics.call_count if metrics.call_count > 0 else 0),
                    "success_rate": (metrics.success_count / metrics.call_count if metrics.call_count > 0 else 0),
                },
            }
            for tool_name, metrics in self.tool_metrics.items()
        },
        "traces": [trace.to_dict() for trace in self.traces],
        "accumulated_usage": self.accumulated_usage,
        "accumulated_metrics": self.accumulated_metrics,
        "agent_invocations": [
            {
                "usage": invocation.usage,
                "cycles": [
                    {"event_loop_cycle_id": cycle.event_loop_cycle_id, "usage": cycle.usage}
                    for cycle in invocation.cycles
                ],
            }
            for invocation in self.agent_invocations
        ],
    }
    return summary

reset_usage_metrics()

Start a new agent invocation by creating a new AgentInvocation.

This should be called at the start of a new request to begin tracking a new agent invocation with fresh usage and cycle data.

Source code in strands/telemetry/metrics.py
342
343
344
345
346
347
348
def reset_usage_metrics(self) -> None:
    """Start a new agent invocation by creating a new AgentInvocation.

    This should be called at the start of a new request to begin tracking
    a new agent invocation with fresh usage and cycle data.
    """
    self.agent_invocations.append(AgentInvocation())

start_cycle(attributes)

Start a new event loop cycle and create a trace for it.

Parameters:

Name Type Description Default
attributes Dict[str, Any]

attributes of the metrics, including event_loop_cycle_id.

required

Returns:

Type Description
Tuple[float, Trace]

A tuple containing the start time and the cycle trace object.

Source code in strands/telemetry/metrics.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def start_cycle(
    self,
    attributes: Dict[str, Any],
) -> Tuple[float, Trace]:
    """Start a new event loop cycle and create a trace for it.

    Args:
        attributes: attributes of the metrics, including event_loop_cycle_id.

    Returns:
        A tuple containing the start time and the cycle trace object.
    """
    self._metrics_client.event_loop_cycle_count.add(1, attributes=attributes)
    self._metrics_client.event_loop_start_cycle.add(1, attributes=attributes)
    self.cycle_count += 1
    start_time = time.time()
    cycle_trace = Trace(f"Cycle {self.cycle_count}", start_time=start_time)
    self.traces.append(cycle_trace)

    self.agent_invocations[-1].cycles.append(
        EventLoopCycleMetric(
            event_loop_cycle_id=attributes["event_loop_cycle_id"],
            usage=Usage(inputTokens=0, outputTokens=0, totalTokens=0),
        )
    )

    return start_time, cycle_trace

update_metrics(metrics)

Update the accumulated performance metrics with new metrics data.

Parameters:

Name Type Description Default
metrics Metrics

The metrics data to add to the accumulated totals.

required
Source code in strands/telemetry/metrics.py
350
351
352
353
354
355
356
357
358
359
def update_metrics(self, metrics: Metrics) -> None:
    """Update the accumulated performance metrics with new metrics data.

    Args:
        metrics: The metrics data to add to the accumulated totals.
    """
    self._metrics_client.event_loop_latency.record(metrics["latencyMs"])
    if metrics.get("timeToFirstByteMs") is not None:
        self._metrics_client.model_time_to_first_token.record(metrics["timeToFirstByteMs"])
    self.accumulated_metrics["latencyMs"] += metrics["latencyMs"]

update_usage(usage)

Update the accumulated token usage with new usage data.

Parameters:

Name Type Description Default
usage Usage

The usage data to add to the accumulated totals.

required
Source code in strands/telemetry/metrics.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def update_usage(self, usage: Usage) -> None:
    """Update the accumulated token usage with new usage data.

    Args:
        usage: The usage data to add to the accumulated totals.
    """
    # Record metrics to OpenTelemetry
    self._metrics_client.event_loop_input_tokens.record(usage["inputTokens"])
    self._metrics_client.event_loop_output_tokens.record(usage["outputTokens"])

    # Handle optional cached token metrics for OpenTelemetry
    if "cacheReadInputTokens" in usage:
        self._metrics_client.event_loop_cache_read_input_tokens.record(usage["cacheReadInputTokens"])
    if "cacheWriteInputTokens" in usage:
        self._metrics_client.event_loop_cache_write_input_tokens.record(usage["cacheWriteInputTokens"])

    self._accumulate_usage(self.accumulated_usage, usage)
    self._accumulate_usage(self.agent_invocations[-1].usage, usage)

    if self.agent_invocations[-1].cycles:
        current_cycle = self.agent_invocations[-1].cycles[-1]
        self._accumulate_usage(current_cycle.usage, usage)

Message

Bases: TypedDict

A message in a conversation with the agent.

Attributes:

Name Type Description
content List[ContentBlock]

The message content.

role Role

The role of the message sender.

Source code in strands/types/content.py
178
179
180
181
182
183
184
185
186
187
class Message(TypedDict):
    """A message in a conversation with the agent.

    Attributes:
        content: The message content.
        role: The role of the message sender.
    """

    content: List[ContentBlock]
    role: Role

Metrics

Bases: TypedDict

Performance metrics for model interactions.

Attributes:

Name Type Description
latencyMs int

Latency of the model request in milliseconds.

timeToFirstByteMs int

Latency from sending model request to first content chunk (contentBlockDelta or contentBlockStart) from the model in milliseconds.

Source code in strands/types/event_loop.py
26
27
28
29
30
31
32
33
34
35
36
class Metrics(TypedDict, total=False):
    """Performance metrics for model interactions.

    Attributes:
        latencyMs (int): Latency of the model request in milliseconds.
        timeToFirstByteMs (int): Latency from sending model request to first
            content chunk (contentBlockDelta or contentBlockStart) from the model in milliseconds.
    """

    latencyMs: Required[int]
    timeToFirstByteMs: int

MetricsClient

Singleton client for managing OpenTelemetry metrics instruments.

The actual metrics export destination (console, OTLP endpoint, etc.) is configured through OpenTelemetry SDK configuration by users, not by this client.

Source code in strands/telemetry/metrics.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
class MetricsClient:
    """Singleton client for managing OpenTelemetry metrics instruments.

    The actual metrics export destination (console, OTLP endpoint, etc.) is configured
    through OpenTelemetry SDK configuration by users, not by this client.
    """

    _instance: Optional["MetricsClient"] = None
    meter: Meter
    event_loop_cycle_count: Counter
    event_loop_start_cycle: Counter
    event_loop_end_cycle: Counter
    event_loop_cycle_duration: Histogram
    event_loop_latency: Histogram
    event_loop_input_tokens: Histogram
    event_loop_output_tokens: Histogram
    event_loop_cache_read_input_tokens: Histogram
    event_loop_cache_write_input_tokens: Histogram
    model_time_to_first_token: Histogram
    tool_call_count: Counter
    tool_success_count: Counter
    tool_error_count: Counter
    tool_duration: Histogram

    def __new__(cls) -> "MetricsClient":
        """Create or return the singleton instance of MetricsClient.

        Returns:
            The single MetricsClient instance.
        """
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self) -> None:
        """Initialize the MetricsClient.

        This method only runs once due to the singleton pattern.
        Sets up the OpenTelemetry meter and creates metric instruments.
        """
        if hasattr(self, "meter"):
            return

        logger.info("Creating Strands MetricsClient")
        meter_provider: metrics_api.MeterProvider = metrics_api.get_meter_provider()
        self.meter = meter_provider.get_meter(__name__)
        self.create_instruments()

    def create_instruments(self) -> None:
        """Create and initialize all OpenTelemetry metric instruments."""
        self.event_loop_cycle_count = self.meter.create_counter(
            name=constants.STRANDS_EVENT_LOOP_CYCLE_COUNT, unit="Count"
        )
        self.event_loop_start_cycle = self.meter.create_counter(
            name=constants.STRANDS_EVENT_LOOP_START_CYCLE, unit="Count"
        )
        self.event_loop_end_cycle = self.meter.create_counter(name=constants.STRANDS_EVENT_LOOP_END_CYCLE, unit="Count")
        self.event_loop_cycle_duration = self.meter.create_histogram(
            name=constants.STRANDS_EVENT_LOOP_CYCLE_DURATION, unit="s"
        )
        self.event_loop_latency = self.meter.create_histogram(name=constants.STRANDS_EVENT_LOOP_LATENCY, unit="ms")
        self.tool_call_count = self.meter.create_counter(name=constants.STRANDS_TOOL_CALL_COUNT, unit="Count")
        self.tool_success_count = self.meter.create_counter(name=constants.STRANDS_TOOL_SUCCESS_COUNT, unit="Count")
        self.tool_error_count = self.meter.create_counter(name=constants.STRANDS_TOOL_ERROR_COUNT, unit="Count")
        self.tool_duration = self.meter.create_histogram(name=constants.STRANDS_TOOL_DURATION, unit="s")
        self.event_loop_input_tokens = self.meter.create_histogram(
            name=constants.STRANDS_EVENT_LOOP_INPUT_TOKENS, unit="token"
        )
        self.event_loop_output_tokens = self.meter.create_histogram(
            name=constants.STRANDS_EVENT_LOOP_OUTPUT_TOKENS, unit="token"
        )
        self.event_loop_cache_read_input_tokens = self.meter.create_histogram(
            name=constants.STRANDS_EVENT_LOOP_CACHE_READ_INPUT_TOKENS, unit="token"
        )
        self.event_loop_cache_write_input_tokens = self.meter.create_histogram(
            name=constants.STRANDS_EVENT_LOOP_CACHE_WRITE_INPUT_TOKENS, unit="token"
        )
        self.model_time_to_first_token = self.meter.create_histogram(
            name=constants.STRANDS_MODEL_TIME_TO_FIRST_TOKEN, unit="ms"
        )

__init__()

Initialize the MetricsClient.

This method only runs once due to the singleton pattern. Sets up the OpenTelemetry meter and creates metric instruments.

Source code in strands/telemetry/metrics.py
547
548
549
550
551
552
553
554
555
556
557
558
559
def __init__(self) -> None:
    """Initialize the MetricsClient.

    This method only runs once due to the singleton pattern.
    Sets up the OpenTelemetry meter and creates metric instruments.
    """
    if hasattr(self, "meter"):
        return

    logger.info("Creating Strands MetricsClient")
    meter_provider: metrics_api.MeterProvider = metrics_api.get_meter_provider()
    self.meter = meter_provider.get_meter(__name__)
    self.create_instruments()

__new__()

Create or return the singleton instance of MetricsClient.

Returns:

Type Description
MetricsClient

The single MetricsClient instance.

Source code in strands/telemetry/metrics.py
537
538
539
540
541
542
543
544
545
def __new__(cls) -> "MetricsClient":
    """Create or return the singleton instance of MetricsClient.

    Returns:
        The single MetricsClient instance.
    """
    if cls._instance is None:
        cls._instance = super().__new__(cls)
    return cls._instance

create_instruments()

Create and initialize all OpenTelemetry metric instruments.

Source code in strands/telemetry/metrics.py
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
def create_instruments(self) -> None:
    """Create and initialize all OpenTelemetry metric instruments."""
    self.event_loop_cycle_count = self.meter.create_counter(
        name=constants.STRANDS_EVENT_LOOP_CYCLE_COUNT, unit="Count"
    )
    self.event_loop_start_cycle = self.meter.create_counter(
        name=constants.STRANDS_EVENT_LOOP_START_CYCLE, unit="Count"
    )
    self.event_loop_end_cycle = self.meter.create_counter(name=constants.STRANDS_EVENT_LOOP_END_CYCLE, unit="Count")
    self.event_loop_cycle_duration = self.meter.create_histogram(
        name=constants.STRANDS_EVENT_LOOP_CYCLE_DURATION, unit="s"
    )
    self.event_loop_latency = self.meter.create_histogram(name=constants.STRANDS_EVENT_LOOP_LATENCY, unit="ms")
    self.tool_call_count = self.meter.create_counter(name=constants.STRANDS_TOOL_CALL_COUNT, unit="Count")
    self.tool_success_count = self.meter.create_counter(name=constants.STRANDS_TOOL_SUCCESS_COUNT, unit="Count")
    self.tool_error_count = self.meter.create_counter(name=constants.STRANDS_TOOL_ERROR_COUNT, unit="Count")
    self.tool_duration = self.meter.create_histogram(name=constants.STRANDS_TOOL_DURATION, unit="s")
    self.event_loop_input_tokens = self.meter.create_histogram(
        name=constants.STRANDS_EVENT_LOOP_INPUT_TOKENS, unit="token"
    )
    self.event_loop_output_tokens = self.meter.create_histogram(
        name=constants.STRANDS_EVENT_LOOP_OUTPUT_TOKENS, unit="token"
    )
    self.event_loop_cache_read_input_tokens = self.meter.create_histogram(
        name=constants.STRANDS_EVENT_LOOP_CACHE_READ_INPUT_TOKENS, unit="token"
    )
    self.event_loop_cache_write_input_tokens = self.meter.create_histogram(
        name=constants.STRANDS_EVENT_LOOP_CACHE_WRITE_INPUT_TOKENS, unit="token"
    )
    self.model_time_to_first_token = self.meter.create_histogram(
        name=constants.STRANDS_MODEL_TIME_TO_FIRST_TOKEN, unit="ms"
    )

ToolMetrics dataclass

Metrics for a specific tool's usage.

Attributes:

Name Type Description
tool ToolUse

The tool being tracked.

call_count int

Number of times the tool has been called.

success_count int

Number of successful tool calls.

error_count int

Number of failed tool calls.

total_time float

Total execution time across all calls in seconds.

Source code in strands/telemetry/metrics.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@dataclass
class ToolMetrics:
    """Metrics for a specific tool's usage.

    Attributes:
        tool: The tool being tracked.
        call_count: Number of times the tool has been called.
        success_count: Number of successful tool calls.
        error_count: Number of failed tool calls.
        total_time: Total execution time across all calls in seconds.
    """

    tool: ToolUse
    call_count: int = 0
    success_count: int = 0
    error_count: int = 0
    total_time: float = 0.0

    def add_call(
        self,
        tool: ToolUse,
        duration: float,
        success: bool,
        metrics_client: "MetricsClient",
        attributes: Optional[Dict[str, Any]] = None,
    ) -> None:
        """Record a new tool call with its outcome.

        Args:
            tool: The tool that was called.
            duration: How long the call took in seconds.
            success: Whether the call was successful.
            metrics_client: The metrics client for recording the metrics.
            attributes: attributes of the metrics.
        """
        self.tool = tool  # Update with latest tool state
        self.call_count += 1
        self.total_time += duration
        metrics_client.tool_call_count.add(1, attributes=attributes)
        metrics_client.tool_duration.record(duration, attributes=attributes)
        if success:
            self.success_count += 1
            metrics_client.tool_success_count.add(1, attributes=attributes)
        else:
            self.error_count += 1
            metrics_client.tool_error_count.add(1, attributes=attributes)

add_call(tool, duration, success, metrics_client, attributes=None)

Record a new tool call with its outcome.

Parameters:

Name Type Description Default
tool ToolUse

The tool that was called.

required
duration float

How long the call took in seconds.

required
success bool

Whether the call was successful.

required
metrics_client MetricsClient

The metrics client for recording the metrics.

required
attributes Optional[Dict[str, Any]]

attributes of the metrics.

None
Source code in strands/telemetry/metrics.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def add_call(
    self,
    tool: ToolUse,
    duration: float,
    success: bool,
    metrics_client: "MetricsClient",
    attributes: Optional[Dict[str, Any]] = None,
) -> None:
    """Record a new tool call with its outcome.

    Args:
        tool: The tool that was called.
        duration: How long the call took in seconds.
        success: Whether the call was successful.
        metrics_client: The metrics client for recording the metrics.
        attributes: attributes of the metrics.
    """
    self.tool = tool  # Update with latest tool state
    self.call_count += 1
    self.total_time += duration
    metrics_client.tool_call_count.add(1, attributes=attributes)
    metrics_client.tool_duration.record(duration, attributes=attributes)
    if success:
        self.success_count += 1
        metrics_client.tool_success_count.add(1, attributes=attributes)
    else:
        self.error_count += 1
        metrics_client.tool_error_count.add(1, attributes=attributes)

ToolUse

Bases: TypedDict

A request from the model to use a specific tool with the provided input.

Attributes:

Name Type Description
input Any

The input parameters for the tool. Can be any JSON-serializable type.

name str

The name of the tool to invoke.

toolUseId str

A unique identifier for this specific tool use request.

Source code in strands/types/tools.py
52
53
54
55
56
57
58
59
60
61
62
63
64
class ToolUse(TypedDict):
    """A request from the model to use a specific tool with the provided input.

    Attributes:
        input: The input parameters for the tool.
            Can be any JSON-serializable type.
        name: The name of the tool to invoke.
        toolUseId: A unique identifier for this specific tool use request.
    """

    input: Any
    name: str
    toolUseId: str

Trace

A trace representing a single operation or step in the execution flow.

Source code in strands/telemetry/metrics.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class Trace:
    """A trace representing a single operation or step in the execution flow."""

    def __init__(
        self,
        name: str,
        parent_id: Optional[str] = None,
        start_time: Optional[float] = None,
        raw_name: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
        message: Optional[Message] = None,
    ) -> None:
        """Initialize a new trace.

        Args:
            name: Human-readable name of the operation being traced.
            parent_id: ID of the parent trace, if this is a child operation.
            start_time: Timestamp when the trace started.
                If not provided, the current time will be used.
            raw_name: System level name.
            metadata: Additional contextual information about the trace.
            message: Message associated with the trace.
        """
        self.id: str = str(uuid.uuid4())
        self.name: str = name
        self.raw_name: Optional[str] = raw_name
        self.parent_id: Optional[str] = parent_id
        self.start_time: float = start_time if start_time is not None else time.time()
        self.end_time: Optional[float] = None
        self.children: List["Trace"] = []
        self.metadata: Dict[str, Any] = metadata or {}
        self.message: Optional[Message] = message

    def end(self, end_time: Optional[float] = None) -> None:
        """Mark the trace as complete with the given or current timestamp.

        Args:
            end_time: Timestamp to use as the end time.
                If not provided, the current time will be used.
        """
        self.end_time = end_time if end_time is not None else time.time()

    def add_child(self, child: "Trace") -> None:
        """Add a child trace to this trace.

        Args:
            child: The child trace to add.
        """
        self.children.append(child)

    def duration(self) -> Optional[float]:
        """Calculate the duration of this trace.

        Returns:
            The duration in seconds, or None if the trace hasn't ended yet.
        """
        return None if self.end_time is None else self.end_time - self.start_time

    def add_message(self, message: Message) -> None:
        """Add a message to the trace.

        Args:
            message: The message to add.
        """
        self.message = message

    def to_dict(self) -> Dict[str, Any]:
        """Convert the trace to a dictionary representation.

        Returns:
            A dictionary containing all trace information, suitable for serialization.
        """
        return {
            "id": self.id,
            "name": self.name,
            "raw_name": self.raw_name,
            "parent_id": self.parent_id,
            "start_time": self.start_time,
            "end_time": self.end_time,
            "duration": self.duration(),
            "children": [child.to_dict() for child in self.children],
            "metadata": self.metadata,
            "message": self.message,
        }

__init__(name, parent_id=None, start_time=None, raw_name=None, metadata=None, message=None)

Initialize a new trace.

Parameters:

Name Type Description Default
name str

Human-readable name of the operation being traced.

required
parent_id Optional[str]

ID of the parent trace, if this is a child operation.

None
start_time Optional[float]

Timestamp when the trace started. If not provided, the current time will be used.

None
raw_name Optional[str]

System level name.

None
metadata Optional[Dict[str, Any]]

Additional contextual information about the trace.

None
message Optional[Message]

Message associated with the trace.

None
Source code in strands/telemetry/metrics.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(
    self,
    name: str,
    parent_id: Optional[str] = None,
    start_time: Optional[float] = None,
    raw_name: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None,
    message: Optional[Message] = None,
) -> None:
    """Initialize a new trace.

    Args:
        name: Human-readable name of the operation being traced.
        parent_id: ID of the parent trace, if this is a child operation.
        start_time: Timestamp when the trace started.
            If not provided, the current time will be used.
        raw_name: System level name.
        metadata: Additional contextual information about the trace.
        message: Message associated with the trace.
    """
    self.id: str = str(uuid.uuid4())
    self.name: str = name
    self.raw_name: Optional[str] = raw_name
    self.parent_id: Optional[str] = parent_id
    self.start_time: float = start_time if start_time is not None else time.time()
    self.end_time: Optional[float] = None
    self.children: List["Trace"] = []
    self.metadata: Dict[str, Any] = metadata or {}
    self.message: Optional[Message] = message

add_child(child)

Add a child trace to this trace.

Parameters:

Name Type Description Default
child Trace

The child trace to add.

required
Source code in strands/telemetry/metrics.py
62
63
64
65
66
67
68
def add_child(self, child: "Trace") -> None:
    """Add a child trace to this trace.

    Args:
        child: The child trace to add.
    """
    self.children.append(child)

add_message(message)

Add a message to the trace.

Parameters:

Name Type Description Default
message Message

The message to add.

required
Source code in strands/telemetry/metrics.py
78
79
80
81
82
83
84
def add_message(self, message: Message) -> None:
    """Add a message to the trace.

    Args:
        message: The message to add.
    """
    self.message = message

duration()

Calculate the duration of this trace.

Returns:

Type Description
Optional[float]

The duration in seconds, or None if the trace hasn't ended yet.

Source code in strands/telemetry/metrics.py
70
71
72
73
74
75
76
def duration(self) -> Optional[float]:
    """Calculate the duration of this trace.

    Returns:
        The duration in seconds, or None if the trace hasn't ended yet.
    """
    return None if self.end_time is None else self.end_time - self.start_time

end(end_time=None)

Mark the trace as complete with the given or current timestamp.

Parameters:

Name Type Description Default
end_time Optional[float]

Timestamp to use as the end time. If not provided, the current time will be used.

None
Source code in strands/telemetry/metrics.py
53
54
55
56
57
58
59
60
def end(self, end_time: Optional[float] = None) -> None:
    """Mark the trace as complete with the given or current timestamp.

    Args:
        end_time: Timestamp to use as the end time.
            If not provided, the current time will be used.
    """
    self.end_time = end_time if end_time is not None else time.time()

to_dict()

Convert the trace to a dictionary representation.

Returns:

Type Description
Dict[str, Any]

A dictionary containing all trace information, suitable for serialization.

Source code in strands/telemetry/metrics.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def to_dict(self) -> Dict[str, Any]:
    """Convert the trace to a dictionary representation.

    Returns:
        A dictionary containing all trace information, suitable for serialization.
    """
    return {
        "id": self.id,
        "name": self.name,
        "raw_name": self.raw_name,
        "parent_id": self.parent_id,
        "start_time": self.start_time,
        "end_time": self.end_time,
        "duration": self.duration(),
        "children": [child.to_dict() for child in self.children],
        "metadata": self.metadata,
        "message": self.message,
    }

Usage

Bases: TypedDict

Token usage information for model interactions.

Attributes:

Name Type Description
inputTokens Required[int]

Number of tokens sent in the request to the model.

outputTokens Required[int]

Number of tokens that the model generated for the request.

totalTokens Required[int]

Total number of tokens (input + output).

cacheReadInputTokens int

Number of tokens read from cache (optional).

cacheWriteInputTokens int

Number of tokens written to cache (optional).

Source code in strands/types/event_loop.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Usage(TypedDict, total=False):
    """Token usage information for model interactions.

    Attributes:
        inputTokens: Number of tokens sent in the request to the model.
        outputTokens: Number of tokens that the model generated for the request.
        totalTokens: Total number of tokens (input + output).
        cacheReadInputTokens: Number of tokens read from cache (optional).
        cacheWriteInputTokens: Number of tokens written to cache (optional).
    """

    inputTokens: Required[int]
    outputTokens: Required[int]
    totalTokens: Required[int]
    cacheReadInputTokens: int
    cacheWriteInputTokens: int

_metrics_summary_to_lines(event_loop_metrics, allowed_names)

Convert event loop metrics to a series of formatted text lines.

Parameters:

Name Type Description Default
event_loop_metrics EventLoopMetrics

The metrics to format.

required
allowed_names Set[str]

Set of names that are allowed to be displayed unmodified.

required

Returns:

Type Description
Iterable[str]

An iterable of formatted text lines representing the metrics.

Source code in strands/telemetry/metrics.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def _metrics_summary_to_lines(event_loop_metrics: EventLoopMetrics, allowed_names: Set[str]) -> Iterable[str]:
    """Convert event loop metrics to a series of formatted text lines.

    Args:
        event_loop_metrics: The metrics to format.
        allowed_names: Set of names that are allowed to be displayed unmodified.

    Returns:
        An iterable of formatted text lines representing the metrics.
    """
    summary = event_loop_metrics.get_summary()
    yield "Event Loop Metrics Summary:"
    yield (
        f"├─ Cycles: total={summary['total_cycles']}, avg_time={summary['average_cycle_time']:.3f}s, "
        f"total_time={summary['total_duration']:.3f}s"
    )

    # Build token display with optional cached tokens
    token_parts = [
        f"in={summary['accumulated_usage']['inputTokens']}",
        f"out={summary['accumulated_usage']['outputTokens']}",
        f"total={summary['accumulated_usage']['totalTokens']}",
    ]

    # Add cached token info if present
    if summary["accumulated_usage"].get("cacheReadInputTokens"):
        token_parts.append(f"cache_read_input_tokens={summary['accumulated_usage']['cacheReadInputTokens']}")
    if summary["accumulated_usage"].get("cacheWriteInputTokens"):
        token_parts.append(f"cache_write_input_tokens={summary['accumulated_usage']['cacheWriteInputTokens']}")

    yield f"├─ Tokens: {', '.join(token_parts)}"
    yield f"├─ Bedrock Latency: {summary['accumulated_metrics']['latencyMs']}ms"

    yield "├─ Tool Usage:"
    for tool_name, tool_data in summary.get("tool_usage", {}).items():
        # tool_info = tool_data["tool_info"]
        exec_stats = tool_data["execution_stats"]

        # Tool header - show just name for multi-call case
        yield f"   └─ {tool_name}:"
        # Execution stats
        yield f"      ├─ Stats: calls={exec_stats['call_count']}, success={exec_stats['success_count']}"
        yield f"      │         errors={exec_stats['error_count']}, success_rate={exec_stats['success_rate']:.1%}"
        yield f"      ├─ Timing: avg={exec_stats['average_time']:.3f}s, total={exec_stats['total_time']:.3f}s"
        # All tool calls with their inputs
        yield "      └─ Tool Calls:"
        # Show tool use ID and input for each call from the traces
        for trace in event_loop_metrics.traces:
            for child in trace.children:
                if child.metadata.get("tool_name") == tool_name:
                    tool_use_id = child.metadata.get("toolUseId", "unknown")
                    # tool_input = child.metadata.get('tool_input', {})
                    yield f"         ├─ {tool_use_id}: {tool_name}"
                    # yield f"         │  └─ Input: {json.dumps(tool_input, sort_keys=True)}"

    yield "├─ Execution Trace:"

    for trace in event_loop_metrics.traces:
        yield from _trace_to_lines(trace.to_dict(), allowed_names=allowed_names, indent=1)

_trace_to_lines(trace, allowed_names, indent)

Convert a trace to a series of formatted text lines.

Parameters:

Name Type Description Default
trace Dict

The trace dictionary to format.

required
allowed_names Set[str]

Set of names that are allowed to be displayed unmodified.

required
indent int

The indentation level for the output lines.

required

Returns:

Type Description
Iterable[str]

An iterable of formatted text lines representing the trace.

Source code in strands/telemetry/metrics.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
def _trace_to_lines(trace: Dict, allowed_names: Set[str], indent: int) -> Iterable[str]:
    """Convert a trace to a series of formatted text lines.

    Args:
        trace: The trace dictionary to format.
        allowed_names: Set of names that are allowed to be displayed unmodified.
        indent: The indentation level for the output lines.

    Returns:
        An iterable of formatted text lines representing the trace.
    """
    duration = trace.get("duration", "N/A")
    duration_str = f"{duration:.4f}s" if isinstance(duration, (int, float)) else str(duration)

    safe_name = trace.get("raw_name", trace.get("name"))

    tool_use_id = ""
    # Check if this trace contains tool info with toolUseId
    if trace.get("raw_name") and isinstance(safe_name, str) and " - tooluse_" in safe_name:
        # Already includes toolUseId, use as is
        yield f"{'   ' * indent}└─ {safe_name} - Duration: {duration_str}"
    else:
        # Extract toolUseId if it exists in metadata
        metadata = trace.get("metadata", {})
        if isinstance(metadata, dict) and metadata.get("toolUseId"):
            tool_use_id = f" - {metadata['toolUseId']}"
        yield f"{'   ' * indent}└─ {safe_name}{tool_use_id} - Duration: {duration_str}"

    for child in trace.get("children", []):
        yield from _trace_to_lines(child, allowed_names, indent + 1)

metrics_to_string(event_loop_metrics, allowed_names=None)

Convert event loop metrics to a human-readable string representation.

Parameters:

Name Type Description Default
event_loop_metrics EventLoopMetrics

The metrics to format.

required
allowed_names Optional[Set[str]]

Set of names that are allowed to be displayed unmodified.

None

Returns:

Type Description
str

A formatted string representation of the metrics.

Source code in strands/telemetry/metrics.py
500
501
502
503
504
505
506
507
508
509
510
def metrics_to_string(event_loop_metrics: EventLoopMetrics, allowed_names: Optional[Set[str]] = None) -> str:
    """Convert event loop metrics to a human-readable string representation.

    Args:
        event_loop_metrics: The metrics to format.
        allowed_names: Set of names that are allowed to be displayed unmodified.

    Returns:
        A formatted string representation of the metrics.
    """
    return "\n".join(_metrics_summary_to_lines(event_loop_metrics, allowed_names or set()))