nat.plugins.customizer.dpo#

DPO (Direct Preference Optimization) components for NAT.

This module provides: - DPO Trajectory Builder: Collects preference data from scored TTC intermediate steps - NeMo Customizer TrainerAdapter: Submits DPO training jobs to NeMo Customizer

Submodules#

Classes#

DPOSpecificHyperparameters

DPO-specific hyperparameters for NeMo Customizer.

DPOTrajectoryBuilderConfig

Configuration for the DPO (Direct Preference Optimization) Trajectory Builder.

NeMoCustomizerHyperparameters

Hyperparameters for NeMo Customizer training jobs.

NeMoCustomizerTrainerAdapterConfig

Configuration for the NeMo Customizer TrainerAdapter.

NeMoCustomizerTrainerConfig

Configuration for the NeMo Customizer Trainer.

NIMDeploymentConfig

Configuration for NIM deployment after training.

NeMoCustomizerTrainer

Trainer for NeMo Customizer DPO/SFT finetuning.

NeMoCustomizerTrainerAdapter

TrainerAdapter for NeMo Customizer backend.

DPOTrajectoryBuilder

Trajectory builder for DPO (Direct Preference Optimization) training.

Package Contents#

class DPOSpecificHyperparameters(/, **data: Any)#

Bases: pydantic.BaseModel

DPO-specific hyperparameters for NeMo Customizer.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

ref_policy_kl_penalty: float = None#
preference_loss_weight: float = None#
preference_average_log_probs: bool = None#
sft_loss_weight: float = None#
class DPOTrajectoryBuilderConfig(/, **data: Any)#

Bases: nat.data_models.finetuning.TrajectoryBuilderConfig

Configuration for the DPO (Direct Preference Optimization) Trajectory Builder.

This builder collects preference pairs from workflows that produce TTC_END intermediate steps with TTCEventData. It uses the structured TTCEventData model to extract turn_id, candidate_index, score, input (prompt), and output (response) - no dictionary key configuration needed.

The builder groups candidates by turn_id and creates preference pairs based on score differences.

Example YAML configuration:

trajectory_builders:
  dpo_builder:
    _type: dpo_traj_builder
    ttc_step_name: dpo_candidate_move
    exhaustive_pairs: true
    min_score_diff: 0.05
    max_pairs_per_turn: 5

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

ttc_step_name: str = None#
exhaustive_pairs: bool = None#
min_score_diff: float = None#
max_pairs_per_turn: int | None = None#
reward_from_score_diff: bool = None#
require_multiple_candidates: bool = None#
validate_config() DPOTrajectoryBuilderConfig#

Validate configuration consistency.

class NeMoCustomizerHyperparameters(/, **data: Any)#

Bases: pydantic.BaseModel

Hyperparameters for NeMo Customizer training jobs.

These map to the hyperparameters argument in client.customization.jobs.create().

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

training_type: Literal['sft', 'dpo'] = None#
finetuning_type: Literal['lora', 'all_weights'] = None#
epochs: int = None#
batch_size: int = None#
learning_rate: float = None#
dpo: DPOSpecificHyperparameters = None#
class NeMoCustomizerTrainerAdapterConfig(/, **data: Any)#

Bases: nat.data_models.finetuning.TrainerAdapterConfig

Configuration for the NeMo Customizer TrainerAdapter.

This adapter submits DPO/SFT training jobs to NeMo Customizer and optionally deploys the trained model.

Example YAML configuration:

trainer_adapters:
  nemo_customizer:
    _type: nemo_customizer_trainer_adapter
    entity_host: https://nmp.example.com
    datastore_host: https://datastore.example.com
    namespace: my-project
    customization_config: meta/llama-3.2-1b-instruct@v1.0.0+A100
    hyperparameters:
      training_type: dpo
      epochs: 5
      batch_size: 8
    use_full_message_history: true
    deploy_on_completion: true

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

entity_host: str = None#
datastore_host: str = None#
hf_token: str = None#
namespace: str = None#
dataset_name: str = None#
dataset_output_dir: str | None = None#
create_namespace_if_missing: bool = None#
customization_config: str = None#
hyperparameters: NeMoCustomizerHyperparameters = None#
use_full_message_history: bool = None#
deploy_on_completion: bool = None#
deployment_config: NIMDeploymentConfig = None#
poll_interval_seconds: float = None#
deployment_timeout_seconds: float = None#
max_consecutive_status_failures: int = None#
validate_config() NeMoCustomizerTrainerAdapterConfig#

