For AI agents: a documentation index is available at the root level at /llms.txt and /llms-full.txt. Append /llms.txt to any URL for a page-level index, or .md for the markdown version of any page.
    • Welcome to AIPerf Documentation
  • Getting Started
    • Profiling with AIPerf
    • Comprehensive LLM Benchmarking
    • Migrating from GenAI-Perf
    • GenAI-Perf vs AIPerf CLI Feature Comparison Matrix
  • Tutorials
      • Architecture of AIPerf
      • Metrics Flow
      • Mixins
      • Code Patterns
      • Global Property-Test Invariants
      • Sweep Orchestrator (Dev Reference)
      • YAML Config Future Goals
NVIDIANVIDIA
Developer-friendly docs for your API
Privacy Policy | Your Privacy Choices | Terms of Service | Accessibility | Corporate Policies | Product Security | Contact

Copyright © 2026, NVIDIA Corporation.

LogoLogoDocumentation
On this page
  • CLI Command Pattern
  • Adding a New CLI Flag
  • Service Pattern
  • Model Pattern
  • Message Pattern
  • Plugin System Pattern
  • Error Handling Pattern
  • Logging Pattern
  • NaN/Inf Discipline Pattern
  • FiniteFloat for Pydantic metric fields
  • scrub_non_finite before every JSON exporter
  • is_finite_value for the canonical finiteness check
  • nan_safe_mean / nan_safe_std for aggregation
  • Don’t: the bug pattern these primitives prevent
  • Safe Filesystem Reads Pattern
  • What the helper does
  • When this pattern does NOT apply
  • Testing Pattern
  • Console Exporter Pattern
  • Uncertainty Plot Pattern
  • Data Contract
  • Multi-Series Data Contract
  • Plotly Renderer (interactive + Kaleido PNG)
  • Matplotlib Renderer (code-gen reports)
  • Ellipse Geometry Utility
  • Plot Envelope (plot:)
  • Validator Pattern
  • Endpoint Mixin Pattern
  • Per-turn dataset extra
  • Strategy Protocol Pattern
  • Drop-Oldest Fanout Queue
Architecture & Internals

AIPerf Code Patterns

||View as Markdown|
Previous

Mixins

Next

Global Property-Test Invariants

Code examples for common development tasks. Referenced from CLAUDE.md.

CLI Command Pattern

Commands live in src/aiperf/cli_commands/, one file per command. They are lazily loaded via import strings in aiperf.cli — modules are only imported when their command is invoked:

1# aiperf/cli.py — register with lazy import strings
2app.command("aiperf.cli_commands.profile:app", name="profile")
1# aiperf/cli_commands/profile.py — thin command definition
2from cyclopts import App
3from aiperf.config.flags import CLIConfig
4
5app = App(name="profile")
6
7@app.default
8def profile(*, cli_config: CLIConfig) -> None:
9 """Run the Profile subcommand."""
10 from aiperf.cli_utils import exit_on_error
11 from aiperf.config.loader.errors import ConfigurationError
12
13 with exit_on_error(title="Error Running AIPerf System", show_traceback=False):
14 from aiperf.config.flags.resolver import resolve_config
15 from aiperf.config.loader import build_benchmark_plan
16
17 config = resolve_config(cli_config, cli_config.config_file)
18 plan = build_benchmark_plan(config)
19
20 with exit_on_error(
21 title="Error Running AIPerf System",
22 quiet_for=(ConfigurationError,),
23 ):
24 from aiperf.cli_runner import run_benchmark # heavy import deferred
25
26 run_benchmark(plan)

Conventions:

  • Export a single App named app.
  • Hyphenate multi-word commands: App(name="analyze-trace").
  • Keep module-level imports minimal; heavy deps go inside the function body.
  • Heavy implementation logic lives in a cli.py inside the owning domain package (e.g. aiperf/plugin/cli.py), lazily imported at call time.

Adding a New CLI Flag

