# ignore_header_test
""""""
"""
SRResNet model. This code was modified from,
https://github.com/sgrvinod/a-PyTorch-Tutorial-to-Super-Resolution
The following license is provided from their source,
MIT License
Copyright (c) 2020 Sagar Vinodababu
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import torch
from torch import nn
import math
from typing import Union, List, Any
from dataclasses import dataclass
import modulus
from ..meta import ModelMetaData
from ..module import Module
Tensor = torch.Tensor
[docs]class SRResNet(Module):
"""3D convolutional super-resolution network
Parameters
----------
in_channels : int
Number of input channels
out_channels: int
Number of outout channels
large_kernel_size : int, optional
convolutional kernel size for first and last convolution, by default 7
small_kernel_size : int, optional
convolutional kernel size for internal convolutions, by default 3
conv_layer_size : int, optional
Latent channel size, by default 32
n_resid_blocks : int, optional
Number of residual blocks before , by default 8
scaling_factor : int, optional
Scaling factor to increase the output feature size
compared to the input (2, 4, or 8), by default 8
activation_fn : Activation, optional
Activation function, by default Activation.PRELU
Example
-------
>>> #3D convolutional encoder decoder
>>> model = modulus.models.srrn.SRResNet(
... in_channels=1,
... out_channels=2,
... conv_layer_size=4,
... scaling_factor=2)
>>> input = torch.randn(4, 1, 8, 8, 8) #(N, C, D, H, W)
>>> output = model(input)
>>> output.size()
torch.Size([4, 2, 16, 16, 16])
Note
----
Based on the implementation:
https://github.com/sgrvinod/a-PyTorch-Tutorial-to-Super-Resolution
"""
def __init__(
self,
in_channels: int,
out_channels: int,
large_kernel_size: int = 7,
small_kernel_size: int = 3,
conv_layer_size: int = 32,
n_resid_blocks: int = 8,
scaling_factor: int = 8,
activation_fn: nn.Module = nn.PReLU(),
):
super().__init__(meta=MetaData())
self.var_dim = 1
# Scaling factor must be 2, 4, or 8
scaling_factor = int(scaling_factor)
assert scaling_factor in {2, 4, 8}, "The scaling factor must be 2, 4, or 8!"
# The first convolutional block
self.conv_block1 = ConvolutionalBlock3d(
in_channels=in_channels,
out_channels=conv_layer_size,
kernel_size=large_kernel_size,
batch_norm=False,
activation_fn=activation_fn,
)
# A sequence of n_resid_blocks residual blocks,
# each containing a skip-connection across the block
self.residual_blocks = nn.Sequential(
*[
ResidualConvBlock3d(
n_layers=2,
kernel_size=small_kernel_size,
conv_layer_size=conv_layer_size,
activation_fn=activation_fn,
)
for i in range(n_resid_blocks)
]
)
# Another convolutional block
self.conv_block2 = ConvolutionalBlock3d(
in_channels=conv_layer_size,
out_channels=conv_layer_size,
kernel_size=small_kernel_size,
batch_norm=True,
)
# Upscaling is done by sub-pixel convolution,
# with each such block upscaling by a factor of 2
n_subpixel_convolution_blocks = int(math.log2(scaling_factor))
self.subpixel_convolutional_blocks = nn.Sequential(
*[
SubPixel_ConvolutionalBlock3d(
kernel_size=small_kernel_size,
conv_layer_size=conv_layer_size,
scaling_factor=2,
)
for i in range(n_subpixel_convolution_blocks)
]
)
# The last convolutional block
self.conv_block3 = ConvolutionalBlock3d(
in_channels=conv_layer_size,
out_channels=out_channels,
kernel_size=large_kernel_size,
batch_norm=False,
)
[docs] def forward(self, in_vars: Tensor) -> Tensor:
output = self.conv_block1(in_vars) # (N, 3, w, h)
residual = output # (N, n_channels, w, h)
output = self.residual_blocks(output) # (N, n_channels, w, h)
output = self.conv_block2(output) # (N, n_channels, w, h)
output = output + residual # (N, n_channels, w, h)
output = self.subpixel_convolutional_blocks(
output
) # (N, n_channels, w * scaling factor, h * scaling factor)
output = self.conv_block3(
output
) # (N, 3, w * scaling factor, h * scaling factor)
return output
[docs]class ConvolutionalBlock3d(nn.Module):
"""3D convolutional block
Parameters
----------
in_channels : int
Input channels
out_channels : int
Output channels
kernel_size : int
Kernel size
stride : int, optional
Convolutional stride, by default 1
batch_norm : bool, optional
Use batchnorm, by default False
"""
def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: int,
stride: int = 1,
batch_norm: bool = False, # TODO set the train/eval model context
activation_fn: nn.Module = nn.Identity(),
):
super().__init__()
# A container that will hold the layers in this convolutional block
layers = list()
# A convolutional layer
layers.append(
nn.Conv3d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=kernel_size // 2,
)
)
# A batch normalization (BN) layer, if wanted
if batch_norm is True:
layers.append(nn.BatchNorm3d(num_features=out_channels))
self.activation_fn = activation_fn
# Put together the convolutional block as a sequence of the layers
self.conv_block = nn.Sequential(*layers)
[docs] def forward(self, input: Tensor) -> Tensor:
output = self.activation_fn(self.conv_block(input))
return output # (N, out_channels, w, h)
[docs]class PixelShuffle3d(nn.Module):
"""3D pixel-shuffle operation
Parameters
----------
scale : int
Factor to downscale channel count by
Note
----
Reference: http://www.multisilicon.com/blog/a25332339.html
"""
def __init__(self, scale: int):
super().__init__()
self.scale = scale
[docs] def forward(self, input: Tensor) -> Tensor:
batch_size, channels, in_depth, in_height, in_width = input.size()
nOut = int(channels // self.scale**3)
out_depth = in_depth * self.scale
out_height = in_height * self.scale
out_width = in_width * self.scale
input_view = input.contiguous().view(
batch_size,
nOut,
self.scale,
self.scale,
self.scale,
in_depth,
in_height,
in_width,
)
output = input_view.permute(0, 1, 5, 2, 6, 3, 7, 4).contiguous()
return output.view(batch_size, nOut, out_depth, out_height, out_width)
[docs]class SubPixel_ConvolutionalBlock3d(nn.Module):
"""Convolutional block with Pixel Shuffle operation
Parameters
----------
kernel_size : int, optional
Kernel size, by default 3
conv_layer_size : int, optional
Latent channel size, by default 64
scaling_factor : int, optional
Pixel shuffle scaling factor, by default 2
"""
def __init__(
self, kernel_size: int = 3, conv_layer_size: int = 64, scaling_factor: int = 2
):
super().__init__()
# A convolutional layer that increases the number of channels
# by scaling factor^2, followed by pixel shuffle and PReLU
self.conv = nn.Conv3d(
in_channels=conv_layer_size,
out_channels=conv_layer_size * (scaling_factor**3),
kernel_size=kernel_size,
padding=kernel_size // 2,
)
# These additional channels are shuffled to form additional pixels,
# upscaling each dimension by the scaling factor
self.pixel_shuffle = PixelShuffle3d(scaling_factor)
self.prelu = nn.PReLU()
[docs] def forward(self, input: Tensor) -> Tensor:
output = self.conv(input) # (N, n_channels * scaling factor^2, w, h)
output = self.pixel_shuffle(
output
) # (N, n_channels, w * scaling factor, h * scaling factor)
output = self.prelu(
output
) # (N, n_channels, w * scaling factor, h * scaling factor)
return output
[docs]class ResidualConvBlock3d(nn.Module):
"""3D ResNet block
Parameters
----------
n_layers : int, optional
Number of convolutional layers, by default 1
kernel_size : int, optional
Kernel size, by default 3
conv_layer_size : int, optional
Latent channel size, by default 64
activation_fn : nn.Module, optional
Activation function, by default nn.Identity()
"""
def __init__(
self,
n_layers: int = 1,
kernel_size: int = 3,
conv_layer_size: int = 64,
activation_fn: nn.Module = nn.Identity(),
):
super().__init__()
layers = []
for i in range(n_layers - 1):
layers.append(
ConvolutionalBlock3d(
in_channels=conv_layer_size,
out_channels=conv_layer_size,
kernel_size=kernel_size,
batch_norm=True,
activation_fn=activation_fn,
)
)
# The final convolutional block with no activation
layers.append(
ConvolutionalBlock3d(
in_channels=conv_layer_size,
out_channels=conv_layer_size,
kernel_size=kernel_size,
batch_norm=True,
)
)
self.conv_layers = nn.Sequential(*layers)
[docs] def forward(self, input: Tensor) -> Tensor:
residual = input # (N, n_channels, w, h)
output = self.conv_layers(input) # (N, n_channels, w, h)
output = output + residual # (N, n_channels, w, h)
return output