Source code for nemo_rl.data.interfaces
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Protocol, TypedDict, Union
import torch
# OpenAI-API-like message log, but every messsage may contain associated tensors (i.e. tokenized strings and logprobs) in addition to the original "content" string
LLMMessageLogType = List[Dict[str, Union[str, torch.Tensor]]]
# Flattened message log where all tensors and data are concatenated together for a conversation
# Converts a conversation from list-of-turns format to key-value format with concatenated tensors
FlatMessagesType = Dict[str, Union[List[str], torch.Tensor]]
[docs]
class DatumSpec(TypedDict):
message_log: LLMMessageLogType
length: int # total (concatenated) length of the message tensors
extra_env_info: Dict[str, Any]
loss_multiplier: float # multiplier for the loss for this datum. 0 to mask out (say the sample is invalid)
idx: int
task_name: Optional[str] = "default"
stop_strings: Optional[List[str]] = None # Optional stop strings for generation
__extra__: Any # This allows additional fields of any type
[docs]
@dataclass
class TaskDataSpec:
task_name: Optional[str] = None
# prompt
prompt_file: Optional[os.PathLike] = None
system_prompt_file: Optional[Union[str, os.PathLike]] = None
[docs]
def __post_init__(self):
def load_prompt_file(
prompt_file: Optional[os.PathLike],
) -> Optional[str]:
"""Load prompt from file if it exists, otherwise return as is."""
if prompt_file is None:
return None
if os.path.exists(prompt_file):
with open(prompt_file, "r", encoding="utf-8") as f:
return f.read()
else:
raise FileNotFoundError(f"Prompt file {prompt_file} not found")
# Load prompts from files if they exist
self.system_prompt = load_prompt_file(self.system_prompt_file)
self.prompt = load_prompt_file(self.prompt_file)
[docs]
def copy_defaults(self, from_spec: "TaskDataSpec"):
"""Apply default values from another Task instance for any None attributes."""
default_attrs = {
"system_prompt": from_spec.system_prompt,
"prompt": from_spec.prompt,
}
for attr_name, default_value in default_attrs.items():
if getattr(self, attr_name) is None:
setattr(self, attr_name, default_value)
[docs]
class TaskDataProcessFnCallable(Protocol):
"""A callable that processes a loaded datum dictionary into a DatumSpec."""
[docs]
def __call__(
self,
datum_dict: Dict[str, Any],
task_data_spec: TaskDataSpec,
tokenizer,
max_seq_length: int,
idx: int,
) -> DatumSpec:
raise NotImplementedError("Task data process not implemented")