aistore.sdk.obj.content_iterator.parallel

View as Markdown

Parallel object download using forked workers and shared memory.

Provides two download modes:

  • read_all(): workers write directly into a single full-size ParallelBuffer (shared memory). The caller receives a memoryview with no additional copy on the return path.

  • create_iter(): workers write into a fixed-size ring buffer in shared memory; the main process yields chunks in order. See the create_iter docstring for the detailed data flow.

Module Contents

Classes

NameDescription
ParallelContentIterProviderParallel content iterator backed by a shared-memory ring buffer.
WorkerStatePer-worker state populated by _init_worker before any task is dispatched.

Functions

NameDescription
_fetch_chunkDownload byte range [start, end) and write into worker_state.shm at offset.
_init_workerProcessPoolExecutor initializer hook — runs once per worker after forking.

Data

worker_state

API

class aistore.sdk.obj.content_iterator.parallel.ParallelContentIterProvider(
client: aistore.sdk.obj.object_client.ObjectClient,
chunk_size: typing.Optional[int],
num_workers: int
)

Bases: BaseContentIterProvider

Parallel content iterator backed by a shared-memory ring buffer.

Splits the object into fixed-size byte ranges and fetches them in parallel using ProcessPoolExecutor. Workers write directly into a bounded ring buffer of num_workers slots. Chunks are yielded to the caller in order.

Parameters:

client
ObjectClient

Client for accessing contents of an individual object.

chunk_size
Optional[int]

Size of each chunk of data yielded. If None, will attempt to use optimal chunk size from HeadObjectV2 response.

num_workers
int

Number of concurrent workers for fetching chunks.

_object_size
= attrs.size
num_workers
int

Get the number of concurrent workers.

aistore.sdk.obj.content_iterator.parallel.ParallelContentIterProvider._build_chunk_ranges(
offset: int
) -> typing.List[typing.Tuple[int, int]]

Return a list of (start, end) byte ranges covering the object from offset.

aistore.sdk.obj.content_iterator.parallel.ParallelContentIterProvider._fill_shm(
dst: aistore.sdk.obj.content_iterator.buffer.ParallelBuffer
) -> None

Download the entire object into dst (direct-to-destination mode).

Each worker owns a disjoint byte range within dst — no synchronization beyond waiting for all futures.

Parameters:

dst
ParallelBuffer

A ParallelBuffer of at least _object_size bytes, typically created and owned by read_all().

aistore.sdk.obj.content_iterator.parallel.ParallelContentIterProvider._get_fork_context()
staticmethod

Return the ‘fork’ multiprocessing context, or raise RuntimeError on unsupported platforms.

aistore.sdk.obj.content_iterator.parallel.ParallelContentIterProvider.create_iter(
offset: int = 0
) -> typing.Generator[bytes, None, None]

Yield object content in order using a sliding-window ring buffer.

Keeps at most num_workers range-reads in flight at any time. The ring buffer lives in /dev/shm::

/dev/shm ┌──────────┬──────────┬──────────┐ │ [slot 0] │ [slot 1] │ [slot 2] │ num_slots x slot_size bytes └──────────┴──────────┴──────────┘ chunks: [ chunk 0 ][ chunk 1 ][ chunk 2 ][ chunk 3 ].. slot = chunk_idx % num_slots

Main Process Forked Workers ───────────── ────────────────────────────── [slot 0] [slot 1] [slot 2] submit chunk 0 ───────────────> write slot 0 submit chunk 1 ─────────────────────────────> write slot 1 submit chunk 2 ───────────────────────────────────────────> write slot 2 future[0].result() ← done wait_slot(0) ← event set yield read_slot(0) submit chunk 3 ───────────────> write slot 0 (reuse) …

Parameters:

offset
intDefaults to 0

Starting byte offset. Defaults to 0.

aistore.sdk.obj.content_iterator.parallel.ParallelContentIterProvider.read_all() -> aistore.sdk.obj.content_iterator.buffer.ParallelBuffer

Download the entire object into shared memory and return a :class:ParallelBuffer.

Workers write directly into a SharedMemory segment; the caller receives a memoryview of that segment with no extra allocation or copy. The caller must call result.close() (or use it as a context manager) to release the shared memory when done.

class aistore.sdk.obj.content_iterator.parallel.WorkerState(
client: typing.Optional[aistore.sdk.obj.object_client.ObjectClient] = None,
shm: typing.Optional[multiprocessing.shared_memory.SharedMemory] = None,
slot_ready: typing.List[multiprocessing.Event] = list()
)
Dataclass

Per-worker state populated by _init_worker before any task is dispatched.

This instance lives at module scope so each forked worker inherits it via fork’s memory copy. _init_worker populates it once per worker; _fetch_chunk reads it on every invocation without re-serialization.

client
Optional[ObjectClient] = None
shm
Optional[SharedMemory] = None
slot_ready
List[Event] = field(default_factory=list)
aistore.sdk.obj.content_iterator.parallel._fetch_chunk(
start: int,
end: int,
offset: int,
slot_idx: int = -1
) -> int

Download byte range [start, end) and write into worker_state.shm at offset.

Parameters:

start
int

First byte of the range to fetch from the object.

end
int

One past the last byte (exclusive).

offset
int

Byte offset in the shm segment to write into. Ring-buffer callers pass slot * chunk_size; direct callers pass start.

slot_idx
intDefaults to -1

If >= 0, signals slot_ready[slot_idx] on completion (ring-buffer mode). Pass -1 to skip (direct mode).

aistore.sdk.obj.content_iterator.parallel._init_worker(
client: aistore.sdk.obj.object_client.ObjectClient,
shm_name: str,
slot_ready: typing.Optional[typing.List[multiprocessing.Event]] = None
) -> None

ProcessPoolExecutor initializer hook — runs once per worker after forking.

Populates worker_state before any _fetch_chunk task is dispatched so the worker is fully initialized before it does any work.

Parameters:

client
ObjectClient

Client for the worker to use to download chunks.

shm_name
str

Name of the shared memory segment to attach to.

slot_ready
Optional[List[mp.Event]]Defaults to None

Per-slot events for ring-buffer mode. Omit for direct-to-destination callers.

aistore.sdk.obj.content_iterator.parallel.worker_state = WorkerState()