***
title: Deploy Image Curation on Slurm
description: ''
---------------
This workflow covers the full image curation pipeline on Slurm, including model download, embedding generation, classification, filtering, and deduplication.
For details on image container environments and Slurm environment variables, see [Container Environments](/reference/infra/container-environments).
## Prerequisites
* Create required directories for AWS credentials, NeMo Curator configuration, and local workspace:
```bash
mkdir $HOME/.aws
mkdir -p $HOME/.config/nemo_curator
mkdir $HOME/nemo_curator_local_workspace
```
**Prepare configuration files:**
`$HOME/.aws/credentials` (for S3 access)
See configuration files in the repository.
`$HOME/.config/nemo_curator/config.yaml` (for HuggingFace API key)
See configuration files in the repository.
`$HOME/nemo_curator_local_workspace/image_config.yaml` (image processing parameters)
```yaml
# Image curation configuration
embedding:
model_name: "vit_large_patch14_clip_quickgelu_224.openai"
batch_size: 1024
num_threads_per_worker: 16
normalize_embeddings: true
autocast: false
aesthetic_classifier:
score_threshold: 6.0
batch_size: 512
nsfw_classifier:
score_threshold: 0.2
batch_size: 512
semantic_deduplication:
max_iter: 100
n_clusters: 50000
random_state: 42
eps_thresholds: [0.9, 0.95, 0.99]
eps_to_extract: 0.95
which_to_keep: "hard"
filtering:
min_resolution: 256
max_aspect_ratio: 3.0
min_aesthetic_score: 6.0
max_nsfw_score: 0.2
```
***
## Model Download
1. Copy the following script for downloading all required image processing models into the Slurm cluster.
```bash
#!/bin/bash
#SBATCH --job-name=download_image_models
#SBATCH -p defq
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --exclusive
#SBATCH --gres=gpu:1
# Update Me!
#SBATCH --output=/home//logs/%x_%j.log
USER_DIR="/home/${USER}"
CONTAINER_IMAGE="${USER_DIR}/path-to/curator.sqsh"
#
LOCAL_WORKSPACE="${USER_DIR}/nemo_curator_local_workspace"
LOCAL_WORKSPACE_MOUNT="${LOCAL_WORKSPACE}:/config"
NEMO_CONFIG_MOUNT="${HOME}/.config/nemo_curator/config.yaml:/nemo_curator/config/nemo_curator.yaml"
CONTAINER_MOUNTS="${LOCAL_WORKSPACE_MOUNT},${NEMO_CONFIG_MOUNT}"
export NEMO_CURATOR_RAY_SLURM_JOB=1
export NEMO_CURATOR_LOCAL_DOCKER_JOB=1
# Download Image Processing Models
srun \
--mpi=none \
--container-writable \
--no-container-remap-root \
--export=NEMO_CURATOR_RAY_SLURM_JOB,NEMO_CURATOR_LOCAL_DOCKER_JOB \
--container-image "${CONTAINER_IMAGE}" \
--container-mounts "${CONTAINER_MOUNTS}" \
-- python3 -c "
import timm
from nemo_curator.image.embedders import TimmImageEmbedder
from nemo_curator.image.classifiers import AestheticClassifier, NsfwClassifier
# Download and cache CLIP model
embedder = TimmImageEmbedder('vit_large_patch14_clip_quickgelu_224.openai', pretrained=True)
# Download aesthetic and NSFW classifiers
aesthetic = AestheticClassifier()
nsfw = NsfwClassifier()
print('Image models downloaded successfully')
"
```
2. Update the `SBATCH` parameters and paths to match your username and environment.
3. Run the script.
```bash
sbatch 1_curator_download_image_models.sh
```
## Image Processing Pipeline
The workflow consists of three main Slurm scripts, to be run in order:
1. `curator_image_embed.sh`: Generates embeddings and applies classifications to images.
2. `curator_image_filter.sh`: Filters images based on quality, aesthetic, and NSFW scores.
3. `curator_image_dedup.sh`: Performs semantic deduplication using image embeddings.
`curator_image_embed.sh` - Generates embeddings and applies classifications to images.
```bash
#!/bin/bash
#SBATCH --job-name=image-embed
#SBATCH -p defq
#SBATCH --nodes=4
#SBATCH --ntasks-per-node=1
#SBATCH --exclusive
#SBATCH --gres=gpu:4
#SBATCH --time=08:00:00
# Update Me!
#SBATCH --output=/home//logs/%x_%j.log
#SBATCH --error=/home//logs/%x_%j.log
USER_DIR="/home/${USER}"
CONTAINER_IMAGE="${USER_DIR}/path-to/curator.sqsh"
INPUT_TAR_PATH="s3://your-bucket/raw-images/{00000..00999}.tar"
OUTPUT_DATA_PATH="s3://your-bucket/embedded-images/"
#
LOCAL_WORKSPACE="${USER_DIR}/nemo_curator_local_workspace"
LOCAL_WORKSPACE_MOUNT="${LOCAL_WORKSPACE}:/config"
NEMO_CONFIG_MOUNT="${HOME}/.config/nemo_curator/config.yaml:/nemo_curator/config/nemo_curator.yaml"
AWS_MOUNT="${HOME}/.aws:/root/.aws"
CONTAINER_MOUNTS="${LOCAL_WORKSPACE_MOUNT},${NEMO_CONFIG_MOUNT},${AWS_MOUNT}"
export NEMO_CURATOR_RAY_SLURM_JOB=1
srun \
--mpi=none \
--container-writable \
--no-container-remap-root \
--export=NEMO_CURATOR_RAY_SLURM_JOB \
--container-image "${CONTAINER_IMAGE}" \
--container-mounts "${CONTAINER_MOUNTS}" \
-- python3 -c "
from nemo_curator.datasets import ImageTextPairDataset
from nemo_curator.image.embedders import TimmImageEmbedder
from nemo_curator.image.classifiers import AestheticClassifier, NsfwClassifier
from nemo_curator.utils.distributed_utils import get_client
# Initialize distributed client
client = get_client(cluster_type='gpu')
# Load dataset
dataset = ImageTextPairDataset.from_webdataset('${INPUT_TAR_PATH}', id_col='key')
# Generate embeddings
embedder = TimmImageEmbedder(
'vit_large_patch14_clip_quickgelu_224.openai',
pretrained=True,
batch_size=1024,
num_threads_per_worker=16,
normalize_embeddings=True,
autocast=False
)
dataset = embedder(dataset)
# Apply aesthetic classification
aesthetic_classifier = AestheticClassifier()
dataset = aesthetic_classifier(dataset)
# Apply NSFW classification
nsfw_classifier = NsfwClassifier()
dataset = nsfw_classifier(dataset)
# Save results
dataset.to_webdataset('${OUTPUT_DATA_PATH}')
client.close()
"
```
`curator_image_filter.sh` - Filters images based on quality, aesthetic, and NSFW scores.
```bash
#!/bin/bash
#SBATCH --job-name=image-filter
#SBATCH -p defq
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --exclusive
#SBATCH --gres=gpu:2
#SBATCH --time=04:00:00
# Update Me!
#SBATCH --output=/home//logs/%x_%j.log
#SBATCH --error=/home//logs/%x_%j.log
USER_DIR="/home/${USER}"
CONTAINER_IMAGE="${USER_DIR}/path-to/curator.sqsh"
INPUT_DATA_PATH="s3://your-bucket/embedded-images/"
OUTPUT_DATA_PATH="s3://your-bucket/filtered-images/"
#
LOCAL_WORKSPACE="${USER_DIR}/nemo_curator_local_workspace"
LOCAL_WORKSPACE_MOUNT="${LOCAL_WORKSPACE}:/config"
NEMO_CONFIG_MOUNT="${HOME}/.config/nemo_curator/config.yaml:/nemo_curator/config/nemo_curator.yaml"
AWS_MOUNT="${HOME}/.aws:/root/.aws"
CONTAINER_MOUNTS="${LOCAL_WORKSPACE_MOUNT},${NEMO_CONFIG_MOUNT},${AWS_MOUNT}"
export NEMO_CURATOR_RAY_SLURM_JOB=1
srun \
--mpi=none \
--container-writable \
--no-container-remap-root \
--export=NEMO_CURATOR_RAY_SLURM_JOB \
--container-image "${CONTAINER_IMAGE}" \
--container-mounts "${CONTAINER_MOUNTS}" \
-- python3 -c "
from nemo_curator.datasets import ImageTextPairDataset
from nemo_curator.utils.distributed_utils import get_client
import yaml
# Initialize distributed client
client = get_client(cluster_type='gpu')
# Load configuration
with open('/config/image_config.yaml', 'r') as f:
config = yaml.safe_load(f)
# Load dataset
dataset = ImageTextPairDataset.from_webdataset('${INPUT_DATA_PATH}', id_col='key')
# Apply filters based on configuration
filter_params = config['filtering']
filters = []
# Aesthetic score filter
if 'min_aesthetic_score' in filter_params:
filters.append(f\"aesthetic_score >= {filter_params['min_aesthetic_score']}\")
# NSFW score filter
if 'max_nsfw_score' in filter_params:
filters.append(f\"nsfw_score <= {filter_params['max_nsfw_score']}\")
# Apply combined filter
if filters:
filter_expression = ' and '.join(filters)
dataset.metadata['passes_filter'] = dataset.metadata.eval(filter_expression)
# Save filtered dataset
dataset.to_webdataset('${OUTPUT_DATA_PATH}', filter_column='passes_filter')
else:
dataset.to_webdataset('${OUTPUT_DATA_PATH}')
print(f'Filtering completed. Applied filters: {filter_expression if filters else \"None\"}')
client.close()
"
```
`curator_image_dedup.sh` - Performs semantic deduplication using image embeddings.
```bash
#!/bin/bash
#SBATCH --job-name=image-dedup
#SBATCH -p defq
#SBATCH --nodes=8
#SBATCH --ntasks-per-node=1
#SBATCH --exclusive
#SBATCH --gres=gpu:8
#SBATCH --time=12:00:00
# Update Me!
#SBATCH --output=/home//logs/%x_%j.log
#SBATCH --error=/home//logs/%x_%j.log
USER_DIR="/home/${USER}"
CONTAINER_IMAGE="${USER_DIR}/path-to/curator.sqsh"
INPUT_DATA_PATH="s3://your-bucket/filtered-images/"
OUTPUT_DATA_PATH="s3://your-bucket/deduplicated-images/"
CACHE_DIR="s3://your-bucket/image-dedup-cache/"
#
LOCAL_WORKSPACE="${USER_DIR}/nemo_curator_local_workspace"
LOCAL_WORKSPACE_MOUNT="${LOCAL_WORKSPACE}:/config"
NEMO_CONFIG_MOUNT="${HOME}/.config/nemo_curator/config.yaml:/nemo_curator/config/nemo_curator.yaml"
AWS_MOUNT="${HOME}/.aws:/root/.aws"
CONTAINER_MOUNTS="${LOCAL_WORKSPACE_MOUNT},${NEMO_CONFIG_MOUNT},${AWS_MOUNT}"
export NEMO_CURATOR_RAY_SLURM_JOB=1
srun \
--mpi=none \
--container-writable \
--no-container-remap-root \
--export=NEMO_CURATOR_RAY_SLURM_JOB \
--container-image "${CONTAINER_IMAGE}" \
--container-mounts "${CONTAINER_MOUNTS}" \
-- python3 -c "
from nemo_curator.datasets import ImageTextPairDataset, DocumentDataset
from nemo_curator import ClusteringModel, SemanticClusterLevelDedup
from nemo_curator.utils.distributed_utils import get_client
import yaml
import os
# Initialize distributed client
client = get_client(cluster_type='gpu')
# Load configuration
with open('/config/image_config.yaml', 'r') as f:
config = yaml.safe_load(f)
# Load dataset
dataset = ImageTextPairDataset.from_webdataset('${INPUT_DATA_PATH}', id_col='key')
embeddings_dataset = DocumentDataset(dataset.metadata)
# Semantic deduplication parameters
dedup_config = config['semantic_deduplication']
clustering_output = '${CACHE_DIR}/cluster_output'
os.makedirs(clustering_output, exist_ok=True)
# Run clustering
clustering_model = ClusteringModel(
id_column='key',
embedding_column='image_embedding',
max_iter=dedup_config['max_iter'],
n_clusters=dedup_config['n_clusters'],
random_state=dedup_config['random_state'],
clustering_output_dir=clustering_output,
)
clustered_dataset = clustering_model(embeddings_dataset)
if clustered_dataset:
print('Clustering completed successfully')
# Run cluster-level deduplication
emb_by_cluster_output = os.path.join(clustering_output, 'embs_by_nearest_center')
duplicate_output = '${CACHE_DIR}/duplicates'
semantic_dedup = SemanticClusterLevelDedup(
n_clusters=dedup_config['n_clusters'],
emb_by_clust_dir=emb_by_cluster_output,
id_column='key',
which_to_keep=dedup_config['which_to_keep'],
embedding_column='image_embedding',
batched_cosine_similarity=1024,
output_dir=duplicate_output,
)
semantic_dedup.compute_semantic_match_dfs()
deduplicated_dataset_ids = semantic_dedup.extract_dedup_data(
eps_to_extract=dedup_config['eps_to_extract']
)
# Mark unique images and save
dataset.metadata['is_unique'] = dataset.metadata['key'].isin(
deduplicated_dataset_ids.df['key'].compute()
)
dataset.to_webdataset('${OUTPUT_DATA_PATH}', filter_column='is_unique')
print(f'Deduplication completed. Unique images saved to ${OUTPUT_DATA_PATH}')
else:
print('Clustering failed')
client.close()
"
```
1. **Update** all `# Update Me!` sections in the scripts for your environment (paths, usernames, S3 buckets, etc).
2. Submit each job with `sbatch`:
```sh
sbatch curator_image_embed.sh
sbatch curator_image_filter.sh
sbatch curator_image_dedup.sh
```
## Monitoring and Logs
1. Check job status:
```bash
squeue
```
2. View logs:
```bash
tail -f /path/to/logs/-.log
```
## Performance Considerations
* **GPU Memory**: Image processing requires significant GPU memory. Consider using nodes with high-memory GPUs (40GB+ VRAM) for large batch sizes.
* **Tar Archive Format**: Ensure your input data is in tar archive format (`.tar` files containing JPEG images).
* **Network I/O**: Image data can be large. Consider local caching or high-bandwidth storage for better performance.
* **Clustering Scale**: For datasets with millions of images, increase `n_clusters` to 50,000+ to improve deduplication performance.