Message Adapter#

FAPI message receiver and processor converting FAPI PDUs to PUSCH pipeline inputs.

Overview#

The Message Adapter receives FAPI messages via NVIPC and processes them through pipelines:

  • FAPI Reception - Receives FAPI messages via NVIPC in dedicated thread

  • Message Processing - Routes messages through pipeline implementations

  • FAPI to PUSCH Conversion - Converts FAPI PDUs (UL_TTI_REQUEST) to PUSCH input parameters

  • Slot Accumulation - Accumulates messages per slot and cell before processing

  • Pipeline Interfaces - Provides slot indication, slot info, and pipeline executor interfaces

API Reference#

using ran::message_adapter::GraphScheduleCallback = std::function<void(ran::fapi::SlotInfo slot)>#

Callback invoked when uplink graph should be scheduled

Param slot:

[in] Slot information (sfn and slot) for which to schedule the graph

using ran::message_adapter::PipelinePtr = std::unique_ptr<PipelineInterface>#

Smart pointer type for pipeline components

using ran::message_adapter::PipelineFactory = std::function<PipelinePtr(const std::string&)>#

Factory function type for creating pipeline components

ran::message_adapter::DECLARE_LOG_COMPONENT(
Sample5gPipelines,
Core,
Fapi5g,
)#

Logging component for 5G FAPI sample pipelines.

ran::message_adapter::DECLARE_LOG_EVENT(
FapiEvent,
ConfigRequest,
ConfigResponse,
StartRequest,
StopRequest,
UlTtiRequest,
DlTtiRequest,
SlotIndication,
SlotResponse,
ErrorIndication,
RcvdMessage,
CrcIndication,
RxDataIndication,
PuschNoiseVarIndication,
)#

FAPI event logging identifiers for message processing.

ran::message_adapter::DECLARE_LOG_EVENT(
ErrorEvent,
InvalidParam,
InvalidState,
InvalidConfig,
InvalidCellid,
InvalidData,
InvalidMessage,
)#

FAPI error event logging identifiers

template<typename T>
static scf_fapi_body_header_t *ran::message_adapter::add_scf_fapi_hdr(
nv_ipc_msg_t &msg,
int msg_id,
uint16_t cell_id,
bool data,
)#
ran::message_adapter::DECLARE_LOG_COMPONENT(
MessageAdapterComponent,
MessageAdapterCore,
Nvipc,
)#
ran::message_adapter::DECLARE_LOG_EVENT(
MessageAdapterEvent,
NvipcInitComplete,
NvipcInitError,
MESSAGE_LOOP_START,
MESSAGE_LOOP_START_ERROR,
MESSAGE_LOOP_STOP,
NvipcDestroyComplete,
)#
ran::message_adapter::DECLARE_LOG_EVENT(
NvIpcEvent,
NvipcRxMsg,
NvipcTxMsg,
NvipcRxError,
NvipcTxError,
)#

NvIPC event logging identifiers

class IFapiMessageProcessor#
#include <ifapi_message_processor.hpp>

Interface for processing FAPI messages

This interface defines the contract for FAPI message processing. Implementations handle incoming FAPI messages (CONFIG_REQUEST, START_REQUEST, UL_TTI_REQUEST, SLOT_RESPONSE, etc.) from the FAPI RX task.

Thread safety: Implementations must be thread-safe if called from multiple threads, though typically called from single FAPI RX task.

Subclassed by ran::message_adapter::Sample5GPipeline

Public Functions

virtual ~IFapiMessageProcessor() = default#

Virtual destructor for proper cleanup

virtual void process_msg(nv_ipc_msg_t &msg) = 0#

Process incoming FAPI message

Handles the incoming message based on its type (msg_id) and updates internal state accordingly. The processor owns the nvIPC endpoint and manages message responses internally.

Parameters:

msg[inout] FAPI message to process

IFapiMessageProcessor(const IFapiMessageProcessor&) = delete#

Copy constructor (disabled for abstract base)

IFapiMessageProcessor &operator=(
const IFapiMessageProcessor&,
) = delete#

Assignment operator (disabled for abstract base)