Validate configuration consistency.

class NeMoCustomizerTrainerConfig(/, **data: Any)#

Bases: nat.data_models.finetuning.TrainerConfig

Configuration for the NeMo Customizer Trainer.

This trainer orchestrates DPO data collection and training job submission. Unlike epoch-based trainers, it runs the trajectory builder multiple times to collect data, then submits a single training job to NeMo Customizer.

Example YAML configuration:

trainers:
  nemo_dpo:
    _type: nemo_customizer_trainer
    num_runs: 5
    wait_for_completion: true
    deduplicate_pairs: true
    max_pairs: 10000

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

num_runs: int = None#
continue_on_collection_error: bool = None#
deduplicate_pairs: bool = None#
max_pairs: int | None = None#
wait_for_completion: bool = None#
class NIMDeploymentConfig(/, **data: Any)#

Bases: pydantic.BaseModel

Configuration for NIM deployment after training.

These settings are used when deploy_on_completion is True.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

image_name: str = None#
image_tag: str = None#
gpu: int = None#
deployment_name: str | None = None#
description: str = None#
class NeMoCustomizerTrainer(
trainer_config: nat.plugins.customizer.dpo.config.NeMoCustomizerTrainerConfig,
\*\*kwargs,
)#

Bases: nat.finetuning.interfaces.finetuning_runner.Trainer

Trainer for NeMo Customizer DPO/SFT finetuning.

Unlike epoch-based trainers, this trainer: 1. Runs the trajectory builder multiple times (num_runs) to collect data 2. Aggregates all trajectories into a single dataset 3. Submits the dataset to NeMo Customizer for training 4. Monitors the training job until completion

The actual training epochs are handled by NeMo Customizer via hyperparameters.

Initialize the NeMo Customizer Trainer.

Args:

trainer_config: Configuration for the trainer

trainer_config: nat.plugins.customizer.dpo.config.NeMoCustomizerTrainerConfig#
_job_ref: nat.data_models.finetuning.TrainingJobRef | None = None#
_run_id: str | None = None#
_all_trajectories: list[list[nat.data_models.finetuning.Trajectory]] = []#
_run_metrics: list[dict[str, Any]] = []#
_collection_history: list[dict[str, Any]] = []#
async initialize(
run_config: nat.data_models.finetuning.FinetuneConfig,
) None#

Initialize the trainer and its components.

Note: Curriculum learning is not supported for DPO training.

async run_epoch(
epoch: int,
run_id: str,
) nat.data_models.finetuning.TrainingJobRef | None#

Run a single data collection run.

For NeMo Customizer, this collects trajectories without submitting to training. The actual submission happens in run().

Args:

epoch: The current run number (0-indexed) run_id: Unique identifier for this training run

Returns:

None (trajectories are accumulated, not submitted per-run)

async run(
num_epochs: int,
) list[nat.data_models.finetuning.TrainingJobStatus]#

Run the complete DPO data collection and training workflow.

Args:

num_epochs: Ignored for NeMo Customizer (uses trainer_config.num_runs)

Returns:

list[TrainingJobStatus]: Status of the training job

_deduplicate_trajectories(
collection: nat.data_models.finetuning.TrajectoryCollection,
) nat.data_models.finetuning.TrajectoryCollection#

Remove duplicate DPO pairs based on prompt+responses.

_sample_trajectories(
collection: nat.data_models.finetuning.TrajectoryCollection,
max_pairs: int,
) nat.data_models.finetuning.TrajectoryCollection#

Sample trajectories to limit dataset size.

async get_metrics(run_id: str) dict[str, Any]#

Get training metrics for the run.

async cleanup() None#

Clean up resources.

log_progress(
epoch: int,
metrics: dict[str, Any],
output_dir: str | None = None,
) None#

Log data collection progress.

_log_final_metrics(
final_status: nat.data_models.finetuning.TrainingJobStatus,
) None#

Log final training metrics.

class NeMoCustomizerTrainerAdapter(
adapter_config: nat.plugins.customizer.dpo.config.NeMoCustomizerTrainerAdapterConfig,
)#

Bases: nat.finetuning.interfaces.trainer_adapter.TrainerAdapter

TrainerAdapter for NeMo Customizer backend.

This adapter: 1. Converts trajectories to JSONL format for DPO training 2. Uploads datasets to NeMo Datastore via HuggingFace Hub API 3. Submits customization jobs to NeMo Customizer 4. Monitors job progress and status 5. Optionally deploys trained models

