nat.plugins.customizer#

NeMo Customizer plugin for NAT finetuning.

This plugin provides trajectory builders and trainer adapters for finetuning workflows using NeMo Customizer backend.

Available components: - DPO Trajectory Builder: Collects preference pairs from scored TTC candidates - NeMo Customizer TrainerAdapter: Submits DPO/SFT jobs to NeMo Customizer

Submodules#

Classes#

DPOSpecificHyperparameters

DPO-specific hyperparameters for NeMo Customizer.

DPOTrajectoryBuilder

Trajectory builder for DPO (Direct Preference Optimization) training.

DPOTrajectoryBuilderConfig

Configuration for the DPO (Direct Preference Optimization) Trajectory Builder.

NeMoCustomizerHyperparameters

Hyperparameters for NeMo Customizer training jobs.

NeMoCustomizerTrainerAdapter

TrainerAdapter for NeMo Customizer backend.

NeMoCustomizerTrainerAdapterConfig

Configuration for the NeMo Customizer TrainerAdapter.

NIMDeploymentConfig

Configuration for NIM deployment after training.

Package Contents#

class DPOSpecificHyperparameters(/, **data: Any)#

Bases: pydantic.BaseModel

DPO-specific hyperparameters for NeMo Customizer.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

ref_policy_kl_penalty: float = None#
preference_loss_weight: float = None#
preference_average_log_probs: bool = None#
sft_loss_weight: float = None#
class DPOTrajectoryBuilder(
trajectory_builder_config: nat.plugins.customizer.dpo.config.DPOTrajectoryBuilderConfig,
)#

Bases: nat.finetuning.interfaces.trajectory_builder.TrajectoryBuilder

Trajectory builder for DPO (Direct Preference Optimization) training.

This builder collects preference pairs from workflows that produce TTC_END intermediate steps with TTCEventData. It uses the structured data model to extract turn_id, candidate_index, score, input (prompt), and output.

Key features: - Uses TTCEventData model directly (no brittle dictionary key configuration) - Supports prompts as strings or list of OpenAIMessage - Exhaustive or best-vs-worst pair generation modes - Configurable score difference filtering - Grouping by example for curriculum learning - Builds trajectories with DPOItem episodes

Example workflow integration:

trajectory_builders:
  dpo_builder:
    _type: dpo_traj_builder
    ttc_step_name: dpo_candidate_move
    exhaustive_pairs: true
    min_score_diff: 0.05

Initialize the DPO Trajectory Builder.

Args:

trajectory_builder_config: Configuration for the builder.

config: nat.plugins.customizer.dpo.config.DPOTrajectoryBuilderConfig#
evaluation_runs: dict[str, asyncio.Task[nat.eval.config.EvaluationRunOutput]]#
_metrics: dict[str, Any]#
async start_run(run_id: str, meta: dict | None = None) None#

Start a single evaluation run to collect intermediate steps.

Args:

run_id: Unique identifier for this run. meta: Optional metadata for the run.

Raises:

ValueError: If a run with this ID is already in progress.

async finalize(
run_id: str,
meta: dict | None = None,
) nat.data_models.finetuning.TrajectoryCollection#

Wait for evaluation, collect TTC steps, and build DPO trajectories.

This method: 1. Waits for the evaluation run to complete 2. Collects and groups candidates by turn_id using TTCEventData 3. Generates preference pairs 4. Builds trajectories with DPOItem episodes 5. Groups trajectories by example for curriculum learning

Args:

run_id: Unique identifier for the run. meta: Optional metadata for the run.

Returns:

TrajectoryCollection with DPO preference trajectories.

Raises:

ValueError: If no run with this ID exists.

log_progress(
run_id: str,
metrics: dict[str, Any],
output_dir: str | None = None,
) None#

Log trajectory building progress.

Args:

run_id: The training run ID. metrics: Dictionary of metrics to log. output_dir: Optional output directory override.

_collect_candidates(
eval_result: nat.eval.config.EvaluationRunOutput,
) dict[str, list[CandidateStep]]#

Extract TTC_END intermediate steps and group by turn_id.

This method: 1. Iterates through all evaluation input items 2. Filters for TTC_END steps with the configured name 3. Extracts data from TTCEventData model directly 4. Groups candidates by (example_id, turn_id)

Args:

eval_result: The evaluation run output.

Returns:

Dictionary mapping turn keys to lists of candidates.

_is_target_step(
step: nat.data_models.intermediate_step.IntermediateStep,
) bool#

Check if an intermediate step is a target TTC step.

Args:

step: The intermediate step to check.

Returns:

True if this is a TTC_END step with the configured name.

_parse_candidate(
example_id: str,
step: nat.data_models.intermediate_step.IntermediateStep,
) CandidateStep | None#