CLIConfig is a flat DTO — every CLI flag is a top-level field on CLIConfig with an Annotated[...] annotation that carries Pydantic metadata + the cyclopts CLI binding. Never add a new nested config class. Disambiguate collisions with a section prefix (e.g. image_batch_size vs audio_batch_size).

1# src/aiperf/config/flags/cli_config.py — add the field in its section block
2my_new_flag: Annotated[
3 int | None,
4 Field(
5 ge=1,
6 description="One-line user-facing description rendered in --help "
7 "and docs/cli-options.md. Mention units, defaults, and obvious "
8 "interactions with other flags.",
9 ),
10 CLIParameter(
11 name=("--my-new-flag",), # CLI flag name (independent of attr name)
12 group=Groups.LOAD_GENERATOR, # cyclopts --help group; pick from list below
13 ),
14] = None

Pick a Groups.X from src/aiperf/config/cli_parameter.py:

ENDPOINT, INPUT, FIXED_SCHEDULE, GOODPUT, OUTPUT, HTTP_TRACE, TOKENIZER, LOAD_GENERATOR, WARMUP, USER_CENTRIC, REQUEST_CANCELLATION, CONVERSATION_INPUT, ISL, OSL, PROMPT, PREFIX_PROMPT, RANKINGS, SYNTHESIS, AUDIO_INPUT, IMAGE_INPUT, VIDEO_INPUT, SERVICE, SERVER_METRICS, GPU_TELEMETRY, UI, WORKERS, ZMQ_COMMUNICATION, ACCURACY, MULTI_RUN.

If none fit, prefer adding a new Groups.X constant in src/aiperf/config/cli_parameter.py over reusing an unrelated group.

Then:

  1. Add the attr name to the appropriate <SECTION>_FIELDS frozenset in src/aiperf/config/flags/_section_fields.py so the resolver/converter can scope cli.model_fields_set & <SECTION>_FIELDS queries.
  2. If the flag maps to an existing AIPerfConfig key, add an entry to that section’s field map (e.g. _ENDPOINT_FIELD_MAP in _converter_endpoint.py). Otherwise, read it directly in the relevant _converter_*.py builder.
  3. Run make generate-cli-docs to regen docs/cli-options.md. Run make generate-env-vars-docs if you also added a corresponding env var.
  4. Add a unit test under tests/unit/config/ constructing CLIConfig(my_new_flag=...) and asserting the converter emits the right AIPerfConfig shape.
  5. The disjointedness invariant in tests/unit/config/v1/test_section_fields.py will catch any cross-section name collision automatically.

CLI flag DTO charter (enforced):

  • No validators on CLIConfig fields. BeforeValidator(parse_str_or_list) for CLI input coercion is fine; domain validation (range checks across fields, cross-field constraints) lives on AIPerfConfig, not CLIConfig.
  • The CLI-to-envelope converter is the only module outside cli_commands/ that may read CLIConfig attributes.

Service Pattern

Services run in separate processes via bootstrap.py:

1class MyService(BaseComponentService):
2 @on_message(MessageType.MY_MSG)
3 async def _handle(self, msg: MyMsg) -> None:
4 await self.publish(ResponseMsg(data=msg.data))

Register in plugins.yaml:

1service:
2 my_service:
3 class: aiperf.my_module.my_service:MyService
4 description: My custom service
5 metadata:
6 required: true
7 auto_start: true

Config types:

  • CLIConfig: unified CLI input DTO carrying both benchmark params (endpoints, loadgen) and service-runtime knobs (ZMQ ports, logging level)

Model Pattern

Use AIPerfBaseModel for data, BaseConfig for configuration:

1from pydantic import Field
2from aiperf.common.models import AIPerfBaseModel
3
4class Record(AIPerfBaseModel):
5 ts_ns: int = Field(description="Timestamp in nanoseconds")
6 value: float = Field(description="Measured value")

Message Pattern

Messages require message_type field and handler decorator:

