nemo_gym.server_utils#

Module Contents#

Classes#

Functions#

get_global_aiohttp_client

set_global_aiohttp_client

is_global_aiohttp_client_setup

global_aiohttp_client_exit

request

raise_for_status

get_response_json

initialize_ray

Initialize ray cluster in a process. We store the Ray address in the global config dict so that child processes can connect to it. This avoids the need to start a new Ray cluster in each child process. Note: This function will modify the global config dict - update ray_head_node_address

is_nemo_gym_fastapi_worker

set_is_nemo_gym_fastapi_worker

Data#

API#

nemo_gym.server_utils._GLOBAL_AIOHTTP_CLIENT: Union[None, aiohttp.ClientSession]#

None

nemo_gym.server_utils._GLOBAL_AIOHTTP_CLIENT_REQUEST_DEBUG: bool#

False

class nemo_gym.server_utils.GlobalAIOHTTPAsyncClientConfig(/, **data: typing.Any)[source]#

Bases: pydantic.BaseModel

global_aiohttp_connector_limit: int#

None

global_aiohttp_connector_limit_per_host: int#

1024

global_aiohttp_client_request_debug: bool#

False

nemo_gym.server_utils.get_global_aiohttp_client(
global_config_dict_parser_config: Optional[nemo_gym.global_config.GlobalConfigDictParserConfig] = None,
global_config_dict_parser_cls: Type[nemo_gym.global_config.GlobalConfigDictParser] = GlobalConfigDictParser,
) aiohttp.ClientSession[source]#
nemo_gym.server_utils.set_global_aiohttp_client(
cfg: nemo_gym.server_utils.GlobalAIOHTTPAsyncClientConfig,
) aiohttp.ClientSession[source]#
nemo_gym.server_utils.is_global_aiohttp_client_setup() bool[source]#
nemo_gym.server_utils.global_aiohttp_client_exit()[source]#
nemo_gym.server_utils.MAX_NUM_TRIES#

3

async nemo_gym.server_utils.request(
method: str,
url: str,
_internal: bool = False,
**kwargs: Unpack[aiohttp.client._RequestOptions],
) aiohttp.ClientResponse[source]#
async nemo_gym.server_utils.raise_for_status(response: aiohttp.ClientResponse) None[source]#
async nemo_gym.server_utils.get_response_json(response: aiohttp.ClientResponse) Any[source]#
nemo_gym.server_utils.DEFAULT_HEAD_SERVER_PORT#

11000

nemo_gym.server_utils.ServerStatus#

None

class nemo_gym.server_utils.ServerClient(/, **data: typing.Any)[source]#

Bases: pydantic.BaseModel

head_server_config: nemo_gym.config_types.BaseServerConfig#

None

global_config_dict: omegaconf.DictConfig#

None

model_config#

‘ConfigDict(…)’

classmethod load_head_server_config() nemo_gym.config_types.BaseServerConfig[source]#
classmethod load_from_global_config(
head_server_config: Optional[nemo_gym.config_types.BaseServerConfig] = None,
) nemo_gym.server_utils.ServerClient[source]#
_build_server_base_url(server_config_dict: omegaconf.OmegaConf) str[source]#
async request(
server_name: str,
url_path: str,
method: str,
**kwargs: Unpack[aiohttp.client._RequestOptions],
) aiohttp.ClientResponse[source]#
async get(
server_name: str,
url_path: str,
**kwargs: Unpack[aiohttp.client._RequestOptions],
) aiohttp.ClientResponse[source]#
Parameters:
  • server_name – str The name of the server you are trying to call.

  • url_path – str The URL path in the server you are trying to call e.g. “/v1/responses”.

async post(
server_name: str,
url_path: str,
**kwargs: Unpack[aiohttp.client._RequestOptions],
) aiohttp.ClientResponse[source]#
Parameters:
  • server_name – str The name of the server you are trying to call.

  • url_path – str The URL path in the server you are trying to call e.g. “/v1/responses”.

