GetBatch: Distributed Multi-Object Retrieval

View as Markdown

Paper: GetBatch: Distributed Multi-Object Retrieval for ML Data Loading - implementation, benchmarks, and discussion.

GetBatch is AIStore’s high-performance API for retrieving multiple objects and/or archived files in a single request. Behind the scenes, GetBatch assembles the requested data from across the cluster and delivers the result as a continuous serialized stream.

Regardless of retrieval source (in-cluster objects, remote objects, or shard extractions), GetBatch always preserves the exact ordering of request entries in both streaming and buffered modes.

Other supported capabilities include:

  • Fetching thousands of objects in strict user-specified order
  • Extracting specific files from distributed shard archives (TAR, TAR.GZ, TAR.LZ4, ZIP)
  • Cross-bucket retrieval in a single request
  • Graceful handling of missing data
  • Streaming or buffered delivery

GetBatch is implemented by the eXtended Action (job). Internally, the job is codenamed x-moss. The respective API endpoint is: /v1/ml/moss.

Note: buffered mode always returns both metadata (that describes the output) and the resulting serialized archive containing all requested data items.

Note: for TAR.GZ, both .tgz and .tar.gz are accepted (and interchangeable) aliases - both denote the same exact format.

A Note on HTTP Semantics

GetBatch uses HTTP GET with a JSON request body, which is:

  • Permitted by RFC 9110 - HTTP semantics allow message bodies in GET requests.
  • Necessary for this API, as the list of requested objects can contain thousands of entries that would exceed URL length limits.
  • Semantically correct - the operation is idempotent (pure data retrieval with no server-side state changes).

Rest of this document is structured as follows:

Table of Contents

Supported APIs

GetBatch is typically called from:

  1. Go services via the api/ml.go client bindings, and
  2. Python via the AIStore SDK, and
  3. Third-party tooling such as Lhotse.

The respective Go and Python-based usage examples follow below, in the sections that include:

Terminology

TermDescription
Designated Target (DT)Randomly selected AIS target that coordinates and assembles the get-batch response
Work Item (WI)A single get-batch request being processed; implementation-wise, each request becomes one work item
ShardAn archive file (TAR/ZIP/TGZ/LZ4) stored in (or accessible by) the cluster and containing multiple files, typically used for dataset distribution
Soft ErrorRecoverable error (missing object, transient network issue) that doesn’t fail the entire request when coer: true
Hard ErrorUnrecoverable failure that terminates the work item (e.g., 429 rejection, fatal errors)
Get-From-Neighbor (GFN)Fallback mechanism to retrieve data from peer targets when primary location fails
RxWaitTime spent waiting to receive data from peer targets (coordination/network delay)

When to Use GetBatch

ML Training Pipelines

  • Loading training data in deterministic order (reproducible epochs)
  • Fetching 1000s-100Ks of objects per training batch
  • Consuming sharded datasets where each shard contains many samples

Distributed Shard Processing

  • Extracting specific files from archives distributed across the cluster
  • Processing subsets of large TAR/ZIP collections
  • Parallel extraction from 1000s of shards

Ordered Batch Retrieval

  • Any workflow requiring strict ordering guarantees
  • Sequential processing pipelines
  • Deterministic data sampling

When NOT to Use GetBatch

  • Single objects => Use regular GET
  • Small batches (<16 entries) => Overhead not justified; use parallel GETs
  • Order doesn’t matter => Use list-objects + concurrent GETs for better parallelism
  • Real-time random access => GetBatch optimizes for sequential streaming

Whether or not batch sizes can be deemed “small” will ultimately (obviously) depend on the workload and cluster; any specific numbers in this document must be considered a “rule of thumb” rather than direct prescription or limitation of any kind.


Go API Structure

Request: apc.MossReq

1{
2 "mime": ".tar", // Output format: .tar, .tgz, .zip, .tar.lz4
3 "in": [ // Array of items to retrieve
4 {
5 "objname": "shard-0000.tar",
6 "archpath": "image_42.jpg" // Optional: extract specific file from archive
7 },
8 {
9 "objname": "dataset/file-0001.bin",
10 "bucket": "other-bucket" // Optional: override default bucket
11 }
12 ],
13 "coer": true, // Continue on error (missing items => __404__/)
14 "onob": false, // Name in TAR: false = bucket/object, true = object only
15 "strm": true // Stream output (vs buffered multipart)
16}