1from aiperf.common.messages import Message, MessageTypeT
2from aiperf.common.hooks import on_message
3
4class MyMsg(Message):
5 message_type: MessageTypeT = MessageType.MY_MSG
6 data: list[Record] = Field(description="Records to process")
7
8# In service class:
9@on_message(MessageType.MY_MSG)
10async def _handle(self, msg: MyMsg) -> None:
11 await self.publish(OtherMsg(data=msg.data))

Auto-subscription happens during @on_init phase.

Plugin System Pattern

YAML-based registry with lazy-loading:

1# plugins.yaml
2endpoint:
3 chat:
4 class: aiperf.endpoints.openai_chat:ChatEndpoint
5 description: OpenAI Chat Completions endpoint
6 metadata:
7 endpoint_path: /v1/chat/completions
8 supports_streaming: true
9 produces_tokens: true
10 tokenizes_input: true
11 supports_audio: true
12 supports_images: true
13 supports_videos: true
14 metrics_title: LLM Metrics

Local GPU telemetry collectors declare themselves via is_local. Each collector class implements validate_environment() to surface missing native bindings before the benchmark starts; DCGM is a passthrough no-op.

1# plugins.yaml
2gpu_telemetry_collector:
3 my_local_gpu:
4 class: my_package.gpu:MyLocalGPUCollector
5 description: Local GPU telemetry collector using vendor Python bindings.
6 metadata:
7 is_local: true
1from aiperf.plugin import plugins
2from aiperf.plugin.enums import PluginType
3
4EndpointClass = plugins.get_class(PluginType.ENDPOINT, 'chat')

Error Handling Pattern

Log errors and publish ErrorDetails in messages:

1try:
2 await risky_operation()
3except Exception as e:
4 self.error(f"Operation failed: {e!r}")
5 await self.publish(ResultMsg(error=ErrorDetails.from_exception(e)))

Logging Pattern

Use lambda for expensive log messages:

1# Expensive - lambda defers evaluation
2self.debug(lambda: f"Processing {len(self._items())} items")
3
4# Cheap - direct string is fine
5self.info("Starting service")

NaN/Inf Discipline Pattern

NaN/+inf/-inf in metric data corrupts downstream artifacts in three ways: orjson.dumps (and Pydantic model_dump_json) silently coerce them to JSON null, which is indistinguishable from “metric was missing”; CSV writers emit literal "nan"/"inf" strings that pandas/duckdb parse inconsistently; and np.mean/np.std/polyfit poison downstream decision logic (Pareto fronts, BO acquisition maxima, plateau detectors) without raising.

The aiperf.common.finite module centralizes the discipline as four primitives. Use them at every numeric boundary.

FiniteFloat for Pydantic metric fields

1from pydantic import Field
2from aiperf.common.finite import FiniteFloat
3from aiperf.common.models import AIPerfBaseModel
4
5class MetricSummary(AIPerfBaseModel):
6 mean: FiniteFloat = Field(description="Sample mean (must be finite)")
7 std: FiniteFloat | None = Field(
8 default=None,
9 description="Sample stddev; None means insufficient samples",
10 )
11 p99: FiniteFloat | None = Field(
12 default=None,
13 description="99th percentile latency in ms; None means no samples",
14 )

The AfterValidator rejects NaN/+inf/-inf at config-load and model_validate time with a debuggable message. For finite-or-explicitly-missing semantics, use FiniteFloat | None — the validator only fires when a non-None value is provided.

scrub_non_finite before every JSON exporter

1import orjson
2from aiperf.common.finite import scrub_non_finite
3
4def export_records_json(records: list[Record], out_path: Path) -> None:
5 payload = {"records": [r.model_dump() for r in records]}
6 out_path.write_bytes(orjson.dumps(scrub_non_finite(payload)))

scrub_non_finite recursively walks dict/list/tuple containers and rewrites non-finite numeric values to None. It leaves str/bytes/bool alone and handles numpy scalar types correctly (numpy.float32, numpy.float64).

is_finite_value for the canonical finiteness check

1from aiperf.common.finite import is_finite_value
2
3def maybe_record_throughput(value: float) -> None:
4 if not is_finite_value(value):
5 self.warning(lambda: f"Skipping non-finite throughput: {value!r}")
6 return
7 self._records.append(value)

