nat.plugins.customizer.dpo.trainer_adapter#

NeMo Customizer TrainerAdapter for DPO/SFT training.

This module provides a TrainerAdapter implementation that interfaces with NeMo Customizer for submitting and monitoring training jobs.

Attributes#

Classes#

NeMoCustomizerTrainerAdapter

TrainerAdapter for NeMo Customizer backend.

Module Contents#

logger#
class NeMoCustomizerTrainerAdapter(
adapter_config: nat.plugins.customizer.dpo.config.NeMoCustomizerTrainerAdapterConfig,
)#

Bases: nat.finetuning.interfaces.trainer_adapter.TrainerAdapter

TrainerAdapter for NeMo Customizer backend.

This adapter: 1. Converts trajectories to JSONL format for DPO training 2. Uploads datasets to NeMo Datastore via HuggingFace Hub API 3. Submits customization jobs to NeMo Customizer 4. Monitors job progress and status 5. Optionally deploys trained models

adapter_config: nat.plugins.customizer.dpo.config.NeMoCustomizerTrainerAdapterConfig#
_entity_client: nemo_microservices.NeMoMicroservices | None = None#
_hf_api: huggingface_hub.HfApi | None = None#
_active_jobs: dict[str, str]#
_job_output_models: dict[str, str]#
property entity_client: nemo_microservices.NeMoMicroservices#

Lazy initialization of NeMo Microservices client.

property hf_api: huggingface_hub.HfApi#

Lazy initialization of HuggingFace API client.

async initialize(
run_config: nat.data_models.finetuning.FinetuneConfig,
) None#

Initialize the trainer adapter.

async _ensure_namespaces_exist() None#

Create namespaces in entity store and datastore if they don’t exist.

async is_healthy() bool#

Check if NeMo Customizer services are reachable.

_format_prompt(
prompt: list[nat.data_models.finetuning.OpenAIMessage] | str,
) list[dict[str, str]] | str#

Format prompt based on configuration.

Args:

prompt: Original prompt (string or list of OpenAI messages)

Returns:

Formatted prompt based on use_full_message_history setting

_trajectory_to_dpo_jsonl(
trajectories: nat.data_models.finetuning.TrajectoryCollection,
) tuple[str, str]#

Convert trajectory collection to JSONL format for DPO training.

Returns:

Tuple of (training_jsonl, validation_jsonl) content strings

async _setup_dataset(
run_id: str,
training_jsonl: str,
validation_jsonl: str,
) str#

Create dataset repository and upload JSONL files.

Args:

run_id: Unique identifier for this training run training_jsonl: Training data in JSONL format validation_jsonl: Validation data in JSONL format

Returns:

Repository ID for the created dataset

async submit(
trajectories: nat.data_models.finetuning.TrajectoryCollection,
) nat.data_models.finetuning.TrainingJobRef#

Submit trajectories for training.

Args:

trajectories: Collection of trajectories containing DPO items

Returns:

Reference to the submitted training job

async status(
ref: nat.data_models.finetuning.TrainingJobRef,
) nat.data_models.finetuning.TrainingJobStatus#

Get the status of a training job.

async wait_until_complete(
ref: nat.data_models.finetuning.TrainingJobRef,
poll_interval: float | None = None,
) nat.data_models.finetuning.TrainingJobStatus#

Wait for training job to complete.

async _deploy_model(ref: nat.data_models.finetuning.TrainingJobRef) None#

Deploy the trained model and wait until deployment is ready.

async _wait_for_deployment_ready(
namespace: str,
deployment_name: str,
poll_interval: float | None = None,
timeout: float | None = None,
) None#

Wait for a model deployment to become ready.

Args:

namespace: Namespace of the deployment deployment_name: Name of the deployment poll_interval: Seconds between status checks (default: adapter config poll_interval_seconds) timeout: Maximum seconds to wait (default: adapter config deployment_timeout_seconds)

log_progress(
ref: nat.data_models.finetuning.TrainingJobRef,
metrics: dict[str, Any],
output_dir: str | None = None,
) None#

Log training progress to file.