The flow of the application is broken down into key functions and steps:

gga_offload_app app{parse_cli_args(argc, argv)}; storage::install_ctrl_c_handler([&app]() { app. abort ( "User requested abort" ); }); app.connect_to_storage(); app.wait_for_comch_client_connection(); app.wait_for_and_process_query_storage(); app.wait_for_and_process_init_storage(); app.wait_for_and_process_start_storage(); app.wait_for_and_process_stop_storage(); app.wait_for_and_process_shutdown(); app.display_stats();

gga_offload_app app{parse_cli_args(argc, argv)} Parse CLI arguments and use these to create the application instance. Initial resources are also created at this stage: DOCA_LOG_INFO( "Open doca_dev: %s" , m_cfg.device_id.c_str()); m_dev = storage::open_device(m_cfg.device_id); Open a doca_dev as specified by the CLI argument: -d or --device DOCA_LOG_INFO( "Open doca_dev_rep: %s" , m_cfg.representor_id.c_str()); m_dev_rep = storage::open_representor(m_dev, m_cfg.representor_id); Open a doca_dev_rep as specified by the CLI argument: -r or --representor m_all_ctrl_channels[connection_role::data_1] = storage::control::make_tcp_client_control_channel( m_cfg.storage_server_address[connection_role::data_1]); m_all_ctrl_channels[connection_role::data_2] = storage::control::make_tcp_client_control_channel( m_cfg.storage_server_address[connection_role::data_2]); m_all_ctrl_channels[connection_role::data_p] = storage::control::make_tcp_client_control_channel( m_cfg.storage_server_address[connection_role::data_p]); Create three TCP client control channels (control channel objects provide a unified API so that a TCP client, TCP server, doca_comch_client , and doca_comch_server all have a consistent API) Info See storage_common/control_channel.hpp for more information about the control channel abstraction. m_all_ctrl_channels[connection_role::client] = storage::control::make_comch_server_control_channel(m_dev, m_dev_rep, m_cfg.command_channel_name.c_str(), this , new_comch_consumer_callback, expired_comch_consumer_callback); Create a comch server control channel (containing a doca_comch_server instance) using the device, representor, and channel name as specified by the CLI argument --command-channel-name or the default value if none was specified. storage::install_ctrl_c_handler([&app]() { app. abort ( "User requested abort" ); }); Set a signal handler for Ctrl+c keyboard inputs so the app can shutdown gracefully. app.connect_to_storage(); Connect to the TCP server hosted by each of the three storage targets as defined by the CLI arguments: --data-1-storage , --data-2-storage , and --data-p-storage void gga_offload_app::connect_to_storage( void ) { for (auto *storage_channel : m_storage_ctrl_channels) { DOCA_LOG_DBG( "Connect control channel..." ); for (;;) { if (m_abort_flag) { throw storage::runtime_error{DOCA_ERROR_CONNECTION_ABORTED, "Aborted while connecting to storage" }; } if (storage_channel->is_connected()) break ; } } } For each storage control channel (the TCP contexts) poll each context in an infinite loop until either it connects, or the user aborts the application. (The servers should have been started before the service, so no delay is desired between polls of the control context). app.wait_for_comch_client_connection(); Wait for a doca_comch_client to connect. void gga_offload_app::wait_for_comch_client_connection( void ) { while (!m_all_ctrl_channels[connection_role::client]->is_connected()) { std::this_thread::sleep_for(std::chrono::milliseconds{100}); if (m_abort_flag) { throw storage::runtime_error{DOCA_ERROR_CONNECTION_ABORTED, "Aborted while connecting to client" }; } } } Poll the Comch server control channel until a doca_comch_client has connected, or the user aborts the application. If any further Comch client attempts to connect to the server it will be automatically rejected by the control channel which is designed for a 1:1 relationship between clients and servers. A sleep is placed in this loop as it may take the user / operator a few seconds to start the client so there is no gain to polling any faster. app.wait_for_and_process_query_storage(); Wait for the initiator to send a query_storage_request control message and then perform the required actions to fulfill the request: for (auto *storage_ctrl : m_storage_ctrl_channels) { auto storage_request = storage::control::message{ storage::control::message_type::query_storage_request, storage::control::message_id{m_message_id_counter++}, client_request.correlation_id, {}, }; msg_ids.push_back(storage_request.message_id); storage_ctrl->send_message(storage_request); } Send a query storage request to each storage target server. wait_for_responses(msg_ids, default_control_timeout_seconds); Wait for storage targets to respond. Check the response from each storage target. If they all report the same dimensions (block size and block count) respond to the initiator with a query_storage_response message reporting double the values returned by each storage target (as each storage holds half blocks not full blocks) I.E. If the storage target reports "64K capacity 2K block size" report "128K capacity 4K block size". If the dimensions do not match or any other error occurs respond to the initiator with a error_response message detailing the error that occurred. Copy Copied! app.wait_for_and_process_init_storage(); Wait for the initiator to send a init_storage_request control message and then perform the required actions to fulfill the request: Use the init_storage_payload data to: Set core count ( m_core_count ) as the number of cores requested by the initiator (number of --cpu arguments provided to the initiator) OR fail if this is more than the number of --cpu arguments provided to the service. Set number of transactions per core ( m_num_transactions ) to: the number of transactions requested by the initiator doubled. This is doubled to allow for batched task submission and avoid race conditions where the initiator can see a response to a transaction and try to re-submit it before the associated comch producer event callback is received by the server meaning the initiator will continually re-try to send the task and degrade performance until the service catches up and re-submits the consumer task. This should be uncommon, but to make sure it can never happen double the transaction count is allocated so even if every single transaction on the initiator hit this issue there is a full second set of transactions ready on the service side to receive the tasks and avoid any contention. A user can experiment with reducing this value to save memory if desired). Calculate the per transaction storage block size ( m_per_transaction_chunk_size ) as: block size * 2.5 if write flow is enabled or block size * 1.5 when the write flow is disabled. Allocate storage memory blocks ( m_local_io_region ) as an array of m_core_count * m_num_transactions * m_per_transaction_chunk_size bytes. Copy Copied! m_remote_io_mmap = storage::make_mmap(m_dev, init_storage_details->mmap_export_blob.data(), init_storage_details->mmap_export_blob.size(), storage::thread_safety::yes); Import the initiator mmap so that its IO blocks can be accessed by the service. m_remote_io_mmap = storage::make_mmap(m_dev, init_storage_details->mmap_export_blob.data(), init_storage_details->mmap_export_blob.size(), storage::thread_safety::yes); Import the initiator mmap so that its IO blocks can be accessed by the service. m_local_io_mmap = storage::make_mmap(m_dev, reinterpret_cast < char *>(m_local_io_region), local_storage_size, DOCA_ACCESS_FLAG_LOCAL_READ_WRITE | DOCA_ACCESS_FLAG_PCI_READ_WRITE | DOCA_ACCESS_FLAG_RDMA_WRITE | DOCA_ACCESS_FLAG_RDMA_READ, storage::thread_safety::yes); std::vector<uint8_t> mmap_export_blob = [ this ]() { uint8_t const *reexport_blob = nullptr; size_t reexport_blob_size = 0; auto const ret = doca_mmap_export_rdma(m_local_io_mmap, m_dev, reinterpret_cast < void const **>(&reexport_blob), &reexport_blob_size); if (ret != DOCA_SUCCESS) { throw storage::runtime_error{ret, "Failed to re-export host mmap for rdma" }; } return std::vector<uint8_t>{reexport_blob, reexport_blob + reexport_blob_size}; }(); Create and export the service IO blocks memory region. This will be shared with the storage targets so they can access storage service IO blocks memory for RDMA read / write operations. for (auto *storage_ctrl : m_storage_ctrl_channels) { auto storage_request = storage::control::message{ storage::control::message_type::init_storage_request, storage::control::message_id{m_message_id_counter++}, client_request.correlation_id, std::make_unique<storage::control::init_storage_payload>(m_num_transactions, init_storage_details->core_count, mmap_export_blob), }; storage_ctrl->send_message(storage_request); } Send an init storage request to each storage target using: The service transaction count (double the initiator value). The initiator core count. The service IO blocks mmap. Wait for storage targets to respond. Send a response to the initiator: Send a init_storage_response message upon success or an error_response message if anything failed Perform the first stages of the worker threads initialization. These steps are carried out for each thread, but only one thread performs the steps at any time this simplifies the sending and receiving of control messages, the user could modify this flow to execute in parallel if they so desired. Create thread bound to the Nth CPU provided to the service via the --cpu CLI arguments. m_workers[ii].execute_control_command( worker_create_objects_control_command { m_dev, comch_channel->get_comch_connection(), m_local_io_mmap, m_remote_io_mmap, m_num_transactions, m_storage_block_size, m_cfg.ec_matrix_type, m_cfg.recover_freq} ); Initialize thread context (asynchronously). connect_rdma(ii, storage::control::rdma_connection_role::io_data, cid); Create RDMA data connections (asynchronously) The thread will connect to each of the three storage targets and create a RDMA context which will be idle from the service's point of view, but is used by the storage target to perform RDMA read / write operations. See the DOCA Storage Target RDMA Application Guide for an explanation why there are two RDMA contexts per thread. connect_rdma(ii, storage::control::rdma_connection_role::io_control, cid); Create RDMA data connections (asynchronously) The thread will connect to each of the three storage targets and create a RDMA context which will be used to exchange IO requests and responses using RDMA send / recv tasks. See the DOCA Storage Target RDMA Application Guide for an explanation why there are two RDMA contexts per thread. app.wait_for_and_process_start_storage(); Wait for the initiator to send a start_storage_request control message and then perform the required actions to fulfill the request: Forward the start storage request to each of the storage targets. Wait for storage targets to respond. Signal all work threads to begin data path operation. Send a response to the initiator: Send a start_storage_response message upon success or an error_response message if anything failed. Data path execution takes place now until either the user abort the program or a stop message is received. app.wait_for_and_process_stop_storage(); Wait for the initiator to send a stop_storage_request control message and then perform the required actions to fulfill the request: Forward the stop storage request to each of the storage targets. Wait for storage targets to respond. Signal all work threads to stop data path operation. Collect run time stats. Send a response to the initiator: Send a stop_storage_response message upon success or an error_response message if anything failed. app.wait_for_and_process_shutdown(); Wait for the initiator to send a shutdown_request control message and then perform the required actions to fulfill the request: Forward the shutdown request to each of the storage targets. Wait for storage targets to respond. Destroy worker thread objects. Send a response to the initiator: Send a stop_storage_response message upon success or an error_response message if anything failed. app.display_stats() Display runtime statistics. Application destructor is triggered: Destroy control channels. Destroy storage IO blocks doca_mmap . Destroy storage IO block memory. Destroy initiator IO blocks doca_mmap . Close doca_dev_rep . Close doca_dev . Program exits.