IFapiMessageProcessor(IFapiMessageProcessor&&) = delete#

Move constructor (disabled for abstract base)

IFapiMessageProcessor &operator=(IFapiMessageProcessor&&) = delete#

Move assignment operator (disabled for abstract base)

class IFapiSlotInfoProvider#
#include <ifapi_slot_info_provider.hpp>

Interface for accessing slot information and captured FAPI messages

This interface provides read-only access to:

  • Current slot information (SFN and slot number)

  • Accumulated FAPI messages (specifically UL-TTI-REQUEST messages) for a slot

Used by C-plane and U-plane tasks to retrieve slot state and messages after graph scheduling.

Thread safety: The provider implementation must ensure thread-safe access to the slot information and message collection. Typically, the graph scheduling mechanism provides happens-before guarantees that make the data safe to read.

Subclassed by ran::message_adapter::Sample5GPipeline

Public Functions

virtual ~IFapiSlotInfoProvider() = default#

Virtual destructor for proper cleanup

virtual ran::fapi::SlotInfo get_current_slot() const = 0#

Get current slot information (SFN and slot number)

Returns the current SFN and slot being processed by the Message Adapter. Used by C-plane to calculate absolute slot counter.

Thread-safe: Uses atomic operations internally.

Returns:

ran::fapi::SlotInfo with sfn in [0, 1023] and slot in [0, 19] for 30kHz SCS

virtual std::span<const ran::fapi::CapturedFapiMessage> get_accumulated_ul_tti_msgs(
std::uint16_t slot,
) const = 0#

Get accumulated UL-TTI messages for current slot

Returns a non-owning view of the accumulated messages. The span remains valid until the next slot’s messages are accumulated. Messages are in the order they were received.

Parameters:

slot[in] Slot number (0-19 for 30kHz SCS) to get accumulated messages for

Returns:

Span of captured FAPI messages (may be empty if no messages)

virtual std::uint64_t get_current_absolute_slot(
ran::fapi::SlotInfo slot_info,
) const noexcept = 0#

Get absolute slot number for given slot info

Calculates absolute slot number accounting for SFN wrap-arounds. The absolute slot is a monotonic counter that never wraps.

Parameters:

slot_info[in] Slot information containing SFN and slot

Returns:

Absolute slot number since initialization

IFapiSlotInfoProvider(const IFapiSlotInfoProvider&) = delete#

Copy constructor (disabled for abstract base)

IFapiSlotInfoProvider &operator=(
const IFapiSlotInfoProvider&,
) = delete#

Assignment operator (disabled for abstract base)

IFapiSlotInfoProvider(IFapiSlotInfoProvider&&) = delete#

Move constructor (disabled for abstract base)

IFapiSlotInfoProvider &operator=(IFapiSlotInfoProvider&&) = delete#

Move assignment operator (disabled for abstract base)

class IPipelineExecutor#
#include <ipipeline_executor.hpp>

Interface for executing PUSCH pipelines

This interface provides access to pipeline execution functionality, allowing PUSCH RX tasks to trigger pipeline processing for a given slot.

Used by PUSCH task to launch pipeline execution after U-Plane processing completes.

Thread safety: The provider implementation must ensure thread-safe access to the pipeline execution mechanism.

Subclassed by ran::message_adapter::Sample5GPipeline

Public Functions

virtual ~IPipelineExecutor() = default#

Virtual destructor for proper cleanup

virtual void launch_pipelines(std::size_t slot) = 0#

Launch PUSCH pipelines for the given slot

Triggers execution of the PUSCH pipeline(s) for the specified slot. This should be called by the PUSCH RX task after U-Plane processing has prepared the I/Q data.

Thread-safe: Implementation uses appropriate synchronization.

Parameters:

slot[in] Slot number to process (0-19 for 30kHz SCS)

IPipelineExecutor(const IPipelineExecutor&) = delete#

Copy constructor (disabled for abstract base)

IPipelineExecutor &operator=(const IPipelineExecutor&) = delete#

Assignment operator (disabled for abstract base)

IPipelineExecutor(IPipelineExecutor&&) = delete#

