core.inference.apis._llm_base#
Internal building blocks for the Megatron inference high-level API.
This module hosts private helpers shared by MegatronLLM and
MegatronAsyncLLM: _EventLoopManager, _CoordinatorRuntime, and
_MegatronLLMBase. The public sync/async wrappers live on the subclasses;
this base only exposes shared engine state, runtime spawn, validation
helpers, and the private _<method>_impl coroutines.
Module Contents#
Classes#
Per-instance background daemon thread + persistent asyncio event loop. |
|
Owns the dynamic-inference coordinator and |
|
Private base shared by |
API#
- class core.inference.apis._llm_base._EventLoopManager#
Per-instance background daemon thread + persistent asyncio event loop.
Bridges sync and async user-thread callers to coroutines that run on the background loop via
asyncio.run_coroutine_threadsafe.Initialization
- start() None#
Spawn the daemon thread and start the event loop. Idempotent.
- property loop: asyncio.AbstractEventLoop#
The background asyncio loop. Raises if
start()has not been called.
- submit(coro: Coroutine) concurrent.futures.Future#
Schedule
coroon the background loop and return its future.The caller decides how to wait on the returned future (e.g.
.result()for blocking sync,asyncio.wrap_future(...)for awaiting from another loop).
- run_sync(coro: Coroutine)#
Schedule
coroon the background loop and block on its result.Must not be called from a coroutine running on
self._loopitself – that would deadlock, since the only loop that could dispatchcorowould be the one already blocked waiting for the caller. Calling from a different loop (e.g., the user’s main-thread asyncio loop) is allowed:cororuns on the background loop while the caller’s loop is stalled until.result()returns.
- async run_async(coro: Coroutine)#
Schedule
coroon the background loop and await it from any loop.
- stop() None#
Stop the event loop and join the background thread. Idempotent.
- class core.inference.apis._llm_base._CoordinatorRuntime(
- engine: megatron.core.inference.engines.dynamic_engine.DynamicInferenceEngine,
- *,
- is_primary: bool,
- coordinator_host: Optional[str],
- coordinator_port: Optional[int],
Owns the dynamic-inference coordinator and
InferenceClientlifecycle.Async-native: :meth:
setupand :meth:teardownare coroutines meant to run on a background loop owned by :class:_EventLoopManager. The primary rank additionally holds an :class:InferenceClientused by the high-level API to submit requests and send control signals.Initialization
- async setup(*, loop: asyncio.AbstractEventLoop) None#
Bring the coordinator and (on primary) the
InferenceClientup.Calls
engine.start_listening_to_data_parallel_coordinator(loop=loop)on every rank. Only host/port kwargs that the caller actually supplied are forwarded so the engine can auto-bind when both areNone.
- async teardown() None#
Idempotent best-effort shutdown of the coordinator + client.
Safe to call from partial-setup state (e.g., when :meth:
setupraised after the coordinator subprocess spawned but before the client opened). Worker ranks are always no-op; theirengine_loop_taskis awaited by- Meth:
_MegatronLLMBase._shutdown_implafter the primary has issued the STOP signal.
- property client: InferenceClient | None#
The :class:
InferenceClienton the primary rank;Noneon workers.
- property coord_addr: Optional[str]#
Address returned by
start_listening_to_data_parallel_coordinator.
- class core.inference.apis._llm_base._MegatronLLMBase(
- *,
- model,
- tokenizer,
- inference_config: Optional[megatron.core.inference.config.InferenceConfig] = None,
- use_coordinator: bool = False,
- coordinator_host: Optional[str] = None,
- coordinator_port: Optional[int] = None,
Private base shared by
MegatronLLMandMegatronAsyncLLM.This base intentionally exposes no public
generate/ lifecycle methods – those live on the subclasses, which call into the private_<method>_implcoroutines defined here. The base owns:the engine pipeline (engine, context, controller),
the per-instance background runtime (
_loop_manager,_coord_runtime) whenuse_coordinator=True,validation helpers (
_assert_primary,_assert_coordinator) and the input shape helper (_normalize_prompts).
Two execution modes are supported:
Direct mode (
use_coordinator=False): every rank is treated as primary andgenerateruns the engine synchronously (offloaded to a thread when called from an event loop). Lifecycle methods are invalid and raise :class:RuntimeErrorvia_assert_coordinator.Coordinator mode (
use_coordinator=True): a background event loop hosts the engine pipeline and an :class:InferenceClient(on global rank 0). Only the primary rank may submit requests viagenerate.
modelmust be in eval mode before construction; this class does not modify the model state.Initialization
- property is_primary_rank: bool#
Whether
generatemay be called on this rank.
- property engine: megatron.core.inference.engines.dynamic_engine.DynamicInferenceEngine#
The underlying :class:
DynamicInferenceEngine.
- property context: megatron.core.inference.contexts.dynamic_context.DynamicInferenceContext#
The underlying :class:
DynamicInferenceContext.
- property controller: megatron.core.inference.text_generation_controllers.text_generation_controller.TextGenerationController#
The underlying :class:
TextGenerationController.
- _assert_primary() None#
- _assert_coordinator() None#
- _normalize_prompts(
- prompts: Union[str, List[int], List[str], List[List[int]]],
Return
(normalized_list, is_batch_input)."abc"->(["abc"], False)[1, 2, 3]->([[1, 2, 3]], False)(single token-id prompt)["abc", "def"]->(["abc", "def"], True)[[1, 2], [3, 4]]->([[1, 2], [3, 4]], True)[]->([], True)
Only the first element is inspected to distinguish single vs batch; per-element type validation is left to the engine.
- async _generate_impl(
- prompts: Union[List[str], List[List[int]]],
- sp: megatron.core.inference.sampling_params.SamplingParams,
Run inference for a non-empty list of prompts; returns input-ordered list.
Coordinator mode: must run on the runtime loop (via
_loop_manager.run_async); enqueues requests throughclient.add_requestand gathers all futures.Direct mode: runs on the caller’s event loop; offloads the synchronous
engine.generateto a thread.
- async _pause_impl() None#
- async _unpause_impl() None#
- async _suspend_impl() None#
- async _resume_impl() None#
- async _shutdown_impl() None#
- async _wait_for_shutdown_impl() None#