DOCA Documentation v3.2.0

DOCA Storage Comch to RDMA GGA Offload Application Guide

The doca_storage_comch_to_rdma_gga_offload application acts as a bridge between the initiator and three storage targets. It actively participates in data transfer and leverages doca_libs to accelerate certain data operations transparently to both the initiator and the targets.

The doca_storage_comch_to_rdma_gga_offload application performs the following key functions:

  • Orchestrates communication between the initiator and the three storage instances

  • Periodically triggers data recovery flows to simulate data loss

  • Performs data recovery using erasure coding

  • Performs inline data (de)compression

  • Performs creation of erasure coding redundancy data

To accomplish these tasks, the application establishes TCP connections to three storage targets and listens for an incoming connection from a single initiator using doca_comch_server.

The doca_storage_comch_to_rdma_gga_offload application is divided into two main functional areas:

  • Control-time and shared resources

  • Per-thread data path resources

gga_offload_-_objects-version-1-modificationdate-1761134691420-api-v2.png

The application execution follows two primary phases:

  • Control phase

  • Data path phase

Control Phase

This phase begins with establishing connections to the required storage targets, followed by awaiting a client connection. Once all connections are in place, the application waits for specific control commands:

  • Query storage

  • Init storage

  • Start storage

Each control command is processed using the following sequence:

  1. Relay the command to each connected storage target.

  2. Wait for responses from all storage targets.

  3. Perform required post-processing and consistency checks on the responses.

  4. Send a response back to the client.

Issuing the start storage command initiates the data path phase. While the data threads begin execution, the main thread continues to wait for final control commands to complete the application's lifecycle:

  • Stop storage

  • Shutdown

Data Path Phase

This phase is executed per thread and involves each thread performing I/O operations requested by the client. For a read I/O operation, one of two flows is used:

  • Regular read (no recovery)

  • Recovery read

By default, only the regular read flow is executed. However, periodic recovery operations can be enabled by the user—see the "Command Line Flags" section for details.

Regular Read Data Flow

The regular read flow consists of the stages detailed in the following subsections.

Initiator Request

  1. The initiator sends an I/O request to the GGA offload application.

  2. The GGA offload application generates two read requests: one for data_1 and one for data_2. Since each storage target only stores a half block, these requests are resized to be half of the initiator's original block size. The output address for each request is set to place the resulting half-block of data into the correct slot (the data_1 or data_2 location) inside the service's larger temporary storage block.

gga_offload_-_regular_read_01_-_IO_request-version-1-modificationdate-1761134691143-api-v2.png

RDMA Transfers

Each storage target performs an RDMA write into the GGA offload memory block at the designated offsets.

gga_offload_-_regular_read_02_-_RDMA-version-1-modificationdate-1761134690823-api-v2.png

Target Responses

  1. Both storage targets send I/O responses upon completing the RDMA transfers.

  2. The GGA offload application waits until both responses are received.

gga_offload_-_regular_read_03_-_RDMA_IO_response-version-1-modificationdate-1761134690430-api-v2.png

Decompression and Final Response

  1. The application performs decompression using the received data blocks. Decompression output is written directly to the initiator memory using the doca_decompress task. gga_offload_-_regular_read_04_-_Decompress-version-1-modificationdate-1761134690147-api-v2.png

  2. The application sends an I/O response to the initiator, completing the operation.

    gga_offload_-_regular_read_05_-_Decompress_IO_response-version-1-modificationdate-1761134689770-api-v2.png

Recovery Read Data Flow

This flow is similar to the regular read, but with added steps for error correction using erasure coding. The process involves the stages detailed in the following subsections.

Initiator Request

  1. The initiator sends an I/O request to the GGA offload application.

  2. Given that recovery is required, the application:

    1. Translates the I/O address normally.

    2. Modifies one of the two storage requests to target the parity partition instead of a standard data partition.

    3. Adds an output offset field to redirect the parity data into a reserved region of GGA offload memory.

gga_offload_-_recovery_read_01_-_IO_request-version-1-modificationdate-1761134689373-api-v2.png

RDMA Transfers

  1. Two RDMA transfers are issued:

    • One to the surviving data partition.

    • One to the parity partition.

  2. The transferred blocks are written into the GGA offload memory with appropriate alignment.

gga_offload_-_recovery_read_02_-_RDMA-version-1-modificationdate-1761134689060-api-v2.png