adapter_config: nat.plugins.customizer.dpo.config.NeMoCustomizerTrainerAdapterConfig#
_entity_client: nemo_microservices.NeMoMicroservices | None = None#
_hf_api: huggingface_hub.HfApi | None = None#
_active_jobs: dict[str, str]#
_job_output_models: dict[str, str]#
property entity_client: nemo_microservices.NeMoMicroservices#

Lazy initialization of NeMo Microservices client.

property hf_api: huggingface_hub.HfApi#

Lazy initialization of HuggingFace API client.

async initialize(
run_config: nat.data_models.finetuning.FinetuneConfig,
) None#

Initialize the trainer adapter.

async _ensure_namespaces_exist() None#

Create namespaces in entity store and datastore if they don’t exist.

async is_healthy() bool#

Check if NeMo Customizer services are reachable.

_format_prompt(
prompt: list[nat.data_models.finetuning.OpenAIMessage] | str,
) list[dict[str, str]] | str#

Format prompt based on configuration.

Args:

prompt: Original prompt (string or list of OpenAI messages)

Returns:

Formatted prompt based on use_full_message_history setting

_trajectory_to_dpo_jsonl(
trajectories: nat.data_models.finetuning.TrajectoryCollection,
) tuple[str, str]#

Convert trajectory collection to JSONL format for DPO training.

Returns:

Tuple of (training_jsonl, validation_jsonl) content strings

async _setup_dataset(
run_id: str,
training_jsonl: str,
validation_jsonl: str,
) str#

Create dataset repository and upload JSONL files.

Args:

run_id: Unique identifier for this training run training_jsonl: Training data in JSONL format validation_jsonl: Validation data in JSONL format

Returns:

Repository ID for the created dataset

async submit(
trajectories: nat.data_models.finetuning.TrajectoryCollection,
) nat.data_models.finetuning.TrainingJobRef#

Submit trajectories for training.

Args:

trajectories: Collection of trajectories containing DPO items

Returns:

Reference to the submitted training job

async status(
ref: nat.data_models.finetuning.TrainingJobRef,
) nat.data_models.finetuning.TrainingJobStatus#

Get the status of a training job.

async wait_until_complete(
ref: nat.data_models.finetuning.TrainingJobRef,
poll_interval: float | None = None,
) nat.data_models.finetuning.TrainingJobStatus#

Wait for training job to complete.

async _deploy_model(ref: nat.data_models.finetuning.TrainingJobRef) None#

Deploy the trained model and wait until deployment is ready.

async _wait_for_deployment_ready(
namespace: str,
deployment_name: str,
poll_interval: float | None = None,
timeout: float | None = None,
) None#

Wait for a model deployment to become ready.

Args:

namespace: Namespace of the deployment deployment_name: Name of the deployment poll_interval: Seconds between status checks (default: adapter config poll_interval_seconds) timeout: Maximum seconds to wait (default: adapter config deployment_timeout_seconds)

log_progress(
ref: nat.data_models.finetuning.TrainingJobRef,
metrics: dict[str, Any],
output_dir: str | None = None,
) None#

Log training progress to file.

class DPOTrajectoryBuilder(
trajectory_builder_config: nat.plugins.customizer.dpo.config.DPOTrajectoryBuilderConfig,
)#

Bases: nat.finetuning.interfaces.trajectory_builder.TrajectoryBuilder

Trajectory builder for DPO (Direct Preference Optimization) training.

This builder collects preference pairs from workflows that produce TTC_END intermediate steps with TTCEventData. It uses the structured data model to extract turn_id, candidate_index, score, input (prompt), and output.

Key features: - Uses TTCEventData model directly (no brittle dictionary key configuration) - Supports prompts as strings or list of OpenAIMessage - Exhaustive or best-vs-worst pair generation modes - Configurable score difference filtering - Grouping by example for curriculum learning - Builds trajectories with DPOItem episodes

Example workflow integration:

trajectory_builders:
  dpo_builder:
    _type: dpo_traj_builder
    ttc_step_name: dpo_candidate_move
    exhaustive_pairs: true
    min_score_diff: 0.05

Initialize the DPO Trajectory Builder.

Args:

trajectory_builder_config: Configuration for the builder.

config: nat.plugins.customizer.dpo.config.DPOTrajectoryBuilderConfig#
evaluation_runs: dict[str, asyncio.Task[nat.eval.config.EvaluationRunOutput]]#
_metrics: dict[str, Any]#
async start_run(run_id: str, meta: dict | None = None) None#

