Skip to content

Add User-defined Stages to Your NeMo Retriever Extraction Pipeline

This documentation demonstrates how to add user-defined stages to your NeMo Retriever extraction pipeline. You can directly import a function, or use a string module path, and include robust signature validation. By following these steps, your Lambda stages are robust, signature-validated, plug-and-play for your RayPipeline, and operate on a well-defined DataFrame payload and metadata structure.

Note

NeMo Retriever extraction is also known as NVIDIA Ingest and nv-ingest.

To add user-defined stages to your pipeline, you need the following:

  • A callable function — Your function must have the following exact signature. For more information, refer to .

    def my_fn(control_message: IngestControlMessage, stage_config: MyConfig) -> IngestControlMessage:
    
  • A DataFrame payload — The control_message.payload field must be a pandas.DataFrame. For more information, refer to Create a DataFrame Payload.

  • Valid metadata — The metadata field must conform to the nv-ingest metadata schema. For more information, refer to Update and Validate Metadata.

Create a Lambda Function and Config

Your function must have the following exact signature.

def my_fn(control_message: IngestControlMessage, stage_config: MyConfig) -> IngestControlMessage:
...
  • The first parameter is named control_message and is an IngestControlMessage.
  • The second parameter is named stage_config and must be a subclass of pydantic.BaseModel.
  • The return value is an IngestControlMessage.

The following example demonstrates how to create a valid Lambda function and configuration.

import pandas as pd
from pydantic import BaseModel
from nv_ingest_api.internal.primitives.ingest_control_message import IngestControlMessage
from nv_ingest_api.internal.schemas.meta.metadata_schema import validate_metadata

# Config schema for your stage
class MyToyConfig(BaseModel):
  set_processed: bool = True

def toy_stage_fn(control_message: IngestControlMessage, stage_config: MyToyConfig) -> IngestControlMessage:
  df = control_message.payload()

  # Set 'processed' flag in the allowed 'content_metadata' dict (if present)
  def update_metadata(meta):
    meta = dict(meta)

    # Only update if 'content_metadata' exists and is a dict
    if "content_metadata" in meta and isinstance(meta["content_metadata"], dict):
      meta["content_metadata"] = dict(meta["content_metadata"])  # ensure copy
      meta["content_metadata"]["processed"] = stage_config.set_processed
    validate_metadata(meta)
    return meta

  df["metadata"] = df["metadata"].apply(update_metadata)
  control_message.payload(df)
  return control_message

Create a DataFrame Payload

The control_message.payload field must be a pandas.DataFrame with the following columns.

  • document_type
  • source_id
  • job_id
  • metadata

The following example demonstrates an input payload (before the stage runs) as a DataFrame with valid metadata.

import pandas as pd

df = pd.DataFrame([
  {
    "document_type": "invoice",
    "source_id": "A123",
    "job_id": "job-001",
    "metadata": {
      "content": "example",
      "content_metadata": {"type": "pdf"},
      "source_metadata": {"source_id": "A123", "source_type": "pdf"},
    },
  },
  {
    "document_type": "report",
    "source_id": "B456",
    "job_id": "job-002",
    "metadata": {
      "content": "another",
      "content_metadata": {"type": "pdf"},
      "source_metadata": {"source_id": "B456", "source_type": "pdf"},
    }
  }
])

Add a Stage (Function is Imported)

If your function is already imported in Python, use the following code to add your user-defined stage to the pipeline.

config = MyToyConfig(flag_field="my_flag")

pipeline.add_stage(
  name="toy_stage",
  stage_actor=toy_stage_fn,
  config=config,
  min_replicas=1,
  max_replicas=2,
)

Add a Stage (Function is Defined in a Module)

If your function is defined in a module, use the following code to add your user-defined stage to the pipeline. In this example, the function is defined in my_project.stages:toy_stage_fn.

config = MyToyConfig(flag_field="has_been_processed")

pipeline.add_stage(
  name="toy_stage",
  stage_actor="my_project.stages:toy_stage_fn",
  config=config,
  min_replicas=1,
  max_replicas=2,
)