Target Responses

  1. Both storage targets reply with I/O responses.

  2. The GGA offload application waits for both to complete before proceeding.

gga_offload_-_recovery_read_03_-_IO_response-version-1-modificationdate-1761134688710-api-v2.png

Data Recovery

  1. The application performs recovery using the available half block and parity data to reconstruct the missing block.

    gga_offload_-_recovery_read_04_-_EC_recover-version-1-modificationdate-1761134688347-api-v2.png

Decompression and Final Response

  1. The recovered data is then decompressed. As with the regular read, decompression output is written directly to initiator memory.

    gga_offload_-_regular_read_04_-_Decompress-version-1-modificationdate-1761134690147-api-v22.png

  • An I/O response is sent to the initiator, completing the operation.

    gga_offload_-_regular_read_05_-_Decompress_IO_response-version-1-modificationdate-1761134689770-api-v2.png

Write Data Flow

Note

The write path is only available when the service is compiled with the liblz4 dev libraries.

The write data flow consists of the stages detailed in the following subsections.

Initiator Request

The initiator sends an I/O request to the GGA offload application.

gga_offload_-_write_01_-_IO_request-version-1-modificationdate-1761134688020-api-v2.png

Service Fetches Data

The service fetches the data from the initiator as a single DMA read.

gga_offload_-_write_02_-_DMA_fetch_data-version-1-modificationdate-1761134687677-api-v2.png

Data Compression

  1. The service uses a liblz4 to compress the data from block data1 and data 2 into the temp storage space.

    gga_offload_-_write_03_-_Compress_data-version-1-modificationdate-1761134687337-api-v2.png

  2. The compressed bytes are copied back to data 1 and data 2 location but with a metadata header and trailer added.

    gga_offload_-_write_04_-_Add_metadata-version-1-modificationdate-1761134687003-api-v2.png

Parity Block Creation

A half block worth of EC redundancy bytes are created from the compressed data (including its metadata).

gga_offload_-_write_05_-_EC_create-version-1-modificationdate-1761134686663-api-v2.png

IO Requests Sent to Storage Targets

Each of the storage targets are sent an IO write request to store the appropriate half block of data (data 1, data 2, or parity data).

gga_offload_-_write_07_-_RDMA-version-2-modificationdate-1761208529597-api-v2.png

IO Responses

  1. All three storage targets reply with I/O responses.

  2. The GGA offload application waits for all three to complete.

  3. The GGA offload application sends an IO response to the initiator.

    gga_offload_-_write_08_-_IO_response-version-1-modificationdate-1761134685520-api-v2.png

This application leverages the following DOCA libraries:

This application is compiled as part of the set of storage applications. For compilation instructions, refer to the DOCA Storage Applications page.

Application Execution

Warning

This application can only run within the NVIDIA® BlueField® DPU.

Info

DOCA Storage Comch to RDMA GGA Offload is provided in source form. Therefore, compilation is required before the application can be executed.

  • Application usage instructions:

    Copy
    Copied!
                

    Usage: doca_storage_comch_to_rdma_gga_offload [DOCA Flags] [Program Flags]   DOCA Flags: -h, --help Print a help synopsis -v, --version Print program version information -l, --log-level Set the (numeric) log level for the program <10=DISABLE, 20=CRITICAL, 30=ERROR, 40=WARNING, 50=INFO, 60=DEBUG, 70=TRACE> --sdk-log-level Set the SDK (numeric) log level for the program <10=DISABLE, 20=CRITICAL, 30=ERROR, 40=WARNING, 50=INFO, 60=DEBUG, 70=TRACE>  -j, --json <path>                 Parse command line flags from an input json file   Program Flags: -d, --device Device identifier -r, --representor Device host side representor identifier --cpu CPU core to which the process affinity can be set --data-1-storage Storage server addresses in <ip_addr>:<port> format --data-2-storage Storage server addresses in <ip_addr>:<port> format --data-p-storage Storage server addresses in <ip_addr>:<port> format --matrix-type Type of matrix to use. One of: cauchy, vandermonde Default: vandermonde --command-channel-name Name of the channel used by the doca_comch_client. Default: "doca_storage_comch" --control-timeout Time (in seconds) to wait while performing control operations. Default: 5 --trigger-recovery-read-every-n Trigger a recovery read flow every N th request. Default: 0 (disabled)

    Info

    This usage printout can be printed to the command line using the -h (or --help) options:

    Copy
    Copied!
                

    ./doca_storage_comch_to_rdma_gga_offload -h

    For additional information, refer to section "Command-line Flags".

  • CLI example for running the application on the BlueField:

    Copy
    Copied!
                

    ./doca_storage_comch_to_rdma_gga_offload -d 03:00.0 -r 3b:00.0 --data-1-storage 172.17.0.1:12345 --data-2-storage 172.17.0.2:12345 --data-p-storage 172.17.0.3:12345 --cpu 0

    Note

    Both the DOCA Comch device PCIe address (03:00.0) and the DOCA Comch device representor PCIe address (3b:00.0) should match the addresses of the desired PCIe devices.

    Note

    Storage target IP address:port tuples should be updated to refer to the running storage target applications.

