bridge.data.vlm_processing#

Shared VLM processing helpers for Energon and HF dataset paths.

Module Contents#

Classes#

NormalizedVLMSample

Source-normalized VLM sample consumed by shared processing.

Functions#

normalize_energon_vlm_sample

Normalize an Energon ChatMLSample into the shared VLM sample contract.

normalize_hf_vlm_example

Normalize a HF-style VLM dataset example into the shared sample contract.

normalized_vlm_sample_to_hf_example

Convert a normalized VLM sample into the HF-style collate example schema.

get_processor_tokenizer

Return the tokenizer attached to a processor, or the object itself.

find_token_span

Find the first [start, end) token span matching pattern.

gather_assistant_text_segments

Extract assistant text segments from a structured VLM conversation.

tokenize_text_without_special_tokens

Tokenize text using a HF-like tokenizer without adding special tokens.

_assistant_text_variants

build_assistant_loss_mask

Build an unshifted assistant-only loss mask using the current token-search behavior.

build_shifted_labels_and_loss_mask

Build next-token labels and shifted loss mask for Megatron training.

apply_assistant_labels_to_batch

Attach labels and loss_mask to a collated HF VLM batch.

ensure_position_ids

Ensure a collated batch has 2D position IDs.

pop_generic_visual_inputs

Move processor visual tensor keys into GenericVisualInputs.

API#

class bridge.data.vlm_processing.NormalizedVLMSample#

Source-normalized VLM sample consumed by shared processing.

Expected input format: Instances are produced by source adapters such as :func:normalize_energon_vlm_sample and :func:normalize_hf_vlm_example. conversation must be a structured list of chat turns in HF processor format, for example::

    [
        {"role": "user", "content": "describe <image>"},
        {"role": "assistant", "content": "a red car"},
    ]

``content`` may also be a multimodal content list accepted by
``processor.apply_chat_template``, for example::

    [{"type": "image", "image": image_obj}, {"type": "text", "text": "describe"}]

``images`` and ``videos`` are optional processor-ready modality payloads.
Energon adapters convert WDS tensors to PIL objects before populating
these fields; HF adapters may leave them ``None`` when media already lives
inline in ``conversation`` content.

Output format: Shared processing treats this as the single boundary contract before model-specific tokenization and vision preprocessing. It does not contain batched tensors; model-specific collators convert it into model input tensors.

conversation: list[dict[str, Any]]#

None

images: list[Any] | None#

None

videos: list[Any] | None#

None

audio: Any | None#

None

bridge.data.vlm_processing.normalize_energon_vlm_sample(
sample: Any,
) bridge.data.vlm_processing.NormalizedVLMSample#

Normalize an Energon ChatMLSample into the shared VLM sample contract.

Expected input format: sample is expected to expose the Energon ChatMLSample fields:

- ``conversation``: JSON string accepted by ``cook_chatml_sample``.  The
  JSON may use either ``{"role": ..., "content": ...}`` turns or
  ``{"from": ..., "value": ...}`` turns.
- ``imgs``: optional WDS decoded image tensor/list payload.
- ``videos``: optional WDS decoded video tensor/list payload.
- ``audio``: optional audio payload, passed through unchanged.

Output format: Returns NormalizedVLMSample where conversation is a list of {"role": str, "content": str | list[dict]} turns, images are PIL/list processor inputs or None, videos are nested PIL/list processor inputs or None, and audio is copied from the source sample when present.

bridge.data.vlm_processing.normalize_hf_vlm_example(
example: collections.abc.Mapping[str, Any],
) bridge.data.vlm_processing.NormalizedVLMSample#

Normalize a HF-style VLM dataset example into the shared sample contract.

Expected input format: example must contain "conversation" as a structured list of chat turns already produced by an HF dataset maker, for example::

    {
        "conversation": [
            {"role": "user", "content": [{"type": "image", "image": img}, {"type": "text", "text": "Q"}]},
            {"role": "assistant", "content": [{"type": "text", "text": "A"}]},
        ],
        "audio": optional_audio,
    }

Optional top-level ``images``/``image`` and ``videos``/``video`` fields
are accepted for maker variants that do not embed media inline in the
conversation.

Output format: Returns NormalizedVLMSample with a deep-copied structured conversation list, optional list-valued images and videos payloads, and optional audio. The adapter does not call cook_chatml_sample because HF makers have already normalized the chat schema.

Raises:

ValueError – If example["conversation"] is missing or is not a list.