Field Details:

FieldTypeDescription
mimestringOutput format: .tar (default), .tgz, .zip, .tar.lz4
in[]apc.MossInList of objects/files to retrieve (order preserved)
coerboolContinue on error: true = include missing items under __404__/, false = fail on first missing
onobboolOutput naming: false = bucket/object, true = object only
strmboolStreaming mode: true = stream as data arrives, false = buffer then send multipart response

Request Entry: apc.MossIn

Each entry in the in array can specify:

FieldTypeDescription
objnamestringRequired. Object name to retrieve
bucketstringOverride default bucket (enables cross-bucket requests)
providerstringProvider for this object (e.g., “s3”, “ais”, “gcp”). If omitted, defaults to the bucket’s.
unamestringFully-qualified bucket specification, including bucket name, provider and namespace.
archpathstringPath to file within input archive (for TAR/ZIP/TGZ/LZ4 shards)
opaque[]byteOpaque user identifier (passed through to response to implement client-side logic of any kind)
startint64Range read start offset (future)
lengthint64Range read length (future)

Response: apc.MossResp

Streaming mode (strm: true):

  • Single HTTP response body containing TAR stream
  • Files appear in exact order requested
  • No separate metadata structure

Buffered mode (strm: false):

  • Multipart HTTP response with two parts:
    1. Metadata part (MossResp JSON)
    2. Data part (TAR archive)
1{
2 "uuid": "xaction-id",
3 "out": [
4 {
5 "objname": "shard-0000.tar",
6 "archpath": "image_42.jpg",
7 "bucket": "training-data",
8 "provider": "ais",
9 "size": 524288,
10 "err_msg": ""
11 },
12 {
13 "objname": "missing-file.bin",
14 "size": 0,
15 "err_msg": "object does not exist"
16 }
17 ]
18}

Response Entry: apc.MossOut

FieldTypeDescription
objnamestringObject name (matches request)
archpathstringArchive path if extracted from shard
bucketstringBucket name
providerstringProvider
sizeint64Actual bytes delivered (0 if missing)
err_msgstringError message if item failed (with coer: true)
opaque[]byteUser metadata from request

Python Integrations

GetBatch also offers robust integration with Python data pipelines, with official support in both the AIStore Python SDK and third-party libraries:

1. AIStore Python SDK

The Python SDK provides a Batch class that wraps the /v1/ml/moss endpoint with a Pythonic fluent API.

Key features:

  • Pydantic models (MossReq, MossIn, MossOut) that mirror Go structs exactly
  • Fluent interface for building batch requests
  • Automatic TAR stream extraction
  • Support for archpath (shard extraction), opaque metadata, and cross-bucket requests

The SDK mirrors Go structures, with minor naming conventions:

ConceptGo APIPython SDK
Output archive formatMossReq.mimeoutput_format=".tar"
Continue-on-errorMossReq.coercont_on_err=True
Streaming modeMossReq.strmstreaming_get=True
Archive subpathMossIn.archpatharchpath=
Opaque metadataMossIn.opaqueopaque=
Object nameobjnameobj_name

This mapping helps translate examples between Go and Python.

Basic usage:

1from aistore.sdk import Client
2
3client = Client("http://ais-gateway")
4bucket = client.bucket("training-data")
5
6# Simple batch: list of object names
7batch = client.batch(["file1.bin", "file2.bin"], bucket)
8for obj_info, content in batch.get():
9 print(f"Got {obj_info.obj_name}: {len(content)} bytes")
10
11# Advanced: shard extraction with tracking
12batch = client.batch(bucket=bucket)
13# Extract specific files from archives
14batch.add("shard-0000.tar", archpath="images/photo.jpg")
15# Add opaque data for tracking/correlation
16batch.add("shard-0001.tar", archpath="images/photo.jpg", opaque=b"batch-id-42")
17
18# Stream results
19for obj_info, content in batch.get():
20 if not obj_info.err_msg: # Check for errors
21 process(content)