Command-line Flags

Flag Type

Short Flag

Long Flag/JSON Key

Description

General flags

h

help

Print a help synopsis

v

version

Print program version information

l

log-level

Set the log level for the application:

  • DISABLE=10

  • CRITICAL=20

  • ERROR=30

  • WARNING=40

  • INFO=50

  • DEBUG=60

  • TRACE=70 (requires compilation with TRACE log level support)

N/A

sdk-log-level

Set the log level for the program:

  • DISABLE=10

  • CRITICAL=20

  • ERROR=30

  • WARNING=40

  • INFO=50

  • DEBUG=60

  • TRACE=70

j

json

Parse command line flags from an input JSON file as well as from the CLI (if provided)

Program flags

d

device

DOCA device identifier. One of:

  • PCIe address: 3b:00.0

  • InfiniBand name: mlx5_0

  • Network interface name: en3f0pf0sf0

Note

This flag is a mandatory.

r

representor

DOCA Comch device representor PCIe address

Note

This flag is a mandatory.

N/A

--cpu

Index of CPU to use. One data path thread is spawned per CPU. Index starts at 0.

Note

The user can specify this argument multiple times to create more threads.

Note

This flag is a mandatory.

N/A

--data-1-storage

IP address and port to use to establish the control TCP connection to the target.

Note

This flag is a mandatory.

N/A

--data-2-storage

IP address and port to use to establish the control TCP connection to the target.

Note

This flag is a mandatory.

N/A

--data-p-storage

IP address and port to use to establish the control TCP connection to the target.

Note

This flag is a mandatory.

N/A

--matrix-type

Type of matrix to use. One of:

  • cauchy

  • vandermonde

N/A

--command-channel-name

Allows customizing the server name used for this application instance if multiple comch servers exist on the same device.

N/A

--control-timeout

Time, in seconds, to wait while performing control operations

N/A

--trigger-recovery-read-every-n

Trigger a recovery read flow every Nth request. Set to 0 to disable recovery reads entirely. For example, a value of 1 triggers a recovery read for every request, while a value of 10 triggers a recovery read for one out of every ten requests.


Troubleshooting

Refer to the NVIDIA BlueField Platform Software Troubleshooting Guide for any issue encountered with the installation or execution of the DOCA applications.

The flow of the application is broken down into key functions and steps:

Copy
Copied!
            

gga_offload_app app{parse_cli_args(argc, argv)};   storage::install_ctrl_c_handler([&app]() { app.abort("User requested abort"); });   app.connect_to_storage(); app.wait_for_comch_client_connection(); app.wait_for_and_process_query_storage(); app.wait_for_and_process_init_storage(); app.wait_for_and_process_start_storage(); app.wait_for_and_process_stop_storage(); app.wait_for_and_process_shutdown(); app.display_stats();

