core.inference.engines.async_zmq_communicator#
Module Contents#
Classes#
An asyncio-friendly communicator abstraction using ZMQ. Can be used to implement collective operations like all-reduce, and bcast which are asyncio friendly on top of ZMQ sockets. Only to be used with small amounts of data (e.g., 1 integer) on the CPU. |
API#
- class core.inference.engines.async_zmq_communicator.AsyncZMQCommunicator(
- zmq_context: zmq.Context,
- process_group: torch.distributed.ProcessGroup,
- hostname: str | None = None,
An asyncio-friendly communicator abstraction using ZMQ. Can be used to implement collective operations like all-reduce, and bcast which are asyncio friendly on top of ZMQ sockets. Only to be used with small amounts of data (e.g., 1 integer) on the CPU.
Initialization
Constructor for AsyncZMQCommunicator. Sets up ZMQ sockets for communication among ranks in the given process group.
- Parameters:
zmq_context (zmq.Context) – ZMQ context to create sockets.
process_group (dist.ProcessGroup) – Process group for communication.
hostname (str | None) – Hostname or IP address to use for ZMQ socket binding. If None, defaults to socket.gethostname().
- async all_reduce_max(
- *local_vals: int,
- async_op=True,
Element-wise all-reduce max of one or more integers.
Packs all values into a single message so the communication cost is independent of the number of values.
Returns a single int when called with one argument, otherwise a tuple.
- sync_all_reduce_max(*local_vals: int) int | tuple[int, ...]#
Synchronous (non-asyncio) variant of all_reduce_max.
Uses blocking ZMQ sends/recvs so it can be called from synchronous call sites that need a CPU-only MAX reduction across the process group. Intended for tiny payloads (e.g. a few integers) that would otherwise force a NCCL AllReduce kernel on the compute stream.
Note: when called from inside a running asyncio event loop, the blocking recv will pause other coroutines on this rank until all peers respond. This is acceptable here because every rank reaches the call simultaneously and the message size is trivial.
Returns a single int when called with one argument, otherwise a tuple.
- close()#
Close the ZMQ sockets.