nemo_automodel._diffusers.auto_diffusion_pipeline#

NeMo Auto Diffusion Pipeline - Unified pipeline wrapper for all diffusion models.

This module provides a single pipeline class that handles:

  • Loading from pretrained weights (finetuning) via DiffusionPipeline auto-detection

  • Loading from config with random weights (pretraining) via YAML-specified transformer class

  • FSDP2/DDP parallelization for distributed training

  • Gradient checkpointing for memory efficiency

Usage: # Finetuning (from_pretrained) - no pipeline_spec needed pipe, managers = NeMoAutoDiffusionPipeline.from_pretrained( “black-forest-labs/FLUX.1-dev”, load_for_training=True, parallel_scheme={“transformer”: manager_args}, )

# Pretraining (from_config) - pipeline_spec required in YAML
pipe, managers = NeMoAutoDiffusionPipeline.from_config(
    "black-forest-labs/FLUX.1-dev",
    pipeline_spec={
        "transformer_cls": "FluxTransformer2DModel",
        "subfolder": "transformer",
    },
    parallel_scheme={"transformer": manager_args},
)

Module Contents#

Classes#

PipelineSpec

YAML-driven specification for loading a diffusion pipeline.

NeMoAutoDiffusionPipeline

Unified diffusion pipeline wrapper for all model types.

Functions#

_import_diffusers_class

Dynamically import a class from diffusers by name.

_init_parallelizer

Register custom parallelization strategies.

_choose_device

Choose device, defaulting to CUDA with LOCAL_RANK if available.

_iter_pipeline_modules

Iterate over nn.Module components in a pipeline.

_move_module_to_device

Move module to device with specified dtype.

_ensure_params_trainable

Ensure that all parameters in the given module are trainable.

_create_parallel_manager

Factory function to create the appropriate parallel manager based on config.

_apply_parallelization

Apply FSDP2/DDP parallelization to pipeline components.

Data#

API#

nemo_automodel._diffusers.auto_diffusion_pipeline.logger#

‘getLogger(…)’

nemo_automodel._diffusers.auto_diffusion_pipeline.ParallelManager#

None

class nemo_automodel._diffusers.auto_diffusion_pipeline.PipelineSpec#

YAML-driven specification for loading a diffusion pipeline.

This is required for from_config (pretraining with random weights). Not needed for from_pretrained (finetuning).

Example YAML: pipeline_spec: transformer_cls: “FluxTransformer2DModel” pipeline_cls: “FluxPipeline” # Optional subfolder: “transformer” load_full_pipeline: false enable_gradient_checkpointing: true

transformer_cls: str = <Multiline-String>#
pipeline_cls: Optional[str]#

None

subfolder: str#

‘transformer’

load_full_pipeline: bool#

False

enable_gradient_checkpointing: bool#

True

low_cpu_mem_usage: bool#

True

classmethod from_dict(
d: Optional[Dict[str, Any]],
) nemo_automodel._diffusers.auto_diffusion_pipeline.PipelineSpec#

Create PipelineSpec from YAML dict.

validate_for_from_config()#

Validate spec has required fields for from_config.

nemo_automodel._diffusers.auto_diffusion_pipeline._import_diffusers_class(class_name: str)#

Dynamically import a class from diffusers by name.

nemo_automodel._diffusers.auto_diffusion_pipeline._init_parallelizer()#

Register custom parallelization strategies.

nemo_automodel._diffusers.auto_diffusion_pipeline._choose_device(device: Optional[torch.device]) torch.device#

Choose device, defaulting to CUDA with LOCAL_RANK if available.

nemo_automodel._diffusers.auto_diffusion_pipeline._iter_pipeline_modules(
pipe,
) Iterable[Tuple[str, torch.nn.Module]]#

Iterate over nn.Module components in a pipeline.

nemo_automodel._diffusers.auto_diffusion_pipeline._move_module_to_device(
module: torch.nn.Module,
device: torch.device,
torch_dtype: Any,
) None#

Move module to device with specified dtype.

nemo_automodel._diffusers.auto_diffusion_pipeline._ensure_params_trainable(
module: torch.nn.Module,
module_name: Optional[str] = None,
) int#

Ensure that all parameters in the given module are trainable.

Returns the number of parameters marked trainable.

nemo_automodel._diffusers.auto_diffusion_pipeline._create_parallel_manager(
manager_args: Dict[str, Any],
) nemo_automodel._diffusers.auto_diffusion_pipeline.ParallelManager#

Factory function to create the appropriate parallel manager based on config.

Constructs the proper config objects (FSDP2Config / DDPConfig) and, for FSDP2, creates the required device mesh before instantiating the manager. This mirrors the pattern used by _instantiate_distributed in the transformers infrastructure.

The manager type is determined by the _manager_type key in manager_args:

  • 'ddp': Creates :class:DDPConfig + :class:DDPManager

  • 'fsdp2' (default): Creates :class:FSDP2Config, builds a

    class:

    DeviceMesh via :func:create_device_mesh, then creates

    class:

    FSDP2Manager

Parameters:

manager_args

Flat dictionary of arguments. Recognised keys:

Common: _manager_type (str): 'fsdp2' or 'ddp'. activation_checkpointing (bool): Enable activation checkpointing. backend (str): Distributed backend (default 'nccl').