Use is_finite_value instead of math.isfinite or not math.isnan: isinstance(x, float) misses numpy scalar types on some numpy versions, and math.isfinite raises on non-numeric inputs.

nan_safe_mean / nan_safe_std for aggregation

1from aiperf.common.finite import nan_safe_mean, nan_safe_std
2
3# Partial-failure samples may contain NaN; np.mean would propagate.
4samples = [r.latency_ms for r in records] # may contain NaN
5mean = nan_safe_mean(samples) # None if no finite values
6std = nan_safe_std(samples, ddof=1) # None if < 2 finite values

Both functions return None (not NaN) when the input has too few finite values, so callers can distinguish “no data” from “data averaged to NaN”.

Don’t: the bug pattern these primitives prevent

1# WRONG: raw float field accepts NaN silently
2class BadSummary(AIPerfBaseModel):
3 p99: float = Field(description="99th percentile latency") # accepts NaN
4
5# WRONG: orjson silently coerces NaN/inf to JSON null
6out_path.write_bytes(orjson.dumps({"p99": float("nan")}))
7# Result on disk: {"p99": null} -- indistinguishable from "missing"
8
9# WRONG: np.mean propagates NaN through Pareto/BO downstream
10import numpy as np
11mean = float(np.mean([1.0, 2.0, float("nan")])) # NaN, poisons callers

Mechanical CI invariants in tests/unit/property/test_finite_invariants.py reject all three patterns for new code; see /aiperf/dev/architecture-internals/global-property-test-invariants for the full contract and the baseline-ratchet mechanism.

Safe Filesystem Reads Pattern

User-supplied filesystem paths reaching AIPerf (e.g. --extra-inputs payload_template=<path>, endpoint.template.body in a YAML config) must go through aiperf.common.path_safety.safe_read_template_path rather than inline Path(...).read_text() / open(...).read(). The helper is the canonical CWE-22 path-traversal sanitizer recognized by SAST tools — every inline read regenerates that finding.

What the helper does

1from aiperf.common.path_safety import safe_read_template_path
2
3body = safe_read_template_path(user_string)
4if body is None:
5 # safety check failed — caller picks the fallback semantic
6 body = user_string # template "path or inline" idiom
7 # or: raise ValueError(f"Template file not readable: {user_string!r}")

Sanitizer chain (in the order SAST engines walk it):

  1. Path(ts).expanduser() — catches TypeError / ValueError / RuntimeError (the last fires on unresolvable ~user prefixes).
  2. Reject if path or any component in path.parents is a symlink. resolve() alone is insufficient because it follows symlinked parent directories silently.
  3. path.resolve(strict=True) — the canonical sanitizer that Snyk/CodeQL/Semgrep recognize; raises on missing paths.
  4. Require resolved.is_file() — rejects directories, devices, fifos.
  5. read_text(encoding="utf-8") — explicit decode; no platform default. Catches UnicodeError alongside OSError so non-UTF-8 files fall back to the literal-string branch rather than crashing config conversion.

Returning None on any failure preserves the existing “treat as a literal value” fallback that both call sites (_converter_endpoint and TemplateEndpoint.__init__) already implement.

When this pattern does NOT apply

  • Path joining of trusted strings — Path(__file__).parent / "data.yaml", artifact_dir / "inputs.json". These never resolve untrusted input; no sanitizer needed.
  • Binary reads — open(p, "rb") for parquet/orjson/etc. The helper is UTF-8-text only. If a hardened binary variant is needed, add it to aiperf.common.path_safety alongside the existing helper rather than inlining read_bytes().
  • Reads where missing-file should hard-fail rather than fall back — the helper still works (returns None); the caller is responsible for raising instead of substituting a literal.

Testing Pattern