Batch class methods:

  • add(obj, archpath=None, opaque=None, start=None, length=None) - Add object with advanced parameters
  • get(raw=False, decode_as_stream=False, clear_batch=True) - Execute and return generator of (MossOut, bytes) tuples
  • clear() - Clear batch for reuse

Response structure:

  • Streaming mode (streaming_get=True, default): Returns TAR stream, MossOut metadata inferred from request
  • Multipart mode (streaming_get=False): Returns server-validated MossOut with actual sizes, errors

See Python SDK Batch API for complete documentation.

2. Lhotse Integration

Lhotse is a speech/audio data toolkit used by NVIDIA NeMo and other frameworks. It includes native AIStore support via AISBatchLoader.

How it works:

  1. CutSet Manifests - Lhotse manages audio/feature manifests with AIStore URLs
  2. Batch Loading - AISBatchLoader collects all URLs from a CutSet batch
  3. GetBatch Execution - Issues single batch request via AIStore Python SDK
  4. Archive Extraction - Automatically extracts files from sharded archives (TAR/TGZ)
  5. In-Memory Injection - Returns CutSet with data loaded into memory

Usage:

1from lhotse.ais import AISBatchLoader
2
3# Create AIStore BatchLoader
4ais_batch_loader = AISBatchLoader()
5
6# Load entire dev cutset from AIStore
7loaded_cut_set = ais_batch_loader(cut_set)
8
9# Pass the loaded cut-set to DataLoaders

See also:

A complete, runnable example for batch loading audios from AIStore with Lhotse is available in here:

Archive extraction example:

Lhotse URLs like ais://mybucket/shard-0000.tar.gz/audio/sample_42.wav automatically:

  • Split into object (shard-0000.tar.gz) + archpath (audio/sample_42.wav)
  • Send to GetBatch with archpath parameter
  • AIStore extracts the file server-side
  • Returns raw audio bytes directly to training loop

Architecture:

Lhotse CutSet => AISBatchLoader => AIStore Python SDK => GetBatch API => Training Loop

Key benefits:

  • Single batch request instead of N individual GETs
  • Server-side extraction from sharded archives
  • Deterministic ordering for reproducible training
  • Zero client-side decompression overhead

See also:

3. NeMo Framework Integration

NVIDIA NeMo is an end-to-end platform for building and training state-of-the-art AI models, including Automatic Speech Recognition (ASR), Natural Language Processing (NLP), and Text-to-Speech (TTS).

GetBatch is now integrated into the Lhotse-based ASR dataloader. Instead of fetching individual audio files from AIStore during training, the dataloader now batches all required samples for an epoch or mini-batch into a single GetBatch request. This reduces network overhead and improves data loading throughput for large-scale ASR training.

Enabling GetBatch:

$export USE_AIS_GET_BATCH=true

A single environment variable activates batch loading for ASR training pipelines using Lhotse+AIStore. No code changes required.

How it works:

  1. Dataloader collects all audio file paths needed for the current batch
  2. Issues single GetBatch request to AIStore (replaces N individual GETs)
  3. AIStore returns TAR archive with all samples in order
  4. Dataloader extracts and feeds samples to the training loop

This same pattern can be integrated in other NeMo training pipelines (NLP, TTS, multimodal) where datasets are stored in AIStore, providing similar performance benefits for data-intensive workloads.

See also:

4. PyTorch Integration

The AIStore PyTorch Plugin provides AISBatchIterDataset, an iterable-style dataset that uses GetBatch API for efficient multi-worker data loading with automatic batching and streaming support.


Usage Examples

Note: curl examples in this section are purely illustrative - do not copy/paste.

Example 1: Retrieve Plain Objects

1curl -L -X GET http://aistore-gateway/v1/ml/moss/my-bucket \
2 -H "Content-Type: application/json" \
3 -d '{
4 "action": "getbatch",
5 "value": {
6 "mime": ".tar",
7 "in": [
8 {"objname": "file-0001.bin"},
9 {"objname": "file-0002.bin"},
10 {"objname": "file-0003.bin"}
11 ],
12 "strm": true
13 }
14 }' --output batch.tar