The work thread proc executes in two phases: Control / configuration phase, followed by data path phase where read, write, and recovery operations take place.

void gga_offload_app_worker::thread_proc(gga_offload_app_worker *self, uint16_t core_idx) noexcept

The worker starts by executing a loop of:

Lock mutex If message pointer is not null: Process the configuration message. Set the operation result. Unlock the mutex. Yield.

The following configuration operations can be performed by the worker thread:

void gga_offload_app_worker::create_worker_objects(worker_create_objects_control_command const &cmd) Create general worker objects: Creates LZ4 software library context for compression operations (when enabled). Create N transaction objects. Create IO message memory. Create IO message doca_mmap (to allow the messages to be accessed by comch and RDMA). Create doca_buf_inventory . Create doca_pe to drive the DOCA contexts ( doca_rdma , doca_comch_consumer , doca_comch_producer , doca_compress , doca_ec , doca_dma ). Create doca_comch_consumer , doca_comch_producer , doca_compress , doca_ec , and doca_dma contexts. Initialize and start contexts. void gga_offload_app_worker::export_local_rdma_connection_blob(worker_export_local_rdma_connection_command &cmd) Export RDMA context connection binary blob. void gga_offload_app_worker::import_remote_rdma_connection_blob(worker_import_local_rdma_connection_command const &cmd) Import remote RDMA context connection binary blob. void gga_offload_app_worker::are_contexts_ready(worker_are_contexts_ready_control_command &cmd) const noexcept Poll all contexts to check they are ready to perform data path operations. void gga_offload_app_worker::prepare_tasks(worker_prepare_tasks_control_command const &cmd) Prepare transaction contexts by: Allocating doca_buf objects. Allocating doca_task objects. Setting task user data. Setting task pointers into the transaction context. Setting per transaction IO block addresses. void gga_offload_app_worker::start_data_path( void ) Break out of the wait for configuration event loop and start the data path loop.