Move constructor (disabled for abstract base)

IPipelineExecutor &operator=(IPipelineExecutor&&) = delete#

Move assignment operator (disabled for abstract base)

class ISlotIndicationSender#
#include <islot_indication_sender.hpp>

Interface for sending slot indications

This interface abstracts the slot indication mechanism, allowing the TimedTrigger to send periodic slot indications without depending on the concrete implementation. Implementations should send slot indications to all active cells.

Thread safety: Implementations must be thread-safe as this may be called from a timer thread.

Subclassed by ran::message_adapter::Sample5GPipeline

Public Functions

virtual ~ISlotIndicationSender() = default#

Virtual destructor for proper cleanup

virtual void send_slot_indications() = 0#

Send slot indications to all active cells

Called periodically (typically every 500µs) to advance the slot counter and send slot indication messages to all cells in running state.

ISlotIndicationSender(const ISlotIndicationSender&) = delete#

Copy constructor (disabled for abstract base)

ISlotIndicationSender &operator=(
const ISlotIndicationSender&,
) = delete#

Assignment operator (disabled for abstract base)

ISlotIndicationSender(ISlotIndicationSender&&) = delete#

Move constructor (disabled for abstract base)

ISlotIndicationSender &operator=(ISlotIndicationSender&&) = delete#

Move assignment operator (disabled for abstract base)

class MessageAdapter#
#include <message_adapter.hpp>

MessageAdapter class for handling message processing in a separate thread

This class creates a dedicated thread for message processing and provides thread-safe operations for starting, stopping, and managing the message loop.

Public Functions

explicit MessageAdapter(const std::string &config_file)#

Constructor that creates a thread for message processing

The thread is automatically joined when the object is destroyed due to the use of std::jthread.

Parameters:

config_file[in] Path to the configuration file

~MessageAdapter()#

Destructor - ensures proper cleanup of resources

MessageAdapter(const MessageAdapter&) = delete#
MessageAdapter &operator=(const MessageAdapter&) = delete#
MessageAdapter(MessageAdapter&&) = delete#
MessageAdapter &operator=(MessageAdapter&&) = delete#
bool start()#

Start the message processing thread

Returns:

true if started successfully, false otherwise

void stop()#

Stop the message processing thread

bool is_running() const noexcept#

Check if the thread is currently running

Returns:

true if running, false otherwise

inline int get_execution_mode() const noexcept#

Get the execution mode from configuration

Returns:

Execution mode (0 = Stream, 1 = Graph)

inline nv_ipc_t *get_ipc() const noexcept#

Get the Ipc interface pointer

Returns:

Pointer to the NV Ipc interface

nv_ipc_t *init_nv_ipc_interface(nv_ipc_config_t *config)#

Initialize NV Ipc interface

This function initializes the NV Ipc interface with the configuration specified in the config file.

Parameters:

config[inout] Pointer to the nv_ipc_config_t structure to be configured

Returns:

Pointer to the initialized NV Ipc interface, or nullptr if initialization fails

inline void add_pipeline(PipelinePtr new_pipeline)#

Add a pipeline for message processing

This function adds a new pipeline to the processing chain. The pipeline will be used to process incoming messages in the message loop.

Parameters:

new_pipeline[in] Smart pointer to the new pipeline object

inline void set_pipeline(size_t index, PipelinePtr new_pipeline)#

Set the pipeline at a specific index

This function sets the pipeline at the specified index. If the index is out of bounds, the pipeline is added to the end.

Parameters:
  • index[in] Index where to set the pipeline

  • new_pipeline[in] Smart pointer to the new pipeline object

inline void process_message(nv_ipc_msg_t &msg)#

Process a single message using all pipelines

This function delegates message processing to all pipelines in the chain. If no pipelines are set, the message is ignored.

Parameters:

msg[inout] Message to be processed

struct PhyMacMsgDesc : public nv_ipc_msg_t#
#include <pipeline_interface.hpp>

PHY-MAC message descriptor

Extends nv_ipc_msg_t to provide additional functionality for PHY-MAC layer message handling with reset capability.

Public Functions