bridge.data.vlm_processing.normalized_vlm_sample_to_hf_example(
sample: bridge.data.vlm_processing.NormalizedVLMSample,
*,
media_first: bool = False,
) dict[str, Any]#

Convert a normalized VLM sample into the HF-style collate example schema.

Expected input format: sample follows NormalizedVLMSample: conversation is a list of chat turns, and optional images/videos contain processor-ready media payloads such as PIL images or decoded video frame lists. Text turns may contain literal <image> / <video> placeholders.

Output format: Returns a dictionary suitable for VLM HF collate functions::

    {
        "conversation": [
            {
                "role": "user",
                "content": [
                    {"type": "image", "image": image_obj},
                    {"type": "text", "text": "describe"},
                ],
            },
            {"role": "assistant", "content": [{"type": "text", "text": "answer"}]},
        ],
        "images": [image_obj],   # present when sample.images is not None
        "videos": [video_obj],   # present when sample.videos is not None
        "audio": audio_obj,      # present when sample.audio is not None
    }

Inline media parts are populated from ``sample.images`` and
``sample.videos`` in placeholder order.  When ``media_first=True``,
media parts are moved before text parts within each turn to preserve
Qwen Energon's legacy media-before-text ordering while still using the
shared HF collate function.
bridge.data.vlm_processing.get_processor_tokenizer(processor: Any) Any#

Return the tokenizer attached to a processor, or the object itself.

bridge.data.vlm_processing.find_token_span(
sequence: collections.abc.Sequence[int] | torch.Tensor,
pattern: collections.abc.Sequence[int],
start: int = 0,
) tuple[int, int]#

Find the first [start, end) token span matching pattern.

Parameters:
  • sequence – Token id sequence to search.

  • pattern – Token id pattern to locate.

  • start – Index to begin searching from.

Returns:

(start, end) for the first match, or (-1, -1) when no match exists.

bridge.data.vlm_processing.gather_assistant_text_segments(
example_or_conversation: collections.abc.Mapping[str, Any] | collections.abc.Sequence[collections.abc.Mapping[str, Any]],
) list[str]#

Extract assistant text segments from a structured VLM conversation.

bridge.data.vlm_processing.tokenize_text_without_special_tokens(
tokenizer: Any,
text: str,
) list[int]#

Tokenize text using a HF-like tokenizer without adding special tokens.

bridge.data.vlm_processing._assistant_text_variants(
text: str,
*,
include_search_variants: bool,
) list[str]#
bridge.data.vlm_processing.build_assistant_loss_mask(
example_or_conversation: collections.abc.Mapping[str, Any] | collections.abc.Sequence[collections.abc.Mapping[str, Any]],
input_ids: collections.abc.Sequence[int] | torch.Tensor,
processor: Any,
skipped_tokens: torch.Tensor | None = None,
*,
include_search_variants: bool = True,
require_matches: bool = False,
warn_on_all_masked: bool = True,
) torch.Tensor#

Build an unshifted assistant-only loss mask using the current token-search behavior.

This intentionally preserves the existing text-search masking strategy. Step 3 of issue #4041 can replace this helper with a generation-tag or boundary-token implementation without touching every VLM collate/task encoder again.

bridge.data.vlm_processing.build_shifted_labels_and_loss_mask(
input_ids: torch.Tensor,
assistant_loss_mask: torch.Tensor,
skipped_tokens: torch.Tensor | None = None,
*,
ignore_index: int = IGNORE_INDEX,
) tuple[torch.Tensor, torch.Tensor]#

Build next-token labels and shifted loss mask for Megatron training.

bridge.data.vlm_processing.apply_assistant_labels_to_batch(
batch: collections.abc.MutableMapping[str, Any],
examples: collections.abc.Sequence[collections.abc.Mapping[str, Any]],
processor: Any,
skipped_tokens: torch.Tensor,
*,
unmask_last_token: bool = False,
) None#

Attach labels and loss_mask to a collated HF VLM batch.

bridge.data.vlm_processing.ensure_position_ids(
batch: collections.abc.MutableMapping[str, Any],
) None#

Ensure a collated batch has 2D position IDs.

bridge.data.vlm_processing.pop_generic_visual_inputs(
batch: collections.abc.MutableMapping[str, Any],
visual_keys: collections.abc.Sequence[str],
) megatron.bridge.training.utils.visual_inputs.GenericVisualInputs | None#

Move processor visual tensor keys into GenericVisualInputs.