# SPDX-FileCopyrightText: Copyright (c) 2023 - 2026 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
from dataclasses import dataclass
import numpy as np
import torch
from jaxtyping import Float
from physicsnemo.core.meta import ModelMetaData
from physicsnemo.core.module import Module
from physicsnemo.nn import DownSample3D, FuserLayer, UpSample3D
from physicsnemo.nn.module.utils import (
PatchEmbed2D,
PatchEmbed3D,
PatchRecovery2D,
PatchRecovery3D,
)
@dataclass
class MetaData(ModelMetaData):
# Optimization
jit: bool = False # ONNX Ops Conflict
cuda_graphs: bool = True
amp: bool = True
# Inference
onnx_cpu: bool = False # No FFT op on CPU
onnx_gpu: bool = True
onnx_runtime: bool = True
# Physics informed
var_dim: int = 1
func_torch: bool = False
auto_grad: bool = False
[docs]
class Pangu(Module):
r"""
Pangu weather forecasting model.
This implementation follows `Pangu-Weather: A 3D High-Resolution Model for
Fast and Accurate Global Weather Forecast
<https://arxiv.org/abs/2211.02556>`_.
Parameters
----------
img_size : tuple[int, int], optional, default=(721, 1440)
Spatial resolution :math:`(H, W)` of the latitude-longitude grid.
patch_size : tuple[int, int, int], optional, default=(2, 4, 4)
Patch size :math:`(p_l, p_h, p_w)` for pressure-level and spatial axes.
embed_dim : int, optional, default=192
Embedding channel size used throughout the transformer hierarchy.
num_heads : tuple[int, int, int, int], optional, default=(6, 12, 12, 6)
Number of attention heads used at each stage.
window_size : tuple[int, int, int], optional, default=(2, 6, 12)
Window size used by the transformer blocks.
Forward
-------
x : torch.Tensor
Input tensor of shape :math:`(B, 72, H, W)` where channels are arranged
as ``surface(7) + upper_air(5*13)``.
Outputs
-------
tuple[torch.Tensor, torch.Tensor]
Tuple ``(surface, upper_air)`` where ``surface`` has shape
:math:`(B, 4, H, W)` and ``upper_air`` has shape
:math:`(B, 5, 13, H, W)`.
"""
def __init__(
self,
img_size: tuple[int, int] = (721, 1440),
patch_size: tuple[int, int, int] = (2, 4, 4),
embed_dim: int = 192,
num_heads: tuple[int, int, int, int] = (6, 12, 12, 6),
window_size: tuple[int, int, int] = (2, 6, 12),
) -> None:
super().__init__(meta=MetaData())
self.img_size = tuple(img_size)
self.patch_size = tuple(patch_size)
self.embed_dim = embed_dim
self.surface_channels = 4
self.surface_mask_channels = 3
self.upper_air_channels = 5
self.upper_air_levels = 13
self.surface_input_channels = self.surface_channels + self.surface_mask_channels
self.in_channels = (
self.surface_input_channels
+ self.upper_air_channels * self.upper_air_levels
)
drop_path = np.linspace(0, 0.2, 8).tolist()
# In addition, three constant masks(the topography mask, land-sea mask and soil type mask)
self.patchembed2d = PatchEmbed2D(
img_size=img_size,
patch_size=patch_size[1:],
in_chans=4 + 3, # add
embed_dim=embed_dim,
)
self.patchembed3d = PatchEmbed3D(
img_size=(13, img_size[0], img_size[1]),
patch_size=patch_size,
in_chans=5,
embed_dim=embed_dim,
)
patched_inp_shape = (
8,
math.ceil(img_size[0] / patch_size[1]),
math.ceil(img_size[1] / patch_size[2]),
)
self.layer1 = FuserLayer(
dim=embed_dim,
input_resolution=patched_inp_shape,
depth=2,
num_heads=num_heads[0],
window_size=window_size,
drop_path=drop_path[:2],
)
patched_inp_shape_downsample = (
8,
math.ceil(patched_inp_shape[1] / 2),
math.ceil(patched_inp_shape[2] / 2),
)
self.downsample = DownSample3D(
in_dim=embed_dim,
input_resolution=patched_inp_shape,
output_resolution=patched_inp_shape_downsample,
)
self.layer2 = FuserLayer(
dim=embed_dim * 2,
input_resolution=patched_inp_shape_downsample,
depth=6,
num_heads=num_heads[1],
window_size=window_size,
drop_path=drop_path[2:],
)
self.layer3 = FuserLayer(
dim=embed_dim * 2,
input_resolution=patched_inp_shape_downsample,
depth=6,
num_heads=num_heads[2],
window_size=window_size,
drop_path=drop_path[2:],
)
self.upsample = UpSample3D(
embed_dim * 2, embed_dim, patched_inp_shape_downsample, patched_inp_shape
)
self.layer4 = FuserLayer(
dim=embed_dim,
input_resolution=patched_inp_shape,
depth=2,
num_heads=num_heads[3],
window_size=window_size,
drop_path=drop_path[:2],
)
# The outputs of the 2nd encoder layer and the 7th decoder layer are concatenated along the channel dimension.
self.patchrecovery2d = PatchRecovery2D(
img_size, patch_size[1:], 2 * embed_dim, 4
)
self.patchrecovery3d = PatchRecovery3D(
(13, img_size[0], img_size[1]), patch_size, 2 * embed_dim, 5
)
[docs]
def forward(
self,
x: Float[torch.Tensor, "batch channels lat lon"],
) -> tuple[
Float[torch.Tensor, "batch c_surface lat lon"],
Float[torch.Tensor, "batch c_upper levels lat lon"],
]:
r"""
Run Pangu forward prediction.
Parameters
----------
x : torch.Tensor
Concatenated input tensor of shape :math:`(B, 72, H, W)`.
Returns
-------
tuple[torch.Tensor, torch.Tensor]
Output tuple ``(surface, upper_air)`` with shapes
:math:`(B, 4, H, W)` and :math:`(B, 5, 13, H, W)`.
"""
if not torch.compiler.is_compiling():
if x.ndim != 4:
raise ValueError(
f"Expected 'x' to be a 4D tensor, got {x.ndim}D tensor with shape {tuple(x.shape)}"
)
if x.shape[1] != self.in_channels:
raise ValueError(
f"Expected 'x' to have {self.in_channels} channels, got tensor with shape {tuple(x.shape)}"
)
if x.shape[2:] != self.img_size:
raise ValueError(
f"Expected 'x' spatial shape {self.img_size}, got tensor with shape {tuple(x.shape)}"
)
surface = x[:, : self.surface_input_channels, :, :]
upper_air = x[:, self.surface_input_channels :, :, :].reshape(
x.shape[0],
self.upper_air_channels,
self.upper_air_levels,
x.shape[2],
x.shape[3],
)
surface = self.patchembed2d(surface)
upper_air = self.patchembed3d(upper_air)
x = torch.concat([surface.unsqueeze(2), upper_air], dim=2)
B, C, Pl, Lat, Lon = x.shape
x = x.reshape(B, C, -1).transpose(1, 2)
x = self.layer1(x)
skip = x
x = self.downsample(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.upsample(x)
x = self.layer4(x)
output = torch.concat([x, skip], dim=-1)
output = output.transpose(1, 2).reshape(B, -1, Pl, Lat, Lon)
output_surface = output[:, :, 0, :, :]
output_upper_air = output[:, :, 1:, :, :]
output_surface = self.patchrecovery2d(output_surface)
output_upper_air = self.patchrecovery3d(output_upper_air)
return output_surface, output_upper_air