Main/Control Thread Flow

  1. Copy
    Copied!
                

    gga_offload_app app{parse_cli_args(argc, argv)}

    Parse CLI arguments and use these to create the application instance. Initial resources are also created at this stage:

    1. Copy
      Copied!
                  

      DOCA_LOG_INFO("Open doca_dev: %s", m_cfg.device_id.c_str()); m_dev = storage::open_device(m_cfg.device_id);

      Open a doca_dev as specified by the CLI argument: -d or --device

    2. Copy
      Copied!
                  

      DOCA_LOG_INFO("Open doca_dev_rep: %s", m_cfg.representor_id.c_str()); m_dev_rep = storage::open_representor(m_dev, m_cfg.representor_id);

      Open a doca_dev_rep as specified by the CLI argument: -r or --representor

    3. Copy
      Copied!
                  

      m_all_ctrl_channels[connection_role::data_1] = storage::control::make_tcp_client_control_channel( m_cfg.storage_server_address[connection_role::data_1]);   m_all_ctrl_channels[connection_role::data_2] = storage::control::make_tcp_client_control_channel( m_cfg.storage_server_address[connection_role::data_2]);   m_all_ctrl_channels[connection_role::data_p] = storage::control::make_tcp_client_control_channel( m_cfg.storage_server_address[connection_role::data_p]);

      Create three TCP client control channels (control channel objects provide a unified API so that a TCP client, TCP server, doca_comch_client, and doca_comch_server all have a consistent API)

      Info

      See storage_common/control_channel.hpp for more information about the control channel abstraction.

    4. Copy
      Copied!
                  

      m_all_ctrl_channels[connection_role::client] = storage::control::make_comch_server_control_channel(m_dev, m_dev_rep, m_cfg.command_channel_name.c_str(), this, new_comch_consumer_callback, expired_comch_consumer_callback);

      Create a comch server control channel (containing adoca_comch_server instance) using the device, representor, and channel name as specified by the CLI argument --command-channel-name or the default value if none was specified.

  2. Copy
    Copied!
                

    storage::install_ctrl_c_handler([&app]() { app.abort("User requested abort"); });

    Set a signal handler for Ctrl+c keyboard inputs so the app can shutdown gracefully.

  3. Copy
    Copied!
                

    app.connect_to_storage();

    Connect to the TCP server hosted by each of the three storage targets as defined by the CLI arguments: --data-1-storage, --data-2-storage, and --data-p-storage

    1. Copy
      Copied!
                  

      void gga_offload_app::connect_to_storage(void) { for (auto *storage_channel : m_storage_ctrl_channels) { DOCA_LOG_DBG("Connect control channel..."); for (;;) { if (m_abort_flag) { throw storage::runtime_error{DOCA_ERROR_CONNECTION_ABORTED, "Aborted while connecting to storage"}; }   if (storage_channel->is_connected()) break; } } }

      For each storage control channel (the TCP contexts) poll each context in an infinite loop until either it connects, or the user aborts the application. (The servers should have been started before the service, so no delay is desired between polls of the control context).

  4. Copy
    Copied!
                

    app.wait_for_comch_client_connection();

    Wait for a doca_comch_client to connect.

    1. Copy
      Copied!
                  

      void gga_offload_app::wait_for_comch_client_connection(void) { while (!m_all_ctrl_channels[connection_role::client]->is_connected()) { std::this_thread::sleep_for(std::chrono::milliseconds{100}); if (m_abort_flag) { throw storage::runtime_error{DOCA_ERROR_CONNECTION_ABORTED, "Aborted while connecting to client"}; } } }

      Poll the Comch server control channel until a doca_comch_client has connected, or the user aborts the application. If any further Comch client attempts to connect to the server it will be automatically rejected by the control channel which is designed for a 1:1 relationship between clients and servers. A sleep is placed in this loop as it may take the user / operator a few seconds to start the client so there is no gain to polling any faster.

  5. Copy
    Copied!
                

    app.wait_for_and_process_query_storage();

    Wait for the initiator to send a query_storage_request control message and then perform the required actions to fulfill the request:

    1. Copy
      Copied!
                  

      for (auto *storage_ctrl : m_storage_ctrl_channels) { auto storage_request = storage::control::message{ storage::control::message_type::query_storage_request, storage::control::message_id{m_message_id_counter++}, client_request.correlation_id, {}, };   msg_ids.push_back(storage_request.message_id); storage_ctrl->send_message(storage_request); }

      Send a query storage request to each storage target server.

    2. Copy
      Copied!
                  

      wait_for_responses(msg_ids, default_control_timeout_seconds);

      Wait for storage targets to respond.

    3. Check the response from each storage target.

      1. If they all report the same dimensions (block size and block count) respond to the initiator with a query_storage_response message reporting double the values returned by each storage target (as each storage holds half blocks not full blocks) I.E. If the storage target reports "64K capacity 2K block size" report "128K capacity 4K block size".

      2. If the dimensions do not match or any other error occurs respond to the initiator with a error_response message detailing the error that occurred.

  6. Copy
    Copied!
                

    app.wait_for_and_process_init_storage();

    Wait for the initiator to send a init_storage_request control message and then perform the required actions to fulfill the request:

    1. Use the init_storage_payload data to:

      1. Set core count (m_core_count) as the number of cores requested by the initiator (number of --cpu arguments provided to the initiator) OR fail if this is more than the number of --cpu arguments provided to the service.

      2. Set number of transactions per core (m_num_transactions) to: the number of transactions requested by the initiator doubled. This is doubled to allow for batched task submission and avoid race conditions where the initiator can see a response to a transaction and try to re-submit it before the associated comch producer event callback is received by the server meaning the initiator will continually re-try to send the task and degrade performance until the service catches up and re-submits the consumer task. This should be uncommon, but to make sure it can never happen double the transaction count is allocated so even if every single transaction on the initiator hit this issue there is a full second set of transactions ready on the service side to receive the tasks and avoid any contention. A user can experiment with reducing this value to save memory if desired).

      3. Calculate the per transaction storage block size (m_per_transaction_chunk_size) as: block size * 2.5 if write flow is enabled or block size * 1.5 when the write flow is disabled.

      4. Allocate storage memory blocks (m_local_io_region) as an array of m_core_count * m_num_transactions * m_per_transaction_chunk_size bytes.

      5. Copy
        Copied!
                    

        m_remote_io_mmap = storage::make_mmap(m_dev, init_storage_details->mmap_export_blob.data(), init_storage_details->mmap_export_blob.size(), storage::thread_safety::yes);

        Import the initiator mmap so that its IO blocks can be accessed by the service.

      6. Copy
        Copied!
                    

        m_local_io_mmap = storage::make_mmap(m_dev, reinterpret_cast<char *>(m_local_io_region), local_storage_size, DOCA_ACCESS_FLAG_LOCAL_READ_WRITE | DOCA_ACCESS_FLAG_PCI_READ_WRITE | DOCA_ACCESS_FLAG_RDMA_WRITE | DOCA_ACCESS_FLAG_RDMA_READ, storage::thread_safety::yes);   std::vector<uint8_t> mmap_export_blob = [this]() { uint8_t const *reexport_blob = nullptr; size_t reexport_blob_size = 0; auto const ret = doca_mmap_export_rdma(m_local_io_mmap, m_dev, reinterpret_cast<void const **>(&reexport_blob), &reexport_blob_size); if (ret != DOCA_SUCCESS) { throw storage::runtime_error{ret, "Failed to re-export host mmap for rdma"}; }   return std::vector<uint8_t>{reexport_blob, reexport_blob + reexport_blob_size}; }();

        Create and export the service IO blocks memory region. This will be shared with the storage targets so they can access storage service IO blocks memory for RDMA read / write operations.

      7. Copy
        Copied!
                    

        for (auto *storage_ctrl : m_storage_ctrl_channels) { auto storage_request = storage::control::message{ storage::control::message_type::init_storage_request, storage::control::message_id{m_message_id_counter++}, client_request.correlation_id, std::make_unique<storage::control::init_storage_payload>(m_num_transactions, init_storage_details->core_count, mmap_export_blob), }; storage_ctrl->send_message(storage_request); }

        Send an init storage request to each storage target using:

        1. The service transaction count (double the initiator value).

        2. The initiator core count.

        3. The service IO blocks mmap.

      8. Wait for storage targets to respond.

      9. Send a response to the initiator:

        1. Send a init_storage_response message upon success or an error_response message if anything failed

      10. Perform the first stages of the worker threads initialization. These steps are carried out for each thread, but only one thread performs the steps at any time this simplifies the sending and receiving of control messages, the user could modify this flow to execute in parallel if they so desired.

        1. Create thread bound to the Nth CPU provided to the service via the --cpu CLI arguments.

        2. Copy
          Copied!
                      

          m_workers[ii].execute_control_command( worker_create_objects_control_command { m_dev, comch_channel->get_comch_connection(), m_local_io_mmap, m_remote_io_mmap, m_num_transactions, m_storage_block_size, m_cfg.ec_matrix_type, m_cfg.recover_freq} );

          Initialize thread context (asynchronously).

        3. Copy
          Copied!
                      

          connect_rdma(ii, storage::control::rdma_connection_role::io_data, cid);

          Create RDMA data connections (asynchronously) The thread will connect to each of the three storage targets and create a RDMA context which will be idle from the service's point of view, but is used by the storage target to perform RDMA read / write operations. See the DOCA Storage Target RDMA Application Guide for an explanation why there are two RDMA contexts per thread.

        4. Copy
          Copied!
                      

          connect_rdma(ii, storage::control::rdma_connection_role::io_control, cid);

          Create RDMA data connections (asynchronously) The thread will connect to each of the three storage targets and create a RDMA context which will be used to exchange IO requests and responses using RDMA send / recv tasks. See the DOCA Storage Target RDMA Application Guide for an explanation why there are two RDMA contexts per thread.

  7. Copy
    Copied!
                

    app.wait_for_and_process_start_storage();

    Wait for the initiator to send a start_storage_request control message and then perform the required actions to fulfill the request:

    1. Forward the start storage request to each of the storage targets.

    2. Wait for storage targets to respond.

    3. Signal all work threads to begin data path operation.

    4. Send a response to the initiator:

      1. Send a start_storage_response message upon success or an error_response message if anything failed.

  8. Data path execution takes place now until either the user abort the program or a stop message is received.

  9. Copy
    Copied!
                

    app.wait_for_and_process_stop_storage();

    Wait for the initiator to send a stop_storage_request control message and then perform the required actions to fulfill the request:

    1. Forward the stop storage request to each of the storage targets.

    2. Wait for storage targets to respond.

    3. Signal all work threads to stop data path operation.

    4. Collect run time stats.

    5. Send a response to the initiator:

      1. Send a stop_storage_response message upon success or an error_response message if anything failed.

  10. Copy
    Copied!
                

    app.wait_for_and_process_shutdown();

    Wait for the initiator to send a shutdown_request control message and then perform the required actions to fulfill the request:

    1. Forward the shutdown request to each of the storage targets.

    2. Wait for storage targets to respond.

    3. Destroy worker thread objects.

    4. Send a response to the initiator:

      1. Send a stop_storage_response message upon success or an error_response message if anything failed.

  11. Copy
    Copied!
                

    app.display_stats()

    Display runtime statistics.

  12. Application destructor is triggered:

    1. Destroy control channels.

    2. Destroy storage IO blocks doca_mmap.

    3. Destroy storage IO block memory.

    4. Destroy initiator IO blocks doca_mmap.

    5. Close doca_dev_rep.

    6. Close doca_dev.

  13. Program exits.