Info After the configuration phase the mutex is not used again

After breaking out of the initial configuration loop, the thread submits receive tasks (Comch consumer tasks, and RDMA recv tasks) then enters the data path function: run_data_path_ops .

void gga_offload_app_worker::run_data_path_ops(gga_offload_app_worker::hot_data &hot_data) { DOCA_LOG_INFO( "Core: %u running" , hot_data.core_idx); while (hot_data.run_flag) { doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count); } while (hot_data.error_flag == false && hot_data.in_flight_transaction_count != 0) { doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count); } }

During the data path phase the thread simply polls the doca_pe as quickly as possible to check for a task completion from one of the thread DOCA contexts ( doca_rdma , doca_comch_consumer , doca_comch_producer , doca_compress , doca_ec , doca_dma ) The interesting work is done in the callbacks of these tasks. The flow will always start with a consumer task completion. This is the reception of the IO message from the initiator.

void gga_offload_app_worker::doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast < void >(task_user_data); auto const transaction_idx = doca_task_get_user_data(doca_comch_consumer_task_post_recv_as_task(task)).u64; auto * const hot_data = static_cast <gga_offload_app_worker::hot_data *>(ctx_user_data.ptr); auto const ret = hot_data->start_transaction(hot_data->transactions[transaction_idx]); if (ret != DOCA_SUCCESS) { DOCA_LOG_ERR( "Failed to start transaction: %s" , doca_error_get_name(ret)); } }

