DOCA Storage Comch to RDMA GGA Offload Application Guide
The doca_storage_comch_to_rdma_gga_offload application acts as a bridge between the initiator and three storage targets. It actively participates in data transfer and leverages doca_libs to accelerate certain data operations transparently to both the initiator and the targets.
The doca_storage_comch_to_rdma_gga_offload application performs the following key functions:
Orchestrates communication between the initiator and the three storage instances
Periodically triggers data recovery flows to simulate data loss
Performs data recovery using erasure coding
Performs inline data (de)compression
Performs creation of erasure coding redundancy data
To accomplish these tasks, the application establishes TCP connections to three storage targets and listens for an incoming connection from a single initiator using doca_comch_server.
The doca_storage_comch_to_rdma_gga_offload application is divided into two main functional areas:
Control-time and shared resources
Per-thread data path resources
The application execution follows two primary phases:
Control phase
Data path phase
Control Phase
This phase begins with establishing connections to the required storage targets, followed by awaiting a client connection. Once all connections are in place, the application waits for specific control commands:
Query storage
Init storage
Start storage
Each control command is processed using the following sequence:
Relay the command to each connected storage target.
Wait for responses from all storage targets.
Perform required post-processing and consistency checks on the responses.
Send a response back to the client.
Issuing the start storage command initiates the data path phase. While the data threads begin execution, the main thread continues to wait for final control commands to complete the application's lifecycle:
Stop storage
Shutdown
Data Path Phase
This phase is executed per thread and involves each thread performing I/O operations requested by the client. For a read I/O operation, one of two flows is used:
Regular read (no recovery)
Recovery read
By default, only the regular read flow is executed. However, periodic recovery operations can be enabled by the user—see the "Command Line Flags" section for details.
Regular Read Data Flow
The regular read flow consists of the stages detailed in the following subsections.
Initiator Request
The initiator sends an I/O request to the GGA offload application.
The GGA offload application generates two read requests: one for
data_1and one fordata_2. Since each storage target only stores a half block, these requests are resized to be half of the initiator's original block size. The output address for each request is set to place the resulting half-block of data into the correct slot (thedata_1ordata_2location) inside the service's larger temporary storage block.
RDMA Transfers
Each storage target performs an RDMA write into the GGA offload memory block at the designated offsets.
Target Responses
Both storage targets send I/O responses upon completing the RDMA transfers.
The GGA offload application waits until both responses are received.
Decompression and Final Response
The application performs decompression using the received data blocks. Decompression output is written directly to the initiator memory using the
doca_decompresstask.
The application sends an I/O response to the initiator, completing the operation.
Recovery Read Data Flow
This flow is similar to the regular read, but with added steps for error correction using erasure coding. The process involves the stages detailed in the following subsections.
Initiator Request
The initiator sends an I/O request to the GGA offload application.
Given that recovery is required, the application:
Translates the I/O address normally.
Modifies one of the two storage requests to target the parity partition instead of a standard data partition.
Adds an output offset field to redirect the parity data into a reserved region of GGA offload memory.
RDMA Transfers
Two RDMA transfers are issued:
One to the surviving data partition.
One to the parity partition.
The transferred blocks are written into the GGA offload memory with appropriate alignment.
Target Responses
Both storage targets reply with I/O responses.
The GGA offload application waits for both to complete before proceeding.
Data Recovery
The application performs recovery using the available half block and parity data to reconstruct the missing block.
Decompression and Final Response
The recovered data is then decompressed. As with the regular read, decompression output is written directly to initiator memory.
An I/O response is sent to the initiator, completing the operation.
Write Data Flow
The write path is only available when the service is compiled with the liblz4 dev libraries.
The write data flow consists of the stages detailed in the following subsections.
Initiator Request
The initiator sends an I/O request to the GGA offload application.
Service Fetches Data
The service fetches the data from the initiator as a single DMA read.
Data Compression
The service uses a
liblz4to compress the data from block data1 and data 2 into the temp storage space.
The compressed bytes are copied back to data 1 and data 2 location but with a metadata header and trailer added.
Parity Block Creation
A half block worth of EC redundancy bytes are created from the compressed data (including its metadata).
IO Requests Sent to Storage Targets
Each of the storage targets are sent an IO write request to store the appropriate half block of data (data 1, data 2, or parity data).
IO Responses
All three storage targets reply with I/O responses.
The GGA offload application waits for all three to complete.
The GGA offload application sends an IO response to the initiator.
This application leverages the following DOCA libraries:
This application is compiled as part of the set of storage applications. For compilation instructions, refer to the DOCA Storage Applications page.
Application Execution
This application can only run within the NVIDIA® BlueField® DPU.
DOCA Storage Comch to RDMA GGA Offload is provided in source form. Therefore, compilation is required before the application can be executed.
Application usage instructions:
Usage: doca_storage_comch_to_rdma_gga_offload [DOCA Flags] [Program Flags] DOCA Flags: -h, --help Print a help synopsis -v, --version Print program version information -l, --log-level Set the (numeric) log level for the program <10=DISABLE, 20=CRITICAL, 30=ERROR, 40=WARNING, 50=INFO, 60=DEBUG, 70=TRACE> --sdk-log-level Set the SDK (numeric) log level for the program <10=DISABLE, 20=CRITICAL, 30=ERROR, 40=WARNING, 50=INFO, 60=DEBUG, 70=TRACE> -j, --json <path> Parse command line flags from an input json file Program Flags: -d, --device Device identifier -r, --representor Device host side representor identifier --cpu CPU core to which the process affinity can be set --data-1-storage Storage server addresses in <ip_addr>:<port> format --data-2-storage Storage server addresses in <ip_addr>:<port> format --data-p-storage Storage server addresses in <ip_addr>:<port> format --matrix-type Type of matrix to use. One of: cauchy, vandermonde Default: vandermonde --command-channel-name Name of the channel used by the doca_comch_client. Default: "doca_storage_comch" --control-timeout Time (in seconds) to wait while performing control operations. Default: 5 --trigger-recovery-read-every-n Trigger a recovery read flow every N th request. Default: 0 (disabled)
InfoThis usage printout can be printed to the command line using the
-h(or--help) options:./doca_storage_comch_to_rdma_gga_offload -h
For additional information, refer to section "Command-line Flags".
CLI example for running the application on the BlueField:
./doca_storage_comch_to_rdma_gga_offload -d 03:00.0 -r 3b:00.0 --data-1-storage 172.17.0.1:12345 --data-2-storage 172.17.0.2:12345 --data-p-storage 172.17.0.3:12345 --cpu 0
NoteBoth the DOCA Comch device PCIe address (
03:00.0) and the DOCA Comch device representor PCIe address (3b:00.0) should match the addresses of the desired PCIe devices.NoteStorage target IP
address:porttuples should be updated to refer to the running storage target applications.
Command-line Flags
Flag Type | Short Flag | Long Flag/JSON Key | Description |
General flags |
|
| Print a help synopsis |
|
| Print program version information | |
|
| Set the log level for the application:
| |
N/A |
| Set the log level for the program:
| |
|
| Parse command line flags from an input JSON file as well as from the CLI (if provided) | |
Program flags |
|
| DOCA device identifier. One of:
Note
This flag is a mandatory. |
|
| DOCA Comch device representor PCIe address Note
This flag is a mandatory. | |
N/A |
| Index of CPU to use. One data path thread is spawned per CPU. Index starts at 0. Note
The user can specify this argument multiple times to create more threads.
Note
This flag is a mandatory. | |
N/A |
| IP address and port to use to establish the control TCP connection to the target. Note
This flag is a mandatory. | |
N/A |
| IP address and port to use to establish the control TCP connection to the target. Note
This flag is a mandatory. | |
N/A |
| IP address and port to use to establish the control TCP connection to the target. Note
This flag is a mandatory. | |
N/A |
| Type of matrix to use. One of:
| |
N/A |
| Allows customizing the server name used for this application instance if multiple comch servers exist on the same device. | |
N/A |
| Time, in seconds, to wait while performing control operations | |
N/A |
| Trigger a recovery read flow every Nth request. Set to 0 to disable recovery reads entirely. For example, a value of 1 triggers a recovery read for every request, while a value of 10 triggers a recovery read for one out of every ten requests. |
Troubleshooting
Refer to the NVIDIA BlueField Platform Software Troubleshooting Guide for any issue encountered with the installation or execution of the DOCA applications.
The flow of the application is broken down into key functions and steps:
gga_offload_app app{parse_cli_args(argc, argv)};
storage::install_ctrl_c_handler([&app]() {
app.abort("User requested abort");
});
app.connect_to_storage();
app.wait_for_comch_client_connection();
app.wait_for_and_process_query_storage();
app.wait_for_and_process_init_storage();
app.wait_for_and_process_start_storage();
app.wait_for_and_process_stop_storage();
app.wait_for_and_process_shutdown();
app.display_stats();
Main/Control Thread Flow
-
gga_offload_app app{parse_cli_args(argc, argv)}
Parse CLI arguments and use these to create the application instance. Initial resources are also created at this stage:
-
DOCA_LOG_INFO(
"Open doca_dev: %s", m_cfg.device_id.c_str()); m_dev = storage::open_device(m_cfg.device_id);Open a
doca_devas specified by the CLI argument:-dor--device -
DOCA_LOG_INFO(
"Open doca_dev_rep: %s", m_cfg.representor_id.c_str()); m_dev_rep = storage::open_representor(m_dev, m_cfg.representor_id);Open a
doca_dev_repas specified by the CLI argument:-ror--representor -
m_all_ctrl_channels[connection_role::data_1] = storage::control::make_tcp_client_control_channel( m_cfg.storage_server_address[connection_role::data_1]); m_all_ctrl_channels[connection_role::data_2] = storage::control::make_tcp_client_control_channel( m_cfg.storage_server_address[connection_role::data_2]); m_all_ctrl_channels[connection_role::data_p] = storage::control::make_tcp_client_control_channel( m_cfg.storage_server_address[connection_role::data_p]);
Create three TCP client control channels (control channel objects provide a unified API so that a TCP client, TCP server,
doca_comch_client, anddoca_comch_serverall have a consistent API)InfoSee
storage_common/control_channel.hppfor more information about the control channel abstraction. -
m_all_ctrl_channels[connection_role::client] = storage::control::make_comch_server_control_channel(m_dev, m_dev_rep, m_cfg.command_channel_name.c_str(),
this, new_comch_consumer_callback, expired_comch_consumer_callback);Create a comch server control channel (containing a
doca_comch_serverinstance) using the device, representor, and channel name as specified by the CLI argument--command-channel-nameor the default value if none was specified.
-
-
storage::install_ctrl_c_handler([&app]() { app.
abort("User requested abort"); });Set a signal handler for Ctrl+c keyboard inputs so the app can shutdown gracefully.
-
app.connect_to_storage();
Connect to the TCP server hosted by each of the three storage targets as defined by the CLI arguments:
--data-1-storage,--data-2-storage, and--data-p-storage-
voidgga_offload_app::connect_to_storage(void) {for(auto *storage_channel : m_storage_ctrl_channels) { DOCA_LOG_DBG("Connect control channel...");for(;;) {if(m_abort_flag) {throwstorage::runtime_error{DOCA_ERROR_CONNECTION_ABORTED,"Aborted while connecting to storage"}; }if(storage_channel->is_connected())break; } } }For each storage control channel (the TCP contexts) poll each context in an infinite loop until either it connects, or the user aborts the application. (The servers should have been started before the service, so no delay is desired between polls of the control context).
-
-
app.wait_for_comch_client_connection();
Wait for a
doca_comch_clientto connect.-
voidgga_offload_app::wait_for_comch_client_connection(void) {while(!m_all_ctrl_channels[connection_role::client]->is_connected()) { std::this_thread::sleep_for(std::chrono::milliseconds{100});if(m_abort_flag) {throwstorage::runtime_error{DOCA_ERROR_CONNECTION_ABORTED,"Aborted while connecting to client"}; } } }Poll the Comch server control channel until a
doca_comch_clienthas connected, or the user aborts the application. If any further Comch client attempts to connect to the server it will be automatically rejected by the control channel which is designed for a 1:1 relationship between clients and servers. A sleep is placed in this loop as it may take the user / operator a few seconds to start the client so there is no gain to polling any faster.
-
-
app.wait_for_and_process_query_storage();
Wait for the initiator to send a
query_storage_requestcontrol message and then perform the required actions to fulfill the request:-
for(auto *storage_ctrl : m_storage_ctrl_channels) { auto storage_request = storage::control::message{ storage::control::message_type::query_storage_request, storage::control::message_id{m_message_id_counter++}, client_request.correlation_id, {}, }; msg_ids.push_back(storage_request.message_id); storage_ctrl->send_message(storage_request); }Send a query storage request to each storage target server.
-
wait_for_responses(msg_ids, default_control_timeout_seconds);
Wait for storage targets to respond.
Check the response from each storage target.
If they all report the same dimensions (block size and block count) respond to the initiator with a
query_storage_responsemessage reporting double the values returned by each storage target (as each storage holds half blocks not full blocks) I.E. If the storage target reports "64K capacity 2K block size" report "128K capacity 4K block size".If the dimensions do not match or any other error occurs respond to the initiator with a
error_responsemessage detailing the error that occurred.
-
-
app.wait_for_and_process_init_storage();
Wait for the initiator to send a
init_storage_requestcontrol message and then perform the required actions to fulfill the request:Use the
init_storage_payloaddata to:Set core count (
m_core_count) as the number of cores requested by the initiator (number of--cpuarguments provided to the initiator) OR fail if this is more than the number of--cpuarguments provided to the service.Set number of transactions per core (
m_num_transactions) to: the number of transactions requested by the initiator doubled. This is doubled to allow for batched task submission and avoid race conditions where the initiator can see a response to a transaction and try to re-submit it before the associated comch producer event callback is received by the server meaning the initiator will continually re-try to send the task and degrade performance until the service catches up and re-submits the consumer task. This should be uncommon, but to make sure it can never happen double the transaction count is allocated so even if every single transaction on the initiator hit this issue there is a full second set of transactions ready on the service side to receive the tasks and avoid any contention. A user can experiment with reducing this value to save memory if desired).Calculate the per transaction storage block size (
m_per_transaction_chunk_size) as:block size * 2.5if write flow is enabled orblock size * 1.5when the write flow is disabled.Allocate storage memory blocks (
m_local_io_region) as an array ofm_core_count * m_num_transactions * m_per_transaction_chunk_sizebytes.-
m_remote_io_mmap = storage::make_mmap(m_dev, init_storage_details->mmap_export_blob.data(), init_storage_details->mmap_export_blob.size(), storage::thread_safety::yes);
Import the initiator mmap so that its IO blocks can be accessed by the service.
-
m_local_io_mmap = storage::make_mmap(m_dev,
reinterpret_cast<char*>(m_local_io_region), local_storage_size, DOCA_ACCESS_FLAG_LOCAL_READ_WRITE | DOCA_ACCESS_FLAG_PCI_READ_WRITE | DOCA_ACCESS_FLAG_RDMA_WRITE | DOCA_ACCESS_FLAG_RDMA_READ, storage::thread_safety::yes); std::vector<uint8_t> mmap_export_blob = [this]() { uint8_tconst*reexport_blob = nullptr;size_treexport_blob_size = 0; autoconstret = doca_mmap_export_rdma(m_local_io_mmap, m_dev,reinterpret_cast<voidconst**>(&reexport_blob), &reexport_blob_size);if(ret != DOCA_SUCCESS) {throwstorage::runtime_error{ret,"Failed to re-export host mmap for rdma"}; }returnstd::vector<uint8_t>{reexport_blob, reexport_blob + reexport_blob_size}; }();Create and export the service IO blocks memory region. This will be shared with the storage targets so they can access storage service IO blocks memory for RDMA read / write operations.
-
for(auto *storage_ctrl : m_storage_ctrl_channels) { auto storage_request = storage::control::message{ storage::control::message_type::init_storage_request, storage::control::message_id{m_message_id_counter++}, client_request.correlation_id, std::make_unique<storage::control::init_storage_payload>(m_num_transactions, init_storage_details->core_count, mmap_export_blob), }; storage_ctrl->send_message(storage_request); }Send an init storage request to each storage target using:
The service transaction count (double the initiator value).
The initiator core count.
The service IO blocks mmap.
Wait for storage targets to respond.
Send a response to the initiator:
Send a
init_storage_responsemessage upon success or anerror_responsemessage if anything failed
Perform the first stages of the worker threads initialization. These steps are carried out for each thread, but only one thread performs the steps at any time this simplifies the sending and receiving of control messages, the user could modify this flow to execute in parallel if they so desired.
Create thread bound to the Nth CPU provided to the service via the
--cpuCLI arguments.-
m_workers[ii].execute_control_command( worker_create_objects_control_command { m_dev, comch_channel->get_comch_connection(), m_local_io_mmap, m_remote_io_mmap, m_num_transactions, m_storage_block_size, m_cfg.ec_matrix_type, m_cfg.recover_freq} );
Initialize thread context (asynchronously).
-
connect_rdma(ii, storage::control::rdma_connection_role::io_data, cid);
Create RDMA data connections (asynchronously) The thread will connect to each of the three storage targets and create a RDMA context which will be idle from the service's point of view, but is used by the storage target to perform RDMA read / write operations. See the DOCA Storage Target RDMA Application Guide for an explanation why there are two RDMA contexts per thread.
-
connect_rdma(ii, storage::control::rdma_connection_role::io_control, cid);
Create RDMA data connections (asynchronously) The thread will connect to each of the three storage targets and create a RDMA context which will be used to exchange IO requests and responses using RDMA send / recv tasks. See the DOCA Storage Target RDMA Application Guide for an explanation why there are two RDMA contexts per thread.
-
app.wait_for_and_process_start_storage();
Wait for the initiator to send a
start_storage_requestcontrol message and then perform the required actions to fulfill the request:Forward the start storage request to each of the storage targets.
Wait for storage targets to respond.
Signal all work threads to begin data path operation.
Send a response to the initiator:
Send a
start_storage_responsemessage upon success or anerror_responsemessage if anything failed.
Data path execution takes place now until either the user abort the program or a stop message is received.
-
app.wait_for_and_process_stop_storage();
Wait for the initiator to send a
stop_storage_requestcontrol message and then perform the required actions to fulfill the request:Forward the stop storage request to each of the storage targets.
Wait for storage targets to respond.
Signal all work threads to stop data path operation.
Collect run time stats.
Send a response to the initiator:
Send a
stop_storage_responsemessage upon success or anerror_responsemessage if anything failed.
-
app.wait_for_and_process_shutdown();
Wait for the initiator to send a
shutdown_requestcontrol message and then perform the required actions to fulfill the request:Forward the shutdown request to each of the storage targets.
Wait for storage targets to respond.
Destroy worker thread objects.
Send a response to the initiator:
Send a
stop_storage_responsemessage upon success or anerror_responsemessage if anything failed.
-
app.display_stats()
Display runtime statistics.
Application destructor is triggered:
Destroy control channels.
Destroy storage IO blocks
doca_mmap.Destroy storage IO block memory.
Destroy initiator IO blocks
doca_mmap.Close
doca_dev_rep.Close
doca_dev.
Program exits.
Worker/Data Path Thread Flow
The work thread proc executes in two phases: Control / configuration phase, followed by data path phase where read, write, and recovery operations take place.
Worker Init Process
void gga_offload_app_worker::thread_proc(gga_offload_app_worker *self, uint16_t core_idx) noexcept
The worker starts by executing a loop of:
Lock mutex
If message pointer is not null:
Process the configuration message.
Set the operation result.
Unlock the mutex.
Yield.
The following configuration operations can be performed by the worker thread:
-
voidgga_offload_app_worker::create_worker_objects(worker_create_objects_control_commandconst&cmd)Create general worker objects:
Creates LZ4 software library context for compression operations (when enabled).
Create N transaction objects.
Create IO message memory.
Create IO message
doca_mmap(to allow the messages to be accessed by comch and RDMA).Create
doca_buf_inventory.Create
doca_peto drive the DOCA contexts (doca_rdma,doca_comch_consumer,doca_comch_producer,doca_compress,doca_ec,doca_dma).Create
doca_comch_consumer,doca_comch_producer,doca_compress,doca_ec, anddoca_dmacontexts.Initialize and start contexts.
-
voidgga_offload_app_worker::export_local_rdma_connection_blob(worker_export_local_rdma_connection_command &cmd)Export RDMA context connection binary blob.
-
voidgga_offload_app_worker::import_remote_rdma_connection_blob(worker_import_local_rdma_connection_commandconst&cmd)Import remote RDMA context connection binary blob.
-
voidgga_offload_app_worker::are_contexts_ready(worker_are_contexts_ready_control_command &cmd)constnoexceptPoll all contexts to check they are ready to perform data path operations.
-
voidgga_offload_app_worker::prepare_tasks(worker_prepare_tasks_control_commandconst&cmd)Prepare transaction contexts by:
Allocating
doca_bufobjects.Allocating
doca_taskobjects.Setting task user data.
Setting task pointers into the transaction context.
Setting per transaction IO block addresses.
-
voidgga_offload_app_worker::start_data_path(void)Break out of the wait for configuration event loop and start the data path loop.
After the configuration phase the mutex is not used again
After breaking out of the initial configuration loop, the thread submits receive tasks (Comch consumer tasks, and RDMA recv tasks) then enters the data path function: run_data_path_ops.
Worker Data Path Process
void gga_offload_app_worker::run_data_path_ops(gga_offload_app_worker::hot_data &hot_data)
{
DOCA_LOG_INFO("Core: %u running", hot_data.core_idx);
while (hot_data.run_flag) {
doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count);
}
while (hot_data.error_flag == false && hot_data.in_flight_transaction_count != 0) {
doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count);
}
}
During the data path phase the thread simply polls the doca_pe as quickly as possible to check for a task completion from one of the thread DOCA contexts (doca_rdma, doca_comch_consumer, doca_comch_producer, doca_compress, doca_ec, doca_dma) The interesting work is done in the callbacks of these tasks. The flow will always start with a consumer task completion. This is the reception of the IO message from the initiator.
doca_comch_consumer_task_post_recv_cb
void gga_offload_app_worker::doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task,
doca_data task_user_data,
doca_data ctx_user_data) noexcept
{
static_cast<void>(task_user_data);
auto const transaction_idx = doca_task_get_user_data(doca_comch_consumer_task_post_recv_as_task(task)).u64;
auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
auto const ret = hot_data->start_transaction(hot_data->transactions[transaction_idx]);
if (ret != DOCA_SUCCESS) {
DOCA_LOG_ERR("Failed to start transaction: %s", doca_error_get_name(ret));
}
}
This callback simply interprets the task callback parameters to call the start_transaction function. The start transaction flow performs the following steps:
Check transaction request type
Set transaction action flags based on the transaction type (read, write)
Calls the
progress_transactionfunction to execute as many of the pending actions as possible. Some actions like sending IO requests to the storage targets, decompressing data, creating erasure coding blocks, amongst others are themselves asynchronous so processing will stop until a future task callback resumes the processing.
The transaction will result in a read or write operation being performed. A read operation may result in a regular read or a recovery read depending on value provides by the --trigger-recovery-read-every-n CLI argument. When non 0 even Nth read will perform a recovery read flow.
The flow of these operations are:
Write flow
Fetch data from initiator memory using DMA
Compress data
Generate EC redundancy blocks
Store
data_1,data_2, anddata_pto the respective storage targetsRespond with IO response
Regular read flow
Fetch
data_1anddata_2from the respective storage targetsDecompress the data into initiator memory
Respond with IO response
Recovery read flow
Fetch
data_1ORdata_2from the respective storage targetFetch
data_pfrom thedata_pstorage targetRecover the missing data half using the existing data and the redundancy data
Decompress the data into initiator memory
Respond with IO response
doca_comch_producer_task_send_cb
void gga_offload_app_worker::doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task,
doca_data task_user_data,
doca_data ctx_user_data) noexcept
{
static_cast<void>(task);
auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
auto &transaction = hot_data->transactions[task_user_data.u64];
transaction.pending_actions = 0;
--(hot_data->in_flight_transaction_count);
++(hot_data->completed_transaction_count);
static_cast<void>(
doca_buf_reset_data_len(doca_comch_consumer_task_post_recv_get_buf(transaction.host_request_task)));
auto const ret = doca_task_submit(doca_comch_consumer_task_post_recv_as_task(transaction.host_request_task));
if (ret != DOCA_SUCCESS) {
DOCA_LOG_ERR("Failed to submit doca_comch_consumer_task_post_recv: %s", doca_error_get_name(ret));
hot_data->error_flag = true;
hot_data->run_flag = false;
}
}
The producer task completion callback signals that the transaction response has been delivered to the initiator. The transaction can now be re-used by re-submitting the transaction consumer task.
doca_rdma_task_send_cb
void gga_offload_app_worker::doca_rdma_task_send_cb(doca_rdma_task_send *task,
doca_data task_user_data,
doca_data ctx_user_data) noexcept
{
static_cast<void>(task);
auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
auto &transaction = hot_data->transactions[task_user_data.u64];
--(transaction.pending_storage_response_count);
if (transaction.pending_storage_response_count == 0) {
transaction.pending_actions ^= static_cast<uint32_t>(transaction_action::wait_for_storage_completion);
hot_data->progress_transaction(transaction);
}
}
The RDMA send task callback signals that an IO request has been delivered to one of the storage target machines. A counter is decremented to know when all IO requests and IO responses have been received so that the next step of processing the transaction can be carried out. For a read IO operation this could be data recovery, or decompression next, for a write operation the transaction is completed and the IO response to the initiator can be sent.
doca_rdma_task_receive_cb
void gga_offload_app_worker::doca_rdma_task_receive_cb(doca_rdma_task_receive *task,
doca_data task_user_data,
doca_data ctx_user_data) noexcept
{
static_cast<void>(task_user_data);
auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
auto *const rdma_io_message = storage::get_buffer_bytes(doca_rdma_task_receive_get_dst_buf(task));
auto const transaction_idx = storage::io_message_view::get_user_data(rdma_io_message).u64;
auto &transaction = hot_data->transactions[transaction_idx];
auto const io_result = storage::io_message_view::get_result(rdma_io_message);
if (io_result != DOCA_SUCCESS) {
// store error
storage::io_message_view::set_result(io_result, transaction.initiator_io_message);
}
--(transaction.pending_storage_response_count);
if (transaction.pending_storage_response_count == 0) {
transaction.pending_actions ^= static_cast<uint32_t>(transaction_action::wait_for_storage_completion);
hot_data->progress_transaction(transaction);
}
if (hot_data->run_flag) {
static_cast<void>(doca_buf_reset_data_len(doca_rdma_task_receive_get_dst_buf(task)));
auto const ret = doca_task_submit(doca_rdma_task_receive_as_task(task));
if (ret != DOCA_SUCCESS) {
DOCA_LOG_ERR("Failed to resubmit doca_rdma_task_receive");
hot_data->run_flag = false;
hot_data->error_flag = true;
}
}
}
The RDMA receive task callback signals that an IO response has been received from one of the storage targets. As with a send task completion the counter is decremented to know if processing can continue. In addition the IO response status is inspected as if the storage operation failed then the transaction result should be marked as a failure. The receive task is then re-submitted to be ready to receive another message from that storage target in a future transaction.
doca_ec_task_recover_cb
void gga_offload_app_worker::doca_ec_task_recover_cb(doca_ec_task_recover *task,
doca_data task_user_data,
doca_data ctx_user_data) noexcept
{
static_cast<void>(task);
auto *hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
auto &transaction = hot_data->transactions[task_user_data.u64];
transaction.pending_actions &= ~(static_cast<uint32_t>(transaction_action::recover_a) |
static_cast<uint32_t>(transaction_action::recover_b));
hot_data->progress_transaction(transaction);
}
The EC recover task callback signals the completion of data recovery using erasure coding blocks. The transaction can now proceed to the next setp, which will be data decompression to continue the read process.
doca_compress_task_decompress_lz4_stream_cb
void gga_offload_app_worker::doca_compress_task_decompress_lz4_stream_cb(doca_compress_task_decompress_lz4_stream *task,
doca_data task_user_data,
doca_data ctx_user_data) noexcept
{
static_cast<void>(task);
static_cast<void>(task_user_data);
auto *hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
auto &transaction = hot_data->transactions[task_user_data.u64];
transaction.pending_actions ^= static_cast<uint32_t>(transaction_action::decompress);
hot_data->progress_transaction(transaction);
}
The decompress LZ4 stream task callback signals the completion of data decompression. As the decompressed data is stored directly into initiator memory the transaction can continue to send the IO response to the initiator.
doca_ec_task_create_cb
void gga_offload_app_worker::doca_ec_task_create_cb(doca_ec_task_create *task,
doca_data task_user_data,
doca_data ctx_user_data) noexcept
{
static_cast<void>(task);
static_cast<void>(task_user_data);
auto *hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
auto &transaction = hot_data->transactions[task_user_data.u64];
transaction.pending_actions ^= static_cast<uint32_t>(transaction_action::produce_ec_blocks);
hot_data->start_commit_to_storage(transaction);
}
The EC create task callback signals the completion of the generation of EC redundancy data. The transaction processing can now continue by sending IO requests to all three storage targets to store the three half blocks: data_1, data_2, and data_p in the respective storage targets IO blocks.
doca_dma_task_memcpy_cb
void gga_offload_app_worker::doca_dma_task_memcpy_cb(doca_dma_task_memcpy *task,
doca_data task_user_data,
doca_data ctx_user_data) noexcept
{
static_cast<void>(task);
static_cast<void>(task_user_data);
auto *hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
auto &transaction = hot_data->transactions[task_user_data.u64];
transaction.pending_actions ^= static_cast<uint32_t>(transaction_action::fetch_from_initiator);
auto const ret = hot_data->compress_data(transaction);
if (ret != DOCA_SUCCESS) {
transaction.set_error(ret);
}
hot_data->progress_transaction(transaction);
}
The DMA memcpy task callback signals that the fetch of initiator data into service memory is now complete. The software compression of the data is immediately executed to completion as the software LZ4 library does not use events to signal completion. The transaction processing now continues which will start the creation of EC redundancy data.
/opt/mellanox/doca/applications/storage/