Result: batch.tar containing:

my-bucket/file-0001.bin
my-bucket/file-0002.bin
my-bucket/file-0003.bin

Example 2: Extract Files from Shards

1# Extract image_42.jpg from 100 distributed shards
2curl -L -X GET http://aistore-gateway/v1/ml/moss/shards \
3 -H "Content-Type: application/json" \
4 -d '{
5 "action": "getbatch",
6 "value": {
7 "mime": ".tar",
8 "in": [
9 {"objname": "shard-0000.tar", "archpath": "image_42.jpg"},
10 {"objname": "shard-0001.tar", "archpath": "image_42.jpg"},
11 ...
12 {"objname": "shard-0099.tar", "archpath": "image_42.jpg"}
13 ],
14 "onob": true,
15 "strm": true
16 }
17 }' --output extracted.tar

Result: extracted.tar containing:

shard-0000.tar/image_42.jpg
shard-0001.tar/image_42.jpg
...
shard-0099.tar/image_42.jpg

Example 3: Cross-Bucket Retrieval

The request bucket (default-bucket in URL) is used when bucket is omitted in an in entry.

1curl -L -X GET http://aistore-gateway/v1/ml/moss/default-bucket \
2 -H "Content-Type: application/json" \
3 -d '{
4 "action": "getbatch",
5 "value": {
6 "in": [
7 {"objname": "config.json", "bucket": "configs"},
8 {"objname": "model.pt", "bucket": "models"},
9 {"objname": "data.csv", "bucket": "datasets"}
10 ],
11 "strm": true
12 }
13 }' --output multi-bucket.tar

Result: TAR containing objects from three different buckets in one request.

Example 4: Handle Missing Data Gracefully

1curl -L -X GET http://aistore-gateway/v1/ml/moss \
2 -H "Content-Type: application/json" \
3 -d '{
4 "action": "getbatch",
5 "value": {
6 "in": [
7 {"objname": "exists-1.bin"},
8 {"objname": "missing.bin"},
9 {"objname": "exists-2.bin"}
10 ],
11 "coer": true,
12 "strm": false
13 }
14 }' --output result.tar

Response metadata shows which items failed:

1{
2 "out": [
3 {"objname": "exists-1.bin", "size": 1048576, "err_msg": ""},
4 {"objname": "missing.bin", "size": 0, "err_msg": "object does not exist"},
5 {"objname": "exists-2.bin", "size": 2097152, "err_msg": ""}
6 ]
7}

TAR contains:

training/exists-1.bin (1 MB)
__404__/training/missing.bin (0 bytes)
training/exists-2.bin (2 MB)

Performance Characteristics

Throughput by Workload Type

Note: Actual throughput will vary significantly based on object sizes, network topology, storage backend, CPU capability, and cluster configuration. Numbers below are purely indicative ranges rather than guaranteed performance targets.

WorkloadTypical ThroughputPrimary Bottleneck
Plain objects (located at DT)50-150K objects/sec/targetDisk I/O, network
Plain objects (located in other storage nodes)10-50K objects/sec/targetPeer network, coordination
Archived files (located at DT)5-20K files/sec/targetCPU (extraction), I/O
Archived files (remote shards)2-10K files/sec/targetPeer extraction + network

Note: Archived file extraction from compressed formats is CPU-intensive. A single target extracting from 1000s of TAR/ZIP shards will see significant CPU load.

Latency Components

First-byte latency: 50-500ms (can vary based on cluster size and load)

  • Designated Target (DT) selection
  • Peer coordination
  • First data arrival

Streaming throughput: Wire-speed after first byte

  • Limited by network bandwidth or disk I/O
  • No additional per-object overhead once streaming starts

Memory & Resource Usage

Memory: Bounded by DT capacity + load-based throttling

  • System monitors memory pressure
  • Automatically throttles or rejects (429) new requests under stress
  • See Monitoring GetBatch for observability

CPU: Varies by workload

  • Plain objects: minimal CPU (file I/O bound)
  • Compressed archives: moderate CPU (decompression)
  • Many small files: higher CPU (archive parsing overhead)