1import pytest
2from aiperf.plugin import plugins
3from aiperf.plugin.enums import PluginType
4from tests.harness import mock_plugin
5
6@pytest.mark.asyncio
7async def test_async_operation():
8 result = await some_async_func()
9 assert result.status == "ok"
10
11@pytest.mark.parametrize("input,expected",
12 [
13 ("a", 1),
14 ("b", 2),
15 ]
16) # fmt: skip
17def test_with_params(input, expected):
18 assert process(input) == expected
19
20def test_with_mock_plugin():
21 with mock_plugin(PluginType.ENDPOINT, "test", MockClass):
22 assert plugins.get_class(PluginType.ENDPOINT, "test") == MockClass

Auto-fixtures (always active): asyncio.sleep runs instantly, RNG=42, singletons reset.

Console Exporter Pattern

Console exporters subclass ConsoleMetricsExporter and configure rendering via class attributes — no method overrides required for the common case. The base class handles filtering, grouping, table construction, and printing; subclasses just declare what to show and when to run.

1# src/aiperf/exporters/internal_metrics_console_exporter.py — gated single-table
2class ConsoleInternalMetricsExporter(ConsoleMetricsExporter):
3 """Console exporter for INTERNAL framework metrics, gated on dev mode."""
4
5 title = "[yellow]NVIDIA AIPerf | Internal Metrics[/yellow]"
6 require_flags = MetricFlags.INTERNAL # records must have this flag
7 exclude_flags = MetricFlags.ERROR_ONLY # records with this flag are hidden
8 console_groups = None # single combined table; ignore groups
9
10 def _check_enabled(self, exporter_config: ExporterConfig) -> None:
11 if not (Environment.DEV.MODE and Environment.DEV.SHOW_INTERNAL_METRICS):
12 raise ConsoleExporterDisabled("Internal metrics are not enabled, ...")
Class attributeTypePurpose
title`strNone`
require_flagsMetricFlagsRecords must have ALL of these. Default MetricFlags.NONE (no requirement).
exclude_flagsMetricFlagsRecords with ANY of these are hidden. Default `ERROR_ONLY
console_groups`tuple[MetricConsoleGroup, …]None`
split_by_groupboolTrue → one table per non-empty group. False → single combined table.

Override _check_enabled(self, exporter_config) to raise ConsoleExporterDisabled when the exporter shouldn’t run (env var, user-config flag, dev mode). The base class no-ops (always-enabled). The flag-driven sibling exporters (ConsoleInternalMetricsExporter, ConsoleExperimentalMetricsExporter, HttpTraceConsoleExporter) follow this pattern verbatim — copy one of them as a starting point.

Uncertainty Plot Pattern

The latency-throughput uncertainty plot uses a one-data-contract, three-renderers architecture.

Data Contract

1from aiperf.plot.models.uncertainty import BenchmarkPoint, LatencyThroughputUncertaintyData
2
3point = BenchmarkPoint(
4 x_mean=10.0, y_mean=100.0,
5 x_ci_low=8.0, x_ci_high=12.0,
6 y_ci_low=90.0, y_ci_high=110.0,
7 cov_xy=5.0, # enables rotated ellipses; None for axis-aligned
8 label="concurrency=4",
9)
10data = LatencyThroughputUncertaintyData(
11 points=[point],
12 confidence_level=0.95,
13 title="Latency vs Throughput",
14 x_label="Latency (ms)",
15 y_label="Throughput (tok/s)",
16)

Multi-Series Data Contract

1from aiperf.plot.models.uncertainty import (
2 BenchmarkPoint, LatencyThroughputUncertaintyData, UncertaintySeries,
3)
4
5# One series per experiment variant (e.g., request_count=20 vs 50).
6# When `series` is non-empty it overrides `points`; see get_series().
7data = LatencyThroughputUncertaintyData(
8 series=[
9 UncertaintySeries(name="request_count=20", points=[
10 BenchmarkPoint(x_mean=5.0, y_mean=50.0, x_ci_low=4.0, x_ci_high=6.0,
11 y_ci_low=45.0, y_ci_high=55.0, label="c=2", n_runs=10),
12 BenchmarkPoint(x_mean=15.0, y_mean=120.0, x_ci_low=13.0, x_ci_high=17.0,
13 y_ci_low=110.0, y_ci_high=130.0, label="c=10", n_runs=8),
14 ]),
15 UncertaintySeries(name="request_count=50", points=[
16 BenchmarkPoint(x_mean=6.0, y_mean=48.0, x_ci_low=4.5, x_ci_high=7.5,
17 y_ci_low=42.0, y_ci_high=54.0, label="c=2", n_runs=10),
18 BenchmarkPoint(x_mean=18.0, y_mean=110.0, x_ci_low=15.0, x_ci_high=21.0,
19 y_ci_low=100.0, y_ci_high=120.0, label="c=10", n_runs=10),
20 ]),
21 ],
22 confidence_level=0.95,
23 title="Latency vs Throughput by Request Count",
24 x_label="Latency (ms)",
25 y_label="Throughput (tok/s)",
26)

Plotly Renderer (interactive + Kaleido PNG)

1from aiperf.plot.core.plot_generator import PlotGenerator
2
3pg = PlotGenerator()
4fig = pg.create_uncertainty_plot(data)
5fig.write_image("output.png") # Kaleido export

Matplotlib Renderer (code-gen reports)

1from aiperf.plot.exporters import export_uncertainty_matplotlib
2from pathlib import Path
3
4export_uncertainty_matplotlib(data, Path("output.png"))

Ellipse Geometry Utility

1from aiperf.plot.geometry import compute_ellipse_vertices, compute_axis_aligned_ellipse_vertices
2import numpy as np
3
4cov = np.array([[4.0, 1.0], [1.0, 9.0]])
5vertices = compute_ellipse_vertices(cov, center=(10.0, 100.0), confidence_level=0.95)
6# Returns list of (x, y) tuples forming a closed polygon

Plot Envelope (plot:)

AIPerfConfig accepts an optional top-level plot: key that fully describes which plots are rendered after the run. Two forms are supported:

1# Form A: bare-string path reference (resolved relative to the AIPerf YAML's directory)
2plot: ./plots/baseline.yaml
1# Form B: inline mapping (mirrors src/aiperf/plot/default_plot_config.yaml)
2plot:
3 visualization:
4 multi_run_defaults: [pareto_curve_throughput_per_gpu_vs_latency]
5 single_run_defaults: [ttft_over_time]
6 multi_run_plots:
7 pareto_curve_throughput_per_gpu_vs_latency:
8 type: pareto
9 x: {metric: request_latency, stat: avg}
10 y: {metric: output_token_throughput_per_gpu, stat: avg}
11 labels: [concurrency]
12 groups: [model]
13 single_run_plots:
14 ttft_over_time:
15 type: scatter
16 x: request_number
17 y: time_to_first_token
18 settings:
19 server_metrics_downsampling:
20 enabled: true
21 window_size_seconds: 5.0
22 aggregation_method: mean
23 experiment_classification:
24 baselines: ["*baseline*"]
25 treatments: ["*treatment*"]

When plot: is set, ~/.aiperf/plot_config.yaml is ignored and artifacts.auto_plot flips to True unless explicitly false. The auto-plot callback writes the resolved envelope to <artifact_dir>/.aiperf-plot-config.yaml as a reproducibility receipt, so aiperf plot <run> later picks it up automatically without needing the original AIPerf YAML. Pydantic models live in src/aiperf/config/plot.py.

Validator Pattern

Per-feature load-time validators (e.g. BranchOrchestrator v1) run from the end of dataset loaders. Unsupported constructs raise NotImplementedError with a <loc>: <reason> prefix where <loc> identifies the offending conversation/turn so misconfigurations surface before any credit is issued:

1# src/aiperf/common/validators/orchestrator_v1.py - gate convention
2raise NotImplementedError(
3 f"conversation '{conv.conversation_id}' turn {idx}: "
4 f"prerequisite kind '{prereq.kind}' not supported by v1 orchestrator"
5)

Endpoint Mixin Pattern

Reusable response-parsing behavior lives in mixins applied to endpoint classes:

1# src/aiperf/endpoints/raw_endpoint.py - composing a mixin
2class RawEndpoint(JMESPathResponseMixin, BaseEndpoint):
3 def __init__(self, model_endpoint: ModelEndpointInfo, **kwargs: Any) -> None:
4 super().__init__(model_endpoint, **kwargs)
5 self._init_response_parser()

The mixin in src/aiperf/endpoints/response_mixin.py compiles an optional endpoint.extra.response_field JMESPath query at construction time, with auto-detect fallback when the query fails or no JSON body is present.

Per-turn dataset extra

Custom dataset rows use extra for non-native request-body fields. Loaders map that user-facing field into internal Turn.extra_body. Every endpoint formatter that builds a JSON request body shallow-merges Turn.extra_body into the wire body at the very end of payload construction, AFTER model_endpoint.endpoint.extra. The merge is shallow dict.update; user-provided keys win on collision.

Rules new formatters and loaders must follow:

  • Dispatch-turn scoping. Endpoint formatters read turn.extra_body, turn.max_tokens, and turn.model from request_info.turns[-1] only. Parent turns earlier in the conversation history must never leak these request-control fields into a child payload, so DAG/FORK children stay clean of parent vendor knobs, limits, or model overrides.
  • Tools-as-system-prompt. Only raw_tools walks request_info.turns from the end via BaseEndpoint._latest_turn_attr. Tool definitions behave like a system prompt and persist across a multi-turn or FORK conversation when the dispatching turn does not redeclare them.
  • Dataset user-facing field is extra. Custom dataset row schemas (SingleTurn, inner MultiTurn turns, MooncakeTrace, DagTurn) declare a per-turn extra: dict[str, Any] | None. Loaders translate row.extra into Turn.extra_body at construction time. DagTurn uses Pydantic’s extra="forbid" so a typo’d extra_body is rejected at load time; the other dataset schemas are extra="allow" so an unrecognized extra_body is silently ignored — author the supported field instead.

Coverage:

  • Chat-style formatters with full history flattening (openai_chat, chat_embeddings via inheritance, openai_responses).
  • Single-turn formatters (openai_completions, openai_embeddings and nim_embeddings, openai_image_generation, openai_video_generation, openai_image_edit, nim_image_retrieval, huggingface_generate, solido_rag, the rankings family via BaseRankingsEndpoint, and template_endpoint).

huggingface_generate deliberately merges extra_body at the TOP level of the wire body (not nested under parameters).

openai_image_edit filters reserved keys (prompt, image, url, mask) out of both endpoint extras and extra_body to protect the multipart upload contract.

raw_endpoint intentionally skips this merge — it ships the user-authored Turn.raw_payload verbatim.

Strategy Protocol Pattern

The OTel results processor uses a strategy protocol to dispatch incoming data to specialised handlers. Each strategy declares what data it supports and processes matching records independently:

1from typing import Protocol, runtime_checkable
2
3from aiperf.common.messages.inference_messages import MetricRecordsData
4from aiperf.common.models import CreditPhaseStats
5
6OTelResultData = MetricRecordsData | CreditPhaseStats
7
8
9@runtime_checkable
10class OTelResultsStrategyProtocol(Protocol):
11 """Public extension point for new streamed OTel result domains.
12
13 A strategy owns exactly one ``OTelResultData`` variant and emits its
14 telemetry via ``OTelStrategyContextProtocol``. Strategies MUST NOT touch
15 OTel instruments, the fanout queue, or the MLflow client directly — the
16 context owns instrument lifecycle and cross-strategy state so fanout
17 stays consistent across strategies.
18 """
19
20 def supports(self, record_data: OTelResultData) -> bool:
21 """Return True iff ``record_data`` is the variant this strategy consumes.
22
23 Implementations use ``isinstance`` against a single concrete type —
24 strategies are mutually exclusive by record type.
25 """
26 ...
27
28 async def process(self, record_data: OTelResultData) -> None:
29 """Emit telemetry for ``record_data`` without blocking the hot path.
30
31 Instrument access goes through the context's ``get_or_create_*``
32 factories, which enqueue fanout events rather than touching the OTel
33 SDK inline. Raising is permitted; the processor is best-effort, so
34 the records manager logs and swallows the failure.
35 """
36 ...

Concrete strategies accept a context object at construction time and implement the two-method interface:

1from aiperf.post_processors.strategies.core import (
2 OTelResultData,
3 OTelResultsStrategyProtocol,
4 OTelStrategyContextProtocol,
5)
6
7
8class MetricResultsStrategy(OTelResultsStrategyProtocol):
9 """Streams per-request metric records as histogram observations."""
10
11 def __init__(self, context: OTelStrategyContextProtocol) -> None:
12 self._context = context
13
14 def supports(self, record_data: OTelResultData) -> bool:
15 return isinstance(record_data, MetricRecordsData)
16
17 async def process(self, record_data: OTelResultData) -> None:
18 # Emit histogram observations for each metric in the record.
19 ...
20
21
22class TimingResultsStrategy(OTelResultsStrategyProtocol):
23 """Streams phase-level timing snapshots using counters and gauges."""
24
25 def __init__(self, context: OTelStrategyContextProtocol) -> None:
26 self._context = context
27
28 def supports(self, record_data: OTelResultData) -> bool:
29 return isinstance(record_data, CreditPhaseStats)
30
31 async def process(self, record_data: OTelResultData) -> None:
32 # Emit counter deltas and gauge snapshots for timing data.
33 ...

The processor iterates registered strategies on each incoming record:

1for strategy in self._strategies:
2 if strategy.supports(record_data):
3 await strategy.process(record_data)

Conventions:

  • One strategy class per file under post_processors/strategies/.
  • supports() uses isinstance checks — no dynamic dispatch tables.
  • OTelStrategyContextProtocol exposes instrument factories (get_or_create_histogram, etc.) so strategies never construct OTel instruments directly.

Drop-Oldest Fanout Queue

OTelMetricsResultsProcessor fans out metric events to a dedicated child process via a bounded multiprocessing.Queue. The queue uses drop-oldest semantics so the hot path (the main benchmark loop) is never blocked by a slow downstream consumer.

Queue sizing:

1import multiprocessing as mp
2from aiperf.common.environment import Environment
3
4event_queue = mp.Queue(maxsize=Environment.OTEL.MAX_BUFFERED_RECORDS) # default 10 000

Backpressure algorithm:

  1. Attempt queue.put_nowait(event).
  2. On queue.Full, call queue.get_nowait() to discard the oldest event.
  3. Retry queue.put_nowait(event) once.
  4. If the retry also fails, increment _fanout_dropped_events and log at thresholds (1, 100, 1 000 drops).
1from queue import Empty, Full
2
3def _queue_fanout_event(self, event_type: str, payload: dict[str, Any]) -> None:
4 """Enqueue streaming event for the fanout process without blocking the event loop."""
5 if self._fanout_queue is None:
6 return
7
8 event = {"type": event_type, "payload": payload}
9 try:
10 self._fanout_queue.put_nowait(event)
11 self._fanout_sent_events += 1
12 except Full:
13 if self._drop_oldest_fanout_event():
14 try:
15 self._fanout_queue.put_nowait(event)
16 self._fanout_sent_events += 1
17 return
18 except Full:
19 pass
20 self._record_fanout_drop(
21 "OTel fanout queue remained full; dropping newest event"
22 )
23 except Exception as exc:
24 self.warning(f"Failed to enqueue OTel fanout event: {exc!r}")

Design rationale:

  • The benchmark hot path must never block on telemetry I/O.
  • Dropping the oldest event (rather than the newest) preserves the most recent state, which is more useful for live dashboards.
  • The counter _fanout_dropped_events is reported at shutdown so operators can tune AIPERF_OTEL_MAX_BUFFERED_RECORDS if drops are frequent.