nat.plugins.customizer#
NeMo Customizer plugin for NAT finetuning.
This plugin provides trajectory builders and trainer adapters for finetuning workflows using NeMo Customizer backend.
Available components: - DPO Trajectory Builder: Collects preference pairs from scored TTC candidates - NeMo Customizer TrainerAdapter: Submits DPO/SFT jobs to NeMo Customizer
Submodules#
Classes#
DPO-specific hyperparameters for NeMo Customizer. |
|
Trajectory builder for DPO (Direct Preference Optimization) training. |
|
Configuration for the DPO (Direct Preference Optimization) Trajectory Builder. |
|
Hyperparameters for NeMo Customizer training jobs. |
|
TrainerAdapter for NeMo Customizer backend. |
|
Configuration for the NeMo Customizer TrainerAdapter. |
|
Configuration for NIM deployment after training. |
Package Contents#
- class DPOSpecificHyperparameters(/, **data: Any)#
Bases:
pydantic.BaseModelDPO-specific hyperparameters for NeMo Customizer.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.
- class DPOTrajectoryBuilder(
- trajectory_builder_config: nat.plugins.customizer.dpo.config.DPOTrajectoryBuilderConfig,
Bases:
nat.finetuning.interfaces.trajectory_builder.TrajectoryBuilderTrajectory builder for DPO (Direct Preference Optimization) training.
This builder collects preference pairs from workflows that produce TTC_END intermediate steps with TTCEventData. It uses the structured data model to extract turn_id, candidate_index, score, input (prompt), and output.
Key features: - Uses TTCEventData model directly (no brittle dictionary key configuration) - Supports prompts as strings or list of OpenAIMessage - Exhaustive or best-vs-worst pair generation modes - Configurable score difference filtering - Grouping by example for curriculum learning - Builds trajectories with DPOItem episodes
Example workflow integration:
trajectory_builders: dpo_builder: _type: dpo_traj_builder ttc_step_name: dpo_candidate_move exhaustive_pairs: true min_score_diff: 0.05
Initialize the DPO Trajectory Builder.
- Args:
trajectory_builder_config: Configuration for the builder.
- evaluation_runs: dict[str, asyncio.Task[nat.eval.config.EvaluationRunOutput]]#
- async start_run(run_id: str, meta: dict | None = None) None#
Start a single evaluation run to collect intermediate steps.
- Args:
run_id: Unique identifier for this run. meta: Optional metadata for the run.
- Raises:
ValueError: If a run with this ID is already in progress.
- async finalize( ) nat.data_models.finetuning.TrajectoryCollection#
Wait for evaluation, collect TTC steps, and build DPO trajectories.
This method: 1. Waits for the evaluation run to complete 2. Collects and groups candidates by turn_id using TTCEventData 3. Generates preference pairs 4. Builds trajectories with DPOItem episodes 5. Groups trajectories by example for curriculum learning
- Args:
run_id: Unique identifier for the run. meta: Optional metadata for the run.
- Returns:
TrajectoryCollection with DPO preference trajectories.
- Raises:
ValueError: If no run with this ID exists.
- log_progress( ) None#
Log trajectory building progress.
- Args:
run_id: The training run ID. metrics: Dictionary of metrics to log. output_dir: Optional output directory override.
- _collect_candidates(
- eval_result: nat.eval.config.EvaluationRunOutput,
Extract TTC_END intermediate steps and group by turn_id.
This method: 1. Iterates through all evaluation input items 2. Filters for TTC_END steps with the configured name 3. Extracts data from TTCEventData model directly 4. Groups candidates by (example_id, turn_id)
- Args:
eval_result: The evaluation run output.
- Returns:
Dictionary mapping turn keys to lists of candidates.
- _is_target_step( ) bool#
Check if an intermediate step is a target TTC step.
- Args:
step: The intermediate step to check.
- Returns:
True if this is a TTC_END step with the configured name.
- _parse_candidate(
- example_id: str,
- step: nat.data_models.intermediate_step.IntermediateStep,
Parse a CandidateStep from a TTC intermediate step using TTCEventData.
- Args:
example_id: The example ID this step belongs to. step: The intermediate step to parse.
- Returns:
CandidateStep if parsing succeeds, None otherwise.
- _extract_prompt(input_data: Any) PromptType#
Extract prompt from TTCEventData.input.
Handles both string prompts and list of OpenAIMessage.
- Args:
input_data: The input field from TTCEventData.
- Returns:
String prompt or list of OpenAIMessage.
- _generate_preference_pairs(
- candidates_by_turn: dict[str, list[CandidateStep]],
Generate preference pairs from grouped candidates.
- If exhaustive_pairs=True:
For candidates [A, B, C] with scores [0.9, 0.7, 0.5]: Pairs: (A>B), (A>C), (B>C) - all pairwise comparisons
- If exhaustive_pairs=False:
For candidates [A, B, C] with scores [0.9, 0.7, 0.5]: Pairs: (A>C) only - best vs worst
- Args:
candidates_by_turn: Dictionary mapping turn keys to candidate lists.
- Returns:
List of preference pairs.
- _generate_exhaustive_pairs(
- sorted_candidates: list[CandidateStep],
Generate all pairwise comparisons where score(chosen) > score(rejected).
- Args:
sorted_candidates: Candidates sorted by score (descending).
- Returns:
List of preference pairs, sorted by score difference (descending).
- _generate_best_vs_worst_pair(
- sorted_candidates: list[CandidateStep],
Generate a single pair: best candidate vs worst candidate.
- Args:
sorted_candidates: Candidates sorted by score (descending).
- Returns:
List with at most one preference pair.
- _build_trajectories(
- pairs: list[PreferencePair],
Convert preference pairs to Trajectory format with DPOItem episodes.
Each trajectory contains: - episode: [DPOItem] with prompt, chosen_response, rejected_response - reward: score_diff (if reward_from_score_diff) or chosen_score - metadata: Contains pair information for tracking
- Args:
pairs: List of preference pairs.
- Returns:
List of trajectories with DPOItem episodes.
- _group_by_example(
- trajectories: list[nat.data_models.finetuning.Trajectory],
Group trajectories by example ID for curriculum learning.
This grouping enables: - Filtering by average reward per example - Expansion from easy to hard examples
- Args:
trajectories: List of trajectories to group.
- Returns:
List of trajectory lists, where each inner list contains trajectories for one example.
- class DPOTrajectoryBuilderConfig(/, **data: Any)#
Bases:
nat.data_models.finetuning.TrajectoryBuilderConfigConfiguration for the DPO (Direct Preference Optimization) Trajectory Builder.
This builder collects preference pairs from workflows that produce TTC_END intermediate steps with TTCEventData. It uses the structured TTCEventData model to extract turn_id, candidate_index, score, input (prompt), and output (response) - no dictionary key configuration needed.
The builder groups candidates by turn_id and creates preference pairs based on score differences.
Example YAML configuration:
trajectory_builders: dpo_builder: _type: dpo_traj_builder ttc_step_name: dpo_candidate_move exhaustive_pairs: true min_score_diff: 0.05 max_pairs_per_turn: 5
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.- validate_config() DPOTrajectoryBuilderConfig#
Validate configuration consistency.
- class NeMoCustomizerHyperparameters(/, **data: Any)#
Bases:
pydantic.BaseModelHyperparameters for NeMo Customizer training jobs.
These map to the
hyperparametersargument inclient.customization.jobs.create().Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.- training_type: Literal['sft', 'dpo'] = None#
- finetuning_type: Literal['lora', 'all_weights'] = None#
- dpo: DPOSpecificHyperparameters = None#
- class NeMoCustomizerTrainerAdapter(
- adapter_config: nat.plugins.customizer.dpo.config.NeMoCustomizerTrainerAdapterConfig,
Bases:
nat.finetuning.interfaces.trainer_adapter.TrainerAdapterTrainerAdapter for NeMo Customizer backend.
This adapter: 1. Converts trajectories to JSONL format for DPO training 2. Uploads datasets to NeMo Datastore via HuggingFace Hub API 3. Submits customization jobs to NeMo Customizer 4. Monitors job progress and status 5. Optionally deploys trained models
- property entity_client: nemo_microservices.NeMoMicroservices#
Lazy initialization of NeMo Microservices client.
- property hf_api: huggingface_hub.HfApi#
Lazy initialization of HuggingFace API client.
- async initialize(
- run_config: nat.data_models.finetuning.FinetuneConfig,
Initialize the trainer adapter.
- async _ensure_namespaces_exist() None#
Create namespaces in entity store and datastore if they don’t exist.
- _format_prompt(
- prompt: list[nat.data_models.finetuning.OpenAIMessage] | str,
Format prompt based on configuration.
- Args:
prompt: Original prompt (string or list of OpenAI messages)
- Returns:
Formatted prompt based on use_full_message_history setting
- _trajectory_to_dpo_jsonl(
- trajectories: nat.data_models.finetuning.TrajectoryCollection,
Convert trajectory collection to JSONL format for DPO training.
- Returns:
Tuple of (training_jsonl, validation_jsonl) content strings
- async _setup_dataset( ) str#
Create dataset repository and upload JSONL files.
- Args:
run_id: Unique identifier for this training run training_jsonl: Training data in JSONL format validation_jsonl: Validation data in JSONL format
- Returns:
Repository ID for the created dataset
- async submit(
- trajectories: nat.data_models.finetuning.TrajectoryCollection,
Submit trajectories for training.
- Args:
trajectories: Collection of trajectories containing DPO items
- Returns:
Reference to the submitted training job
- async status( ) nat.data_models.finetuning.TrainingJobStatus#
Get the status of a training job.
- async wait_until_complete(
- ref: nat.data_models.finetuning.TrainingJobRef,
- poll_interval: float | None = None,
Wait for training job to complete.
- async _deploy_model(ref: nat.data_models.finetuning.TrainingJobRef) None#
Deploy the trained model and wait until deployment is ready.
- async _wait_for_deployment_ready(
- namespace: str,
- deployment_name: str,
- poll_interval: float | None = None,
- timeout: float | None = None,
Wait for a model deployment to become ready.
- Args:
namespace: Namespace of the deployment deployment_name: Name of the deployment poll_interval: Seconds between status checks (default: adapter config poll_interval_seconds) timeout: Maximum seconds to wait (default: adapter config deployment_timeout_seconds)
- class NeMoCustomizerTrainerAdapterConfig(/, **data: Any)#
Bases:
nat.data_models.finetuning.TrainerAdapterConfigConfiguration for the NeMo Customizer TrainerAdapter.
This adapter submits DPO/SFT training jobs to NeMo Customizer and optionally deploys the trained model.
Example YAML configuration:
trainer_adapters: nemo_customizer: _type: nemo_customizer_trainer_adapter entity_host: https://nmp.example.com datastore_host: https://datastore.example.com namespace: my-project customization_config: meta/llama-3.2-1b-instruct@v1.0.0+A100 hyperparameters: training_type: dpo epochs: 5 batch_size: 8 use_full_message_history: true deploy_on_completion: true
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.- hyperparameters: NeMoCustomizerHyperparameters = None#
- deployment_config: NIMDeploymentConfig = None#
- validate_config() NeMoCustomizerTrainerAdapterConfig#
Validate configuration consistency.
- class NIMDeploymentConfig(/, **data: Any)#
Bases:
pydantic.BaseModelConfiguration for NIM deployment after training.
These settings are used when
deploy_on_completionis True.Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.