Error Handling

Strict Mode (coer: false)

Behavior: First error aborts entire request

Use when:

  • Data completeness is critical
  • Prefer fail-fast over partial results
  • Small batches where retry cost is low

Error response: HTTP 4xx/5xx, no partial data

Behavior: Continue processing, mark missing items

Use when:

  • Large batches (1000s+ items)
  • Some missing data is acceptable
  • Want to maximize throughput despite occasional 404s

Missing items:

  • Appear in TAR under __404__/bucket/object with size=0
  • Metadata includes err_msg describing failure
  • Extracting the TAR shows all missing items grouped under __404__/

Soft error limit: Configurable per work item (default: 6)

  • Prevents cascading failures
  • Aborts work item after N transient errors
  • See Configuration section below

Configuration

get_batch.max_soft_errs (default: 6)

Maximum transient errors per work item before aborting.

When to increase:

  • Large batches with expected missing data
  • Tolerate more GFN (get-from-neighbor) fallbacks
  • Unstable network environments

When to decrease:

  • Strict availability requirements
  • Fail faster on systemic issues
1ais config cluster get_batch.max_soft_errs=10

get_batch.warmup_workers (default: 2)

Pagecache warming pool size (best-effort read-ahead).

When to increase:

  • Fast NVMe storage
  • Reduce first-access latency
  • CPU/memory headroom available

When to disable (set to -1):

  • High memory pressure
  • Slow disks where read-ahead adds no value
  • CPU-constrained environments
1ais config cluster get_batch.warmup_workers=4

To disable warmup/look-ahead operation:

1ais config cluster get_batch.warmup_workers=-1

Output Formats

GetBatch supports multiple archive formats via the mime field:

FormatExtensionUse Case
TAR.tarDefault, fastest, no compression
TAR+GZIP.tgz or .tar.gzCompressed, slower but smaller
ZIP.zipWindows-compatible, moderate compression
TAR+LZ4.tar.lz4Fast compression, good balance

Recommendation: Use .tar for maximum throughput unless network bandwidth is constrained.


Naming Conventions

Default Naming (onob: false)

Files in output TAR include bucket prefix:

bucket-name/object-1
bucket-name/object-2
other-bucket/object-3

Object-Only Naming (onob: true)

Files in output TAR omit bucket:

object-1
object-2
object-3

Archived Files

When extracting from shards with archpath:

Default (onob: false):

bucket/shard-0000.tar/image_42.jpg
bucket/shard-0001.tar/image_42.jpg

Object-only (onob: true):

shard-0000.tar/image_42.jpg
shard-0001.tar/image_42.jpg

Monitoring & Observability

GetBatch exposes Prometheus metrics for:

  • Throughput (objects vs archived files)
  • Resource pressure (throttling, RxWait stalls)
  • Error rates (soft vs hard errors)

See: Monitoring GetBatch for detailed metrics, PromQL queries, and operational guidance.

CLI monitoring example

