Tuning an augmentation loop#
This notebook demonstrates the use of some dynamic mode control features in the context of a small audio augmentation.
[1]:
import os
import statistics
from timeit import default_timer as timer
import numpy as np
import nvidia.dali.experimental.dynamic as ndd
AUDIO_DIR = os.path.join(os.environ["DALI_EXTRA_PATH"], "db", "audio", "wav")
BATCH_SIZE = 8
SAMPLE_RATE = 16000
CROP_DURATION = 1 # seconds
CROP_SAMPLES = SAMPLE_RATE * CROP_DURATION
Building the augmentation#
mel_batches reads WAV files from the dataset, takes a random 1s crop, applies a random gain, and yields a log-mel spectrogram batch for each batch of a configurable number of epochs. It accepts reader-sharding arguments so multiple workers can split the dataset between them.
[2]:
def make_reader(*, shard_id: int = 0, num_shards: int = 1):
return ndd.readers.File(
file_list=os.path.join(AUDIO_DIR, "file_list.txt"),
max_batch_size=BATCH_SIZE,
random_shuffle=False,
shard_id=shard_id,
num_shards=num_shards,
stick_to_shard=True,
)
def mel_epoch(reader, rng: ndd.random.RNG | None):
for encoded, _ in reader.next_epoch(batch_size=BATCH_SIZE):
audio, _ = ndd.decoders.audio(encoded, dtype=ndd.float32, downmix=True)
audio = audio.gpu()
start = ndd.random.uniform(
batch_size=BATCH_SIZE, range=(0.0, 1.0), rng=rng
)
gain = ndd.random.uniform(
batch_size=BATCH_SIZE, range=(0.7, 1.3), rng=rng, device="gpu"
)
cropped = ndd.slice(
audio,
rel_start=start,
shape=CROP_SAMPLES,
axes=0,
out_of_bounds_policy="pad",
)
scaled = cropped * gain
spec = ndd.spectrogram(scaled)
mel = ndd.mel_filter_bank(spec, sample_rate=SAMPLE_RATE)
log_mel = ndd.to_decibels(mel, multiplier=10.0, cutoff_db=-80)
yield ndd.normalize(log_mel, axes=[0, 1])
def mel_batches(
*,
num_epochs: int = 1,
shard_id: int = 0,
num_shards: int = 1,
rng: ndd.random.RNG | None = None,
):
"""Yield log-mel spectrogram batches for `num_epochs` epochs of the dataset."""
reader = make_reader(shard_id=shard_id, num_shards=num_shards)
for _ in range(num_epochs):
yield from mel_epoch(reader, rng)
def collect(gen):
return [sample for batch in gen for sample in batch.cpu()]
EPOCH_SIZE = make_reader().get_metadata(batch_size=BATCH_SIZE)["epoch_size"]
Developing#
Asynchronous execution (the default, EvalMode.eager) lets the Python preparation of each op overlap with its native execution. Synchronous modes run the two sequentially, which is slower but more practical during development because exceptions raise at the call site and debuggers stop where expected. Let’s measure the throughput difference on this augmentation:
[3]:
def measure_throughput(epochs_per_trial=200, warmup_epochs=20, trials=7):
stream = ndd.get_current_stream()
for _ in mel_batches(num_epochs=warmup_epochs):
pass
stream.synchronize()
throughputs = []
for _ in range(trials):
t0 = timer()
for _ in mel_batches(num_epochs=epochs_per_trial):
pass
stream.synchronize()
throughputs.append(EPOCH_SIZE * epochs_per_trial / (timer() - t0))
return statistics.median(throughputs)
with ndd.EvalMode.sync_cpu:
sync = measure_throughput()
with ndd.EvalMode.eager:
async_ = measure_throughput()
print(f"sync: {sync:>7,.0f} clips/s")
print(f"async: {async_:>7,.0f} clips/s ({async_ / sync - 1:+.0%})")
sync: 5,024 clips/s
async: 6,354 clips/s (+26%)
Subsequent sections run under the default EvalMode.eager. When developing, it is however recommended to use EvalMode.sync_cpu to catch exceptions early and make it easier to use a debugger.
Note
This notebook was run with free-threaded Python. Although DALI releases the GIL in native code, the async-mode benefit narrows under a GIL-enabled Python.
Reproducing#
Seeding the default RNG with ndd.random.set_seed makes every draw deterministic. Two epochs from the same seed produce bit-identical outputs.
[4]:
ndd.random.set_seed(42)
first = collect(mel_batches())
ndd.random.set_seed(42)
second = collect(mel_batches())
match = all(np.array_equal(a, b) for a, b in zip(first, second, strict=True))
print("Epoch outputs match:", match)
Epoch outputs match: True
Scaling#
Let’s try using more threads to exhaust the dataset faster. We split the dataset between workers via reader sharding (shard_id, num_shards), and each worker owns its own RNG seeded from the rank, so every shard gets its own augmentation stream.
Per-thread CUDA streams
Each worker thread should submit its GPU work on its own CUDA stream; otherwise submissions from different threads serialize on the GPU.
Dynamic mode’s thread-local default EvalContext already takes care of this: each thread automatically gets its own stream, so no explicit stream management is required in the worker body.
[5]:
import threading
NUM_WORKERS = 2
SEED = 42
[6]:
def measure_multi(num_epochs: int):
def worker(rank):
rng = ndd.random.RNG(seed=SEED + rank)
for _ in mel_batches(
num_epochs=num_epochs,
shard_id=rank,
num_shards=NUM_WORKERS,
rng=rng,
):
pass
ndd.get_current_stream().synchronize()
threads = [
threading.Thread(target=worker, args=(rank,))
for rank in range(NUM_WORKERS)
]
t0 = timer()
for t in threads:
t.start()
for t in threads:
t.join()
return timer() - t0
# Warmup
measure_multi(num_epochs=20)
num_epochs = 500
elapsed = measure_multi(num_epochs=num_epochs)
multi = EPOCH_SIZE * num_epochs / elapsed
print(f"Single-thread: {async_:>7,.0f} clips/s")
print(
f"{NUM_WORKERS} threads: {multi:>7,.0f} clips/s ({multi / async_:.1f}x)"
)
Single-thread: 6,354 clips/s
2 threads: 9,957 clips/s (1.6x)