bridge.data.energon.task_encoder_utils#

Shared utilities for Energon-based VLM task encoders.

Contains helpers extracted from the Qwen-VL task encoder so they can be reused by the generic HFEncoderVLMTaskEncoder and any future model-specific encoders.

Module Contents#

Classes#

ChatMLSample

Multi-turn complex samples with images, videos, and audio.

videohandler

Webdataset decoder handler for video fields stored as pickled frame lists.

audiohandler

Webdataset decoder handler for audio fields stored as raw WAV/FLAC bytes.

ChatMLWebdataset

Webdataset factory for multi-turn ChatML samples with multimodal support.

Functions#

get_ltor_masks_and_position_ids

Build masks and position ids for a left-to-right model.

find_pattern_indices

Find the [start, end) indices of the first occurrence of pattern in sequence.

_tensor_to_pil

Convert a [C, H, W] float tensor in [0, 1] to a PIL Image (uint8 [0, 255]).

_images_to_pil

Convert WDS tensor images to PIL to match the HF flow input format.

_videos_to_pil

Convert WDS video frame tensors to PIL to match the HF flow input format.

cook_chatml_sample

Normalize a ChatML conversation to [{"role": ..., "content": ...}, ...].

Data#

API#

bridge.data.energon.task_encoder_utils.IGNORE_INDEX#

None

bridge.data.energon.task_encoder_utils.get_ltor_masks_and_position_ids(
data: torch.Tensor,
eod_token: int,
eod_mask_loss: bool,
reset_attention_mask: bool,
reset_position_ids: bool,
compute_attention_mask: bool = True,
)#

Build masks and position ids for a left-to-right model.

Parameters:
  • data – Input token ids of shape [b, s].

  • eod_token – End-of-document token id.

  • eod_mask_loss – If True, zero out loss at EOD positions.

  • reset_attention_mask – If True, block cross-document attention.

  • reset_position_ids – If True, restart position ids after each EOD.

  • compute_attention_mask – If False, skip attention mask computation and return None for the mask.

Returns:

  • attention_mask – [att_mask_batch, 1, s, s] boolean mask (True = masked / blocked) or None when compute_attention_mask is False.

  • loss_mask – [b, s] float mask (1.0 = keep, 0.0 = drop).

  • position_ids – [b, s] position indices.

Return type:

Tuple of (attention_mask, loss_mask, position_ids) where

bridge.data.energon.task_encoder_utils.find_pattern_indices(sequence: numpy.ndarray, pattern, start: int = 0)#

Find the [start, end) indices of the first occurrence of pattern in sequence.

Parameters:
  • sequence – 1-D array (or list) to search in.

  • pattern – Sub-sequence to look for.

  • start – Index in sequence at which to begin searching.

Returns:

(start_idx, end_idx) of the first match, or (-1, -1) if not found.

bridge.data.energon.task_encoder_utils._tensor_to_pil(t)#

Convert a [C, H, W] float tensor in [0, 1] to a PIL Image (uint8 [0, 255]).

bridge.data.energon.task_encoder_utils._images_to_pil(imgs)#

Convert WDS tensor images to PIL to match the HF flow input format.

WDS imagehandler decodes JPEG to float tensors in [0, 1]. The HF flow passes PIL images (uint8 [0, 255]) to the processor. Converting to PIL here ensures the processor applies identical rescaling and normalization in both flows.

Parameters:

imgs – A single [C, H, W] tensor, a [N, C, H, W] batch tensor, or a list of tensors / PIL images.

Returns:

A list of PIL images, or the input unchanged if it is not a tensor.

bridge.data.energon.task_encoder_utils._videos_to_pil(videos)#

Convert WDS video frame tensors to PIL to match the HF flow input format.

Parameters:

videos – A list of videos, where each video is either a list of frame tensors or a [N, C, H, W] batch tensor. None is passed through unchanged.

Returns:

A nested list [[PIL.Image, ...], ...] with one sub-list per video, or None if videos is None.

class bridge.data.energon.task_encoder_utils.ChatMLSample#

Bases: megatron.energon.flavors.base_dataset.Sample

Multi-turn complex samples with images, videos, and audio.

conversation: str#

None

imgs: Optional[List[torch.Tensor]]#

None

videos: Optional[List[List[torch.Tensor]]]#

None

audio: Optional[torch.Tensor]#

None

class bridge.data.energon.task_encoder_utils.videohandler(imagespec)#

Webdataset decoder handler for video fields stored as pickled frame lists.

Initialization

__call__(key, data)#

Decode pickled video data into lists of image tensors.

class bridge.data.energon.task_encoder_utils.audiohandler#

Webdataset decoder handler for audio fields stored as raw WAV/FLAC bytes.

EXTENSIONS#

None

__call__(key, data)#
class bridge.data.energon.task_encoder_utils.ChatMLWebdataset(
path: megatron.energon.epathlib.epath.EPath,
*,
auto_decode: bool = True,
image_decode_spec: Optional[str] = None,
**kwargs,
)#

Bases: megatron.energon.flavors.webdataset.DefaultDecoderWebdatasetFactory[bridge.data.energon.task_encoder_utils.ChatMLSample]

Webdataset factory for multi-turn ChatML samples with multimodal support.

Extends DefaultDecoderWebdatasetFactory to decode webdataset shards into ChatMLSample instances, using custom handlers for image and video fields.

Parameters:
  • path – Root path of the webdataset shards.

  • auto_decode – Whether to install custom image/video decoders. Passed through to the parent class.

  • image_decode_spec – Decode spec forwarded to imagehandler / videohandler (e.g. "torchrgb"). When None (the default), falls back to the parent’s image_decode attribute for backward compatibility with callers that set it via **kwargs, and ultimately defaults to "torchrgb".

  • **kwargs – Forwarded to DefaultDecoderWebdatasetFactory.__init__. A decoder key, if present, is silently dropped because this class installs its own decoder.

Initialization

__sample_type__#

None

bridge.data.energon.task_encoder_utils.cook_chatml_sample(conversation) List[Dict]#

Normalize a ChatML conversation to [{"role": ..., "content": ...}, ...].

Accepts both from/value (GPT-style) and role/content (OpenAI-style) formats, with an optional leading system turn when the total number of turns is odd.

Parameters:

conversation – A JSON string, bytes, list of turn dicts, or a dict with a "conversations" key.

Returns:

A list of dicts with role in {"system", "user", "assistant"} and content as a plain string.