1$ ais show job --refresh 10
2
3get-batch[NeEra51oM] (ctl:
4 pending:(1,1) reqs:21849 objs: [1398300 2.00GiB] files: [391284 8.70GiB], bewarm:on avg-wait:2.3ms
5 pending:(2,2) reqs:22188 objs: [1419972 2.03GiB] files: [395120 8.78GiB], bewarm:on avg-wait:2.2ms
6 pending:(3,3) reqs:22210 objs: [1421380 2.03GiB] files: [395940 8.80GiB], bewarm:on avg-wait:2.1ms
7)
8NODE ID KIND BUCKET OBJECTS BYTES START END STATE
9target1 NeEra51oM get-batch ais://nnn 1419972 2.03GiB 14:38:20 - Running
10target2 NeEra51oM get-batch ais://nnn 1421380 2.03GiB 14:38:20 - Running
11target3 NeEra51oM get-batch ais://nnn 1398300 2.00GiB 14:38:20 - Running
12 Total: 4239652 6.06GiB ✓
13------------------------------------------------------------------------
14get-batch[NeEra51oM] (ctl:
15 pending:(1,1) reqs:25099 objs: [1606276 2.30GiB] files: [445880 9.95GiB], bewarm:on avg-wait:2.0ms
16 pending:(2,2) reqs:25360 objs: [1622980 2.32GiB] files: [450110 10.05GiB], bewarm:on avg-wait:1.9ms
17 pending:(2,2) reqs:25435 objs: [1627780 2.33GiB] files: [451090 10.07GiB], bewarm:on avg-wait:1.9ms
18)
19NODE ID KIND BUCKET OBJECTS BYTES START END STATE
20target1 NeEra51oM get-batch ais://nnn 1627780 2.33GiB 14:38:20 - Running
21target2 NeEra51oM get-batch ais://nnn 1622980 2.32GiB 14:38:20 - Running
22target3 NeEra51oM get-batch ais://nnn 1606276 2.30GiB 14:38:20 - Running
23 Total: 4857036 6.95GiB ✓
24------------------------------------------------------------------------
25get-batch[NeEra51oM] (ctl:
26 pending:(1,1) reqs:28729 objs: [1838584 2.63GiB] files: [512770 11.38GiB], bewarm:on avg-wait:1.8ms
27 pending:(2,2) reqs:28540 objs: [1826488 2.61GiB] files: [510330 11.33GiB], bewarm:on avg-wait:1.8ms
28 pending:(3,3) reqs:28343 objs: [1813892 2.59GiB] files: [507940 11.28GiB], bewarm:on avg-wait:1.7ms
29)
30NODE ID KIND BUCKET OBJECTS BYTES START END STATE
31target1 NeEra51oM get-batch ais://nnn 1838584 2.63GiB 14:38:20 - Running
32target2 NeEra51oM get-batch ais://nnn 1826488 2.61GiB 14:38:20 - Running
33target3 NeEra51oM get-batch ais://nnn 1813892 2.59GiB 14:38:20 - Running
34 Total: 5478964 7.84GiB ✓

CLI wll render CtlMsg output on multiple lines when it includes multiple aggregated messages.


Advanced Use Cases

ML Training: Deterministic Epoch Loading

1from aistore.sdk import Client
2import tarfile
3from io import BytesIO
4
5def load_epoch(epoch_num, num_shards=10000):
6 """Load training data for specific epoch with deterministic ordering."""
7
8 client = Client("http://aistore-gateway")
9 bucket = client.bucket("training-data")
10
11 # Build batch: extract same sample from 10K shards
12 batch = client.batch(bucket=bucket, output_format=".tar", cont_on_err=True)
13 for i in range(num_shards):
14 batch.add(
15 f"shard-{i:06d}.tar",
16 archpath=f"samples/epoch_{epoch_num}.jpg"
17 )
18
19 # Stream TAR directly into training loop
20 for moss_out, content in batch.get():
21 if not moss_out.err_msg: # Skip missing
22 yield content
23
24# Usage in training loop
25for epoch in range(num_epochs):
26 for sample_bytes in load_epoch(epoch):
27 image = decode_image(sample_bytes)
28 train_step(image)

Distributed Processing: Scatter-Gather Pattern

1# Partition dataset across workers
2def get_worker_partition(worker_id, num_workers, total_objects):
3 """Each worker fetches its partition via GetBatch."""
4
5 client = Client("http://aistore-gateway")
6 bucket = client.bucket("dataset")
7
8 # Calculate this worker's range
9 objects_per_worker = total_objects // num_workers
10 start = worker_id * objects_per_worker
11 end = start + objects_per_worker
12
13 # Fetch partition in one batch
14 batch = client.batch(bucket=bucket)
15 for i in range(start, end):
16 batch.add(f"object-{i:08d}.bin")
17
18 return batch.get()
19
20# Worker process
21for moss_out, data in get_worker_partition(worker_id=0, num_workers=10, total_objects=1_000_000):
22 process(data)

Limitations & Future Work

Current limitations:

  • Range reads (start/length) not yet implemented
  • Shard extraction is sequential within each archive

Roadmap:

  • Range read support for partial object retrieval
  • Multi-file extraction from single shard (performance optimization)
  • Finer-grained work item abort controls

References