Parse a CandidateStep from a TTC intermediate step using TTCEventData.

Args:

example_id: The example ID this step belongs to. step: The intermediate step to parse.

Returns:

CandidateStep if parsing succeeds, None otherwise.

_extract_prompt(input_data: Any) PromptType#

Extract prompt from TTCEventData.input.

Handles both string prompts and list of OpenAIMessage.

Args:

input_data: The input field from TTCEventData.

Returns:

String prompt or list of OpenAIMessage.

_generate_preference_pairs(
candidates_by_turn: dict[str, list[CandidateStep]],
) list[PreferencePair]#

Generate preference pairs from grouped candidates.

If exhaustive_pairs=True:

For candidates [A, B, C] with scores [0.9, 0.7, 0.5]: Pairs: (A>B), (A>C), (B>C) - all pairwise comparisons

If exhaustive_pairs=False:

For candidates [A, B, C] with scores [0.9, 0.7, 0.5]: Pairs: (A>C) only - best vs worst

Args:

candidates_by_turn: Dictionary mapping turn keys to candidate lists.

Returns:

List of preference pairs.

_generate_exhaustive_pairs(
sorted_candidates: list[CandidateStep],
) list[PreferencePair]#

Generate all pairwise comparisons where score(chosen) > score(rejected).

Args:

sorted_candidates: Candidates sorted by score (descending).

Returns:

List of preference pairs, sorted by score difference (descending).

_generate_best_vs_worst_pair(
sorted_candidates: list[CandidateStep],
) list[PreferencePair]#

Generate a single pair: best candidate vs worst candidate.

Args:

sorted_candidates: Candidates sorted by score (descending).

Returns:

List with at most one preference pair.

_build_trajectories(
pairs: list[PreferencePair],
) list[nat.data_models.finetuning.Trajectory]#

Convert preference pairs to Trajectory format with DPOItem episodes.

Each trajectory contains: - episode: [DPOItem] with prompt, chosen_response, rejected_response - reward: score_diff (if reward_from_score_diff) or chosen_score - metadata: Contains pair information for tracking

Args:

pairs: List of preference pairs.

Returns:

List of trajectories with DPOItem episodes.

_group_by_example(
trajectories: list[nat.data_models.finetuning.Trajectory],
) list[list[nat.data_models.finetuning.Trajectory]]#

Group trajectories by example ID for curriculum learning.

This grouping enables: - Filtering by average reward per example - Expansion from easy to hard examples

Args:

trajectories: List of trajectories to group.

Returns:

List of trajectory lists, where each inner list contains trajectories for one example.

class DPOTrajectoryBuilderConfig(/, **data: Any)#

Bases: nat.data_models.finetuning.TrajectoryBuilderConfig

Configuration for the DPO (Direct Preference Optimization) Trajectory Builder.

This builder collects preference pairs from workflows that produce TTC_END intermediate steps with TTCEventData. It uses the structured TTCEventData model to extract turn_id, candidate_index, score, input (prompt), and output (response) - no dictionary key configuration needed.

The builder groups candidates by turn_id and creates preference pairs based on score differences.

Example YAML configuration:

trajectory_builders:
  dpo_builder:
    _type: dpo_traj_builder
    ttc_step_name: dpo_candidate_move
    exhaustive_pairs: true
    min_score_diff: 0.05
    max_pairs_per_turn: 5

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

ttc_step_name: str = None#
exhaustive_pairs: bool = None#
min_score_diff: float = None#
max_pairs_per_turn: int | None = None#
reward_from_score_diff: bool = None#
require_multiple_candidates: bool = None#
validate_config() DPOTrajectoryBuilderConfig#

Validate configuration consistency.

class NeMoCustomizerHyperparameters(/, **data: Any)#

Bases: pydantic.BaseModel

Hyperparameters for NeMo Customizer training jobs.

These map to the hyperparameters argument in client.customization.jobs.create().

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

training_type: Literal['sft', 'dpo'] = None#
finetuning_type: Literal['lora', 'all_weights'] = None#
epochs: int = None#
batch_size: int = None#
learning_rate: float = None#
dpo: DPOSpecificHyperparameters = None#
class NeMoCustomizerTrainerAdapter(
adapter_config: nat.plugins.customizer.dpo.config.NeMoCustomizerTrainerAdapterConfig,
)#

Bases: nat.finetuning.interfaces.trainer_adapter.TrainerAdapter

TrainerAdapter for NeMo Customizer backend.

This adapter: 1. Converts trajectories to JSONL format for DPO training 2. Uploads datasets to NeMo Datastore via HuggingFace Hub API 3. Submits customization jobs to NeMo Customizer 4. Monitors job progress and status 5. Optionally deploys trained models

