> For clean Markdown of any page, append .md to the page URL.
> For a complete documentation index, see https://docs.nvidia.com/nemo/automodel/llms.txt.
> For AI client integration (Claude Code, Cursor, etc.), connect to the MCP server at https://docs.nvidia.com/nemo/automodel/_mcp/server.

# nemo_automodel.components.flow_matching.pipeline

FlowMatching Pipeline - Model-agnostic implementation with adapter pattern.

This module provides a unified FlowMatchingPipeline class that is completely
independent of specific model implementations through the ModelAdapter abstraction.

Features:

* Model-agnostic design via ModelAdapter protocol
* Various timestep sampling strategies (uniform, logit\_normal, mode, lognorm)
* Flow shift transformation
* Sigma clamping for finetuning
* Loss weighting
* Detailed training logging

## Module Contents

### Classes

| Name                                                                                                           | Description                                             |
| -------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------- |
| [`FlowMatchingPipeline`](#nemo_automodel-components-flow_matching-pipeline-FlowMatchingPipeline)               | Flow Matching Pipeline - Model-agnostic implementation. |
| [`LinearInterpolationSchedule`](#nemo_automodel-components-flow_matching-pipeline-LinearInterpolationSchedule) | Simple linear interpolation schedule for flow matching. |

### Functions

| Name                                                                                   | Description                                                    |
| -------------------------------------------------------------------------------------- | -------------------------------------------------------------- |
| [`create_adapter`](#nemo_automodel-components-flow_matching-pipeline-create_adapter)   | Factory function to create a model adapter by name.            |
| [`create_pipeline`](#nemo_automodel-components-flow_matching-pipeline-create_pipeline) | Factory function to create a pipeline with a specific adapter. |

### Data

[`logger`](#nemo_automodel-components-flow_matching-pipeline-logger)

### API

```python
class nemo_automodel.components.flow_matching.pipeline.FlowMatchingPipeline(
    model_adapter: nemo_automodel.components.flow_matching.adapters.ModelAdapter,
    num_train_timesteps: int = 1000,
    timestep_sampling: str = 'logit_normal',
    flow_shift: float = 3.0,
    i2v_prob: float = 0.3,
    cfg_dropout_prob: float = 0.1,
    logit_mean: float = 0.0,
    logit_std: float = 1.0,
    mix_uniform_ratio: float = 0.1,
    use_sigma_noise: bool = True,
    sigma_min: float = 0.0,
    sigma_max: float = 1.0,
    use_loss_weighting: bool = True,
    loss_weighting_scheme: str = 'linear',
    log_interval: int = 100,
    summary_log_interval: int = 10,
    device: typing.Optional[torch.device] = None
)
```

Flow Matching Pipeline - Model-agnostic implementation.

This pipeline handles all flow matching training logic while delegating
model-specific operations to a ModelAdapter. This allows adding support
for new model architectures without modifying the pipeline code.

Features:

* Noise scheduling with linear interpolation
* Timestep sampling with various strategies
* Optional sigma-noise flow shift toggle
* Flow shift transformation
* Sigma clamping for finetuning
* Loss weighting
* Detailed training logging

```python
nemo_automodel.components.flow_matching.pipeline.FlowMatchingPipeline._build_bsmntw_weights() -> torch.Tensor
```

Build Bell-Shaped Midpoint Noise Timestep Weighting table.

Returns a 1D tensor of length num\_train\_timesteps with weights
following a Gaussian bell curve centered at the midpoint (t=steps/2).

```python
nemo_automodel.components.flow_matching.pipeline.FlowMatchingPipeline._log_detailed(
    global_step: int,
    sampling_method: str,
    batch_size: int,
    sigma: torch.Tensor,
    timesteps: torch.Tensor,
    latents: torch.Tensor,
    noise: torch.Tensor,
    noisy_latents: torch.Tensor
)
```

Log detailed training information.

```python
nemo_automodel.components.flow_matching.pipeline.FlowMatchingPipeline._log_loss_detailed(
    global_step: int,
    model_pred: torch.Tensor,
    target: torch.Tensor,
    loss_weight: torch.Tensor,
    unweighted_loss: torch.Tensor,
    weighted_loss: torch.Tensor
)
```

Log detailed loss information.

```python
nemo_automodel.components.flow_matching.pipeline.FlowMatchingPipeline._sample_from_distribution(
    batch_size: int,
    device: torch.device
) -> torch.Tensor
```

Sample u values from the configured distribution.

```python
nemo_automodel.components.flow_matching.pipeline.FlowMatchingPipeline.compute_loss(
    model_pred: torch.Tensor,
    target: torch.Tensor,
    sigma: torch.Tensor,
    batch: typing.Optional[typing.Dict[str, typing.Any]] = None
) -> typing.Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, typing.Optional[torch.Tensor]]
```

Compute flow matching loss with optional weighting.

Loss weight: w = 1 + flow\_shift \* σ

**Parameters:**

Model prediction

Target (velocity = noise - clean)

Sigma values for each sample

Optional batch dictionary containing loss\_mask

**Returns:** `torch.Tensor`

Per-element weighted loss

```python
nemo_automodel.components.flow_matching.pipeline.FlowMatchingPipeline.determine_task_type(
    data_type: str
) -> str
```

Determine task type based on data type and randomization.

```python
nemo_automodel.components.flow_matching.pipeline.FlowMatchingPipeline.sample_timesteps(
    batch_size: int,
    device: typing.Optional[torch.device] = None
) -> typing.Tuple[torch.Tensor, torch.Tensor, str]
```

Sample timesteps and compute sigma values with flow shift.

Implements the flow shift transformation:
σ = shift / (shift + (1/u - 1))

**Parameters:**

Number of timesteps to sample

Device for tensor operations

**Returns:** `torch.Tensor`

Sigma values in \[sigma\_min, sigma\_max]

```python
nemo_automodel.components.flow_matching.pipeline.FlowMatchingPipeline.step(
    model: torch.nn.Module,
    batch: typing.Dict[str, typing.Any],
    device: torch.device = torch.device('cuda'),
    dtype: torch.dtype = torch.bfloat16,
    global_step: int = 0,
    collect_metrics: bool = True,
    check_loss: bool = True
) -> typing.Tuple[torch.Tensor, torch.Tensor, typing.Optional[torch.Tensor], typing.Dict[str, typing.Any]]
```

Execute a single training step with flow matching.

Expected batch format:
\{
"video\_latents": torch.Tensor,  # \[B, C, F, H, W] for video
OR
"image\_latents": torch.Tensor,  # \[B, C, H, W] for image
"text\_embeddings": torch.Tensor,  # \[B, seq\_len, dim]
"data\_type": str,  # "video" or "image" (optional)

# ... additional model-specific keys handled by adapter

}

**Parameters:**

The model to train

Batch of training data

Device to use

Data type for operations

Current training step (for logging)

Whether to collect scalar diagnostics. Disable in
hot training paths to avoid host/device synchronizations.

Whether to run scalar loss explosion checks. Disable in
hot training paths to avoid host/device synchronizations.

**Returns:** `torch.Tensor`

Per-element weighted loss

```python
class nemo_automodel.components.flow_matching.pipeline.LinearInterpolationSchedule()
```

Simple linear interpolation schedule for flow matching.

```python
nemo_automodel.components.flow_matching.pipeline.LinearInterpolationSchedule.forward(
    x0: torch.Tensor,
    x1: torch.Tensor,
    sigma: torch.Tensor
) -> torch.Tensor
```

Linear interpolation: x\_t = (1 - σ) \* x\_0 + σ \* x\_1

**Parameters:**

Starting point (clean latents)

Ending point (noise)

Sigma values in \[0, 1]

**Returns:** `torch.Tensor`

Interpolated tensor at sigma

```python
nemo_automodel.components.flow_matching.pipeline.create_adapter(
    adapter_type: str,
    kwargs = {}
) -> nemo_automodel.components.flow_matching.adapters.ModelAdapter
```

Factory function to create a model adapter by name.

**Parameters:**

Type of adapter ("hunyuan", "simple", "flux", "flux2", "qwen\_image")

Additional arguments passed to the adapter constructor

**Returns:** `ModelAdapter`

ModelAdapter instance

```python
nemo_automodel.components.flow_matching.pipeline.create_pipeline(
    adapter_type: str,
    adapter_kwargs: typing.Optional[typing.Dict[str, typing.Any]] = None,
    pipeline_kwargs = {}
) -> nemo_automodel.components.flow_matching.pipeline.FlowMatchingPipeline
```

Factory function to create a pipeline with a specific adapter.

**Parameters:**

Type of adapter ("hunyuan", "simple")

Arguments for the adapter constructor

Arguments for the pipeline constructor

**Returns:** `FlowMatchingPipeline`

FlowMatchingPipeline instance

```python
nemo_automodel.components.flow_matching.pipeline.logger = logging.getLogger(__name__)
```