bridge.data.energon.task_encoder_utils#
Shared utilities for Energon-based VLM task encoders.
Contains helpers extracted from the Qwen-VL task encoder so they can be
reused by the generic HFEncoderVLMTaskEncoder and any future
model-specific encoders.
Module Contents#
Classes#
Multi-turn complex samples with images and videos. |
|
Webdataset decoder handler for video fields stored as pickled frame lists. |
|
Webdataset factory for multi-turn ChatML samples with multimodal support. |
Functions#
Build masks and position ids for a left-to-right model. |
|
Find the |
|
Convert a |
|
Convert WDS tensor images to PIL to match the HF flow input format. |
|
Convert WDS video frame tensors to PIL to match the HF flow input format. |
|
Normalize a ChatML conversation to |
Data#
API#
- bridge.data.energon.task_encoder_utils.IGNORE_INDEX#
None
- bridge.data.energon.task_encoder_utils.get_ltor_masks_and_position_ids(
- data: torch.Tensor,
- eod_token: int,
- eod_mask_loss: bool,
- reset_attention_mask: bool,
- reset_position_ids: bool,
- compute_attention_mask: bool = True,
Build masks and position ids for a left-to-right model.
- Parameters:
data β Input token ids of shape
[b, s].eod_token β End-of-document token id.
eod_mask_loss β If True, zero out loss at EOD positions.
reset_attention_mask β If True, block cross-document attention.
reset_position_ids β If True, restart position ids after each EOD.
compute_attention_mask β If False, skip attention mask computation and return
Nonefor the mask.
- Returns:
attention_mask β
[att_mask_batch, 1, s, s]boolean mask (True= masked / blocked) orNonewhen compute_attention_mask is False.loss_mask β
[b, s]float mask (1.0 = keep, 0.0 = drop).position_ids β
[b, s]position indices.
- Return type:
Tuple of
(attention_mask, loss_mask, position_ids)where
- bridge.data.energon.task_encoder_utils.find_pattern_indices(sequence: numpy.ndarray, pattern, start: int = 0)#
Find the
[start, end)indices of the first occurrence of pattern in sequence.- Parameters:
sequence β 1-D array (or list) to search in.
pattern β Sub-sequence to look for.
start β Index in sequence at which to begin searching.
- Returns:
(start_idx, end_idx)of the first match, or(-1, -1)if not found.
- bridge.data.energon.task_encoder_utils._tensor_to_pil(t)#
Convert a
[C, H, W]float tensor in[0, 1]to a PIL Image (uint8[0, 255]).
- bridge.data.energon.task_encoder_utils._images_to_pil(imgs)#
Convert WDS tensor images to PIL to match the HF flow input format.
WDS
imagehandlerdecodes JPEG to float tensors in[0, 1]. The HF flow passes PIL images (uint8[0, 255]) to the processor. Converting to PIL here ensures the processor applies identical rescaling and normalization in both flows.- Parameters:
imgs β A single
[C, H, W]tensor, a[N, C, H, W]batch tensor, or a list of tensors / PIL images.- Returns:
A list of PIL images, or the input unchanged if it is not a tensor.
- bridge.data.energon.task_encoder_utils._videos_to_pil(videos)#
Convert WDS video frame tensors to PIL to match the HF flow input format.
- Parameters:
videos β A list of videos, where each video is either a list of frame tensors or a
[N, C, H, W]batch tensor.Noneis passed through unchanged.- Returns:
A nested list
[[PIL.Image, ...], ...]with one sub-list per video, orNoneif videos isNone.
- class bridge.data.energon.task_encoder_utils.ChatMLSample#
Bases:
megatron.energon.flavors.base_dataset.SampleMulti-turn complex samples with images and videos.
- conversation: str#
None
- imgs: Optional[List[torch.Tensor]]#
None
- videos: Optional[List[List[torch.Tensor]]]#
None
- class bridge.data.energon.task_encoder_utils.videohandler(imagespec)#
Webdataset decoder handler for video fields stored as pickled frame lists.
Initialization
- __call__(key, data)#
Decode pickled video data into lists of image tensors.
- class bridge.data.energon.task_encoder_utils.ChatMLWebdataset(
- path: megatron.energon.epathlib.epath.EPath,
- *,
- auto_decode: bool = True,
- image_decode_spec: Optional[str] = None,
- **kwargs,
Bases:
megatron.energon.flavors.webdataset.DefaultDecoderWebdatasetFactory[bridge.data.energon.task_encoder_utils.ChatMLSample]Webdataset factory for multi-turn ChatML samples with multimodal support.
Extends
DefaultDecoderWebdatasetFactoryto decode webdataset shards intoChatMLSampleinstances, using custom handlers for image and video fields.- Parameters:
path β Root path of the webdataset shards.
auto_decode β Whether to install custom image/video decoders. Passed through to the parent class.
image_decode_spec β Decode spec forwarded to
imagehandler/videohandler(e.g."torchrgb"). WhenNone(the default), falls back to the parentβsimage_decodeattribute for backward compatibility with callers that set it via**kwargs, and ultimately defaults to"torchrgb".**kwargs β Forwarded to
DefaultDecoderWebdatasetFactory.__init__. Adecoderkey, if present, is silently dropped because this class installs its own decoder.
Initialization
- __sample_type__#
None
- bridge.data.energon.task_encoder_utils.cook_chatml_sample(conversation) List[Dict]#
Normalize a ChatML conversation to
[{"role": ..., "content": ...}, ...].Accepts both
from/value(GPT-style) androle/content(OpenAI-style) formats, with an optional leading system turn when the total number of turns is odd.- Parameters:
conversation β A JSON string, bytes, list of turn dicts, or a dict with a
"conversations"key.- Returns:
A list of dicts with
rolein{"system", "user", "assistant"}andcontentas a plain string.