poll_for_status(
server_name: str,
) nemo_gym.server_utils.ServerStatus[source]#
nemo_gym.server_utils.SESSION_ID_KEY#

‘session_id’

class nemo_gym.server_utils.BaseServer(/, **data: typing.Any)[source]#

Bases: pydantic.BaseModel

All instances of BaseServer are queryable using ServerClient.

Initialization

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

config: nemo_gym.config_types.BaseRunServerInstanceConfig#

None

classmethod load_config_from_global_config() nemo_gym.config_types.BaseRunServerInstanceConfig[source]#
class nemo_gym.server_utils.ProfilingMiddlewareInputConfig(/, **data: typing.Any)[source]#

Bases: pydantic.BaseModel

profiling_results_dirpath: Optional[str]#

None

class nemo_gym.server_utils.ProfilingMiddlewareConfig(/, **data: typing.Any)[source]#

Bases: nemo_gym.server_utils.ProfilingMiddlewareInputConfig

profiling_enabled: bool#

False

class nemo_gym.server_utils.UvicornLoggingConfig(/, **data: typing.Any)[source]#

Bases: pydantic.BaseModel

uvicorn_logging_show_200_ok: bool#

False

nemo_gym.server_utils.initialize_ray() None[source]#

Initialize ray cluster in a process. We store the Ray address in the global config dict so that child processes can connect to it. This avoids the need to start a new Ray cluster in each child process. Note: This function will modify the global config dict - update ray_head_node_address

nemo_gym.server_utils.IS_NEMO_GYM_FASTAPI_WORKER_KEY_NAME#

‘IS_NEMO_GYM_FASTAPI_WORKER’

nemo_gym.server_utils.is_nemo_gym_fastapi_worker() bool[source]#
nemo_gym.server_utils.set_is_nemo_gym_fastapi_worker() None[source]#
class nemo_gym.server_utils.SimpleServer(/, **data: typing.Any)[source]#

Bases: nemo_gym.server_utils.BaseServer

server_client: nemo_gym.server_utils.ServerClient#

None

abstractmethod setup_webserver() fastapi.FastAPI[source]#
get_session_middleware_key() str[source]#
setup_session_middleware(app: fastapi.FastAPI) None[source]#
setup_exception_middleware(app: fastapi.FastAPI) None[source]#
setup_profiling(
app: fastapi.FastAPI,
profiling_config: nemo_gym.server_utils.ProfilingMiddlewareConfig,
) None[source]#
set_ulimit(target_soft_limit: int = 65535)[source]#
prefix_server_logs() None[source]#
classmethod run_webserver() fastapi.FastAPI[source]#
class nemo_gym.server_utils.HeadServer(/, **data: typing.Any)[source]#

Bases: nemo_gym.server_utils.BaseServer

config: nemo_gym.config_types.BaseServerConfig#

None

_server_instances: List[dict]#

[]

setup_webserver() fastapi.FastAPI[source]#
get_server_instances() List[dict][source]#
set_server_instances(instances: List) None[source]#
classmethod run_webserver() Tuple[uvicorn.Server, threading.Thread, nemo_gym.server_utils.HeadServer][source]#
async global_config_dict_yaml() str[source]#
class nemo_gym.server_utils.ServerInstanceDisplayConfig(/, **data: typing.Any)[source]#

Bases: pydantic.BaseModel

config_path: Optional[str]#

None

dir_path: Optional[pathlib.Path]#

None

entrypoint: Optional[str]#

None

host: Optional[str]#

None

name: Optional[str]#

None

pid: Optional[int]#

None

port: Optional[int]#

None

process_name: Optional[str]#

None

server_type: Optional[str]#

None

start_time: Optional[float]#

None

status: Optional[nemo_gym.server_utils.ServerStatus]#

None

uptime_seconds: Optional[float]#

None

url: Optional[str]#

None