Aerial CUDA-Accelerated RAN
Aerial CUDA-Accelerated RAN 24-2.1

Using pyAerial for PUSCH decoding on Aerial Data Lake data

This example shows how to use the pyAerial bindings to run cuPHY GPU accelerated PUSCH decoding for 5G NR PUSCH. The 5G NR PUSCH data is read from an example over the air captured PUSCH dataset collected and stored using Aerial Data Lake. Building a PUSCH receiver using pyAerial is demonstrated in two ways, first by using a fully fused, complete, PUSCH receiver called from Python using just a single function call. The same is then achieved by building the complete PUSCH receiver using individual separate Python function calls to individual PUSCH receiver components.

Note: This example requires that the clickhouse server is running and that the example data has been stored in the database. Refer to the Aerial Data Lake documentation on how to do this.

Copy
Copied!
            

[1]:

Copy
Copied!
            

# Check platform. import platform if platform.machine() not in ['x86_64', 'aarch64']: raise SystemExit("Unsupported platform!")

Copy
Copied!
            

[2]:

Copy
Copied!
            

import math import os os.environ["CUDA_VISIBLE_DEVICES"] = "0" import numpy as np import pandas as pd from IPython.display import Markdown from IPython.display import display # Connecting to clickhouse on remote server import clickhouse_connect # Plotting with Bokeh. import matplotlib.pyplot as plt # pyAerial imports from aerial.phy5g.algorithms import ChannelEstimator from aerial.phy5g.algorithms import ChannelEqualizer from aerial.phy5g.algorithms import NoiseIntfEstimator from aerial.phy5g.algorithms import Demapper from aerial.phy5g.ldpc import LdpcDeRateMatch from aerial.phy5g.ldpc import LdpcDecoder from aerial.phy5g.ldpc import code_block_desegment from aerial.phy5g.pusch import PuschRx from aerial.util.cuda import get_cuda_stream from aerial.util.fapi import dmrs_fapi_to_bit_array # Hide log10(10) warning _ = np.seterr(divide='ignore', invalid='ignore')

This is a PUSCH receiver pipeline made up of separately called pyAerial PUSCH receiver components.

Copy
Copied!
            

[3]:

Copy
Copied!
            

