core.inference.data_parallel_inference_coordinator#
Module Contents#
Classes#
Coordinates inference requests between clients and distributed model engines. |
API#
- class core.inference.data_parallel_inference_coordinator.DataParallelInferenceCoordinator(
- pipe_connection: multiprocessing.connection.Connection,
- data_parallel_size: int,
- tokenizer,
- inference_coordinator_port: int | None = None,
- deterministic_mode: bool = False,
- block_size_tokens: int | None = None,
- enable_prefix_caching: bool = False,
- prefix_caching_coordinator_policy: megatron.core.inference.config.PrefixCachingCoordinatorPolicy = PrefixCachingCoordinatorPolicy.FIRST_PREFIX_BLOCK,
- schedule_output_path: str | None = None,
Coordinates inference requests between clients and distributed model engines.
This class acts as a central server. It uses a ZMQ ROUTER socket to manage communication flows between multiple clients and multiple data parallel ranks.
The coordinator’s main responsibilities are:
Worker Registration: It waits for a specified number of data parallel ranks (representing distributed model instances) to connect and register themselves.
Client Connection: It accepts connections from external clients, like
InferenceClient, and performs a simple handshake.Request Forwarding: It receives inference requests from clients, assigns a unique server-side request ID, tokenizes the prompt, and forwards the request to one of the available data parallel rank using a round-robin scheduling strategy.
Response Routing: It receives completed results from the data parallel ranks and routes them back to the original client that made the request.
Control Signal Broadcasting: It relays control signals (e.g., PAUSE, STOP) from a client to all connected data parallel ranks.
.. attribute:: router_socket
The central ZMQ ROUTER socket for all communication.
- Type:
zmq.Socket
.. attribute:: data_parallel_size
The number of data parallel workers to expect.
- Type:
int
.. attribute:: identities_of_data_parallel_ranks
A deque holding the ZMQ identities of connected TP-coordinators, used for round-robin scheduling.
- Type:
deque
.. attribute:: request_id_to_client_id
Maps server-side request IDs to the ZMQ identity of the client that initiated the request.
- Type:
dict
.. attribute:: request_id_to_client_request_id
Maps server-side request IDs to the original request ID provided by the client.
- Type:
dict
.. attribute:: next_request_id
A counter for generating unique server-side request IDs.
- Type:
int
Initialization
Initializes the inference coordinator.
This sets up the ZMQ context and a ROUTER socket, binding it to the given port. It then enters a blocking loop to wait for all expected data parallel ranks to connect before proceeding.
- Parameters:
pipe_connection (Connection) – A connecting pipe to the parent process.
data_parallel_size (int) – The number of TP-coordinator workers that are expected to connect.
tokenizer – The tokenizer to use for prompt tokenization and detokenization.
inference_coordinator_port (Optional[int]) – The TCP port number to bind the server to.
- class CoordinatorState(*args, **kwds)#
Bases:
enum.EnumState machine for the coordinator.
Initialization
- RUNNING#
‘auto(…)’
- PAUSED#
‘auto(…)’
- SUSPENDED#
‘auto(…)’
- STOPPING#
‘auto(…)’
- get_next_data_parallel_rank()#
Selects the next data parallel rank using round-robin scheduling.
- Returns:
The ZMQ identity of the next data parallel rank to receive a request.
- Return type:
bytes
- _remove_engine(identity)#
Remove a disconnected engine from the routing pool.
- _send_to_engine(identity, payload)#
Send payload to an engine, removing it from the pool if unreachable.
- Returns:
True if the send succeeded, False if the engine was unreachable and removed.
- compute_request_hashes(prompt)#
Compute block hashes for a prompt on CPU.
- Parameters:
prompt – Either a string (to be tokenized) or a list of token IDs.
- Returns:
List of integer block hashes, or empty list if prefix caching is disabled.
- get_best_data_parallel_rank(request_hashes)#
Select the best DP rank based on prefix cache affinity.
Iterates request hashes in reverse order and picks the rank that cached the longest matching prefix (the furthest hash found). Since hashes are parent-chained, finding hash[i] in a rank guarantees hash[0..i-1] are also present. Among ranks that share the longest match, the most recently assigned rank (highest timestamp) is preferred. Falls back to round-robin when no rank matches.
- Parameters:
request_hashes – List of block hashes for the request.
- Returns:
The ZMQ identity of the selected data parallel rank.
- Return type:
bytes
- _update_rank_hashes(rank_identity, request_hashes)#
Record that a rank owns the given hashes.
- Parameters:
rank_identity – ZMQ identity of the target rank.
request_hashes – List of block hashes assigned to this rank.
- start()#
Starts the main event loop for the coordinator.
This method runs an infinite loop, continuously listening for incoming messages on the ZMQ ROUTER socket. It parses the message header to determine the message type and takes appropriate action, such as handling new client connections, forwarding requests, broadcasting control signals, or processing replies from the engines.
- detokenize(finished_request)#
Detokenizes the generated tokens in the finished request.
This method uses the coordinator’s tokenizer to convert the list of generated token IDs back into human-readable text.
- Parameters:
finished_request (dict) – The serialized merged request containing the generated tokens to be detokenized. It is modified in place.
- classmethod entrypoint(
- pipe_connection: multiprocessing.connection.Connection,
- ready_event: multiprocessing.Event,
- data_parallel_size: int,
- tokenizer,
- inference_coordinator_port: int | None = None,
- deterministic_mode: bool = False,
- block_size_tokens: int | None = None,
- enable_prefix_caching: bool = False,
- prefix_caching_coordinator_policy: megatron.core.inference.config.PrefixCachingCoordinatorPolicy = PrefixCachingCoordinatorPolicy.FIRST_PREFIX_BLOCK,
- schedule_output_path: str | None = None,
Class method to instantiate and run the coordinator, for use in a separate process.
This method initializes the coordinator, signals a
ready_eventto indicate that it is fully initialized and listening, and then starts the main event loop.- Parameters:
pipe_connection (Connection) – A connecting pipe to the parent process.
ready_event (Event) – A threading or multiprocessing event object that is set() once the coordinator is ready to accept connections.
inference_coordinator_port (int) – The port to bind to.
data_parallel_size (int) – The number of expected TP-coordinators.
deterministic_mode (bool) – Whether to enable deterministic scheduling.
block_size_tokens (Optional[int]) – Token block size for prefix caching hashing.
enable_prefix_caching (bool) – Whether prefix caching is enabled.
prefix_caching_coordinator_policy (PrefixCachingCoordinatorPolicy) – Routing policy.
schedule_output_path (Optional[str]) – Path to write scheduling decisions JSON.
- stop()#
Stops the inference coordinator, performing any necessary cleanup operations.