core.inference.data_parallel_inference_coordinator#
Module Contents#
Classes#
Coordinates inference requests between clients and distributed model engines. |
API#
- class core.inference.data_parallel_inference_coordinator.DataParallelInferenceCoordinator(
- pipe_connection: multiprocessing.connection.Connection,
- data_parallel_size: int,
- tokenizer,
- inference_coordinator_port: int | None = None,
- deterministic_mode: bool = False,
Coordinates inference requests between clients and distributed model engines.
This class acts as a central server. It uses a ZMQ ROUTER socket to manage communication flows between multiple clients and multiple data parallel ranks.
The coordinator’s main responsibilities are:
Worker Registration: It waits for a specified number of data parallel ranks (representing distributed model instances) to connect and register themselves.
Client Connection: It accepts connections from external clients, like
InferenceClient, and performs a simple handshake.Request Forwarding: It receives inference requests from clients, assigns a unique server-side request ID, tokenizes the prompt, and forwards the request to one of the available data parallel rank using a round-robin scheduling strategy.
Response Routing: It receives completed results from the data parallel ranks and routes them back to the original client that made the request.
Control Signal Broadcasting: It relays control signals (e.g., PAUSE, STOP) from a client to all connected data parallel ranks.
.. attribute:: router_socket
The central ZMQ ROUTER socket for all communication.
- Type:
zmq.Socket
.. attribute:: data_parallel_size
The number of data parallel workers to expect.
- Type:
int
.. attribute:: identities_of_data_parallel_ranks
A deque holding the ZMQ identities of connected TP-coordinators, used for round-robin scheduling.
- Type:
deque
.. attribute:: request_id_to_client_id
Maps server-side request IDs to the ZMQ identity of the client that initiated the request.
- Type:
dict
.. attribute:: request_id_to_client_request_id
Maps server-side request IDs to the original request ID provided by the client.
- Type:
dict
.. attribute:: next_request_id
A counter for generating unique server-side request IDs.
- Type:
int
Initialization
Initializes the inference coordinator.
This sets up the ZMQ context and a ROUTER socket, binding it to the given port. It then enters a blocking loop to wait for all expected data parallel ranks to connect before proceeding.
- Parameters:
pipe_connection (Connection) – A connecting pipe to the parent process.
data_parallel_size (int) – The number of TP-coordinator workers that are expected to connect.
tokenizer – The tokenizer to use for prompt tokenization and detokenization.
inference_coordinator_port (Optional[int]) – The TCP port number to bind the server to.
- get_next_data_parallel_rank()#
Selects the next data parallel rank using round-robin scheduling.
- Returns:
The ZMQ identity of the next data parallel rank to receive a request.
- Return type:
bytes
- start()#
Starts the main event loop for the coordinator.
This method runs an infinite loop, continuously listening for incoming messages on the ZMQ ROUTER socket. It parses the message header to determine the message type and takes appropriate action, such as handling new client connections, forwarding requests, broadcasting control signals, or processing replies from the engines.
- detokenize(finished_request_record)#
Detokenizes the generated tokens in the finished request record.
This method uses the coordinator’s tokenizer to convert the list of generated token IDs back into human-readable text.
- Parameters:
finished_request_record (dict) – The record containing the generated tokens to be detokenized. It is modified in place.
- classmethod entrypoint(
- pipe_connection: multiprocessing.connection.Connection,
- ready_event: multiprocessing.Event,
- data_parallel_size: int,
- tokenizer,
- inference_coordinator_port: int | None = None,
- deterministic_mode: bool = False,
Class method to instantiate and run the coordinator, for use in a separate process.
This method initializes the coordinator, signals a
ready_eventto indicate that it is fully initialized and listening, and then starts the main event loop.- Parameters:
pipe_connection (Connection) – A connecting pipe to the parent process.
ready_event (Event) – A threading or multiprocessing event object that is set() once the coordinator is ready to accept connections.
inference_coordinator_port (int) – The port to bind to.
data_parallel_size (int) – The number of expected TP-coordinators.
deterministic_mode (bool) – Whether to enable deterministic scheduling.
- stop()#
Stops the inference coordinator, performing any necessary cleanup operations.