Start a single evaluation run to collect intermediate steps.

Args:

run_id: Unique identifier for this run. meta: Optional metadata for the run.

Raises:

ValueError: If a run with this ID is already in progress.

async finalize(
run_id: str,
meta: dict | None = None,
) nat.data_models.finetuning.TrajectoryCollection#

Wait for evaluation, collect TTC steps, and build DPO trajectories.

This method: 1. Waits for the evaluation run to complete 2. Collects and groups candidates by turn_id using TTCEventData 3. Generates preference pairs 4. Builds trajectories with DPOItem episodes 5. Groups trajectories by example for curriculum learning

Args:

run_id: Unique identifier for the run. meta: Optional metadata for the run.

Returns:

TrajectoryCollection with DPO preference trajectories.

Raises:

ValueError: If no run with this ID exists.

log_progress(
run_id: str,
metrics: dict[str, Any],
output_dir: str | None = None,
) None#

Log trajectory building progress.

Args:

run_id: The training run ID. metrics: Dictionary of metrics to log. output_dir: Optional output directory override.

_collect_candidates(
eval_result: nat.eval.config.EvaluationRunOutput,
) dict[str, list[CandidateStep]]#

Extract TTC_END intermediate steps and group by turn_id.

This method: 1. Iterates through all evaluation input items 2. Filters for TTC_END steps with the configured name 3. Extracts data from TTCEventData model directly 4. Groups candidates by (example_id, turn_id)

Args:

eval_result: The evaluation run output.

Returns:

Dictionary mapping turn keys to lists of candidates.

_is_target_step(
step: nat.data_models.intermediate_step.IntermediateStep,
) bool#

Check if an intermediate step is a target TTC step.

Args:

step: The intermediate step to check.

Returns:

True if this is a TTC_END step with the configured name.

_parse_candidate(
example_id: str,
step: nat.data_models.intermediate_step.IntermediateStep,
) CandidateStep | None#

Parse a CandidateStep from a TTC intermediate step using TTCEventData.

Args:

example_id: The example ID this step belongs to. step: The intermediate step to parse.

Returns:

CandidateStep if parsing succeeds, None otherwise.

_extract_prompt(input_data: Any) PromptType#

Extract prompt from TTCEventData.input.

Handles both string prompts and list of OpenAIMessage.

Args:

input_data: The input field from TTCEventData.

Returns:

String prompt or list of OpenAIMessage.

_generate_preference_pairs(
candidates_by_turn: dict[str, list[CandidateStep]],
) list[PreferencePair]#

Generate preference pairs from grouped candidates.

If exhaustive_pairs=True:

For candidates [A, B, C] with scores [0.9, 0.7, 0.5]: Pairs: (A>B), (A>C), (B>C) - all pairwise comparisons

If exhaustive_pairs=False:

For candidates [A, B, C] with scores [0.9, 0.7, 0.5]: Pairs: (A>C) only - best vs worst

Args:

candidates_by_turn: Dictionary mapping turn keys to candidate lists.

Returns:

List of preference pairs.

_generate_exhaustive_pairs(
sorted_candidates: list[CandidateStep],
) list[PreferencePair]#

Generate all pairwise comparisons where score(chosen) > score(rejected).

Args:

sorted_candidates: Candidates sorted by score (descending).

Returns:

List of preference pairs, sorted by score difference (descending).

_generate_best_vs_worst_pair(
sorted_candidates: list[CandidateStep],
) list[PreferencePair]#

Generate a single pair: best candidate vs worst candidate.

Args:

sorted_candidates: Candidates sorted by score (descending).

Returns:

List with at most one preference pair.

_build_trajectories(
pairs: list[PreferencePair],
) list[nat.data_models.finetuning.Trajectory]#

Convert preference pairs to Trajectory format with DPOItem episodes.

Each trajectory contains: - episode: [DPOItem] with prompt, chosen_response, rejected_response - reward: score_diff (if reward_from_score_diff) or chosen_score - metadata: Contains pair information for tracking

Args:

pairs: List of preference pairs.

Returns:

List of trajectories with DPOItem episodes.

_group_by_example(
trajectories: list[nat.data_models.finetuning.Trajectory],
) list[list[nat.data_models.finetuning.Trajectory]]#

Group trajectories by example ID for curriculum learning.

This grouping enables: - Filtering by average reward per example - Expansion from easy to hard examples

Args:

trajectories: List of trajectories to group.

Returns:

List of trajectory lists, where each inner list contains trajectories for one example.