Message Adapter#
FAPI message receiver and processor converting FAPI PDUs to PUSCH pipeline inputs.
Overview#
The Message Adapter receives FAPI messages via NVIPC and processes them through pipelines:
FAPI Reception - Receives FAPI messages via NVIPC in dedicated thread
Message Processing - Routes messages through pipeline implementations
FAPI to PUSCH Conversion - Converts FAPI PDUs (UL_TTI_REQUEST) to PUSCH input parameters
Slot Accumulation - Accumulates messages per slot and cell before processing
Pipeline Interfaces - Provides slot indication, slot info, and pipeline executor interfaces
API Reference#
-
using ran::message_adapter::GraphScheduleCallback = std::function<void(ran::fapi::SlotInfo slot)>#
Callback invoked when uplink graph should be scheduled
- Param slot:
[in] Slot information (sfn and slot) for which to schedule the graph
-
using ran::message_adapter::PipelinePtr = std::unique_ptr<PipelineInterface>#
Smart pointer type for pipeline components
-
using ran::message_adapter::PipelineFactory = std::function<PipelinePtr(const std::string&)>#
Factory function type for creating pipeline components
- ran::message_adapter::DECLARE_LOG_COMPONENT(
- Sample5gPipelines,
- Core,
- Fapi5g,
Logging component for 5G FAPI sample pipelines.
- ran::message_adapter::DECLARE_LOG_EVENT(
- FapiEvent,
- ConfigRequest,
- ConfigResponse,
- StartRequest,
- StopRequest,
- UlTtiRequest,
- DlTtiRequest,
- SlotIndication,
- SlotResponse,
- ErrorIndication,
- RcvdMessage,
- CrcIndication,
- RxDataIndication,
- PuschNoiseVarIndication,
FAPI event logging identifiers for message processing.
- ran::message_adapter::DECLARE_LOG_EVENT(
- ErrorEvent,
- InvalidParam,
- InvalidState,
- InvalidConfig,
- InvalidCellid,
- InvalidData,
- InvalidMessage,
FAPI error event logging identifiers
-
template<typename T>
static scf_fapi_body_header_t *ran::message_adapter::add_scf_fapi_hdr( - nv_ipc_msg_t &msg,
- int msg_id,
- uint16_t cell_id,
- bool data,
- ran::message_adapter::DECLARE_LOG_COMPONENT(
- MessageAdapterComponent,
- MessageAdapterCore,
- Nvipc,
- ran::message_adapter::DECLARE_LOG_EVENT(
- MessageAdapterEvent,
- NvipcInitComplete,
- NvipcInitError,
- MESSAGE_LOOP_START,
- MESSAGE_LOOP_START_ERROR,
- MESSAGE_LOOP_STOP,
- NvipcDestroyComplete,
- ran::message_adapter::DECLARE_LOG_EVENT(
- NvIpcEvent,
- NvipcRxMsg,
- NvipcTxMsg,
- NvipcRxError,
- NvipcTxError,
NvIPC event logging identifiers
-
class IFapiMessageProcessor#
- #include <ifapi_message_processor.hpp>
Interface for processing FAPI messages
This interface defines the contract for FAPI message processing. Implementations handle incoming FAPI messages (CONFIG_REQUEST, START_REQUEST, UL_TTI_REQUEST, SLOT_RESPONSE, etc.) from the FAPI RX task.
Thread safety: Implementations must be thread-safe if called from multiple threads, though typically called from single FAPI RX task.
Subclassed by ran::message_adapter::Sample5GPipeline
Public Functions
-
virtual ~IFapiMessageProcessor() = default#
Virtual destructor for proper cleanup
-
virtual void process_msg(nv_ipc_msg_t &msg) = 0#
Process incoming FAPI message
Handles the incoming message based on its type (msg_id) and updates internal state accordingly. The processor owns the nvIPC endpoint and manages message responses internally.
- Parameters:
msg – [inout] FAPI message to process
-
IFapiMessageProcessor(const IFapiMessageProcessor&) = delete#
Copy constructor (disabled for abstract base)
- IFapiMessageProcessor &operator=(
- const IFapiMessageProcessor&,
Assignment operator (disabled for abstract base)
-
IFapiMessageProcessor(IFapiMessageProcessor&&) = delete#
Move constructor (disabled for abstract base)
-
IFapiMessageProcessor &operator=(IFapiMessageProcessor&&) = delete#
Move assignment operator (disabled for abstract base)
-
virtual ~IFapiMessageProcessor() = default#
-
class IFapiSlotInfoProvider#
- #include <ifapi_slot_info_provider.hpp>
Interface for accessing slot information and captured FAPI messages
This interface provides read-only access to:
Current slot information (SFN and slot number)
Accumulated FAPI messages (specifically UL-TTI-REQUEST messages) for a slot
Used by C-plane and U-plane tasks to retrieve slot state and messages after graph scheduling.
Thread safety: The provider implementation must ensure thread-safe access to the slot information and message collection. Typically, the graph scheduling mechanism provides happens-before guarantees that make the data safe to read.
Subclassed by ran::message_adapter::Sample5GPipeline
Public Functions
-
virtual ~IFapiSlotInfoProvider() = default#
Virtual destructor for proper cleanup
-
virtual ran::fapi::SlotInfo get_current_slot() const = 0#
Get current slot information (SFN and slot number)
Returns the current SFN and slot being processed by the Message Adapter. Used by C-plane to calculate absolute slot counter.
Thread-safe: Uses atomic operations internally.
- Returns:
ran::fapi::SlotInfo with sfn in [0, 1023] and slot in [0, 19] for 30kHz SCS
- virtual std::span<const ran::fapi::CapturedFapiMessage> get_accumulated_ul_tti_msgs(
- std::uint16_t slot,
Get accumulated UL-TTI messages for current slot
Returns a non-owning view of the accumulated messages. The span remains valid until the next slot’s messages are accumulated. Messages are in the order they were received.
- Parameters:
slot – [in] Slot number (0-19 for 30kHz SCS) to get accumulated messages for
- Returns:
Span of captured FAPI messages (may be empty if no messages)
- virtual std::uint64_t get_current_absolute_slot(
- ran::fapi::SlotInfo slot_info,
Get absolute slot number for given slot info
Calculates absolute slot number accounting for SFN wrap-arounds. The absolute slot is a monotonic counter that never wraps.
- Parameters:
slot_info – [in] Slot information containing SFN and slot
- Returns:
Absolute slot number since initialization
-
IFapiSlotInfoProvider(const IFapiSlotInfoProvider&) = delete#
Copy constructor (disabled for abstract base)
- IFapiSlotInfoProvider &operator=(
- const IFapiSlotInfoProvider&,
Assignment operator (disabled for abstract base)
-
IFapiSlotInfoProvider(IFapiSlotInfoProvider&&) = delete#
Move constructor (disabled for abstract base)
-
IFapiSlotInfoProvider &operator=(IFapiSlotInfoProvider&&) = delete#
Move assignment operator (disabled for abstract base)
-
class IPipelineExecutor#
- #include <ipipeline_executor.hpp>
Interface for executing PUSCH pipelines
This interface provides access to pipeline execution functionality, allowing PUSCH RX tasks to trigger pipeline processing for a given slot.
Used by PUSCH task to launch pipeline execution after U-Plane processing completes.
Thread safety: The provider implementation must ensure thread-safe access to the pipeline execution mechanism.
Subclassed by ran::message_adapter::Sample5GPipeline
Public Functions
-
virtual ~IPipelineExecutor() = default#
Virtual destructor for proper cleanup
-
virtual void launch_pipelines(std::size_t slot) = 0#
Launch PUSCH pipelines for the given slot
Triggers execution of the PUSCH pipeline(s) for the specified slot. This should be called by the PUSCH RX task after U-Plane processing has prepared the I/Q data.
Thread-safe: Implementation uses appropriate synchronization.
- Parameters:
slot – [in] Slot number to process (0-19 for 30kHz SCS)
-
IPipelineExecutor(const IPipelineExecutor&) = delete#
Copy constructor (disabled for abstract base)
-
IPipelineExecutor &operator=(const IPipelineExecutor&) = delete#
Assignment operator (disabled for abstract base)
-
IPipelineExecutor(IPipelineExecutor&&) = delete#
Move constructor (disabled for abstract base)
-
IPipelineExecutor &operator=(IPipelineExecutor&&) = delete#
Move assignment operator (disabled for abstract base)
-
virtual ~IPipelineExecutor() = default#
-
class ISlotIndicationSender#
- #include <islot_indication_sender.hpp>
Interface for sending slot indications
This interface abstracts the slot indication mechanism, allowing the TimedTrigger to send periodic slot indications without depending on the concrete implementation. Implementations should send slot indications to all active cells.
Thread safety: Implementations must be thread-safe as this may be called from a timer thread.
Subclassed by ran::message_adapter::Sample5GPipeline
Public Functions
-
virtual ~ISlotIndicationSender() = default#
Virtual destructor for proper cleanup
-
virtual void send_slot_indications() = 0#
Send slot indications to all active cells
Called periodically (typically every 500µs) to advance the slot counter and send slot indication messages to all cells in running state.
-
ISlotIndicationSender(const ISlotIndicationSender&) = delete#
Copy constructor (disabled for abstract base)
- ISlotIndicationSender &operator=(
- const ISlotIndicationSender&,
Assignment operator (disabled for abstract base)
-
ISlotIndicationSender(ISlotIndicationSender&&) = delete#
Move constructor (disabled for abstract base)
-
ISlotIndicationSender &operator=(ISlotIndicationSender&&) = delete#
Move assignment operator (disabled for abstract base)
-
virtual ~ISlotIndicationSender() = default#
-
class MessageAdapter#
- #include <message_adapter.hpp>
MessageAdapter class for handling message processing in a separate thread
This class creates a dedicated thread for message processing and provides thread-safe operations for starting, stopping, and managing the message loop.
Public Functions
-
explicit MessageAdapter(const std::string &config_file)#
Constructor that creates a thread for message processing
The thread is automatically joined when the object is destroyed due to the use of std::jthread.
- Parameters:
config_file – [in] Path to the configuration file
-
~MessageAdapter()#
Destructor - ensures proper cleanup of resources
-
MessageAdapter(const MessageAdapter&) = delete#
-
MessageAdapter &operator=(const MessageAdapter&) = delete#
-
MessageAdapter(MessageAdapter&&) = delete#
-
MessageAdapter &operator=(MessageAdapter&&) = delete#
-
bool start()#
Start the message processing thread
- Returns:
true if started successfully, false otherwise
-
void stop()#
Stop the message processing thread
-
bool is_running() const noexcept#
Check if the thread is currently running
- Returns:
true if running, false otherwise
-
inline int get_execution_mode() const noexcept#
Get the execution mode from configuration
- Returns:
Execution mode (0 = Stream, 1 = Graph)
-
inline nv_ipc_t *get_ipc() const noexcept#
Get the Ipc interface pointer
- Returns:
Pointer to the NV Ipc interface
-
nv_ipc_t *init_nv_ipc_interface(nv_ipc_config_t *config)#
Initialize NV Ipc interface
This function initializes the NV Ipc interface with the configuration specified in the config file.
- Parameters:
config – [inout] Pointer to the nv_ipc_config_t structure to be configured
- Returns:
Pointer to the initialized NV Ipc interface, or nullptr if initialization fails
-
inline void add_pipeline(PipelinePtr new_pipeline)#
Add a pipeline for message processing
This function adds a new pipeline to the processing chain. The pipeline will be used to process incoming messages in the message loop.
- Parameters:
new_pipeline – [in] Smart pointer to the new pipeline object
-
inline void set_pipeline(size_t index, PipelinePtr new_pipeline)#
Set the pipeline at a specific index
This function sets the pipeline at the specified index. If the index is out of bounds, the pipeline is added to the end.
- Parameters:
index – [in] Index where to set the pipeline
new_pipeline – [in] Smart pointer to the new pipeline object
-
inline void process_message(nv_ipc_msg_t &msg)#
Process a single message using all pipelines
This function delegates message processing to all pipelines in the chain. If no pipelines are set, the message is ignored.
- Parameters:
msg – [inout] Message to be processed
-
explicit MessageAdapter(const std::string &config_file)#
-
struct PhyMacMsgDesc : public nv_ipc_msg_t#
- #include <pipeline_interface.hpp>
PHY-MAC message descriptor
Extends nv_ipc_msg_t to provide additional functionality for PHY-MAC layer message handling with reset capability.
-
class PhyStats#
- #include <phy_stats.hpp>
PHY statistics collector.
Thread-safe statistics collection using lock-free atomics with relaxed memory ordering. Per-cell granularity for multi-cell debugging and validation.
Performance: ~1-2 CPU cycles per counter increment using relaxed atomics. Memory: 8 bytes per cell (std::atomic<uint64_t>)
Thread Safety: All methods are thread-safe and lock-free.
Public Functions
-
explicit PhyStats(std::size_t max_cells)#
Construct statistics collector for specified number of cells.
- Parameters:
max_cells – [in] Maximum number of cells to track
-
void record_crc_failure(std::uint32_t cell_id) noexcept#
Record a CRC failure for the specified cell.
- Parameters:
cell_id – [in] Cell identifier (must be < max_cells)
-
std::vector<std::uint64_t> get_stats() const#
Get the statistics.
- Returns:
Statistics
-
explicit PhyStats(std::size_t max_cells)#
-
class PipelineInterface#
- #include <pipeline_interface.hpp>
Abstract interface for pipeline components
This interface defines the contract for pipeline components that can be managed by the message adapter. Derived classes should implement the specific pipeline functionality.
Subclassed by Sample5GPipelineAdapter
Public Functions
-
PipelineInterface() = default#
Public constructor for abstract base
-
virtual ~PipelineInterface() = default#
Virtual destructor for proper cleanup
-
virtual void process_msg(nv_ipc_msg_t &msg, nv_ipc_t *ipc) = 0#
Process incoming message
- Parameters:
msg – [inout] Input message to be processed
ipc – [in] Pointer to Ipc interface
-
PipelineInterface(const PipelineInterface&) = delete#
Copy constructor (disabled for abstract base)
-
PipelineInterface &operator=(const PipelineInterface&) = delete#
Assignment operator (disabled for abstract base)
-
PipelineInterface(PipelineInterface&&) = delete#
Move constructor (disabled for abstract base)
-
PipelineInterface &operator=(PipelineInterface&&) = delete#
Move assignment operator (disabled for abstract base)
-
PipelineInterface() = default#
-
class Sample5GPipeline : public ran::message_adapter::IFapiMessageProcessor, public ran::message_adapter::ISlotIndicationSender, public ran::message_adapter::IFapiSlotInfoProvider, public ran::message_adapter::IPipelineExecutor#
- #include <sample_5g_pipelines.hpp>
Sample 5G pipeline implementation with thread-safe operation
This class implements FAPI message processing for 5G networks with atomic operations for thread safety. Supports multiple cells with independent state machines.
Thread-safety: Uses lock-free atomics for sfn/slot counter, cell bitmap, and cell states. Safe for concurrent access from FAPI RX thread and slot indication timer thread.
Implements:
IFapiMessageProcessor: Process incoming FAPI messages
ISlotIndicationSender: Send periodic slot indications
IFapiSlotInfoProvider: Provide slot info and accumulated messages
IPipelineExecutor: Execute PUSCH pipelines for a given slot
Subclassed by Sample5GPipelineAdapter
Public Functions
-
explicit Sample5GPipeline(const InitParams ¶ms)#
Constructor for the sample 5G pipeline
- Parameters:
params – [in] Initialization parameters
- Throws:
std::invalid_argument – if ipc is nullptr
-
~Sample5GPipeline() override = default#
Destructor
-
Sample5GPipeline(const Sample5GPipeline&) = delete#
-
Sample5GPipeline &operator=(const Sample5GPipeline&) = delete#
-
Sample5GPipeline(Sample5GPipeline&&) = delete#
-
Sample5GPipeline &operator=(Sample5GPipeline&&) = delete#
-
std::string get_status() const#
Get 5G pipeline status information
- Returns:
Status string containing pipeline state and statistics
-
std::size_t get_num_cells_running() const#
Get number of currently running cells
Thread-safe: Uses atomic operations to read active_cell_bitmap.
- Returns:
Count of cells in RUNNING state
-
inline const PhyStats &get_stats() const noexcept#
Get PHY phy statistics.
Safe to call after all worker threads have joined. Returns const reference to atomic statistics - caller uses .load() to read values.
- Returns:
Const reference to statistics
-
virtual void process_msg(nv_ipc_msg_t &msg) override#
Process incoming FAPI message
Handles ConfigRequest, StartRequest, StopRequest, UlTtiRequest, DlTtiRequest, and SlotResponse messages. Updates internal state atomically for thread-safe operation.
- Parameters:
msg – [inout] FAPI message to process
-
virtual void send_slot_indications() override#
Send slot indications to all active cells
Called periodically (typically every 500µs) to advance the slot counter atomically and send slot indication messages via nvIPC to all running cells.
Thread-safety: Safe to call concurrently with process_msg(). Uses atomic operations for sfn/slot counter and active cell bitmap.
-
virtual ran::fapi::SlotInfo get_current_slot() const override#
Get current slot information (SFN and slot number)
Returns the current SFN and slot being processed. Uses atomic operations to safely read the packed sfn/slot counter.
Thread-safe: Uses atomic load operation.
- Returns:
ran::fapi::SlotInfo with sfn in [0, 1023] and slot in [0, 19] for 30kHz SCS
- virtual std::span<const ran::fapi::CapturedFapiMessage> get_accumulated_ul_tti_msgs(
- std::uint16_t slot,
Get accumulated UL-TTI messages for current slot
Returns non-owning view of accumulated UL-TTI-REQUEST messages. Messages remain valid until next slot’s accumulation begins.
- Parameters:
slot – [in] Slot number (0-19 for 30kHz SCS) to get accumulated messages for
- Returns:
Span of captured FAPI messages
- virtual std::uint64_t get_current_absolute_slot(
- ran::fapi::SlotInfo slot_info,
Get absolute slot number for given slot info
Calculates absolute slot number accounting for SFN wrap-arounds. The absolute slot is a monotonic counter that never wraps. Thread-safe: Uses atomic load with acquire semantics.
- Parameters:
slot_info – [in] Slot information containing SFN and slot
- Returns:
Absolute slot number since initialization
-
virtual void launch_pipelines(std::size_t slot) override#
Launch PUSCH pipelines for the given slot
Triggers execution of the PUSCH pipeline(s) for the specified slot. This should be called by the PUSCH RX task after U-Plane processing has prepared the I/Q data.
Thread-safe: Delegates to Driver which uses appropriate synchronization.
- Parameters:
slot – [in] Slot number to process (0-19 for 30kHz SCS)
-
struct InitParams#
- #include <sample_5g_pipelines.hpp>
Initialization parameters for Sample5GPipeline
Public Members
-
nv_ipc_t *ipc = {nullptr}#
nvIPC endpoint (non-owning)
-
std::size_t max_cells = {common::NUM_CELLS_SUPPORTED}#
Maximum supported cells.
-
GraphScheduleCallback on_graph_schedule#
Callback to schedule graph.
-
framework::pipeline::IPipelineOutputProvider *output_provider{nullptr}#
Pipeline output provider (non-owning)
-
nv_ipc_t *ipc = {nullptr}#
-
enum class ran::fapi_5g::FapiStateT : std::uint8_t#
FAPI state machine states for each cell
Values:
-
struct Cell#
- #include <cell.hpp>
Represents a single cell in the 5G network. Manages cell-specific configuration and state.
Thread-safety: The fapi_state member is atomic and can be safely accessed from multiple threads. Other members should only be modified during configuration before cells become active.
Public Functions
- inline Cell(
- const std::uint16_t in_phy_cell_id,
- const ran::common::PhyParams &in_phy_params,
Constructor
- Parameters:
in_phy_cell_id – [in] Physical cell ID
in_phy_params – [in] Physical layer parameters
-
~Cell() = default#
Destructor
Public Members
-
std::uint16_t phy_cell_id = {}#
Physical cell ID.
-
ran::common::PhyParams ul_phy_params#
Physical layer parameters.
-
std::atomic<FapiStateT> fapi_state#
FAPI state (atomic for thread safety)