nat.plugins.customizer.dpo#
DPO (Direct Preference Optimization) components for NAT.
This module provides: - DPO Trajectory Builder: Collects preference data from scored TTC intermediate steps - NeMo Customizer TrainerAdapter: Submits DPO training jobs to NeMo Customizer
Submodules#
Classes#
DPO-specific hyperparameters for NeMo Customizer. |
|
Configuration for the DPO (Direct Preference Optimization) Trajectory Builder. |
|
Hyperparameters for NeMo Customizer training jobs. |
|
Configuration for the NeMo Customizer TrainerAdapter. |
|
Configuration for the NeMo Customizer Trainer. |
|
Configuration for NIM deployment after training. |
|
Trainer for NeMo Customizer DPO/SFT finetuning. |
|
TrainerAdapter for NeMo Customizer backend. |
|
Trajectory builder for DPO (Direct Preference Optimization) training. |
Package Contents#
- class DPOSpecificHyperparameters(/, **data: Any)#
Bases:
pydantic.BaseModelDPO-specific hyperparameters for NeMo Customizer.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.
- class DPOTrajectoryBuilderConfig(/, **data: Any)#
Bases:
nat.data_models.finetuning.TrajectoryBuilderConfigConfiguration for the DPO (Direct Preference Optimization) Trajectory Builder.
This builder collects preference pairs from workflows that produce TTC_END intermediate steps with TTCEventData. It uses the structured TTCEventData model to extract turn_id, candidate_index, score, input (prompt), and output (response) - no dictionary key configuration needed.
The builder groups candidates by turn_id and creates preference pairs based on score differences.
Example YAML configuration:
trajectory_builders: dpo_builder: _type: dpo_traj_builder ttc_step_name: dpo_candidate_move exhaustive_pairs: true min_score_diff: 0.05 max_pairs_per_turn: 5
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.- validate_config() DPOTrajectoryBuilderConfig#
Validate configuration consistency.
- class NeMoCustomizerHyperparameters(/, **data: Any)#
Bases:
pydantic.BaseModelHyperparameters for NeMo Customizer training jobs.
These map to the
hyperparametersargument inclient.customization.jobs.create().Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.- training_type: Literal['sft', 'dpo'] = None#
- finetuning_type: Literal['lora', 'all_weights'] = None#
- dpo: DPOSpecificHyperparameters = None#
- class NeMoCustomizerTrainerAdapterConfig(/, **data: Any)#
Bases:
nat.data_models.finetuning.TrainerAdapterConfigConfiguration for the NeMo Customizer TrainerAdapter.
This adapter submits DPO/SFT training jobs to NeMo Customizer and optionally deploys the trained model.
Example YAML configuration:
trainer_adapters: nemo_customizer: _type: nemo_customizer_trainer_adapter entity_host: https://nmp.example.com datastore_host: https://datastore.example.com namespace: my-project customization_config: meta/llama-3.2-1b-instruct@v1.0.0+A100 hyperparameters: training_type: dpo epochs: 5 batch_size: 8 use_full_message_history: true deploy_on_completion: true
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.- hyperparameters: NeMoCustomizerHyperparameters = None#
- deployment_config: NIMDeploymentConfig = None#
- validate_config() NeMoCustomizerTrainerAdapterConfig#
Validate configuration consistency.
- class NeMoCustomizerTrainerConfig(/, **data: Any)#
Bases:
nat.data_models.finetuning.TrainerConfigConfiguration for the NeMo Customizer Trainer.
This trainer orchestrates DPO data collection and training job submission. Unlike epoch-based trainers, it runs the trajectory builder multiple times to collect data, then submits a single training job to NeMo Customizer.
Example YAML configuration:
trainers: nemo_dpo: _type: nemo_customizer_trainer num_runs: 5 wait_for_completion: true deduplicate_pairs: true max_pairs: 10000
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.
- class NIMDeploymentConfig(/, **data: Any)#
Bases:
pydantic.BaseModelConfiguration for NIM deployment after training.
These settings are used when
deploy_on_completionis True.Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.
- class NeMoCustomizerTrainer(
- trainer_config: nat.plugins.customizer.dpo.config.NeMoCustomizerTrainerConfig,
- \*\*kwargs,
Bases:
nat.finetuning.interfaces.finetuning_runner.TrainerTrainer for NeMo Customizer DPO/SFT finetuning.
Unlike epoch-based trainers, this trainer: 1. Runs the trajectory builder multiple times (num_runs) to collect data 2. Aggregates all trajectories into a single dataset 3. Submits the dataset to NeMo Customizer for training 4. Monitors the training job until completion
The actual training epochs are handled by NeMo Customizer via hyperparameters.
Initialize the NeMo Customizer Trainer.
- Args:
trainer_config: Configuration for the trainer
- trainer_config: nat.plugins.customizer.dpo.config.NeMoCustomizerTrainerConfig#
- _job_ref: nat.data_models.finetuning.TrainingJobRef | None = None#
- _all_trajectories: list[list[nat.data_models.finetuning.Trajectory]] = []#
- async initialize(
- run_config: nat.data_models.finetuning.FinetuneConfig,
Initialize the trainer and its components.
Note: Curriculum learning is not supported for DPO training.
- async run_epoch( ) nat.data_models.finetuning.TrainingJobRef | None#
Run a single data collection run.
For NeMo Customizer, this collects trajectories without submitting to training. The actual submission happens in run().
- Args:
epoch: The current run number (0-indexed) run_id: Unique identifier for this training run
- Returns:
None (trajectories are accumulated, not submitted per-run)
- async run(
- num_epochs: int,
Run the complete DPO data collection and training workflow.
- Args:
num_epochs: Ignored for NeMo Customizer (uses trainer_config.num_runs)
- Returns:
list[TrainingJobStatus]: Status of the training job
- _deduplicate_trajectories(
- collection: nat.data_models.finetuning.TrajectoryCollection,
Remove duplicate DPO pairs based on prompt+responses.
- _sample_trajectories(
- collection: nat.data_models.finetuning.TrajectoryCollection,
- max_pairs: int,
Sample trajectories to limit dataset size.
- _log_final_metrics(
- final_status: nat.data_models.finetuning.TrainingJobStatus,
Log final training metrics.
- class NeMoCustomizerTrainerAdapter(
- adapter_config: nat.plugins.customizer.dpo.config.NeMoCustomizerTrainerAdapterConfig,
Bases:
nat.finetuning.interfaces.trainer_adapter.TrainerAdapterTrainerAdapter for NeMo Customizer backend.
This adapter: 1. Converts trajectories to JSONL format for DPO training 2. Uploads datasets to NeMo Datastore via HuggingFace Hub API 3. Submits customization jobs to NeMo Customizer 4. Monitors job progress and status 5. Optionally deploys trained models
- property entity_client: nemo_microservices.NeMoMicroservices#
Lazy initialization of NeMo Microservices client.
- property hf_api: huggingface_hub.HfApi#
Lazy initialization of HuggingFace API client.
- async initialize(
- run_config: nat.data_models.finetuning.FinetuneConfig,
Initialize the trainer adapter.
- async _ensure_namespaces_exist() None#
Create namespaces in entity store and datastore if they don’t exist.
- _format_prompt(
- prompt: list[nat.data_models.finetuning.OpenAIMessage] | str,
Format prompt based on configuration.
- Args:
prompt: Original prompt (string or list of OpenAI messages)
- Returns:
Formatted prompt based on use_full_message_history setting
- _trajectory_to_dpo_jsonl(
- trajectories: nat.data_models.finetuning.TrajectoryCollection,
Convert trajectory collection to JSONL format for DPO training.
- Returns:
Tuple of (training_jsonl, validation_jsonl) content strings
- async _setup_dataset( ) str#
Create dataset repository and upload JSONL files.
- Args:
run_id: Unique identifier for this training run training_jsonl: Training data in JSONL format validation_jsonl: Validation data in JSONL format
- Returns:
Repository ID for the created dataset
- async submit(
- trajectories: nat.data_models.finetuning.TrajectoryCollection,
Submit trajectories for training.
- Args:
trajectories: Collection of trajectories containing DPO items
- Returns:
Reference to the submitted training job
- async status( ) nat.data_models.finetuning.TrainingJobStatus#
Get the status of a training job.
- async wait_until_complete(
- ref: nat.data_models.finetuning.TrainingJobRef,
- poll_interval: float | None = None,
Wait for training job to complete.
- async _deploy_model(ref: nat.data_models.finetuning.TrainingJobRef) None#
Deploy the trained model and wait until deployment is ready.
- async _wait_for_deployment_ready(
- namespace: str,
- deployment_name: str,
- poll_interval: float | None = None,
- timeout: float | None = None,
Wait for a model deployment to become ready.
- Args:
namespace: Namespace of the deployment deployment_name: Name of the deployment poll_interval: Seconds between status checks (default: adapter config poll_interval_seconds) timeout: Maximum seconds to wait (default: adapter config deployment_timeout_seconds)
- class DPOTrajectoryBuilder(
- trajectory_builder_config: nat.plugins.customizer.dpo.config.DPOTrajectoryBuilderConfig,
Bases:
nat.finetuning.interfaces.trajectory_builder.TrajectoryBuilderTrajectory builder for DPO (Direct Preference Optimization) training.
This builder collects preference pairs from workflows that produce TTC_END intermediate steps with TTCEventData. It uses the structured data model to extract turn_id, candidate_index, score, input (prompt), and output.
Key features: - Uses TTCEventData model directly (no brittle dictionary key configuration) - Supports prompts as strings or list of OpenAIMessage - Exhaustive or best-vs-worst pair generation modes - Configurable score difference filtering - Grouping by example for curriculum learning - Builds trajectories with DPOItem episodes
Example workflow integration:
trajectory_builders: dpo_builder: _type: dpo_traj_builder ttc_step_name: dpo_candidate_move exhaustive_pairs: true min_score_diff: 0.05
Initialize the DPO Trajectory Builder.
- Args:
trajectory_builder_config: Configuration for the builder.
- evaluation_runs: dict[str, asyncio.Task[nat.eval.config.EvaluationRunOutput]]#
- async start_run(run_id: str, meta: dict | None = None) None#
Start a single evaluation run to collect intermediate steps.
- Args:
run_id: Unique identifier for this run. meta: Optional metadata for the run.
- Raises:
ValueError: If a run with this ID is already in progress.
- async finalize( ) nat.data_models.finetuning.TrajectoryCollection#
Wait for evaluation, collect TTC steps, and build DPO trajectories.
This method: 1. Waits for the evaluation run to complete 2. Collects and groups candidates by turn_id using TTCEventData 3. Generates preference pairs 4. Builds trajectories with DPOItem episodes 5. Groups trajectories by example for curriculum learning
- Args:
run_id: Unique identifier for the run. meta: Optional metadata for the run.
- Returns:
TrajectoryCollection with DPO preference trajectories.
- Raises:
ValueError: If no run with this ID exists.
- log_progress( ) None#
Log trajectory building progress.
- Args:
run_id: The training run ID. metrics: Dictionary of metrics to log. output_dir: Optional output directory override.
- _collect_candidates(
- eval_result: nat.eval.config.EvaluationRunOutput,
Extract TTC_END intermediate steps and group by turn_id.
This method: 1. Iterates through all evaluation input items 2. Filters for TTC_END steps with the configured name 3. Extracts data from TTCEventData model directly 4. Groups candidates by (example_id, turn_id)
- Args:
eval_result: The evaluation run output.
- Returns:
Dictionary mapping turn keys to lists of candidates.
- _is_target_step( ) bool#
Check if an intermediate step is a target TTC step.
- Args:
step: The intermediate step to check.
- Returns:
True if this is a TTC_END step with the configured name.
- _parse_candidate(
- example_id: str,
- step: nat.data_models.intermediate_step.IntermediateStep,
Parse a CandidateStep from a TTC intermediate step using TTCEventData.
- Args:
example_id: The example ID this step belongs to. step: The intermediate step to parse.
- Returns:
CandidateStep if parsing succeeds, None otherwise.
- _extract_prompt(input_data: Any) PromptType#
Extract prompt from TTCEventData.input.
Handles both string prompts and list of OpenAIMessage.
- Args:
input_data: The input field from TTCEventData.
- Returns:
String prompt or list of OpenAIMessage.
- _generate_preference_pairs(
- candidates_by_turn: dict[str, list[CandidateStep]],
Generate preference pairs from grouped candidates.
- If exhaustive_pairs=True:
For candidates [A, B, C] with scores [0.9, 0.7, 0.5]: Pairs: (A>B), (A>C), (B>C) - all pairwise comparisons
- If exhaustive_pairs=False:
For candidates [A, B, C] with scores [0.9, 0.7, 0.5]: Pairs: (A>C) only - best vs worst
- Args:
candidates_by_turn: Dictionary mapping turn keys to candidate lists.
- Returns:
List of preference pairs.
- _generate_exhaustive_pairs(
- sorted_candidates: list[CandidateStep],
Generate all pairwise comparisons where score(chosen) > score(rejected).
- Args:
sorted_candidates: Candidates sorted by score (descending).
- Returns:
List of preference pairs, sorted by score difference (descending).
- _generate_best_vs_worst_pair(
- sorted_candidates: list[CandidateStep],
Generate a single pair: best candidate vs worst candidate.
- Args:
sorted_candidates: Candidates sorted by score (descending).
- Returns:
List with at most one preference pair.
- _build_trajectories(
- pairs: list[PreferencePair],
Convert preference pairs to Trajectory format with DPOItem episodes.
Each trajectory contains: - episode: [DPOItem] with prompt, chosen_response, rejected_response - reward: score_diff (if reward_from_score_diff) or chosen_score - metadata: Contains pair information for tracking
- Args:
pairs: List of preference pairs.
- Returns:
List of trajectories with DPOItem episodes.
- _group_by_example(
- trajectories: list[nat.data_models.finetuning.Trajectory],
Group trajectories by example ID for curriculum learning.
This grouping enables: - Filtering by average reward per example - Expansion from easy to hard examples
- Args:
trajectories: List of trajectories to group.
- Returns:
List of trajectory lists, where each inner list contains trajectories for one example.