***

title: Deploy Image Curation on Slurm
description: ''
---------------

This workflow covers the full image curation pipeline on Slurm, including model download, embedding generation, classification, filtering, and deduplication.

<Note>
  For details on image container environments and Slurm environment variables, see [Container Environments](/reference/infra/container-environments).
</Note>

## Prerequisites

* Create required directories for AWS credentials, NeMo Curator configuration, and local workspace:

  ```bash
  mkdir $HOME/.aws
  mkdir -p $HOME/.config/nemo_curator
  mkdir $HOME/nemo_curator_local_workspace
  ```

**Prepare configuration files:**

<Tabs>
  <Tab title="AWS Credentials">
    `$HOME/.aws/credentials` (for S3 access)

    See configuration files in the repository.
  </Tab>

  <Tab title="NeMo Curator Configuration">
    `$HOME/.config/nemo_curator/config.yaml` (for HuggingFace API key)

    See configuration files in the repository.
  </Tab>

  <Tab title="Image Processing Configuration">
    `$HOME/nemo_curator_local_workspace/image_config.yaml` (image processing parameters)

    ```yaml
    # Image curation configuration
    embedding:
      model_name: "vit_large_patch14_clip_quickgelu_224.openai"
      batch_size: 1024
      num_threads_per_worker: 16
      normalize_embeddings: true
      autocast: false

    aesthetic_classifier:
      score_threshold: 6.0
      batch_size: 512

    nsfw_classifier:
      score_threshold: 0.2
      batch_size: 512

    semantic_deduplication:
      max_iter: 100
      n_clusters: 50000
      random_state: 42
      eps_thresholds: [0.9, 0.95, 0.99]
      eps_to_extract: 0.95
      which_to_keep: "hard"

    filtering:
      min_resolution: 256
      max_aspect_ratio: 3.0
      min_aesthetic_score: 6.0
      max_nsfw_score: 0.2
    ```
  </Tab>
</Tabs>

***

## Model Download

1. Copy the following script for downloading all required image processing models into the Slurm cluster.

   ```bash
   #!/bin/bash

   #SBATCH --job-name=download_image_models
   #SBATCH -p defq
   #SBATCH --nodes=1
   #SBATCH --ntasks-per-node=1
   #SBATCH --exclusive
   #SBATCH --gres=gpu:1

   # Update Me!
   #SBATCH --output=/home/<username>/logs/%x_%j.log
   USER_DIR="/home/${USER}"
   CONTAINER_IMAGE="${USER_DIR}/path-to/curator.sqsh"
   #

   LOCAL_WORKSPACE="${USER_DIR}/nemo_curator_local_workspace"
   LOCAL_WORKSPACE_MOUNT="${LOCAL_WORKSPACE}:/config"
   NEMO_CONFIG_MOUNT="${HOME}/.config/nemo_curator/config.yaml:/nemo_curator/config/nemo_curator.yaml"
   CONTAINER_MOUNTS="${LOCAL_WORKSPACE_MOUNT},${NEMO_CONFIG_MOUNT}"

   export NEMO_CURATOR_RAY_SLURM_JOB=1
   export NEMO_CURATOR_LOCAL_DOCKER_JOB=1

   # Download Image Processing Models
   srun \
     --mpi=none \
     --container-writable \
     --no-container-remap-root \
     --export=NEMO_CURATOR_RAY_SLURM_JOB,NEMO_CURATOR_LOCAL_DOCKER_JOB \
     --container-image "${CONTAINER_IMAGE}" \
     --container-mounts "${CONTAINER_MOUNTS}" \
       --  python3 -c "
   import timm
   from nemo_curator.image.embedders import TimmImageEmbedder
   from nemo_curator.image.classifiers import AestheticClassifier, NsfwClassifier

   # Download and cache CLIP model
   embedder = TimmImageEmbedder('vit_large_patch14_clip_quickgelu_224.openai', pretrained=True)

   # Download aesthetic and NSFW classifiers
   aesthetic = AestheticClassifier()
   nsfw = NsfwClassifier()

   print('Image models downloaded successfully')
   "
   ```

2. Update the `SBATCH` parameters and paths to match your username and environment.