This callback simply interprets the task callback parameters to call the start_transaction function. The start transaction flow performs the following steps:

Check transaction request type Set transaction action flags based on the transaction type (read, write) Calls the progress_transaction function to execute as many of the pending actions as possible. Some actions like sending IO requests to the storage targets, decompressing data, creating erasure coding blocks, amongst others are themselves asynchronous so processing will stop until a future task callback resumes the processing.

The transaction will result in a read or write operation being performed. A read operation may result in a regular read or a recovery read depending on value provides by the --trigger-recovery-read-every-n CLI argument. When non 0 even Nth read will perform a recovery read flow.

The flow of these operations are:

Write flow Fetch data from initiator memory using DMA Compress data Generate EC redundancy blocks Store data_1 , data_2 , and data_p to the respective storage targets Respond with IO response

Regular read flow Fetch data_1 and data_2 from the respective storage targets Decompress the data into initiator memory Respond with IO response

Recovery read flow Fetch data_1 OR data_2 from the respective storage target Fetch data_p from the data_p storage target Recover the missing data half using the existing data and the redundancy data Decompress the data into initiator memory Respond with IO response



void gga_offload_app_worker::doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task, doca_data task

The producer task completion callback signals that the transaction response has been delivered to the initiator. The transaction can now be re-used by re-submitting the transaction consumer task.

Copy Copied! void gga_offload_app_worker::doca_rdma_task_send_cb(doca_rdma_task_send *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast < void >(task); auto * const hot_data = static_cast <gga_offload_app_worker::hot_data *>(ctx_user_data.ptr); auto &transaction = hot_data->transactions[task_user_data.u64]; --(transaction.pending_storage_response_count); if (transaction.pending_storage_response_count == 0) { transaction.pending_actions ^= static_cast <uint32_t>(transaction_action::wait_for_storage_completion); hot_data->progress_transaction(transaction); } }

The RDMA send task callback signals that an IO request has been delivered to one of the storage target machines. A counter is decremented to know when all IO requests and IO responses have been received so that the next step of processing the transaction can be carried out. For a read IO operation this could be data recovery, or decompression next, for a write operation the transaction is completed and the IO response to the initiator can be sent.