Worker/Data Path Thread Flow

The work thread proc executes in two phases: Control / configuration phase, followed by data path phase where read, write, and recovery operations take place.

Worker Init Process

Copy
Copied!
            

void gga_offload_app_worker::thread_proc(gga_offload_app_worker *self, uint16_t core_idx) noexcept

The worker starts by executing a loop of:

  1. Lock mutex

  2. If message pointer is not null:

    1. Process the configuration message.

    2. Set the operation result.

  3. Unlock the mutex.

  4. Yield.

The following configuration operations can be performed by the worker thread:

  1. Copy
    Copied!
                

    void gga_offload_app_worker::create_worker_objects(worker_create_objects_control_command const &cmd)

    Create general worker objects:

    1. Creates LZ4 software library context for compression operations (when enabled).

    2. Create N transaction objects.

    3. Create IO message memory.

    4. Create IO message doca_mmap (to allow the messages to be accessed by comch and RDMA).

    5. Create doca_buf_inventory.

    6. Create doca_pe to drive the DOCA contexts (doca_rdma , doca_comch_consumer, doca_comch_producer, doca_compress, doca_ec, doca_dma).

    7. Create doca_comch_consumer, doca_comch_producer, doca_compress, doca_ec, and doca_dma contexts.

    8. Initialize and start contexts.

  2. Copy
    Copied!
                

    void gga_offload_app_worker::export_local_rdma_connection_blob(worker_export_local_rdma_connection_command &cmd)

    Export RDMA context connection binary blob.

  3. Copy
    Copied!
                

    void gga_offload_app_worker::import_remote_rdma_connection_blob(worker_import_local_rdma_connection_command const &cmd)

    Import remote RDMA context connection binary blob.

  4. Copy
    Copied!
                

    void gga_offload_app_worker::are_contexts_ready(worker_are_contexts_ready_control_command &cmd) const noexcept

    Poll all contexts to check they are ready to perform data path operations.

  5. Copy
    Copied!
                

    void gga_offload_app_worker::prepare_tasks(worker_prepare_tasks_control_command const &cmd)

    Prepare transaction contexts by:

    1. Allocating doca_buf objects.

    2. Allocating doca_task objects.

    3. Setting task user data.

    4. Setting task pointers into the transaction context.

    5. Setting per transaction IO block addresses.

  6. Copy
    Copied!
                

    void gga_offload_app_worker::start_data_path(void)

    Break out of the wait for configuration event loop and start the data path loop.

