Source code for nemo_rl.data.interfaces

# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Protocol, TypedDict, Union

import torch

# OpenAI-API-like message log, but every messsage may contain associated tensors (i.e. tokenized strings and logprobs) in addition to the original "content" string
LLMMessageLogType = List[Dict[str, Union[str, torch.Tensor]]]

# Flattened message log where all tensors and data are concatenated together for a conversation
# Converts a conversation from list-of-turns format to key-value format with concatenated tensors
FlatMessagesType = Dict[str, Union[List[str], torch.Tensor]]


[docs] class DatumSpec(TypedDict): message_log: LLMMessageLogType length: int # total (concatenated) length of the message tensors extra_env_info: Dict[str, Any] loss_multiplier: float # multiplier for the loss for this datum. 0 to mask out (say the sample is invalid) idx: int task_name: Optional[str] = "default" stop_strings: Optional[List[str]] = None # Optional stop strings for generation __extra__: Any # This allows additional fields of any type
[docs] @dataclass class TaskDataSpec: task_name: Optional[str] = None # prompt prompt_file: Optional[os.PathLike] = None system_prompt_file: Optional[Union[str, os.PathLike]] = None
[docs] def __post_init__(self): def load_prompt_file( prompt_file: Optional[os.PathLike], ) -> Optional[str]: """Load prompt from file if it exists, otherwise return as is.""" if prompt_file is None: return None if os.path.exists(prompt_file): with open(prompt_file, "r", encoding="utf-8") as f: return f.read() else: raise FileNotFoundError(f"Prompt file {prompt_file} not found") # Load prompts from files if they exist self.system_prompt = load_prompt_file(self.system_prompt_file) self.prompt = load_prompt_file(self.prompt_file)
[docs] def copy_defaults(self, from_spec: "TaskDataSpec"): """Apply default values from another Task instance for any None attributes.""" default_attrs = { "system_prompt": from_spec.system_prompt, "prompt": from_spec.prompt, } for attr_name, default_value in default_attrs.items(): if getattr(self, attr_name) is None: setattr(self, attr_name, default_value)
[docs] class TaskDataProcessFnCallable(Protocol): """A callable that processes a loaded datum dictionary into a DatumSpec."""
[docs] def __call__( self, datum_dict: Dict[str, Any], task_data_spec: TaskDataSpec, tokenizer, max_seq_length: int, idx: int, ) -> DatumSpec: raise NotImplementedError("Task data process not implemented")