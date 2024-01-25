NVIDIA Modulus Sym v1.2.0
Source code for modulus.sym.graph

# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" Helper functions for unrolling computational graph
"""

from copy import copy
import torch
import logging
from typing import Dict, List, Optional

from .models.arch import Arch, FuncArch
from .node import Node
from .key import Key
from .constants import diff_str
from .eq.derivatives import Derivative
from .manager import JitManager, GraphManager

logger = logging.getLogger(__name__)



[docs]class Graph(torch.nn.Module):
    """
    Torch Module that is constructed by unrolling a computational graph given
    desired inputs, outputs, and evaluatable nodes.

    Examples
    ========
    Here is a simple example of using `Graph` to unroll a two node graph.
    >>> import torch
    >>> from sympy import Symbol
    >>> from modulus.sym.node import Node
    >>> from modulus.sym.key import Key
    >>> from modulus.sym.graph import Graph
    >>> node_1 = Node.from_sympy(Symbol('x') + Symbol('y'), 'u')
    >>> node_2 = Node.from_sympy(Symbol('u') + 1.0, 'v')
    >>> graph = Graph([node_1, node_2], [Key('x'), Key('y')], [Key('v')])
    >>> graph.forward({'x': torch.tensor([1.0]), 'y': torch.tensor([2.0])})
    {'v': tensor([4.])}

    Parameters
    ----------
    nodes : List[Node]
        List of Modulus Nodes to unroll graph with.
    invar : List[Key]
        List of inputs to graph.
    req_names : List[Key]
        List of required outputs of graph.
    diff_nodes : List[Node]
        List of specialty nodes to compute derivatives.
        By default this is not needed.
    func_arch : bool, Optional
        If True, find the computable derivatives that are part of the Jacobian and
        Hessian of the neural network. They will be calculated during the forward
        pass using FuncArch.
        If None (default), will use the GraphManager to get the global flag
        (default is False), which could be configured in the hydra config with key
        `graph.func_arch`.
    func_arch_allow_partial_hessian : bool, Optional
        If True, allow evaluating partial hessian to save some unnecessary computations.
        For example, when the input is x, outputs are [u, p], and the needed derivatives
        are `[u__x, p__x, u__x__x]`, func_arch needs to evaluate the full hessian rows
        to be able to extract jacobian `p__x`. When this flag is on, func_arch will
        only output `[u__x, u__x__x]`, and `p__x` will be evaluated later by the autograd.
        If None (default), will use the GraphManager to get the global flag
        (default is True), which could be configured in the hydra config with key
        `graph.func_arch_allow_partial_hessian`.
    """

    def __init__(
        self,
        nodes: List[Node],
        invar: List[Key],
        req_names: List[Key],
        diff_nodes: List[Node] = [],
        func_arch: Optional[bool] = None,
        func_arch_allow_partial_hessian: Optional[bool] = None,
    ):
        super().__init__()

        # get configs from the graph manager
        graph_manager = GraphManager()
        func_arch = func_arch if func_arch is not None else graph_manager.func_arch
        func_arch_allow_partial_hessian = (
            func_arch_allow_partial_hessian
            if func_arch_allow_partial_hessian is not None
            else graph_manager.func_arch_allow_partial_hessian
        )

        self.req_names = req_names
        self.computable_names = set(_computable_names(nodes, invar))

        # check if graph can be computed
        req_names_no_diff = [Key(x.name) for x in req_names]
        if not set(req_names_no_diff).issubset(self.computable_names):
            _print_graph_unroll_error(nodes, invar, req_names)
            raise RuntimeError("Failed Unrolling Graph")

        # compute only necessary nodes for req_names
        # Walk backwards from the output nodes in the graph and keep adding required inputs
        # until all inputs are available in invar
        nodes = copy(nodes)
        necessary_nodes = []
        needed_names = [Key(x.name, derivatives=x.derivatives) for x in req_names] + [
            Key(x.name) for x in req_names
        ]
        while True:
            finished = True
            for i, node in enumerate(nodes):
                if not set(node.outputs).isdisjoint(set(needed_names)):
                    # Make needed names include derivatives!
                    needed_names += (
                        node.inputs
                        + [
                            Key(x.name, derivatives=x.derivatives)
                            for x in node.derivatives
                        ]
                        + [Key(x.name) for x in node.derivatives]
                    )
                    # needed_names.update(node.inputs() + [Key(x.name) for x in node.derivatives()])
                    necessary_nodes.append(node)
                    nodes.pop(i)
                    finished = False
            if finished:
                break

        # Convert arch node intto func_arch node if we find computable derivatives and the Arch
        # instance has supports_func_arch == True
        needed_names = set(needed_names)
        if func_arch:
            for i, node in enumerate(necessary_nodes):
                # `jit_mode_arch` is forced to be `only_activation` when func_arch is enabled,
                # so all Arch instances will not be `RecursiveScriptModules` and we are good
                # to transform it into FuncArch
                if isinstance(node.evaluate, Arch):
                    if node.evaluate.supports_func_arch:
                        computable_derivatives = (
                            node.evaluate._find_computable_deriv_with_func_arch(
                                needed_names, func_arch_allow_partial_hessian
                            )
                        )
                        if len(computable_derivatives):
                            node_name = necessary_nodes[i].name
                            necessary_nodes[i] = FuncArch(
                                node.evaluate, computable_derivatives
                            ).make_node(node_name)
                            logger.info(
                                f"{node_name} has been converted to a FuncArch node."
                            )
                    else:
                        logger.warning(
                            f"Arch {type(node.evaluate)} currently does not support FuncArch"
                        )

        # unroll graph with only necessary nodes
        # Store node evaluation order to use at runtime
        self.node_evaluation_order = []
        outvar = copy(invar)
        while True:
            # compute all nodes that don't need derivative calls
            while True:
                finished = True
                for i, node in enumerate(necessary_nodes):
                    if set(node.inputs + node.derivatives).issubset(set(outvar)):
                        self.node_evaluation_order.append(node)
                        outvar += node.outputs
                        necessary_nodes.pop(i)
                        finished = False
                if finished:
                    break
            # compute derivative calls all at once
            needed_derivatives = []
            for node in necessary_nodes:
                needed_derivatives += node.derivatives
            needed_derivatives += [x for x in req_names if x.derivatives]
            needed_derivatives = [
                diff for diff in needed_derivatives if diff not in outvar
            ]  # remove already computed diffs
            if len(needed_derivatives) > 0:
                # check if solution in diff nodes
                try_auto_diff = True
                for dn in diff_nodes:
                    if (not set(dn.outputs).isdisjoint(set(needed_derivatives))) and (
                        set(dn.inputs).issubset(set(outvar))
                    ):
                        # input_variables = Variables.subset(outvar, dn.inputs())
                        # outvar.update(dn.evaluate(input_variables))
                        self.node_evaluation_order.append(dn)
                        outvar += dn.outputs
                        try_auto_diff = False

                # compute first derivatives only
                if try_auto_diff:
                    # Variables.differentiate(outvar, outvar, needed_derivatives)
                    dnode = Derivative.make_node(
                        outvar,
                        needed_derivatives,
                        jit=(JitManager().enabled and JitManager().autograd_nodes),
                    )
                    self.node_evaluation_order.append(dnode)
                    outvar += dnode.outputs

            # check if finished
            if set(req_names).issubset(set(outvar)):
                # return Variables({key: value for key, value in outvar.items() if key in req_names})
                break

        self.evaluation_order = torch.nn.ModuleList(
            [n.evaluate for n in self.node_evaluation_order]
        )
        self.node_names: List[str] = [n.name for n in self.node_evaluation_order]
        self.optimizer_list = torch.nn.ModuleList(
            [n.evaluate for n in self.node_evaluation_order if n.optimize]
        )

        if graph_manager.debug:
            print(self)


[docs]    def forward(self, invar: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
        outvar = invar
        for i, e in enumerate(self.evaluation_order):
            torch.cuda.nvtx.range_push(self.node_names[i])
            outvar.update(e(outvar))
            torch.cuda.nvtx.range_pop()
        outvar = {
            key: value for key, value in outvar.items() if Key(key) in self.req_names
        }
        return outvar
def __str__(self):
        repr = "=" * 100 + "\n"
        for node in self.node_evaluation_order:
            repr += "-" * 50 + "\n"
            repr += str(node) + "\n"
        return repr


    
def _print_graph_unroll_error(nodes, invar, req_names):
    print("####################################")
    print("could not unroll graph!")
    print(
        "This is probably because you are asking to compute a value that is not an output of any node"
    )
    print("####################################")
    print("invar: " + str(list(invar)))
    print("requested var: " + str(req_names))
    print("computable var: " + str(_computable_names(nodes, invar)))
    print("####################################")
    print("Nodes in graph: ")
    for node in nodes:
        print(node)
    print("####################################")


def _computable_names(nodes, invar):
    nodes = copy(nodes)
    computable_names = copy(invar)
    while True:
        finished = True
        for i, node in enumerate(nodes):
            if set(node.inputs).issubset(set(computable_names)):
                computable_names += node.outputs
                nodes.pop(i)
                finished = False
        if finished:
            return computable_names