Info

After the configuration phase the mutex is not used again

After breaking out of the initial configuration loop, the thread submits receive tasks (Comch consumer tasks, and RDMA recv tasks) then enters the data path function: run_data_path_ops.

Worker Data Path Process

Copy
Copied!
            

void gga_offload_app_worker::run_data_path_ops(gga_offload_app_worker::hot_data &hot_data) { DOCA_LOG_INFO("Core: %u running", hot_data.core_idx);   while (hot_data.run_flag) { doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count); }   while (hot_data.error_flag == false && hot_data.in_flight_transaction_count != 0) { doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count); } }

During the data path phase the thread simply polls the doca_pe as quickly as possible to check for a task completion from one of the thread DOCA contexts (doca_rdma, doca_comch_consumer, doca_comch_producer, doca_compress, doca_ec, doca_dma) The interesting work is done in the callbacks of these tasks. The flow will always start with a consumer task completion. This is the reception of the IO message from the initiator.

doca_comch_consumer_task_post_recv_cb

Copy
Copied!
            

void gga_offload_app_worker::doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast<void>(task_user_data);   auto const transaction_idx = doca_task_get_user_data(doca_comch_consumer_task_post_recv_as_task(task)).u64; auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);   auto const ret = hot_data->start_transaction(hot_data->transactions[transaction_idx]); if (ret != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to start transaction: %s", doca_error_get_name(ret)); } }

This callback simply interprets the task callback parameters to call the start_transaction function. The start transaction flow performs the following steps:

  1. Check transaction request type

  2. Set transaction action flags based on the transaction type (read, write)

  3. Calls the progress_transaction function to execute as many of the pending actions as possible. Some actions like sending IO requests to the storage targets, decompressing data, creating erasure coding blocks, amongst others are themselves asynchronous so processing will stop until a future task callback resumes the processing.

The transaction will result in a read or write operation being performed. A read operation may result in a regular read or a recovery read depending on value provides by the --trigger-recovery-read-every-n CLI argument. When non 0 even Nth read will perform a recovery read flow.

The flow of these operations are:

  • Write flow

    • Fetch data from initiator memory using DMA

    • Compress data

    • Generate EC redundancy blocks

    • Store data_1, data_2, and data_p to the respective storage targets

    • Respond with IO response

  • Regular read flow

    • Fetch data_1 and data_2 from the respective storage targets

    • Decompress the data into initiator memory

    • Respond with IO response

  • Recovery read flow

    • Fetch data_1 OR data_2 from the respective storage target

    • Fetch data_p from the data_p storage target

    • Recover the missing data half using the existing data and the redundancy data

    • Decompress the data into initiator memory

    • Respond with IO response

doca_comch_producer_task_send_cb

Copy
Copied!
            

void gga_offload_app_worker::doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast<void>(task);   auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr); auto &transaction = hot_data->transactions[task_user_data.u64]; transaction.pending_actions = 0; --(hot_data->in_flight_transaction_count); ++(hot_data->completed_transaction_count);   static_cast<void>( doca_buf_reset_data_len(doca_comch_consumer_task_post_recv_get_buf(transaction.host_request_task)));   auto const ret = doca_task_submit(doca_comch_consumer_task_post_recv_as_task(transaction.host_request_task)); if (ret != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to submit doca_comch_consumer_task_post_recv: %s", doca_error_get_name(ret)); hot_data->error_flag = true; hot_data->run_flag = false; } }

The producer task completion callback signals that the transaction response has been delivered to the initiator. The transaction can now be re-used by re-submitting the transaction consumer task.

doca_rdma_task_send_cb

Copy
Copied!
            

void gga_offload_app_worker::doca_rdma_task_send_cb(doca_rdma_task_send *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast<void>(task);   auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr); auto &transaction = hot_data->transactions[task_user_data.u64];   --(transaction.pending_storage_response_count); if (transaction.pending_storage_response_count == 0) { transaction.pending_actions ^= static_cast<uint32_t>(transaction_action::wait_for_storage_completion); hot_data->progress_transaction(transaction); } }

