> For clean Markdown of any page, append .md to the page URL.
> For a complete documentation index, see https://docs.nvidia.com/nemo/curator/llms.txt.
> For full documentation content, see https://docs.nvidia.com/nemo/curator/llms-full.txt.

# nemo_curator.stages.deduplication.shuffle_utils.rapidsmpf_shuffler

## Module Contents

### Classes

| Name                                                                                                                 | Description                                   |
| -------------------------------------------------------------------------------------------------------------------- | --------------------------------------------- |
| [`BulkRapidsMPFShuffler`](#nemo_curator-stages-deduplication-shuffle_utils-rapidsmpf_shuffler-BulkRapidsMPFShuffler) | Class that performs a bulk shuffle operation. |

### API

<Anchor id="nemo_curator-stages-deduplication-shuffle_utils-rapidsmpf_shuffler-BulkRapidsMPFShuffler">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    class nemo_curator.stages.deduplication.shuffle_utils.rapidsmpf_shuffler.BulkRapidsMPFShuffler(
        nranks: int,
        total_nparts: int,
        shuffle_on: list[str],
        output_path: str = './',
        rmm_pool_size: int | typing.Literal['auto'] | None = 'auto',
        spill_memory_limit: int | typing.Literal['auto'] | None = 'auto',
        enable_statistics: bool = False,
        read_kwargs: dict[str, typing.Any] | None = None,
        write_kwargs: dict[str, typing.Any] | None = None
    )
    ```
  </CodeBlock>
</Anchor>

<Indent>
  **Bases:** `BaseShufflingActor`

  Class that performs a bulk shuffle operation.
  This class is compatible with Ray Actors communicating with each other using UCXX communication.
  Parameters
  ----------

  nranks
  Number of ranks in the communication group.
  total\_nparts
  Total number of output partitions.
  shuffle\_on
  List of column names to shuffle on.
  output\_path
  Path to write output files.
  rmm\_pool\_size
  Size of the RMM GPU memory pool in bytes.
  If "auto", the memory pool is set to 90% of the free GPU memory.
  If None, the memory pool is set to 50% of the free GPU memory that can expand if needed.
  spill\_memory\_limit
  Device memory limit in bytes for spilling to host.
  If "auto", the limit is set to 80% of the RMM pool size.
  If None spilling is disabled.
  enable\_statistics
  Whether to collect shuffle statistics.
  read\_kwargs
  Keyword arguments for cudf.read\_parquet method.
  write\_kwargs
  Keyword arguments for cudf.to\_parquet method.

  <ParamField path="read_kwargs" type="= read_kwargs if read_kwargs is not None else {}" />

  <ParamField path="rmm_pool_size" type="= align_down_to_256(rmm_pool_size)" />

  <ParamField path="spill_memory_limit" type="= align_down_to_256(spill_memory_limit)" />

  <ParamField path="write_kwargs" type="= write_kwargs if write_kwargs is not None else {}" />

  <Anchor id="nemo_curator-stages-deduplication-shuffle_utils-rapidsmpf_shuffler-BulkRapidsMPFShuffler-cleanup">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.shuffle_utils.rapidsmpf_shuffler.BulkRapidsMPFShuffler.cleanup() -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Cleanup the UCXX communication and the shuffle operation.
  </Indent>

  <Anchor id="nemo_curator-stages-deduplication-shuffle_utils-rapidsmpf_shuffler-BulkRapidsMPFShuffler-extract">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.shuffle_utils.rapidsmpf_shuffler.BulkRapidsMPFShuffler.extract() -> collections.abc.Iterator[tuple[int, pylibcudf.Table]]
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Extract shuffled partitions as they become ready.

    ## Returns

    An iterator over the shuffled partitions.
  </Indent>

  <Anchor id="nemo_curator-stages-deduplication-shuffle_utils-rapidsmpf_shuffler-BulkRapidsMPFShuffler-extract_and_write">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.shuffle_utils.rapidsmpf_shuffler.BulkRapidsMPFShuffler.extract_and_write(
          column_names: list[str]
      ) -> list[tuple[int, str]]
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Extract and write shuffled partitions.

    ## Parameters

    column\_names
    The column names of the table.
  </Indent>

  <Anchor id="nemo_curator-stages-deduplication-shuffle_utils-rapidsmpf_shuffler-BulkRapidsMPFShuffler-insert_chunk">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.shuffle_utils.rapidsmpf_shuffler.BulkRapidsMPFShuffler.insert_chunk(
          table: pylibcudf.Table | cudf.DataFrame,
          column_names: list[str]
      ) -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Insert a pylibcudf Table or cuDF DataFrame into the shuffler.

    ## Parameters

    table
    The table or DataFrame to insert.
    column\_names
    The column names of the table.
  </Indent>

  <Anchor id="nemo_curator-stages-deduplication-shuffle_utils-rapidsmpf_shuffler-BulkRapidsMPFShuffler-insert_finished">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.shuffle_utils.rapidsmpf_shuffler.BulkRapidsMPFShuffler.insert_finished() -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Tell the shuffler that we are done inserting data.
  </Indent>

  <Anchor id="nemo_curator-stages-deduplication-shuffle_utils-rapidsmpf_shuffler-BulkRapidsMPFShuffler-read_and_insert">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.shuffle_utils.rapidsmpf_shuffler.BulkRapidsMPFShuffler.read_and_insert(
          paths: list[str],
          batchsize: int | None = None
      ) -> list[str]
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Read the list of parquet files every batchsize and insert the partitions into the shuffler.

    ## Parameters

    paths
    List of file paths to the Parquet files.
    batchsize
    Number of files to read in each batch.

    ## Returns

    The column names of the table.
  </Indent>

  <Anchor id="nemo_curator-stages-deduplication-shuffle_utils-rapidsmpf_shuffler-BulkRapidsMPFShuffler-read_batch">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.shuffle_utils.rapidsmpf_shuffler.BulkRapidsMPFShuffler.read_batch(
          paths: list[str]
      ) -> tuple[cudf.DataFrame | None, list[str]]
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Read a single batch of Parquet files using cuDF.

    ## Parameters

    paths
    List of file paths to the Parquet files.

    ## Returns

    A tuple containing the DataFrame (or None if empty) and the column names.
  </Indent>

  <Anchor id="nemo_curator-stages-deduplication-shuffle_utils-rapidsmpf_shuffler-BulkRapidsMPFShuffler-setup_worker">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.shuffle_utils.rapidsmpf_shuffler.BulkRapidsMPFShuffler.setup_worker(
          root_address_bytes: bytes
      ) -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Setup the UCXX communication and a shuffle operation.

    ## Parameters

    root\_address\_bytes
    Address of the root worker for UCXX initialization.
  </Indent>

  <Anchor id="nemo_curator-stages-deduplication-shuffle_utils-rapidsmpf_shuffler-BulkRapidsMPFShuffler-write_table">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.shuffle_utils.rapidsmpf_shuffler.BulkRapidsMPFShuffler.write_table(
          table: pylibcudf.Table,
          output_path: str,
          partition_id: int | str,
          column_names: list[str]
      ) -> str
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Write a pylibcudf Table to a Parquet file using cuDF.

    ## Parameters

    table
    The table to write.
    output\_path
    The path to write the table to.
    partition\_id
    Partition id used for naming the output file.
    column\_names
    The column names of the table.
  </Indent>
</Indent>