Collapse Source Copy Copied! void gga_offload_app_worker::doca_rdma_task_receive_cb(doca_rdma_task_receive *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast < void >(task_user_data); auto * const hot_data = static_cast <gga_offload_app_worker::hot_data *>(ctx_user_data.ptr); auto * const rdma_io_message = storage::get_buffer_bytes(doca_rdma_task_receive_get_dst_buf(task)); auto const transaction_idx = storage::io_message_view::get_user_data(rdma_io_message).u64; auto &transaction = hot_data->transactions[transaction_idx]; auto const io_result = storage::io_message_view::get_result(rdma_io_message); if (io_result != DOCA_SUCCESS) { storage::io_message_view::set_result(io_result, transaction.initiator_io_message); } --(transaction.pending_storage_response_count); if (transaction.pending_storage_response_count == 0) { transaction.pending_actions ^= static_cast <uint32_t>(transaction_action::wait_for_storage_completion); hot_data->progress_transaction(transaction); } if (hot_data->run_flag) { static_cast < void >(doca_buf_reset_data_len(doca_rdma_task_receive_get_dst_buf(task))); auto const ret = doca_task_submit(doca_rdma_task_receive_as_task(task)); if (ret != DOCA_SUCCESS) { DOCA_LOG_ERR( "Failed to resubmit doca_rdma_task_receive" ); hot_data->run_flag = false ; hot_data->error_flag = true ; } } }

The RDMA receive task callback signals that an IO response has been received from one of the storage targets. As with a send task completion the counter is decremented to know if processing can continue. In addition the IO response status is inspected as if the storage operation failed then the transaction result should be marked as a failure. The receive task is then re-submitted to be ready to receive another message from that storage target in a future transaction.

Copy Copied! void gga_offload_app_worker::doca_ec_task_recover_cb(doca_ec_task_recover *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast < void >(task); auto *hot_data = static_cast <gga_offload_app_worker::hot_data *>(ctx_user_data.ptr); auto &transaction = hot_data->transactions[task_user_data.u64]; transaction.pending_actions &= ~( static_cast <uint32_t>(transaction_action::recover_a) | static_cast <uint32_t>(transaction_action::recover_b)); hot_data->progress_transaction(transaction); }

The EC recover task callback signals the completion of data recovery using erasure coding blocks. The transaction can now proceed to the next setp, which will be data decompression to continue the read process.

Copy Copied! void gga_offload_app_worker::doca_compress_task_decompress_lz4_stream_cb(doca_compress_task_decompress_lz4_stream *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast < void >(task); static_cast < void >(task_user_data); auto *hot_data = static_cast <gga_offload_app_worker::hot_data *>(ctx_user_data.ptr); auto &transaction = hot_data->transactions[task_user_data.u64]; transaction.pending_actions ^= static_cast <uint32_t>(transaction_action::decompress); hot_data->progress_transaction(transaction); }

The decompress LZ4 stream task callback signals the completion of data decompression. As the decompressed data is stored directly into initiator memory the transaction can continue to send the IO response to the initiator.

Copy Copied! void gga_offload_app_worker::doca_ec_task_create_cb(doca_ec_task_create *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast < void >(task); static_cast < void >(task_user_data); auto *hot_data = static_cast <gga_offload_app_worker::hot_data *>(ctx_user_data.ptr); auto &transaction = hot_data->transactions[task_user_data.u64]; transaction.pending_actions ^= static_cast <uint32_t>(transaction_action::produce_ec_blocks); hot_data->start_commit_to_storage(transaction); }

The EC create task callback signals the completion of the generation of EC redundancy data. The transaction processing can now continue by sending IO requests to all three storage targets to store the three half blocks: data_1 , data_2 , and data_p in the respective storage targets IO blocks.

Copy Copied! void gga_offload_app_worker::doca_dma_task_memcpy_cb(doca_dma_task_memcpy *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { static_cast < void >(task); static_cast < void >(task_user_data); auto *hot_data = static_cast <gga_offload_app_worker::hot_data *>(ctx_user_data.ptr); auto &transaction = hot_data->transactions[task_user_data.u64]; transaction.pending_actions ^= static_cast <uint32_t>(transaction_action::fetch_from_initiator); auto const ret = hot_data->compress_data(transaction); if (ret != DOCA_SUCCESS) { transaction.set_error(ret); } hot_data->progress_transaction(transaction); }

The DMA memcpy task callback signals that the fetch of initiator data into service memory is now complete. The software compression of the data is immediately executed to completion as the software LZ4 library does not use events to signal completion. The transaction processing now continues which will start the creation of EC redundancy data.