The RDMA send task callback signals that an IO request has been delivered to one of the storage target machines. A counter is decremented to know when all IO requests and IO responses have been received so that the next step of processing the transaction can be carried out. For a read IO operation this could be data recovery, or decompression next, for a write operation the transaction is completed and the IO response to the initiator can be sent.

doca_rdma_task_receive_cb

Copy
Copied!
            

void gga_offload_app_worker::doca_rdma_task_receive_cb(doca_rdma_task_receive *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast<void>(task_user_data);   auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);   auto *const rdma_io_message = storage::get_buffer_bytes(doca_rdma_task_receive_get_dst_buf(task)); auto const transaction_idx = storage::io_message_view::get_user_data(rdma_io_message).u64;   auto &transaction = hot_data->transactions[transaction_idx];   auto const io_result = storage::io_message_view::get_result(rdma_io_message); if (io_result != DOCA_SUCCESS) { // store error storage::io_message_view::set_result(io_result, transaction.initiator_io_message); }   --(transaction.pending_storage_response_count); if (transaction.pending_storage_response_count == 0) { transaction.pending_actions ^= static_cast<uint32_t>(transaction_action::wait_for_storage_completion); hot_data->progress_transaction(transaction); }   if (hot_data->run_flag) { static_cast<void>(doca_buf_reset_data_len(doca_rdma_task_receive_get_dst_buf(task))); auto const ret = doca_task_submit(doca_rdma_task_receive_as_task(task)); if (ret != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to resubmit doca_rdma_task_receive"); hot_data->run_flag = false; hot_data->error_flag = true; } } }

The RDMA receive task callback signals that an IO response has been received from one of the storage targets. As with a send task completion the counter is decremented to know if processing can continue. In addition the IO response status is inspected as if the storage operation failed then the transaction result should be marked as a failure. The receive task is then re-submitted to be ready to receive another message from that storage target in a future transaction.

doca_ec_task_recover_cb

Copy
Copied!
            

void gga_offload_app_worker::doca_ec_task_recover_cb(doca_ec_task_recover *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast<void>(task); auto *hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr); auto &transaction = hot_data->transactions[task_user_data.u64];   transaction.pending_actions &= ~(static_cast<uint32_t>(transaction_action::recover_a) | static_cast<uint32_t>(transaction_action::recover_b)); hot_data->progress_transaction(transaction); }