# Whether to plot intermediate results within the PUSCH pipeline, such as channel estimates and equalized symbols. plot_figures = True num_ues = 1 num_tx_ant = 2 # UE antennas num_rx_ant = 4 # gNB antennas cell_id = 41 # Physical cell ID enable_pusch_tdi = 0 # Enable time interpolation for equalizer coefficients eq_coeff_algo = 1 # Equalizer algorithm # The PUSCH receiver chain built from separately called pyAerial Python components is defined here. class PuschRxSeparate: """PUSCH receiver class. This class encapsulates the whole PUSCH receiver chain built using pyAerial components. """ def __init__(self, num_rx_ant, enable_pusch_tdi, eq_coeff_algo, plot_figures): """Initialize the PUSCH receiver.""" self.cuda_stream = get_cuda_stream() # Build the components of the receiver. self.channel_estimator = ChannelEstimator( num_rx_ant=num_rx_ant, cuda_stream=self.cuda_stream) self.channel_equalizer = ChannelEqualizer( num_rx_ant=num_rx_ant, enable_pusch_tdi=enable_pusch_tdi, eq_coeff_algo=eq_coeff_algo, cuda_stream=self.cuda_stream) self.noise_intf_estimator = NoiseIntfEstimator( num_rx_ant=num_rx_ant, eq_coeff_algo=eq_coeff_algo, cuda_stream=self.cuda_stream) self.derate_match = LdpcDeRateMatch( enable_scrambling=True, cuda_stream=self.cuda_stream) self.decoder = LdpcDecoder(cuda_stream=self.cuda_stream) # Whether to plot the intermediate results. self.plot_figures = plot_figures def run( self, rx_slot, num_ues, slot, num_dmrs_cdm_grps_no_data, dmrs_scrm_id, start_prb, num_prbs, dmrs_syms, dmrs_max_len, dmrs_add_ln_pos, start_sym, num_symbols, scids, layers, dmrs_ports, rntis, data_scids, code_rates, mod_orders, tb_sizes ): """Run the receiver.""" # Channel estimation. ch_est = self.channel_estimator.estimate( rx_slot=rx_slot, num_ues=num_ues, slot=slot, num_dmrs_cdm_grps_no_data=num_dmrs_cdm_grps_no_data, dmrs_scrm_id=dmrs_scrm_id, start_prb=start_prb, num_prbs=num_prbs, dmrs_syms=dmrs_syms, dmrs_max_len=dmrs_max_len, dmrs_add_ln_pos=dmrs_add_ln_pos, start_sym=start_sym, num_symbols=num_symbols, scids=scids, layers=layers, dmrs_ports=dmrs_ports ) # Noise and interference estimation. lw_inv, noise_var_pre_eq = self.noise_intf_estimator.estimate( rx_slot=rx_slot, channel_est=ch_est, num_ues=num_ues, slot=slot, num_dmrs_cdm_grps_no_data=num_dmrs_cdm_grps_no_data, dmrs_scrm_id=dmrs_scrm_id, start_prb=start_prb, num_prbs=num_prbs, dmrs_syms=dmrs_syms, dmrs_max_len=dmrs_max_len, dmrs_add_ln_pos=dmrs_add_ln_pos, start_sym=start_sym, num_symbols=num_symbols, scids=scids, layers=layers, dmrs_ports=dmrs_ports ) # Channel equalization and soft demapping. The first return value are the LLRs, # second are the equalized symbols. We only want the LLRs now. llrs, sym = self.channel_equalizer.equalize( rx_slot=rx_slot, channel_est=ch_est, lw_inv=lw_inv, noise_var_pre_eq=noise_var_pre_eq, num_ues=num_ues, num_dmrs_cdm_grps_no_data=num_dmrs_cdm_grps_no_data, start_prb=start_prb, num_prbs=num_prbs, dmrs_syms=dmrs_syms, dmrs_max_len=dmrs_max_len, dmrs_add_ln_pos=dmrs_add_ln_pos, start_sym=start_sym, num_symbols=num_symbols, layers=layers, mod_orders=mod_orders ) if self.plot_figures: fig, axs = plt.subplots(1,4) for ant in range(4): axs[ant].imshow(10*np.log10(np.abs(rx_slot[:, :, ant]**2)), aspect='auto') axs[ant].set_ylim([pusch_record.rbStart * 12, pusch_record.rbSize * 12]) axs[ant].set_title('Ant ' + str(ant)) axs[ant].set(xlabel='Symbol', ylabel='Resource Element') axs[ant].label_outer() fig.suptitle('Power in RU Antennas') fig, axs = plt.subplots(1,2) axs[0].scatter(rx_slot.reshape(-1).real, rx_slot.reshape(-1).imag) axs[0].set_title("Pre-Equalized samples") axs[0].set_aspect('equal') axs[1].scatter(np.array(sym).reshape(-1).real, np.array(sym).reshape(-1).imag) axs[1].set_title("Post-Equalized samples") axs[1].set_aspect('equal') fig, axs = plt.subplots(1) axs.set_title("Channel estimates from the PUSCH pipeline") for ant in range(4): axs.plot(np.abs(ch_est[0][ant, 0, :, 0])) axs.legend(["Rx antenna 0, estimate", "Rx antenna 1, estimate", "Rx antenna 2, estimate", "Rx antenna 3, estimate"]) axs.grid(True) plt.show() num_data_sym = (np.array(dmrs_syms[start_sym:start_sym + num_symbols]) == 0).sum() cinits = [(rntis[ue] << 15) + data_scids[ue] for ue in range(num_ues)] rate_match_lengths = [num_data_sym * mod_orders[ue] * num_prbs * 12 * layers[ue] for ue in range(num_ues)] tb_sizes = [s * 8 for s in tb_sizes] code_rates = [c / 1024. for c in code_rates] rvs = [0,] * num_ues ndis = [1,] * num_ues coded_blocks = self.derate_match.derate_match( input_llrs=llrs, tb_sizes=tb_sizes, code_rates=code_rates, rate_match_lengths=rate_match_lengths, mod_orders=mod_orders, num_layers=layers, redundancy_versions=rvs, ndis=ndis, cinits=cinits ) code_blocks = self.decoder.decode( input_llrs=coded_blocks, tb_sizes=tb_sizes, code_rates=code_rates, redundancy_versions=rvs, rate_match_lengths=rate_match_lengths ) # TODO: Use the CRC kernel here. decoded_tbs = [] for ue_idx in range(num_ues): # Combine the code blocks into a transport block. tb = code_block_desegment( code_blocks=code_blocks[ue_idx], tb_size=tb_sizes[ue_idx], code_rate=code_rates[ue_idx], return_bits=False, ) decoded_tbs.append(tb) return decoded_tbs pusch_rx_separate = PuschRxSeparate( num_rx_ant=num_rx_ant, enable_pusch_tdi=enable_pusch_tdi, eq_coeff_algo=eq_coeff_algo, plot_figures=plot_figures ) # This is the fully fused PUSCH receiver chain. pusch_rx = PuschRx( cell_id=cell_id, num_rx_ant=num_rx_ant, num_tx_ant=num_rx_ant, enable_pusch_tdi=enable_pusch_tdi, eq_coeff_algo=eq_coeff_algo )