When the pipeline runs it does the following:

  • Import and validate the function (using resolve_callable_from_path).
  • Automatically wrap it as a Ray stage.
  • Enforce the signature and parameter naming rules.

Update and Validate Metadata

The metadata column in each row is a dictionary (JSON object), and must conform to the nv-ingest metadata schema.

After you change any metadata, you can validate it by using the validate_metadata function as demonstrated in the following code example.

from nv_ingest_api.internal.schemas.meta.metadata_schema import validate_metadata

def edit_metadata(control_message: IngestControlMessage, stage_config: MyToyConfig) -> IngestControlMessage:
  df = control_message.payload()

  def ensure_valid(meta):
    meta = dict(meta)
    # Only update an allowed nested metadata field
    if "content_metadata" in meta and isinstance(meta["content_metadata"], dict):
      meta["content_metadata"] = dict(meta["content_metadata"])
      meta["content_metadata"]["checked"] = True
    validate_metadata(meta)
    return meta

  df["metadata"] = df["metadata"].apply(ensure_valid)
  control_message.payload(df)
  return control_message

Troubleshoot Validation Failures

The following are some examples of reasons that Lambda functions are invalid and fail validation.

Wrong parameter names

The following Lambda function fails validation because the parameter names are incorrect. You should see an error message similar to TypeError: Expected parameter names: 'control_message', 'config'.

#Incorrect example, do not use
def bad_fn(msg: IngestControlMessage, cfg: MyToyConfig) -> IngestControlMessage:
...

Missing type annotations

The following Lambda function fails validation because the parameter and return types are missing. You should see an error message similar to TypeError.

#Incorrect example, do not use
def bad_fn(control_message, stage_config):
...

Best Practices

Use the following best practices to avoid validation failures.

  • Always use explicit type annotations and the required parameter names (control_message, stage_config).
  • Your config can be any subclass of pydantic.BaseModel.
  • Any errors in signature validation are raised with a clear message during pipeline construction.
  • You can use validate_metadata(meta) to assert compliance after metadata changes.

Minimal Complete Example

The following example adds user-defined stages to your NeMo Retriever extraction pipeline.

  1. The following code creates a function for a user-defined stage.

    # my_pipeline/stages.py
    from pydantic import BaseModel
    from nv_ingest_api.internal.primitives.ingest_control_message import IngestControlMessage
    from nv_ingest_api.internal.schemas.meta.metadata_schema import validate_metadata
    
    class DoubleConfig(BaseModel):
    multiply_by: int = 2
    
    def double_amount(control_message: IngestControlMessage, stage_config: DoubleConfig) -> IngestControlMessage:
    df = control_message.payload()
    
    # Suppose the metadata for each row includes 'amount' under 'content_metadata'
    def double_meta(meta):
        meta = dict(meta)
        if "content_metadata" in meta and isinstance(meta["content_metadata"], dict):
        cm = dict(meta["content_metadata"])
        if "amount" in cm and isinstance(cm["amount"], (int, float)):
            cm["amount"] *= stage_config.multiply_by
        meta["content_metadata"] = cm
        validate_metadata(meta)
        return meta
    
    df["metadata"] = df["metadata"].apply(double_meta)
    control_message.payload(df)
    return control_message
    
  2. The following code adds the user-defined stage to the pipeline.

    • (Option 1) For a function that is defined in the module my_pipeline.stages.

      from my_pipeline.stages import double_amount, DoubleConfig
      
      pipeline.add_stage(
      name="doubler",
      stage_actor="my_pipeline.stages:double_amount",
      config=DoubleConfig(multiply_by=3),
      min_replicas=1,
      max_replicas=2,
      )
      
    • (Option 2) For a function that you have already imported.

      pipeline.add_stage(
      name="doubler",
      stage_actor=double_amount,
      config=DoubleConfig(multiply_by=3),
      min_replicas=1,
      max_replicas=2,
      )