bridge.data.energon.task_encoder_utils#

Shared utilities for Energon-based VLM task encoders.

Contains helpers extracted from the Qwen-VL task encoder so they can be reused by the generic HFEncoderVLMTaskEncoder and any future model-specific encoders.

Module Contents#

Classes#

ChatMLSample

Multi-turn complex samples with images and videos.

videohandler

Webdataset decoder handler for video fields stored as pickled frame lists.

ChatMLWebdataset

Webdataset factory for multi-turn ChatML samples with multimodal support.

Functions#

get_ltor_masks_and_position_ids

Build masks and position ids for a left-to-right model.

find_pattern_indices

Find the [start, end) indices of the first occurrence of pattern in sequence.

_tensor_to_pil

Convert a [C, H, W] float tensor in [0, 1] to a PIL Image (uint8 [0, 255]).

_images_to_pil

Convert WDS tensor images to PIL to match the HF flow input format.

_videos_to_pil

Convert WDS video frame tensors to PIL to match the HF flow input format.

cook_chatml_sample

Normalize a ChatML conversation to [{"role": ..., "content": ...}, ...].

Data#

API#

bridge.data.energon.task_encoder_utils.IGNORE_INDEX#

None

bridge.data.energon.task_encoder_utils.get_ltor_masks_and_position_ids(
data: torch.Tensor,
eod_token: int,
eod_mask_loss: bool,
reset_attention_mask: bool,
reset_position_ids: bool,
compute_attention_mask: bool = True,
)#

Build masks and position ids for a left-to-right model.

Parameters:
  • data – Input token ids of shape [b, s].

  • eod_token – End-of-document token id.

  • eod_mask_loss – If True, zero out loss at EOD positions.

  • reset_attention_mask – If True, block cross-document attention.

  • reset_position_ids – If True, restart position ids after each EOD.

  • compute_attention_mask – If False, skip attention mask computation and return None for the mask.

Returns:

  • attention_mask – [att_mask_batch, 1, s, s] boolean mask (True = masked / blocked) or None when compute_attention_mask is False.

  • loss_mask – [b, s] float mask (1.0 = keep, 0.0 = drop).

  • position_ids – [b, s] position indices.

Return type:

Tuple of (attention_mask, loss_mask, position_ids) where

bridge.data.energon.task_encoder_utils.find_pattern_indices(sequence: numpy.ndarray, pattern, start: int = 0)#

Find the [start, end) indices of the first occurrence of pattern in sequence.

Parameters:
  • sequence – 1-D array (or list) to search in.

  • pattern – Sub-sequence to look for.

  • start – Index in sequence at which to begin searching.

Returns:

(start_idx, end_idx) of the first match, or (-1, -1) if not found.

bridge.data.energon.task_encoder_utils._tensor_to_pil(t)#

Convert a [C, H, W] float tensor in [0, 1] to a PIL Image (uint8 [0, 255]).

bridge.data.energon.task_encoder_utils._images_to_pil(imgs)#

Convert WDS tensor images to PIL to match the HF flow input format.

WDS imagehandler decodes JPEG to float tensors in [0, 1]. The HF flow passes PIL images (uint8 [0, 255]) to the processor. Converting to PIL here ensures the processor applies identical rescaling and normalization in both flows.

Parameters:

imgs – A single [C, H, W] tensor, a [N, C, H, W] batch tensor, or a list of tensors / PIL images.

Returns:

A list of PIL images, or the input unchanged if it is not a tensor.

bridge.data.energon.task_encoder_utils._videos_to_pil(videos)#

Convert WDS video frame tensors to PIL to match the HF flow input format.

Parameters:

videos – A list of videos, where each video is either a list of frame tensors or a [N, C, H, W] batch tensor. None is passed through unchanged.

Returns:

A nested list [[PIL.Image, ...], ...] with one sub-list per video, or None if videos is None.

class bridge.data.energon.task_encoder_utils.ChatMLSample#

Bases: megatron.energon.flavors.base_dataset.Sample

Multi-turn complex samples with images and videos.

conversation: str#

None

imgs: Optional[List[torch.Tensor]]#

None

videos: Optional[List[List[torch.Tensor]]]#

None

class bridge.data.energon.task_encoder_utils.videohandler(imagespec)#

Webdataset decoder handler for video fields stored as pickled frame lists.

Initialization

__call__(key, data)#

Decode pickled video data into lists of image tensors.

class bridge.data.energon.task_encoder_utils.ChatMLWebdataset(
path: megatron.energon.epathlib.epath.EPath,
*,
auto_decode: bool = True,
image_decode_spec: Optional[str] = None,
**kwargs,
)#

Bases: megatron.energon.flavors.webdataset.DefaultDecoderWebdatasetFactory[bridge.data.energon.task_encoder_utils.ChatMLSample]

Webdataset factory for multi-turn ChatML samples with multimodal support.

Extends DefaultDecoderWebdatasetFactory to decode webdataset shards into ChatMLSample instances, using custom handlers for image and video fields.

Parameters:
  • path – Root path of the webdataset shards.

  • auto_decode – Whether to install custom image/video decoders. Passed through to the parent class.

  • image_decode_spec – Decode spec forwarded to imagehandler / videohandler (e.g. "torchrgb"). When None (the default), falls back to the parent’s image_decode attribute for backward compatibility with callers that set it via **kwargs, and ultimately defaults to "torchrgb".

  • **kwargs – Forwarded to DefaultDecoderWebdatasetFactory.__init__. A decoder key, if present, is silently dropped because this class installs its own decoder.

Initialization

__sample_type__#

None

bridge.data.energon.task_encoder_utils.cook_chatml_sample(conversation) List[Dict]#

Normalize a ChatML conversation to [{"role": ..., "content": ...}, ...].

Accepts both from/value (GPT-style) and role/content (OpenAI-style) formats, with an optional leading system turn when the total number of turns is odd.

Parameters:

conversation – A JSON string, bytes, list of turn dicts, or a dict with a "conversations" key.

Returns:

A list of dicts with role in {"system", "user", "assistant"} and content as a plain string.