FSDP2-specific (mesh creation): world_size (int): Total number of processes. dp_size, dp_replicate_size, tp_size, cp_size, pp_size, ep_size (int): Parallelism dimensions.

FSDP2-specific (config): mp_policy: :class:MixedPrecisionPolicy instance. sequence_parallel (bool), tp_plan (dict), offload_policy, defer_fsdp_grad_sync (bool).

Returns:

Either an FSDP2Manager or DDPManager instance.

Raises:

ValueError – If an unknown manager type is specified.

nemo_automodel._diffusers.auto_diffusion_pipeline._apply_parallelization(
pipe,
parallel_scheme: Optional[Dict[str, Dict[str, Any]]],
) Dict[str, nemo_automodel._diffusers.auto_diffusion_pipeline.ParallelManager]#

Apply FSDP2/DDP parallelization to pipeline components.

class nemo_automodel._diffusers.auto_diffusion_pipeline.NeMoAutoDiffusionPipeline(transformer=None, **components)#

Unified diffusion pipeline wrapper for all model types.

This class serves dual purposes:

  1. Provides class methods (from_pretrained, from_config) for loading pipelines

  2. Acts as a minimal wrapper when load_full_pipeline=False (transformer-only mode)

Two loading paths:

  • from_pretrained: Uses DiffusionPipeline auto-detection (for finetuning) No pipeline_spec needed - pipeline type is auto-detected from model_index.json

  • from_config: Uses YAML-specified transformer class (for pretraining) Requires pipeline_spec with transformer_cls in YAML config

Features:

  • Accepts a per-component mapping from component name to parallel manager init args

  • Moves all nn.Module components to the chosen device/dtype

  • Parallelizes only components present in the mapping by constructing a manager per component

  • Supports both FSDP2Manager and DDPManager via ‘_manager_type’ key in config

  • Gradient checkpointing support for memory efficiency

parallel_scheme:

  • Dict[str, Dict[str, Any]]: component name -> kwargs for parallel manager

  • Each component’s kwargs should include ‘_manager_type’: ‘fsdp2’ or ‘ddp’ (defaults to ‘fsdp2’)

Initialization

Initialize NeMoAutoDiffusionPipeline.

Parameters:
  • transformer – The transformer model instance

  • **components – Additional pipeline components (vae, text_encoder, etc.)

property components: Dict[str, Any]#

Return components dict for compatibility.

classmethod from_pretrained(
pretrained_model_name_or_path: str,
*model_args,
parallel_scheme: Optional[Dict[str, Dict[str, Any]]] = None,
device: Optional[torch.device] = None,
torch_dtype: Any = torch.bfloat16,
move_to_device: bool = True,
load_for_training: bool = False,
components_to_load: Optional[Iterable[str]] = None,
enable_gradient_checkpointing: bool = True,
**kwargs,
) Tuple[diffusers.DiffusionPipeline, Dict[str, nemo_automodel._diffusers.auto_diffusion_pipeline.ParallelManager]]#

Load pipeline from pretrained weights using DiffusionPipeline auto-detection.

This method auto-detects the pipeline type from model_index.json and loads all components. Use this for finetuning existing models.

No pipeline_spec is needed - the pipeline type is determined automatically.

Parameters:
  • pretrained_model_name_or_path – HuggingFace model ID or local path

  • parallel_scheme – Dict mapping component names to parallel manager kwargs. Each component’s kwargs should include ‘_manager_type’: ‘fsdp2’ or ‘ddp’

  • device – Device to load model to

  • torch_dtype – Data type for model parameters

  • move_to_device – Whether to move modules to device

  • load_for_training – Whether to make parameters trainable

  • components_to_load – Which components to process (default: all)

  • enable_gradient_checkpointing – Enable gradient checkpointing for transformer

  • **kwargs – Additional arguments passed to DiffusionPipeline.from_pretrained

Returns:

Tuple of (DiffusionPipeline, Dict[str, ParallelManager])

classmethod from_config(
model_id: str,
pipeline_spec: Dict[str, Any],
torch_dtype: torch.dtype = torch.bfloat16,
device: Optional[torch.device] = None,
parallel_scheme: Optional[Dict[str, Dict[str, Any]]] = None,
move_to_device: bool = True,
components_to_load: Optional[Iterable[str]] = None,
**kwargs,
) Tuple[nemo_automodel._diffusers.auto_diffusion_pipeline.NeMoAutoDiffusionPipeline, Dict[str, nemo_automodel._diffusers.auto_diffusion_pipeline.ParallelManager]]#

Initialize pipeline with random weights using YAML-specified transformer class.

This method uses the transformer_cls from pipeline_spec to create a model with random weights. Use this for pretraining from scratch.

Requires pipeline_spec in YAML config with at least: pipeline_spec: transformer_cls: “FluxTransformer2DModel” # or WanTransformer3DModel, etc. subfolder: “transformer”

Parameters:
  • model_id – HuggingFace model ID or local path (for loading config)

  • pipeline_spec – Dict from YAML config with transformer_cls, subfolder, etc.

  • torch_dtype – Data type for model parameters

  • device – Device to load model to

  • parallel_scheme – Dict mapping component names to parallel manager kwargs

  • move_to_device – Whether to move modules to device

  • components_to_load – Which components to process (default: all)

  • **kwargs – Additional arguments

Returns:

Tuple of (NeMoAutoDiffusionPipeline or DiffusionPipeline, Dict[str, ParallelManager])