Below shows how to connect to the clickhouse database and querying the data from it.

Copy
Copied!
            

[4]:

Copy
Copied!
            

# Connect to the local database client = clickhouse_connect.get_client(host='localhost') # Pick a packet from the database pusch_records = client.query_df('select * from fapi where mcsIndex != 0 order by TsTaiNs limit 10')

Copy
Copied!
            

[5]:

Copy
Copied!
            

for index, pusch_record in pusch_records.iterrows(): query = f"""select TsTaiNs,fhData from fh where TsTaiNs =={pusch_record.TsTaiNs.timestamp()} """ fh = client.query_df(query) display(Markdown("### Example{}- SFN.Slot{}.{}from time{}" .format(index + 1, pusch_record.SFN, pusch_record.Slot, pusch_record.TsTaiNs ))) # Make sure that the fronthaul database is complete for the SFN.Slot we've chosen if fh.index.size < 1: pusch_records = pusch_records.drop(index) continue; fh_samp = np.array(fh['fhData'][0], dtype=np.float32) rx_slot = np.swapaxes(fh_samp.view(np.complex64).reshape(4, 14, 273 * 12), 2, 0) # Extract all the needed parameters from the PUSCH record. slot = int(pusch_record.Slot) rntis = [pusch_record.rnti] layers = [pusch_record.nrOfLayers] start_prb = pusch_record.rbStart num_prbs = pusch_record.rbSize start_sym = pusch_record.StartSymbolIndex num_symbols = pusch_record.NrOfSymbols scids = [int(pusch_record.SCID)] data_scids = [pusch_record.dataScramblingId] dmrs_scrm_id = pusch_record.ulDmrsScramblingId num_dmrs_cdm_grps_no_data = pusch_record.numDmrsCdmGrpsNoData dmrs_syms = dmrs_fapi_to_bit_array(int(pusch_record.ulDmrsSymbPos)) dmrs_ports = [pusch_record.dmrsPorts] dmrs_max_len = 1 dmrs_add_ln_pos = 2 mcs_tables = [pusch_record.mcsTable] mcs_indices = [pusch_record.mcsIndex] coderates = [pusch_record.targetCodeRate / 10.] tb_sizes = [pusch_record.TBSize] mod_orders = [pusch_record.qamModOrder] tb_input = np.array(pusch_record.pduData) # Run the receiver built from separately called components. tbs = pusch_rx_separate.run( rx_slot=rx_slot, num_ues=num_ues, slot=slot, num_dmrs_cdm_grps_no_data=num_dmrs_cdm_grps_no_data, dmrs_scrm_id=dmrs_scrm_id, start_prb=start_prb, num_prbs=num_prbs, dmrs_syms=dmrs_syms, dmrs_max_len=dmrs_max_len, dmrs_add_ln_pos=dmrs_add_ln_pos, start_sym=start_sym, num_symbols=num_symbols, scids=scids, layers=layers, dmrs_ports=dmrs_ports, rntis=rntis, data_scids=data_scids, code_rates=coderates, mod_orders=mod_orders, tb_sizes=tb_sizes ) if np.array_equal(tbs[0][:tb_input.size], tb_input): display(Markdown("**Separated kernels PUSCH decoding success** for SFN.Slot{}.{}from time{}".format(pusch_record.SFN, pusch_record.Slot, pusch_record.TsTaiNs))) else: display(Markdown("**Separated kernels PUSCH decoding failure**")) print("Output bytes:") print(tbs[0][:tb_input.size]) print("Expected output:") print(tb_input) # Run the fused PUSCH receiver. # Note that this is where we set the dynamically changing parameters. tb_crcs, tbs = pusch_rx.run( rx_slot=rx_slot, num_ues=num_ues, slot=slot, num_dmrs_cdm_grps_no_data=num_dmrs_cdm_grps_no_data, dmrs_scrm_id=dmrs_scrm_id, start_prb=start_prb, num_prbs=num_prbs, dmrs_syms=dmrs_syms, dmrs_max_len=dmrs_max_len, dmrs_add_ln_pos=dmrs_add_ln_pos, start_sym=start_sym, num_symbols=num_symbols, scids=scids, layers=layers, dmrs_ports=dmrs_ports, rntis=rntis, data_scids=data_scids, code_rates=coderates, mod_orders=mod_orders, tb_sizes=tb_sizes ) if np.array_equal(tbs[0][:tb_input.size], tb_input): display(Markdown("**Fused PUSCH decoding success** for SFN.Slot{}.{}from time{}".format(pusch_record.SFN, pusch_record.Slot, pusch_record.TsTaiNs))) else: display(Markdown("**Fused PUSCH decoding failure**")) print("Output bytes:") print(tbs[0][:tb_input.size]) print("Expected output:") print(tb_input)