3. Run the script.

   ```bash
   sbatch 1_curator_download_image_models.sh
   ```

## Image Processing Pipeline

The workflow consists of three main Slurm scripts, to be run in order:

1. `curator_image_embed.sh`: Generates embeddings and applies classifications to images.
2. `curator_image_filter.sh`: Filters images based on quality, aesthetic, and NSFW scores.
3. `curator_image_dedup.sh`: Performs semantic deduplication using image embeddings.

<Tabs>
  <Tab title="1. Embedding">
    `curator_image_embed.sh` - Generates embeddings and applies classifications to images.

    ```bash
    #!/bin/bash

    #SBATCH --job-name=image-embed
    #SBATCH -p defq
    #SBATCH --nodes=4
    #SBATCH --ntasks-per-node=1
    #SBATCH --exclusive
    #SBATCH --gres=gpu:4
    #SBATCH --time=08:00:00

    # Update Me!
    #SBATCH --output=/home/<username>/logs/%x_%j.log
    #SBATCH --error=/home/<username>/logs/%x_%j.log
    USER_DIR="/home/${USER}"
    CONTAINER_IMAGE="${USER_DIR}/path-to/curator.sqsh"
    INPUT_TAR_PATH="s3://your-bucket/raw-images/{00000..00999}.tar"
    OUTPUT_DATA_PATH="s3://your-bucket/embedded-images/"
    #

    LOCAL_WORKSPACE="${USER_DIR}/nemo_curator_local_workspace"
    LOCAL_WORKSPACE_MOUNT="${LOCAL_WORKSPACE}:/config"
    NEMO_CONFIG_MOUNT="${HOME}/.config/nemo_curator/config.yaml:/nemo_curator/config/nemo_curator.yaml"
    AWS_MOUNT="${HOME}/.aws:/root/.aws"
    CONTAINER_MOUNTS="${LOCAL_WORKSPACE_MOUNT},${NEMO_CONFIG_MOUNT},${AWS_MOUNT}"

    export NEMO_CURATOR_RAY_SLURM_JOB=1

    srun \
      --mpi=none \
      --container-writable \
      --no-container-remap-root \
      --export=NEMO_CURATOR_RAY_SLURM_JOB \
      --container-image "${CONTAINER_IMAGE}" \
      --container-mounts "${CONTAINER_MOUNTS}" \
        -- python3 -c "
    from nemo_curator.datasets import ImageTextPairDataset
    from nemo_curator.image.embedders import TimmImageEmbedder
    from nemo_curator.image.classifiers import AestheticClassifier, NsfwClassifier
    from nemo_curator.utils.distributed_utils import get_client

    # Initialize distributed client
    client = get_client(cluster_type='gpu')

    # Load dataset
    dataset = ImageTextPairDataset.from_webdataset('${INPUT_TAR_PATH}', id_col='key')

    # Generate embeddings
    embedder = TimmImageEmbedder(
        'vit_large_patch14_clip_quickgelu_224.openai',
        pretrained=True,
        batch_size=1024,
        num_threads_per_worker=16,
        normalize_embeddings=True,
        autocast=False
    )
    dataset = embedder(dataset)

    # Apply aesthetic classification
    aesthetic_classifier = AestheticClassifier()
    dataset = aesthetic_classifier(dataset)

    # Apply NSFW classification
    nsfw_classifier = NsfwClassifier()
    dataset = nsfw_classifier(dataset)

    # Save results
    dataset.to_webdataset('${OUTPUT_DATA_PATH}')
    client.close()
    "
    ```
  </Tab>

  <Tab title="2. Filtering">
    `curator_image_filter.sh` - Filters images based on quality, aesthetic, and NSFW scores.

    ```bash
    #!/bin/bash

    #SBATCH --job-name=image-filter
    #SBATCH -p defq
    #SBATCH --nodes=2
    #SBATCH --ntasks-per-node=1
    #SBATCH --exclusive
    #SBATCH --gres=gpu:2
    #SBATCH --time=04:00:00

    # Update Me!
    #SBATCH --output=/home/<username>/logs/%x_%j.log
    #SBATCH --error=/home/<username>/logs/%x_%j.log
    USER_DIR="/home/${USER}"
    CONTAINER_IMAGE="${USER_DIR}/path-to/curator.sqsh"
    INPUT_DATA_PATH="s3://your-bucket/embedded-images/"
    OUTPUT_DATA_PATH="s3://your-bucket/filtered-images/"
    #

    LOCAL_WORKSPACE="${USER_DIR}/nemo_curator_local_workspace"
    LOCAL_WORKSPACE_MOUNT="${LOCAL_WORKSPACE}:/config"
    NEMO_CONFIG_MOUNT="${HOME}/.config/nemo_curator/config.yaml:/nemo_curator/config/nemo_curator.yaml"
    AWS_MOUNT="${HOME}/.aws:/root/.aws"
    CONTAINER_MOUNTS="${LOCAL_WORKSPACE_MOUNT},${NEMO_CONFIG_MOUNT},${AWS_MOUNT}"

    export NEMO_CURATOR_RAY_SLURM_JOB=1

    srun \
      --mpi=none \
      --container-writable \
      --no-container-remap-root \
      --export=NEMO_CURATOR_RAY_SLURM_JOB \
      --container-image "${CONTAINER_IMAGE}" \
      --container-mounts "${CONTAINER_MOUNTS}" \
        -- python3 -c "
    from nemo_curator.datasets import ImageTextPairDataset
    from nemo_curator.utils.distributed_utils import get_client
    import yaml

    # Initialize distributed client
    client = get_client(cluster_type='gpu')

    # Load configuration
    with open('/config/image_config.yaml', 'r') as f:
        config = yaml.safe_load(f)

    # Load dataset
    dataset = ImageTextPairDataset.from_webdataset('${INPUT_DATA_PATH}', id_col='key')

    # Apply filters based on configuration
    filter_params = config['filtering']
    filters = []

    # Aesthetic score filter
    if 'min_aesthetic_score' in filter_params:
        filters.append(f\"aesthetic_score &gt;= {filter_params['min_aesthetic_score']}\")

    # NSFW score filter  
    if 'max_nsfw_score' in filter_params:
        filters.append(f\"nsfw_score &lt;= {filter_params['max_nsfw_score']}\")

    # Apply combined filter
    if filters:
        filter_expression = ' and '.join(filters)
        dataset.metadata['passes_filter'] = dataset.metadata.eval(filter_expression)
        
        # Save filtered dataset
        dataset.to_webdataset('${OUTPUT_DATA_PATH}', filter_column='passes_filter')
    else:
        dataset.to_webdataset('${OUTPUT_DATA_PATH}')

    print(f'Filtering completed. Applied filters: {filter_expression if filters else \"None\"}')
    client.close()
    "
    ```
  </Tab>

  <Tab title="3. Deduplication">
    `curator_image_dedup.sh` - Performs semantic deduplication using image embeddings.

    ```bash
    #!/bin/bash

    #SBATCH --job-name=image-dedup
    #SBATCH -p defq
    #SBATCH --nodes=8
    #SBATCH --ntasks-per-node=1
    #SBATCH --exclusive
    #SBATCH --gres=gpu:8
    #SBATCH --time=12:00:00

    # Update Me!
    #SBATCH --output=/home/<username>/logs/%x_%j.log
    #SBATCH --error=/home/<username>/logs/%x_%j.log
    USER_DIR="/home/${USER}"
    CONTAINER_IMAGE="${USER_DIR}/path-to/curator.sqsh"
    INPUT_DATA_PATH="s3://your-bucket/filtered-images/"
    OUTPUT_DATA_PATH="s3://your-bucket/deduplicated-images/"
    CACHE_DIR="s3://your-bucket/image-dedup-cache/"
    #

    LOCAL_WORKSPACE="${USER_DIR}/nemo_curator_local_workspace"
    LOCAL_WORKSPACE_MOUNT="${LOCAL_WORKSPACE}:/config"
    NEMO_CONFIG_MOUNT="${HOME}/.config/nemo_curator/config.yaml:/nemo_curator/config/nemo_curator.yaml"
    AWS_MOUNT="${HOME}/.aws:/root/.aws"
    CONTAINER_MOUNTS="${LOCAL_WORKSPACE_MOUNT},${NEMO_CONFIG_MOUNT},${AWS_MOUNT}"

    export NEMO_CURATOR_RAY_SLURM_JOB=1

    srun \
      --mpi=none \
      --container-writable \
      --no-container-remap-root \
      --export=NEMO_CURATOR_RAY_SLURM_JOB \
      --container-image "${CONTAINER_IMAGE}" \
      --container-mounts "${CONTAINER_MOUNTS}" \
        -- python3 -c "
    from nemo_curator.datasets import ImageTextPairDataset, DocumentDataset
    from nemo_curator import ClusteringModel, SemanticClusterLevelDedup
    from nemo_curator.utils.distributed_utils import get_client
    import yaml
    import os

    # Initialize distributed client
    client = get_client(cluster_type='gpu')

    # Load configuration
    with open('/config/image_config.yaml', 'r') as f:
        config = yaml.safe_load(f)

    # Load dataset
    dataset = ImageTextPairDataset.from_webdataset('${INPUT_DATA_PATH}', id_col='key')
    embeddings_dataset = DocumentDataset(dataset.metadata)

    # Semantic deduplication parameters
    dedup_config = config['semantic_deduplication']
    clustering_output = '${CACHE_DIR}/cluster_output'
    os.makedirs(clustering_output, exist_ok=True)

    # Run clustering
    clustering_model = ClusteringModel(
        id_column='key',
        embedding_column='image_embedding',
        max_iter=dedup_config['max_iter'],
        n_clusters=dedup_config['n_clusters'],
        random_state=dedup_config['random_state'],
        clustering_output_dir=clustering_output,
    )
    clustered_dataset = clustering_model(embeddings_dataset)

    if clustered_dataset:
        print('Clustering completed successfully')
        
        # Run cluster-level deduplication
        emb_by_cluster_output = os.path.join(clustering_output, 'embs_by_nearest_center')
        duplicate_output = '${CACHE_DIR}/duplicates'
        
        semantic_dedup = SemanticClusterLevelDedup(
            n_clusters=dedup_config['n_clusters'],
            emb_by_clust_dir=emb_by_cluster_output,
            id_column='key',
            which_to_keep=dedup_config['which_to_keep'],
            embedding_column='image_embedding',
            batched_cosine_similarity=1024,
            output_dir=duplicate_output,
        )
        
        semantic_dedup.compute_semantic_match_dfs()
        deduplicated_dataset_ids = semantic_dedup.extract_dedup_data(
            eps_to_extract=dedup_config['eps_to_extract']
        )
        
        # Mark unique images and save
        dataset.metadata['is_unique'] = dataset.metadata['key'].isin(
            deduplicated_dataset_ids.df['key'].compute()
        )
        dataset.to_webdataset('${OUTPUT_DATA_PATH}', filter_column='is_unique')
        
        print(f'Deduplication completed. Unique images saved to ${OUTPUT_DATA_PATH}')
    else:
        print('Clustering failed')

    client.close()
    "
    ```
  </Tab>
</Tabs>

1. **Update** all `# Update Me!` sections in the scripts for your environment (paths, usernames, S3 buckets, etc).
2. Submit each job with `sbatch`:

```sh
sbatch curator_image_embed.sh
sbatch curator_image_filter.sh
sbatch curator_image_dedup.sh
```

## Monitoring and Logs

1. Check job status:

   ```bash
   squeue
   ```

2. View logs:

   ```bash
   tail -f /path/to/logs/<jobname>-<jobid>.log
   ```

## Performance Considerations

* **GPU Memory**: Image processing requires significant GPU memory. Consider using nodes with high-memory GPUs (40GB+ VRAM) for large batch sizes.
* **Tar Archive Format**: Ensure your input data is in tar archive format (`.tar` files containing JPEG images).
* **Network I/O**: Image data can be large. Consider local caching or high-bandwidth storage for better performance.
* **Clustering Scale**: For datasets with millions of images, increase `n_clusters` to 50,000+ to improve deduplication performance.