inline PhyMacMsgDesc()#

Default constructor - initializes and resets all fields

inline explicit PhyMacMsgDesc(nv_ipc_msg_t msg)#

Construct from existing Ipc message

Parameters:

msg[in] Source Ipc message to copy

inline void reset()#

Reset all message fields to default values

class PhyStats#
#include <phy_stats.hpp>

PHY statistics collector.

Thread-safe statistics collection using lock-free atomics with relaxed memory ordering. Per-cell granularity for multi-cell debugging and validation.

Performance: ~1-2 CPU cycles per counter increment using relaxed atomics. Memory: 8 bytes per cell (std::atomic<uint64_t>)

Thread Safety: All methods are thread-safe and lock-free.

Public Functions

explicit PhyStats(std::size_t max_cells)#

Construct statistics collector for specified number of cells.

Parameters:

max_cells[in] Maximum number of cells to track

void record_crc_failure(std::uint32_t cell_id) noexcept#

Record a CRC failure for the specified cell.

Parameters:

cell_id[in] Cell identifier (must be < max_cells)

std::vector<std::uint64_t> get_stats() const#

Get the statistics.

Returns:

Statistics

class PipelineInterface#
#include <pipeline_interface.hpp>

Abstract interface for pipeline components

This interface defines the contract for pipeline components that can be managed by the message adapter. Derived classes should implement the specific pipeline functionality.

Subclassed by Sample5GPipelineAdapter

Public Functions

PipelineInterface() = default#

Public constructor for abstract base

virtual ~PipelineInterface() = default#

Virtual destructor for proper cleanup

virtual void process_msg(nv_ipc_msg_t &msg, nv_ipc_t *ipc) = 0#

Process incoming message

Parameters:
  • msg[inout] Input message to be processed

  • ipc[in] Pointer to Ipc interface

PipelineInterface(const PipelineInterface&) = delete#

Copy constructor (disabled for abstract base)

PipelineInterface &operator=(const PipelineInterface&) = delete#

Assignment operator (disabled for abstract base)

PipelineInterface(PipelineInterface&&) = delete#

Move constructor (disabled for abstract base)

PipelineInterface &operator=(PipelineInterface&&) = delete#

Move assignment operator (disabled for abstract base)

class Sample5GPipeline : public ran::message_adapter::IFapiMessageProcessor, public ran::message_adapter::ISlotIndicationSender, public ran::message_adapter::IFapiSlotInfoProvider, public ran::message_adapter::IPipelineExecutor#
#include <sample_5g_pipelines.hpp>

Sample 5G pipeline implementation with thread-safe operation

This class implements FAPI message processing for 5G networks with atomic operations for thread safety. Supports multiple cells with independent state machines.

Thread-safety: Uses lock-free atomics for sfn/slot counter, cell bitmap, and cell states. Safe for concurrent access from FAPI RX thread and slot indication timer thread.

Implements:

Subclassed by Sample5GPipelineAdapter

Public Functions

explicit Sample5GPipeline(const InitParams &params)#

Constructor for the sample 5G pipeline

Parameters:

params[in] Initialization parameters

Throws:

std::invalid_argument – if ipc is nullptr

~Sample5GPipeline() override = default#

Destructor

Sample5GPipeline(const Sample5GPipeline&) = delete#
Sample5GPipeline &operator=(const Sample5GPipeline&) = delete#
Sample5GPipeline(Sample5GPipeline&&) = delete#
Sample5GPipeline &operator=(Sample5GPipeline&&) = delete#
std::string get_status() const#

Get 5G pipeline status information

Returns:

Status string containing pipeline state and statistics

std::size_t get_num_cells_running() const#

Get number of currently running cells

Thread-safe: Uses atomic operations to read active_cell_bitmap.

Returns:

Count of cells in RUNNING state

inline const PhyStats &get_stats() const noexcept#

Get PHY phy statistics.

Safe to call after all worker threads have joined. Returns const reference to atomic statistics - caller uses .load() to read values.

Returns:

Const reference to statistics

virtual void process_msg(nv_ipc_msg_t &msg) override#

Process incoming FAPI message