content_notebooks_datalake_pusch_decoding_9_1.png


content_notebooks_datalake_pusch_decoding_9_2.png


content_notebooks_datalake_pusch_decoding_9_3.png


Separated kernels PUSCH decoding success for SFN.Slot 192.4 from time 2024-03-21 12:18:39.162000


Fused PUSCH decoding success for SFN.Slot 192.4 from time 2024-03-21 12:18:39.162000


content_notebooks_datalake_pusch_decoding_9_7.png


content_notebooks_datalake_pusch_decoding_9_8.png


content_notebooks_datalake_pusch_decoding_9_9.png


Separated kernels PUSCH decoding success for SFN.Slot 194.14 from time 2024-03-21 12:18:39.187000


Fused PUSCH decoding success for SFN.Slot 194.14 from time 2024-03-21 12:18:39.187000


content_notebooks_datalake_pusch_decoding_9_13.png


content_notebooks_datalake_pusch_decoding_9_14.png


content_notebooks_datalake_pusch_decoding_9_15.png


Separated kernels PUSCH decoding success for SFN.Slot 195.4 from time 2024-03-21 12:18:39.192000


Fused PUSCH decoding success for SFN.Slot 195.4 from time 2024-03-21 12:18:39.192000


content_notebooks_datalake_pusch_decoding_9_19.png


content_notebooks_datalake_pusch_decoding_9_20.png


content_notebooks_datalake_pusch_decoding_9_21.png


Separated kernels PUSCH decoding success for SFN.Slot 201.4 from time 2024-03-21 12:18:39.252000


Fused PUSCH decoding success for SFN.Slot 201.4 from time 2024-03-21 12:18:39.252000


content_notebooks_datalake_pusch_decoding_9_25.png


content_notebooks_datalake_pusch_decoding_9_26.png


content_notebooks_datalake_pusch_decoding_9_27.png


Separated kernels PUSCH decoding success for SFN.Slot 209.4 from time 2024-03-21 12:18:39.332000


Fused PUSCH decoding success for SFN.Slot 209.4 from time 2024-03-21 12:18:39.332000

Previous Using pyAerial for channel estimation on Aerial Data Lake data
Next API Reference
© Copyright 2024, NVIDIA. Last updated on Oct 7, 2024.