nemo_automodel._diffusers.auto_diffusion_pipeline#
NeMo Auto Diffusion Pipeline - Unified pipeline wrapper for all diffusion models.
This module provides a single pipeline class that handles:
Loading from pretrained weights (finetuning) via DiffusionPipeline auto-detection
Loading from config with random weights (pretraining) via YAML-specified transformer class
FSDP2/DDP parallelization for distributed training
Gradient checkpointing for memory efficiency
Usage: # Finetuning (from_pretrained) - no pipeline_spec needed pipe, managers = NeMoAutoDiffusionPipeline.from_pretrained( “black-forest-labs/FLUX.1-dev”, load_for_training=True, parallel_scheme={“transformer”: manager_args}, )
# Pretraining (from_config) - pipeline_spec required in YAML
pipe, managers = NeMoAutoDiffusionPipeline.from_config(
"black-forest-labs/FLUX.1-dev",
pipeline_spec={
"transformer_cls": "FluxTransformer2DModel",
"subfolder": "transformer",
},
parallel_scheme={"transformer": manager_args},
)
Module Contents#
Classes#
YAML-driven specification for loading a diffusion pipeline. |
|
Unified diffusion pipeline wrapper for all model types. |
Functions#
Dynamically import a class from diffusers by name. |
|
Register custom parallelization strategies. |
|
Choose device, defaulting to CUDA with LOCAL_RANK if available. |
|
Iterate over nn.Module components in a pipeline. |
|
Move module to device with specified dtype. |
|
Ensure that all parameters in the given module are trainable. |
|
Factory function to create the appropriate parallel manager based on config. |
|
Apply FSDP2/DDP parallelization to pipeline components. |
Data#
API#
- nemo_automodel._diffusers.auto_diffusion_pipeline.logger#
‘getLogger(…)’
- nemo_automodel._diffusers.auto_diffusion_pipeline.ParallelManager#
None
- class nemo_automodel._diffusers.auto_diffusion_pipeline.PipelineSpec#
YAML-driven specification for loading a diffusion pipeline.
This is required for from_config (pretraining with random weights). Not needed for from_pretrained (finetuning).
Example YAML: pipeline_spec: transformer_cls: “FluxTransformer2DModel” pipeline_cls: “FluxPipeline” # Optional subfolder: “transformer” load_full_pipeline: false enable_gradient_checkpointing: true
- transformer_cls: str = <Multiline-String>#
- pipeline_cls: Optional[str]#
None
- subfolder: str#
‘transformer’
- load_full_pipeline: bool#
False
- enable_gradient_checkpointing: bool#
True
- low_cpu_mem_usage: bool#
True
- classmethod from_dict(
- d: Optional[Dict[str, Any]],
Create PipelineSpec from YAML dict.
- validate_for_from_config()#
Validate spec has required fields for from_config.
- nemo_automodel._diffusers.auto_diffusion_pipeline._import_diffusers_class(class_name: str)#
Dynamically import a class from diffusers by name.
- nemo_automodel._diffusers.auto_diffusion_pipeline._init_parallelizer()#
Register custom parallelization strategies.
- nemo_automodel._diffusers.auto_diffusion_pipeline._choose_device(device: Optional[torch.device]) torch.device#
Choose device, defaulting to CUDA with LOCAL_RANK if available.
- nemo_automodel._diffusers.auto_diffusion_pipeline._iter_pipeline_modules(
- pipe,
Iterate over nn.Module components in a pipeline.
- nemo_automodel._diffusers.auto_diffusion_pipeline._move_module_to_device(
- module: torch.nn.Module,
- device: torch.device,
- torch_dtype: Any,
Move module to device with specified dtype.
- nemo_automodel._diffusers.auto_diffusion_pipeline._ensure_params_trainable(
- module: torch.nn.Module,
- module_name: Optional[str] = None,
Ensure that all parameters in the given module are trainable.
Returns the number of parameters marked trainable.
- nemo_automodel._diffusers.auto_diffusion_pipeline._create_parallel_manager(
- manager_args: Dict[str, Any],
Factory function to create the appropriate parallel manager based on config.
Constructs the proper config objects (FSDP2Config / DDPConfig) and, for FSDP2, creates the required device mesh before instantiating the manager. This mirrors the pattern used by
_instantiate_distributedin the transformers infrastructure.The manager type is determined by the
_manager_typekey in manager_args:'ddp': Creates :class:DDPConfig+ :class:DDPManager'fsdp2'(default): Creates :class:FSDP2Config, builds a- class:
DeviceMeshvia :func:create_device_mesh, then creates- class:
FSDP2Manager
- Parameters:
manager_args –
Flat dictionary of arguments. Recognised keys:
Common:
_manager_type(str):'fsdp2'or'ddp'.activation_checkpointing(bool): Enable activation checkpointing.backend(str): Distributed backend (default'nccl').FSDP2-specific (mesh creation):
world_size(int): Total number of processes.dp_size,dp_replicate_size,tp_size,cp_size,pp_size,ep_size(int): Parallelism dimensions.FSDP2-specific (config):
mp_policy: :class:MixedPrecisionPolicyinstance.sequence_parallel(bool),tp_plan(dict),offload_policy,defer_fsdp_grad_sync(bool).- Returns:
Either an FSDP2Manager or DDPManager instance.
- Raises:
ValueError – If an unknown manager type is specified.
- nemo_automodel._diffusers.auto_diffusion_pipeline._apply_parallelization(
- pipe,
- parallel_scheme: Optional[Dict[str, Dict[str, Any]]],
Apply FSDP2/DDP parallelization to pipeline components.
- class nemo_automodel._diffusers.auto_diffusion_pipeline.NeMoAutoDiffusionPipeline(transformer=None, **components)#
Unified diffusion pipeline wrapper for all model types.
This class serves dual purposes:
Provides class methods (from_pretrained, from_config) for loading pipelines
Acts as a minimal wrapper when load_full_pipeline=False (transformer-only mode)
Two loading paths:
from_pretrained: Uses DiffusionPipeline auto-detection (for finetuning) No pipeline_spec needed - pipeline type is auto-detected from model_index.json
from_config: Uses YAML-specified transformer class (for pretraining) Requires pipeline_spec with transformer_cls in YAML config
Features:
Accepts a per-component mapping from component name to parallel manager init args
Moves all nn.Module components to the chosen device/dtype
Parallelizes only components present in the mapping by constructing a manager per component
Supports both FSDP2Manager and DDPManager via ‘_manager_type’ key in config
Gradient checkpointing support for memory efficiency
parallel_scheme:
Dict[str, Dict[str, Any]]: component name -> kwargs for parallel manager
Each component’s kwargs should include ‘_manager_type’: ‘fsdp2’ or ‘ddp’ (defaults to ‘fsdp2’)
Initialization
Initialize NeMoAutoDiffusionPipeline.
- Parameters:
transformer – The transformer model instance
**components – Additional pipeline components (vae, text_encoder, etc.)
- property components: Dict[str, Any]#
Return components dict for compatibility.
- classmethod from_pretrained(
- pretrained_model_name_or_path: str,
- *model_args,
- parallel_scheme: Optional[Dict[str, Dict[str, Any]]] = None,
- device: Optional[torch.device] = None,
- torch_dtype: Any = torch.bfloat16,
- move_to_device: bool = True,
- load_for_training: bool = False,
- components_to_load: Optional[Iterable[str]] = None,
- enable_gradient_checkpointing: bool = True,
- **kwargs,
Load pipeline from pretrained weights using DiffusionPipeline auto-detection.
This method auto-detects the pipeline type from model_index.json and loads all components. Use this for finetuning existing models.
No pipeline_spec is needed - the pipeline type is determined automatically.
- Parameters:
pretrained_model_name_or_path – HuggingFace model ID or local path
parallel_scheme – Dict mapping component names to parallel manager kwargs. Each component’s kwargs should include ‘_manager_type’: ‘fsdp2’ or ‘ddp’
device – Device to load model to
torch_dtype – Data type for model parameters
move_to_device – Whether to move modules to device
load_for_training – Whether to make parameters trainable
components_to_load – Which components to process (default: all)
enable_gradient_checkpointing – Enable gradient checkpointing for transformer
**kwargs – Additional arguments passed to DiffusionPipeline.from_pretrained
- Returns:
Tuple of (DiffusionPipeline, Dict[str, ParallelManager])
- classmethod from_config(
- model_id: str,
- pipeline_spec: Dict[str, Any],
- torch_dtype: torch.dtype = torch.bfloat16,
- device: Optional[torch.device] = None,
- parallel_scheme: Optional[Dict[str, Dict[str, Any]]] = None,
- move_to_device: bool = True,
- components_to_load: Optional[Iterable[str]] = None,
- **kwargs,
Initialize pipeline with random weights using YAML-specified transformer class.
This method uses the transformer_cls from pipeline_spec to create a model with random weights. Use this for pretraining from scratch.
Requires pipeline_spec in YAML config with at least: pipeline_spec: transformer_cls: “FluxTransformer2DModel” # or WanTransformer3DModel, etc. subfolder: “transformer”
- Parameters:
model_id – HuggingFace model ID or local path (for loading config)
pipeline_spec – Dict from YAML config with transformer_cls, subfolder, etc.
torch_dtype – Data type for model parameters
device – Device to load model to
parallel_scheme – Dict mapping component names to parallel manager kwargs
move_to_device – Whether to move modules to device
components_to_load – Which components to process (default: all)
**kwargs – Additional arguments
- Returns:
Tuple of (NeMoAutoDiffusionPipeline or DiffusionPipeline, Dict[str, ParallelManager])