Handles ConfigRequest, StartRequest, StopRequest, UlTtiRequest, DlTtiRequest, and SlotResponse messages. Updates internal state atomically for thread-safe operation.

Parameters:

msg[inout] FAPI message to process

virtual void send_slot_indications() override#

Send slot indications to all active cells

Called periodically (typically every 500µs) to advance the slot counter atomically and send slot indication messages via nvIPC to all running cells.

Thread-safety: Safe to call concurrently with process_msg(). Uses atomic operations for sfn/slot counter and active cell bitmap.

virtual ran::fapi::SlotInfo get_current_slot() const override#

Get current slot information (SFN and slot number)

Returns the current SFN and slot being processed. Uses atomic operations to safely read the packed sfn/slot counter.

Thread-safe: Uses atomic load operation.

Returns:

ran::fapi::SlotInfo with sfn in [0, 1023] and slot in [0, 19] for 30kHz SCS

virtual std::span<const ran::fapi::CapturedFapiMessage> get_accumulated_ul_tti_msgs(
std::uint16_t slot,
) const override#

Get accumulated UL-TTI messages for current slot

Returns non-owning view of accumulated UL-TTI-REQUEST messages. Messages remain valid until next slot’s accumulation begins.

Parameters:

slot[in] Slot number (0-19 for 30kHz SCS) to get accumulated messages for

Returns:

Span of captured FAPI messages

virtual std::uint64_t get_current_absolute_slot(
ran::fapi::SlotInfo slot_info,
) const noexcept override#

Get absolute slot number for given slot info

Calculates absolute slot number accounting for SFN wrap-arounds. The absolute slot is a monotonic counter that never wraps. Thread-safe: Uses atomic load with acquire semantics.

Parameters:

slot_info[in] Slot information containing SFN and slot

Returns:

Absolute slot number since initialization

virtual void launch_pipelines(std::size_t slot) override#

Launch PUSCH pipelines for the given slot

Triggers execution of the PUSCH pipeline(s) for the specified slot. This should be called by the PUSCH RX task after U-Plane processing has prepared the I/Q data.

Thread-safe: Delegates to Driver which uses appropriate synchronization.

Parameters:

slot[in] Slot number to process (0-19 for 30kHz SCS)

struct InitParams#
#include <sample_5g_pipelines.hpp>

Initialization parameters for Sample5GPipeline

Public Members

nv_ipc_t *ipc = {nullptr}#

nvIPC endpoint (non-owning)

std::size_t max_cells = {common::NUM_CELLS_SUPPORTED}#

Maximum supported cells.

GraphScheduleCallback on_graph_schedule#

Callback to schedule graph.

framework::pipeline::IPipelineOutputProvider *output_provider{nullptr}#

Pipeline output provider (non-owning)

enum class ran::fapi_5g::FapiStateT : std::uint8_t#

FAPI state machine states for each cell

Values:

enumerator FapiStateIdle#

Cell not configured.

enumerator FapiStateConfigured#

Cell configured but not running.

enumerator FapiStateRunning#

Cell running and processing slots.

enumerator FapiStateStopped#

Cell stopped after running.

struct Cell#
#include <cell.hpp>

Cell

Represents a single cell in the 5G network. Manages cell-specific configuration and state.

Thread-safety: The fapi_state member is atomic and can be safely accessed from multiple threads. Other members should only be modified during configuration before cells become active.

Public Functions

inline Cell(
const std::uint16_t in_phy_cell_id,
const ran::common::PhyParams &in_phy_params,
)#

Constructor

Parameters:
  • in_phy_cell_id[in] Physical cell ID

  • in_phy_params[in] Physical layer parameters

~Cell() = default#

Destructor

Cell(const Cell&) = delete#
Cell &operator=(const Cell&) = delete#
Cell(Cell&&) = delete#
Cell &operator=(Cell&&) = delete#

Public Members

std::uint16_t phy_cell_id = {}#

Physical cell ID.

ran::common::PhyParams ul_phy_params#

Physical layer parameters.

std::atomic<FapiStateT> fapi_state#

FAPI state (atomic for thread safety)