Source code for nemo_evaluator.adapters.adapter_config

# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Submodule responsible for the configuration related specifically to adapters.

For the visibility reasons, we don't expose adapter configuration via CLI. All the
adaptor config comes from the framework configuration yaml under
```yaml
target:
  api_endpoint:
    adapter_config:
      discovery:
        modules: ["mod.a.b.c", ...]
        dirs: ["/some/path"]
      interceptors: []
      post_eval_hooks: []
      endpoint_type: "chat"  # default: "chat"
      caching_dir: "/some/dir"  # default: null
      generate_html_report: true  # default: true
      log_failed_requests: false  # default: false
      tracking_requests_stats: true  # default: true
      html_report_size: 5  # default: 5
```

This module merely takes such a dict and translates it into a typed dataclass.
"""

from typing import Any

from pydantic import BaseModel, Field


[docs] class DiscoveryConfig(BaseModel): """Configuration for discovering 3rd party modules and directories""" modules: list[str] = Field( description="List of module paths to discover", default_factory=list, ) dirs: list[str] = Field( description="List of directory paths to discover", default_factory=list, )
[docs] class InterceptorConfig(BaseModel): """Configuration for a single interceptor""" name: str = Field(description="Name of the interceptor to use") enabled: bool = Field( description="Whether this interceptor is enabled", default=True ) config: dict[str, Any] = Field( description="Configuration for the interceptor", default_factory=dict )
[docs] class PostEvalHookConfig(BaseModel): """Configuration for a single post-evaluation hook""" name: str = Field(description="Name of the post-evaluation hook to use") enabled: bool = Field( description="Whether this post-evaluation hook is enabled", default=True ) config: dict[str, Any] = Field( description="Configuration for the post-evaluation hook", default_factory=dict )
[docs] class Config: use_enum_values = True
[docs] class AdapterConfig(BaseModel): """Adapter configuration with registry-based interceptor support""" discovery: DiscoveryConfig = Field( description="Configuration for discovering 3rd party modules and directories", default_factory=DiscoveryConfig, ) interceptors: list[InterceptorConfig] = Field( description="List of interceptors to use with their configurations", default_factory=list, ) post_eval_hooks: list[PostEvalHookConfig] = Field( description="List of post-evaluation hooks to use with their configurations", default_factory=list, ) endpoint_type: str = Field( description="Type of the endpoint to run the adapter for", default="chat", ) caching_dir: str | None = Field( description="Directory for caching responses (legacy field)", default=None, ) generate_html_report: bool = Field( description="Whether to generate HTML report (legacy field)", default=True, ) log_failed_requests: bool = Field( description="Whether to log failed requests (legacy field)", default=False, ) tracking_requests_stats: bool = Field( description="Whether to enable request statistics tracking. When enabled, response statistics including token usage, status codes, finish reasons, tool calls, and latency metrics will be collected and added to eval_factory_metrics.json for comprehensive evaluation analysis.", default=True, ) html_report_size: int | None = Field( description="Number of request-response pairs to track in HTML report. If this is larger than max_saved_responses or max_saved_requests, it will override those values.", default=5, )
[docs] @classmethod def get_legacy_defaults(cls) -> dict[str, Any]: """Get default values for legacy configuration parameters.""" return { "generate_html_report": cls.model_fields["generate_html_report"].default, "html_report_size": cls.model_fields["html_report_size"].default, "tracking_requests_stats": cls.model_fields[ "tracking_requests_stats" ].default, "log_failed_requests": cls.model_fields["log_failed_requests"].default, "endpoint_type": cls.model_fields["endpoint_type"].default, # Boolean defaults for optional features "use_caching": True, "save_responses": False, "save_requests": False, "use_system_prompt": False, "use_omni_info": False, "use_request_logging": False, "use_nvcf": False, "use_response_logging": False, "use_reasoning": False, "use_progress_tracking": False, "use_raise_client_errors": False, "include_json": True, "custom_system_prompt": None, "caching_dir": None, "output_dir": None, "params_to_add": None, "params_to_remove": None, "params_to_rename": None, "max_logged_requests": None, "max_logged_responses": None, "max_saved_requests": None, "max_saved_responses": None, "start_reasoning_token": None, "include_if_reasoning_not_finished": None, "track_reasoning": None, "end_reasoning_token": "</think>", "progress_tracking_url": None, "progress_tracking_interval": 1, "logging_aggregated_stats_interval": 100, }
[docs] @classmethod def get_validated_config(cls, run_config: dict[str, Any]) -> "AdapterConfig": """Extract and validate adapter configuration from run_config. Args: run_config: The run configuration dictionary Returns: AdapterConfig instance if adapter_config is present in run_config, None otherwise Raises: ValueError: If adapter_config is present but invalid """ def merge_discovery( global_discovery: dict[str, Any], local_discovery: dict[str, Any] ) -> dict[str, Any]: """Merge global and local discovery configs.""" return { "modules": global_discovery.get("modules", []) + local_discovery.get("modules", []), "dirs": global_discovery.get("dirs", []) + local_discovery.get("dirs", []), } global_cfg = run_config.get("global_adapter_config", {}) local_cfg = ( run_config.get("target", {}).get("api_endpoint", {}).get("adapter_config") ) if not global_cfg and not local_cfg: # Create default adapter config with caching enabled by default return cls.from_legacy_config({}, run_config) merged = dict(global_cfg) if global_cfg else {} if local_cfg: local_discovery = local_cfg.get("discovery") global_discovery = merged.get("discovery") if local_discovery and global_discovery: merged["discovery"] = merge_discovery(global_discovery, local_discovery) # Add/override other local fields for k, v in local_cfg.items(): if k != "discovery": merged[k] = v else: merged.update(local_cfg) # Syntactic sugar, we allow `interceptors` list in non-typed (pre-validation) # `adapter_config` to contain also plain strings, which will be treated # as `name: <this string>` if isinstance(merged.get("interceptors"), list): merged["interceptors"] = [ {"name": s} if isinstance(s, str) else s for s in merged["interceptors"] ] # Syntactic sugar for post_eval_hooks as well if isinstance(merged.get("post_eval_hooks"), list): merged["post_eval_hooks"] = [ {"name": s} if isinstance(s, str) else s for s in merged["post_eval_hooks"] ] try: config = cls(**merged) # If no interceptors are configured, try to convert from legacy format if not config.interceptors: config = cls.from_legacy_config(merged, run_config) return config except Exception as e: raise ValueError(f"Invalid adapter configuration: {e}") from e
@staticmethod def _get_default_output_dir( legacy_config: dict[str, Any], run_config: dict[str, Any] | None = None ) -> str | None: """Get default output directory based on configuration priority. Args: legacy_config: Legacy configuration dictionary run_config: Full run configuration dictionary (optional) Returns: output directory path based on priority: legacy_config.output_dir > run_config.config.output_dir > None """ # First try legacy_config, but handle KeyError if not present output_dir = legacy_config.get("output_dir") if output_dir is None and run_config: output_dir = run_config.get("config", {}).get("output_dir") return output_dir @staticmethod def _get_default_cache_dir( legacy_config: dict[str, Any], run_config: dict[str, Any] | None = None, subdir: str = "cache", ) -> str: """Get default cache directory based on configuration priority. Args: legacy_config: Legacy configuration dictionary run_config: Full run configuration dictionary (optional) subdir: Subdirectory name to append to output_dir (default: "cache") Returns: cache directory path based on priority: caching_dir > output_dir/{subdir} > /tmp/{subdir} """ # First try caching_dir from legacy config cache_dir = legacy_config["caching_dir"] if cache_dir is not None: return f"{cache_dir}/{subdir}" # Fallback to output_dir/{subdir} output_dir = AdapterConfig._get_default_output_dir(legacy_config, run_config) if output_dir: return f"{output_dir}/{subdir}" # Final fallback to /tmp/{subdir} return f"/tmp/{subdir}"
[docs] @classmethod def from_legacy_config( cls, legacy_config: dict[str, Any], run_config: dict[str, Any] | None = None ) -> "AdapterConfig": """Convert legacy configuration to new interceptor-based format. Args: legacy_config: Legacy configuration dictionary run_config: Full run configuration dictionary (optional, used to extract output_dir) Returns: AdapterConfig instance with interceptors based on legacy config """ # Merge legacy config with defaults to avoid repeated .get() calls defaults = cls.get_legacy_defaults() legacy_config = {**defaults, **legacy_config} interceptors = [] post_eval_hooks = [] # Add system message interceptor if custom system prompt is specified (Request) if ( legacy_config["use_system_prompt"] and legacy_config["custom_system_prompt"] is not None ): interceptors.append( InterceptorConfig( name="system_message", enabled=True, config={ "system_message": legacy_config["custom_system_prompt"], }, ) ) # Add payload modifier interceptor if any payload modification parameters are specified (RequestToResponse) params_to_add = legacy_config["params_to_add"] params_to_remove = legacy_config["params_to_remove"] params_to_rename = legacy_config["params_to_rename"] if params_to_add or params_to_remove or params_to_rename: config = {} if params_to_add: config["params_to_add"] = params_to_add if params_to_remove: config["params_to_remove"] = params_to_remove if params_to_rename: config["params_to_rename"] = params_to_rename interceptors.append( InterceptorConfig( name="payload_modifier", enabled=True, config=config, ) ) # Add omni info interceptor if specified (Request) if legacy_config["use_omni_info"]: interceptors.append( InterceptorConfig( name="omni_info", enabled=True, config={ "output_dir": cls._get_default_output_dir( legacy_config, run_config ), }, ) ) # Convert legacy fields to interceptors (Request) if legacy_config["use_request_logging"]: config = { "output_dir": cls._get_default_output_dir(legacy_config, run_config) } if legacy_config["max_logged_requests"] is not None: config["max_requests"] = legacy_config["max_logged_requests"] interceptors.append( InterceptorConfig( name="request_logging", config=config, ) ) # Add caching interceptor (RequestToResponse) # Activate if ANY of these are set: reuse_cached, save_responses, save_requests, generate_html_report # For caching interceptor, use caching_dir directly if provided, otherwise use output_dir/cache if legacy_config["caching_dir"] is not None: cache_dir = legacy_config["caching_dir"] else: # Use output_dir/cache if output_dir exists, otherwise /tmp/cache output_dir = AdapterConfig._get_default_output_dir( legacy_config, run_config ) if output_dir: cache_dir = f"{output_dir}/cache" else: cache_dir = "/tmp/cache" # Values are now available directly from legacy_config (merged with defaults) generate_html_report = legacy_config["generate_html_report"] max_html_report_size = legacy_config["html_report_size"] # Check if caching should be activated should_activate = any( [ legacy_config["use_caching"], legacy_config["save_responses"], legacy_config["save_requests"], generate_html_report, ] ) if should_activate: # Determine save settings based on generate_html_report if generate_html_report: save_requests = True save_responses = True if max_html_report_size is not None: # Handle None values in max() by filtering them out max_saved_requests_values = [max_html_report_size] max_saved_responses_values = [max_html_report_size] if legacy_config["max_saved_requests"] is not None: max_saved_requests_values.append( legacy_config["max_saved_requests"] ) if legacy_config["max_saved_responses"] is not None: max_saved_responses_values.append( legacy_config["max_saved_responses"] ) max_saved_requests = max(max_saved_requests_values) max_saved_responses = max(max_saved_responses_values) else: max_saved_requests = legacy_config["max_saved_requests"] max_saved_responses = legacy_config["max_saved_responses"] else: save_requests = legacy_config["save_requests"] save_responses = legacy_config["save_responses"] max_saved_requests = legacy_config["max_saved_requests"] max_saved_responses = legacy_config["max_saved_responses"] config = { "cache_dir": cache_dir, "reuse_cached_responses": legacy_config["use_caching"], "save_requests": save_requests, "save_responses": save_responses, } if max_saved_requests is not None: config["max_saved_requests"] = max_saved_requests if max_saved_responses is not None: config["max_saved_responses"] = max_saved_responses interceptors.append( InterceptorConfig( name="caching", enabled=True, config=config, ) ) # Add the final request interceptor - either nvcf or endpoint if legacy_config["use_nvcf"]: interceptors.append( InterceptorConfig( name="nvcf", enabled=True, config={}, ) ) else: # Only add endpoint if nvcf is not used interceptors.append(InterceptorConfig(name="endpoint")) # Add response stats interceptor right after endpoint if tracking is enabled # Default to True if not explicitly set in legacy config if legacy_config["tracking_requests_stats"]: # Use caching_dir if provided, otherwise use output_dir/response_stats_cache cache_dir = cls._get_default_cache_dir( legacy_config, run_config, "response_stats_cache" ) config = { "cache_dir": cache_dir, "logging_aggregated_stats_interval": legacy_config[ "logging_aggregated_stats_interval" ], } interceptors.append( InterceptorConfig( name="response_stats", enabled=True, config=config, ) ) if legacy_config["use_response_logging"]: config = { "output_dir": cls._get_default_output_dir(legacy_config, run_config) } if legacy_config["max_logged_responses"] is not None: config["max_responses"] = legacy_config["max_logged_responses"] interceptors.append( InterceptorConfig( name="response_logging", config=config, ) ) if legacy_config["use_reasoning"]: config = { "end_reasoning_token": legacy_config["end_reasoning_token"], } if legacy_config["start_reasoning_token"] is not None: config["start_reasoning_token"] = legacy_config["start_reasoning_token"] if legacy_config["include_if_reasoning_not_finished"] is not None: config["include_if_not_finished"] = legacy_config[ "include_if_reasoning_not_finished" ] if legacy_config["track_reasoning"] is not None: config["enable_reasoning_tracking"] = legacy_config["track_reasoning"] # Enable caching for reasoning interceptor when tracking requests stats # Default to True if not explicitly set in legacy config if legacy_config["tracking_requests_stats"]: config["save_individuals"] = True # Use caching_dir if provided, otherwise use output_dir/reasoning_stats_cache cache_dir = cls._get_default_cache_dir( legacy_config, run_config, "reasoning_stats_cache" ) config["cache_dir"] = cache_dir # Add logging interval for aggregated stats config["logging_aggregated_stats_interval"] = legacy_config[ "logging_aggregated_stats_interval" ] interceptors.append( InterceptorConfig( name="reasoning", config=config, ) ) if legacy_config["use_progress_tracking"] or legacy_config["output_dir"]: config = { "progress_tracking_interval": legacy_config[ "progress_tracking_interval" ], "request_method": "POST", # Legacy method uses POST "output_dir": cls._get_default_output_dir(legacy_config, run_config), } if legacy_config["progress_tracking_url"] is not None: config["progress_tracking_url"] = legacy_config["progress_tracking_url"] interceptors.append( InterceptorConfig( name="progress_tracking", config=config, ) ) # Add raise client errors interceptor if specified (Response) if legacy_config["use_raise_client_errors"]: # Get default values from the interceptor's Params class from nemo_evaluator.adapters.interceptors.raise_client_error_interceptor import ( RaiseClientErrorInterceptor, ) from nemo_evaluator.logging import get_logger logger = get_logger(__name__) default_params = RaiseClientErrorInterceptor.Params() interceptors.append( InterceptorConfig( name="raise_client_errors", enabled=True, config={}, ) ) logger.warning( "RaiseClientErrorInterceptor configured with default values. " f"This will raise exceptions for 4xx status codes ({default_params.status_code_range_start}-{default_params.status_code_range_end}) " f"excluding {default_params.exclude_status_codes}. " "Consider explicitly configuring the interceptor parameters for your specific use case." ) # Convert legacy HTML report generation to post-eval hook # Value is now available directly from legacy_config (merged with defaults) generate_html_report = legacy_config["generate_html_report"] if generate_html_report: report_types = ["html"] if legacy_config["include_json"]: report_types.append("json") post_eval_hooks.append( PostEvalHookConfig( name="post_eval_report", enabled=True, config={ "report_types": report_types, "html_report_size": legacy_config["html_report_size"], }, ) ) return cls( interceptors=interceptors, post_eval_hooks=post_eval_hooks, endpoint_type=legacy_config["endpoint_type"], caching_dir=legacy_config["caching_dir"], generate_html_report=legacy_config["generate_html_report"], log_failed_requests=legacy_config["log_failed_requests"], tracking_requests_stats=legacy_config["tracking_requests_stats"], html_report_size=legacy_config["html_report_size"], )
[docs] def get_interceptor_configs(self) -> dict[str, dict[str, Any]]: """Get interceptor configurations as a dictionary""" return {ic.name: ic.config for ic in self.interceptors if ic.enabled}
[docs] def get_post_eval_hook_configs(self) -> dict[str, dict[str, Any]]: """Get post-evaluation hook configurations as a dictionary""" return {hook.name: hook.config for hook in self.post_eval_hooks if hook.enabled}