deeplearning/modulus/modulus-v2209/_modules/modulus/hydra/utils.html
Source code for modulus.hydra.utils
import functools
import hydra
import os
import torch
import logging
import copy
import pprint
from termcolor import colored
from pathlib import Path
from omegaconf import DictConfig, OmegaConf, MISSING
from typing import Optional, Any, Union, List
from hydra._internal.utils import _run_hydra, get_args_parser
from hydra.core.hydra_config import HydraConfig
from hydra.utils import get_original_cwd
from modulus.key import Key
from modulus.models.arch import Arch
from modulus.distributed import DistributedManager
from modulus.models.utils import ModulusModels
from modulus.models.layers import Activation
from .arch import ModelConf
from .config import register_modulus_configs, ModulusConfig
from .hydra import register_hydra_configs
from .loss import register_loss_configs
from .metric import register_metric_configs
from .arch import register_arch_configs
from .optimizer import register_optimizer_configs
from .pde import register_pde_configs
from .profiler import register_profiler_configs
from .scheduler import register_scheduler_configs
from .training import register_training_configs
from .callbacks import register_callbacks_configs
from .graph import register_graph_configs
logger = logging.getLogger(__name__)
[docs]def main(config_path: str, config_name: str = "config"):
"""Modified decorator for loading hydra configs in modulus
See: https://github.com/facebookresearch/hydra/blob/main/hydra/main.py
"""
def register_decorator(func):
@functools.wraps(func)
def func_decorated(cfg_passthrough: Optional[DictConfig] = None) -> Any:
# Register all modulus groups before calling hydra main
register_hydra_configs()
register_callbacks_configs()
register_loss_configs()
register_metric_configs()
register_arch_configs()
register_optimizer_configs()
register_pde_configs()
register_profiler_configs()
register_scheduler_configs()
register_training_configs()
register_modulus_configs()
register_graph_configs()
# Set number of intraop torch CPU threads
torch.set_num_threads(1) # TODO: define this as a hydra config somehow
# Setup distributed process config
DistributedManager.initialize()
# Create model parallel process group
model_parallel_size = os.getenv(
"MODEL_PARALLEL_SIZE"
) # TODO: get this from config instead
if model_parallel_size:
# Create model parallel process group
DistributedManager.create_process_subgroup(
"model_parallel", int(model_parallel_size), verbose=True
)
# Create data parallel process group for DDP allreduce
DistributedManager.create_orthogonal_process_group(
"data_parallel", "model_parallel", verbose=True
)
# Pass through dict config
if cfg_passthrough is not None:
return func(cfg_passthrough)
else:
args_parser = get_args_parser()
args = args_parser.parse_args()
# multiple times (--multirun)
# Needs update for 1.2.0
_run_hydra(
args_parser=args_parser,
task_function=func,
config_path=config_path,
config_name=config_name,
)
return func_decorated
return register_decorator
[docs]def compose(
config_name: Optional[str] = None,
config_path: Optional[str] = None,
overrides: List[str] = [],
return_hydra_config: bool = False,
job_name: Optional[str] = "app",
caller_stack_depth: int = 2,
) -> DictConfig:
"""Internal Modulus config initializer and compose function.
This is an alternative for initializing a Hydra config which should be used
as a last ditch effort in cases where @modulus.main() cannot work. For more info
see: https://hydra.cc/docs/advanced/compose_api/
Parameters
----------
config_name : str
Modulus config name
config_path : str
Path to config file relative to the caller at location caller_stack_depth
overrides : list of strings
List of overrides
return_hydra_config : bool
Return the hydra options in the dict config
job_name : string
Name of program run instance
caller_stack_depth : int
Stack depth of this function call (needed for finding config relative to python).
"""
# Clear already initialized hydra
hydra.core.global_hydra.GlobalHydra.instance().clear()
hydra.initialize(
config_path,
job_name,
caller_stack_depth,
)
register_hydra_configs()
register_callbacks_configs()
register_loss_configs()
register_metric_configs()
register_arch_configs()
register_optimizer_configs()
register_pde_configs()
register_profiler_configs()
register_scheduler_configs()
register_training_configs()
register_modulus_configs()
register_graph_configs()
cfg = hydra.compose(
config_name=config_name,
overrides=overrides,
return_hydra_config=return_hydra_config,
)
return cfgdef instantiate_arch(
cfg: ModelConf,
input_keys: Union[List[Key], None] = None,
output_keys: Union[List[Key], None] = None,
detach_keys: Union[List[Key], None] = None,
verbose: bool = False,
**kwargs,
) -> Arch:
# Function for instantiating a modulus architecture with hydra
assert hasattr(
cfg, "arch_type"
), "Model configs are required to have an arch_type defined. \
Improper architecture supplied, please make sure config \
provided is a single arch config NOT the full hydra config!"
try:
# Convert to python dictionary
model_cfg = OmegaConf.to_container(cfg, resolve=True)
# Get model class beased on arch type
modulus_models = ModulusModels()
model_arch = modulus_models[model_cfg["arch_type"]]
del model_cfg["arch_type"]
# Add keys if present
if not input_keys is None:
model_cfg["input_keys"] = input_keys
if not output_keys is None:
model_cfg["output_keys"] = output_keys
if not detach_keys is None:
model_cfg["detach_keys"] = detach_keys
# Add any additional kwargs
for key, value in kwargs.items():
model_cfg[key] = value
# Init model from config dictionary
model, param = model_arch.from_config(model_cfg)
# Verbose printing
if verbose:
pp = pprint.PrettyPrinter(indent=4)
logger.info(f"Initialized models with parameters: \n")
pp.pprint(param)
except Exception as e:
fail = colored(f"Failed to initialize architecture.\n {model_cfg}", "red")
raise Exception(fail) from e
return model
def instantiate_optim(
cfg: DictConfig, model: torch.nn.Module, verbose: bool = False
) -> torch.optim.Optimizer:
# Function for instantiating an optimizer with hydra
# Remove custom parameters used internally in modulus
optim_cfg = copy.deepcopy(cfg.optimizer)
del optim_cfg._params_
try:
optimizer = hydra.utils.instantiate(optim_cfg, params=model.parameters())
except Exception as e:
fail = colored("Failed to initialize optimizer: \n", "red")
logger.error(fail + to_yaml(optim_cfg))
raise Exception(fail) from e
if verbose:
pp = pprint.PrettyPrinter(indent=4)
logger.info(f"Initialized optimizer: \n")
pp.pprint(optimizer)
return optimizer
def instantiate_sched(
cfg: DictConfig, optimizer: torch.optim
) -> torch.optim.lr_scheduler:
# Function for instantiating a scheduler with hydra
sched_cfg = copy.deepcopy(cfg.scheduler)
# Default is no scheduler, so just make fixed LR
if sched_cfg is MISSING:
sched_cfg = {
"_target_": "torch.optim.lr_scheduler.ConstantLR",
"factor": 1.0,
}
# Handle custom cases
if sched_cfg._target_ == "custom":
if "tf.ExponentialLR" in sched_cfg._name_:
sched_cfg = {
"_target_": "torch.optim.lr_scheduler.ExponentialLR",
"gamma": sched_cfg.decay_rate ** (1.0 / sched_cfg.decay_steps),
}
else:
logger.warn("Detected unsupported custom scheduler", sched_cfg)
try:
scheduler = hydra.utils.instantiate(sched_cfg, optimizer=optimizer)
except Exception as e:
fail = colored("Failed to initialize scheduler: \n", "red")
logger.error(fail + to_yaml(sched_cfg))
raise Exception(fail) from e
return scheduler
def instantiate_agg(cfg: DictConfig, model: torch.nn.Module, num_losses: int = 1):
# Function for instantiating a loss aggregator with hydra
try:
aggregator = hydra.utils.instantiate(
cfg.loss,
model,
num_losses,
_convert_="all",
)
except Exception as e:
fail = colored("Failed to initialize loss aggregator: \n", "red")
logger.error(fail + to_yaml(cfg.loss))
raise Exception(fail) from e
return aggregator
[docs]def to_yaml(cfg: DictConfig):
"""Converges dict config into a YML string"""
return OmegaConf.to_yaml(cfg)
[docs]def add_hydra_run_path(path: Union[str, Path]) -> Path:
"""Prepends current hydra run path"""
working_dir = Path(os.getcwd())
# Working directory only present with @modulus.main()
if HydraConfig.initialized():
org_dir = Path(get_original_cwd())
hydra_dir = working_dir.relative_to(org_dir) / Path(path)
else:
hydra_dir = working_dir / Path(path)
if isinstance(path, str):
hydra_dir = str(hydra_dir)
return hydra_dir
[docs]def to_absolute_path(*args: Union[str, Path]):
"""Converts file path to absolute path based on run file location
Modified from: https://github.com/facebookresearch/hydra/blob/main/hydra/utils.py
"""
out = ()
for path in args:
p = Path(path)
if not HydraConfig.initialized():
base = Path(os.getcwd())
else:
ret = HydraConfig.get().runtime.cwd
base = Path(ret)
if p.is_absolute():
ret = p
else:
ret = base / p
if isinstance(path, str):
out = out + (str(ret),)
else:
out = out + (ret,)
if len(args) == 1:
out = out[0]
return out