The EC recover task callback signals the completion of data recovery using erasure coding blocks. The transaction can now proceed to the next setp, which will be data decompression to continue the read process.

doca_compress_task_decompress_lz4_stream_cb

Copy
Copied!
            

void gga_offload_app_worker::doca_compress_task_decompress_lz4_stream_cb(doca_compress_task_decompress_lz4_stream *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast<void>(task); static_cast<void>(task_user_data);   auto *hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr); auto &transaction = hot_data->transactions[task_user_data.u64];   transaction.pending_actions ^= static_cast<uint32_t>(transaction_action::decompress); hot_data->progress_transaction(transaction); }

The decompress LZ4 stream task callback signals the completion of data decompression. As the decompressed data is stored directly into initiator memory the transaction can continue to send the IO response to the initiator.

doca_ec_task_create_cb

Copy
Copied!
            

void gga_offload_app_worker::doca_ec_task_create_cb(doca_ec_task_create *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast<void>(task); static_cast<void>(task_user_data);   auto *hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr); auto &transaction = hot_data->transactions[task_user_data.u64];   transaction.pending_actions ^= static_cast<uint32_t>(transaction_action::produce_ec_blocks); hot_data->start_commit_to_storage(transaction); }

The EC create task callback signals the completion of the generation of EC redundancy data. The transaction processing can now continue by sending IO requests to all three storage targets to store the three half blocks: data_1, data_2, and data_p in the respective storage targets IO blocks.

doca_dma_task_memcpy_cb

Copy
Copied!
            

void gga_offload_app_worker::doca_dma_task_memcpy_cb(doca_dma_task_memcpy *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast<void>(task); static_cast<void>(task_user_data);   auto *hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr); auto &transaction = hot_data->transactions[task_user_data.u64];   transaction.pending_actions ^= static_cast<uint32_t>(transaction_action::fetch_from_initiator);   auto const ret = hot_data->compress_data(transaction); if (ret != DOCA_SUCCESS) { transaction.set_error(ret); }   hot_data->progress_transaction(transaction); }

The DMA memcpy task callback signals that the fetch of initiator data into service memory is now complete. The software compression of the data is immediately executed to completion as the software LZ4 library does not use events to signal completion. The transaction processing now continues which will start the creation of EC redundancy data.

  • /opt/mellanox/doca/applications/storage/

© Copyright 2025, NVIDIA. Last updated on Nov 20, 2025