adapter_config: nat.plugins.customizer.dpo.config.NeMoCustomizerTrainerAdapterConfig#
_entity_client: nemo_microservices.NeMoMicroservices | None = None#
_hf_api: huggingface_hub.HfApi | None = None#
_active_jobs: dict[str, str]#
_job_output_models: dict[str, str]#
property entity_client: nemo_microservices.NeMoMicroservices#

Lazy initialization of NeMo Microservices client.

property hf_api: huggingface_hub.HfApi#

Lazy initialization of HuggingFace API client.

async initialize(
run_config: nat.data_models.finetuning.FinetuneConfig,
) None#

Initialize the trainer adapter.

async _ensure_namespaces_exist() None#

Create namespaces in entity store and datastore if they don’t exist.

async is_healthy() bool#

Check if NeMo Customizer services are reachable.

_format_prompt(
prompt: list[nat.data_models.finetuning.OpenAIMessage] | str,
) list[dict[str, str]] | str#

Format prompt based on configuration.

Args:

prompt: Original prompt (string or list of OpenAI messages)

Returns:

Formatted prompt based on use_full_message_history setting

_trajectory_to_dpo_jsonl(
trajectories: nat.data_models.finetuning.TrajectoryCollection,
) tuple[str, str]#

Convert trajectory collection to JSONL format for DPO training.

Returns:

Tuple of (training_jsonl, validation_jsonl) content strings

async _setup_dataset(
run_id: str,
training_jsonl: str,
validation_jsonl: str,
) str#

Create dataset repository and upload JSONL files.

Args:

run_id: Unique identifier for this training run training_jsonl: Training data in JSONL format validation_jsonl: Validation data in JSONL format

Returns:

Repository ID for the created dataset

async submit(
trajectories: nat.data_models.finetuning.TrajectoryCollection,
) nat.data_models.finetuning.TrainingJobRef#

Submit trajectories for training.

Args:

trajectories: Collection of trajectories containing DPO items

Returns:

Reference to the submitted training job

async status(
ref: nat.data_models.finetuning.TrainingJobRef,
) nat.data_models.finetuning.TrainingJobStatus#

Get the status of a training job.

async wait_until_complete(
ref: nat.data_models.finetuning.TrainingJobRef,
poll_interval: float | None = None,
) nat.data_models.finetuning.TrainingJobStatus#

Wait for training job to complete.

async _deploy_model(ref: nat.data_models.finetuning.TrainingJobRef) None#

Deploy the trained model and wait until deployment is ready.

async _wait_for_deployment_ready(
namespace: str,
deployment_name: str,
poll_interval: float | None = None,
timeout: float | None = None,
) None#

Wait for a model deployment to become ready.

Args:

namespace: Namespace of the deployment deployment_name: Name of the deployment poll_interval: Seconds between status checks (default: adapter config poll_interval_seconds) timeout: Maximum seconds to wait (default: adapter config deployment_timeout_seconds)

log_progress(
ref: nat.data_models.finetuning.TrainingJobRef,
metrics: dict[str, Any],
output_dir: str | None = None,
) None#

Log training progress to file.

class NeMoCustomizerTrainerAdapterConfig(/, **data: Any)#

Bases: nat.data_models.finetuning.TrainerAdapterConfig

Configuration for the NeMo Customizer TrainerAdapter.

This adapter submits DPO/SFT training jobs to NeMo Customizer and optionally deploys the trained model.

Example YAML configuration:

trainer_adapters:
  nemo_customizer:
    _type: nemo_customizer_trainer_adapter
    entity_host: https://nmp.example.com
    datastore_host: https://datastore.example.com
    namespace: my-project
    customization_config: meta/llama-3.2-1b-instruct@v1.0.0+A100
    hyperparameters:
      training_type: dpo
      epochs: 5
      batch_size: 8
    use_full_message_history: true
    deploy_on_completion: true

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

entity_host: str = None#
datastore_host: str = None#
hf_token: str = None#
namespace: str = None#
dataset_name: str = None#
dataset_output_dir: str | None = None#
create_namespace_if_missing: bool = None#
customization_config: str = None#
hyperparameters: NeMoCustomizerHyperparameters = None#
use_full_message_history: bool = None#
deploy_on_completion: bool = None#
deployment_config: NIMDeploymentConfig = None#
poll_interval_seconds: float = None#
deployment_timeout_seconds: float = None#
max_consecutive_status_failures: int = None#
validate_config() NeMoCustomizerTrainerAdapterConfig#

Validate configuration consistency.

class NIMDeploymentConfig(/, **data: Any)#

Bases: pydantic.BaseModel

Configuration for NIM deployment after training.

These settings are used when deploy_on_completion is True.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

image_name: str = None#
image_tag: str = None#
gpu: int = None#
deployment_name: str | None = None#
description: str = None#