nemo_rl.models.generation.vllm.vllm_worker#

Module Contents#

Classes#

API#

class nemo_rl.models.generation.vllm.vllm_worker.BaseVllmGenerationWorker(
config: nemo_rl.models.generation.vllm.config.VllmConfig,
bundle_indices: Optional[list[int]] = None,
fraction_of_gpus: float = 1.0,
seed: Optional[int] = None,
)#

Initialization

Initialize a vLLM worker for distributed inference.

Parameters:
  • config – Configuration dictionary for the policy

  • bundle_indices – List of local bundle indices within a node for parallelism. Only needed for the first worker in each tied worker group.

  • fraction_of_gpus – Fraction of GPUs to use for this worker

  • seed – Random seed for initialization

__repr__() str#

Customizes the actor’s prefix in the Ray logs.

This makes it easier to identify which worker is producing specific log messages.

static configure_worker(
num_gpus: int | float,
bundle_indices: Optional[tuple[int, list[int]]] = None,
) tuple[dict[str, Any], dict[str, str], dict[str, Any]]#

Provides complete worker configuration for vLLM tensor and pipeline parallelism.

This method configures the worker based on its role in tensor and pipeline parallelism, which is determined directly from the bundle_indices parameter.

Parameters:
  • num_gpus – Original GPU allocation for this worker based on the placement group

  • bundle_indices – Tuple of (node_idx, local_bundle_indices) for parallelism (if applicable)

Returns:

  • ‘resources’: Resource allocation (e.g., num_gpus)

  • ’env_vars’: Environment variables for this worker

  • ’init_kwargs’: Parameters to pass to init of the worker

Return type:

tuple with complete worker configuration

llm()#
is_alive()#

Check if the worker is alive.

_merge_stop_strings(batch_stop_strings)#
_build_sampling_params(
*,
greedy: bool,
stop_strings,
max_new_tokens: Optional[int] = None,
)#
start_gpu_profiling() None#

Start GPU profiling.

stop_gpu_profiling() None#

Stop GPU profiling.

class nemo_rl.models.generation.vllm.vllm_worker.VllmGenerationWorker(
config: nemo_rl.models.generation.vllm.config.VllmConfig,
bundle_indices: Optional[list[int]] = None,
fraction_of_gpus: float = 1.0,
seed: Optional[int] = None,
)#

Bases: nemo_rl.models.generation.vllm.vllm_worker.BaseVllmGenerationWorker

Initialization

Initialize a vLLM worker for distributed inference.

Parameters:
  • config – Configuration dictionary for the policy

  • bundle_indices – List of local bundle indices within a node for parallelism. Only needed for the first worker in each tied worker group.

  • fraction_of_gpus – Fraction of GPUs to use for this worker

  • seed – Random seed for initialization

_create_engine(llm_kwargs: dict[str, Any]) None#
post_init()#
init_collective(
rank_prefix: int,
ip: str,
port: int,
world_size: int,
) None#
generate(
data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[nemo_rl.models.generation.interfaces.GenerationDatumSpec],
greedy: bool = False,
) nemo_rl.distributed.batched_data_dict.BatchedDataDict[nemo_rl.models.generation.interfaces.GenerationOutputSpec]#

Generate a batch of data using vLLM generation.

Parameters:
  • data – BatchedDataDict containing input_ids and input_lengths tensors

  • greedy – Whether to use greedy decoding instead of sampling

Returns:

  • output_ids: input + generated token IDs with proper padding

  • logprobs: Log probabilities for tokens

  • generation_lengths: Lengths of each response

  • unpadded_sequence_lengths: Lengths of each input + generated sequence

Return type:

BatchedDataDict conforming to GenerationOutputSpec

generate_text(
data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[nemo_rl.models.generation.interfaces.GenerationDatumSpec],
greedy: bool = False,
) nemo_rl.distributed.batched_data_dict.BatchedDataDict[nemo_rl.models.generation.interfaces.GenerationOutputSpec]#

Generate text responses using vLLM generation.

Parameters:
  • data – BatchedDataDict containing prompts with text strings

  • greedy – Whether to use greedy decoding instead of sampling

Returns:

  • texts: List of generated text responses

Return type:

BatchedDataDict containing

report_device_id() list[str]#

Report device ID from the vLLM worker.

prepare_refit_info(state_dict_info: dict[str, Any]) None#

Prepare the info for refit.

update_weights_from_ipc_handles(
ipc_handles: dict[str, Any],
) bool#

Update weights from IPC handles by delegating to the vLLM Worker implementation.

Parameters:

ipc_handles (dict) – Dictionary mapping device UUIDs (str) to parameter IPC handles.

Returns:

True if weights were successfully updated, False otherwise.

Return type:

bool

update_weights_from_collective() bool#

Update the model weights from collective communication.

reset_prefix_cache()#

Reset the prefix cache of vLLM engine.

sleep()#

Put the vLLM engine to sleep.

wake_up(**kwargs)#

Wake up the vLLM engine.

shutdown() bool#

Clean up vLLM resources.