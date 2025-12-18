The flow of the application is broken down into key functions / steps:

Copy Copied! target_rdma_app app{parse_target_rdma_app_cli_args(argc, argv)}; storage::install_ctrl_c_handler([&app]() { app. abort ( "User requested abort" ); }); app.wait_for_client_connection(); app.wait_for_and_process_query_storage(); app.wait_for_and_process_init_storage(); app.wait_for_and_process_create_rdma_connections(); app.wait_for_and_process_start_storage(); app.wait_for_and_process_stop_storage(); app.wait_for_and_process_shutdown(); app.display_stats();

Copy Copied! initiator_comch_app app{parse_cli_args(argc, argv)}; Parse CLI arguments and use these to create the application instance. Initial resources are also created at this stage: Copy Copied! Open a doca_dev as specified by the CLI argument: -d or --device . Copy Copied! m_storage_block_count = m_cfg.block_count; m_storage_block_size = m_cfg.block_size; auto const page_size = storage::get_system_page_size(); m_local_io_region_size = uint64_t{m_storage_block_count} * m_storage_block_size; m_local_io_region = static_cast <uint8_t *>(storage::aligned_alloc(page_size, m_local_io_region_size)); Allocate storage IO blocks memory. Copy Copied! if (!m_cfg.content.empty()) { std::copy(std::begin(m_cfg.content), std::end(m_cfg.content), m_local_io_region); } Copy user defined content into the storage IO blocks memory if it was provided. Copy Copied! m_control_channel = storage::control::make_tcp_server_control_channel(m_cfg.listen_port); Create a TCP server control channel using the listen port specified by the CLI argument --listen-port . (Control channel objects provide a unified API so that a TCP client, TCP server, doca_comch_client , and doca_comch_server all have a consistent API). Info See storage_common/control_channel.hpp for more information about the control channel abstraction. Copy Copied! storage::install_ctrl_c_handler([&app]() { app. abort ( "User requested abort" ); }); Set a signal handler for control+c keyboard inputs so the app can shutdown gracefully. Copy Copied! app.wait_for_client_connection(); Wait for a TCP client (the storage service) to connect. Copy Copied! app.wait_for_and_process_query_storage(); Wait for the storage service to send a query_storage_request control message and then perform the required actions to fulfill the request: Populate a query_storage_response with the storage capacity (block count * block size) and the block size. Send response to the storage service. Copy Copied! app.wait_for_and_process_init_storage(); Wait for the storage service to send a init_storage_request control message and then perform the required actions to fulfill the request: Use the init_storage_payload data to: Set core count ( m_core_count ) as the number of cores requested by the initiator (number of --cpu arguments provided to the initiator) OR fail if this is more than the number of --cpu arguments provided to the service. Set number of transactions per core ( m_num_transactions ). Create doca_mmap object from the provided export blob. Create m_core_count worker threads. Perform the first stages of the worker threads initialization. These steps are carried out for each thread, but only one thread performs the steps at any time this simplifies the sending and receiving of control messages, the user could modify this flow to execute in parallel if they so desired. Create thread bound to the Nth CPU provided to the service via the --cpu CLI arguments. Copy Copied! m_workers[ii].execute_control_command( worker_create_objects_control_command { m_dev, m_transaction_count, m_remote_io_mmap, m_local_io_mmap} ); Initialize thread context (asychronously). Send a response to the storage service: Send an init_storage_response message upon success or an error_response message if anything failed. Copy Copied! app.wait_for_and_process_create_rdma_connections(); Wait for the storage service to send a create_rdma_connection_request control message and then perform the required actions to fulfill the request: Copy Copied! void const *blob = nullptr; size_t blob_size = 0; doca_rdma_export(rdma_ctx, &blob, &blob_size, &rdma_conn); Create local side of the RDMA connection by exporting RDMA context and store the created connection blob. Copy Copied! doca_rdma_connect(rdma_ctx, cmd.import_blob.data(), cmd.import_blob.size(), rdma_conn); Connect to remote RDMA side of the RDMA connection. Info This RDMA connection activity will be performed twice per worker thread. This allows for the IO control messages to be transferred over one connection while the other is used for data transfer. The reason for this is to reduce latency as with a single connection doca_rdma_task_send , doca_rdma_task_read , and doca_rdma_task_write tasks execute in submission order. This could mean that if, for example, 32 IO requests arrive in close succession and the corresponding 32 read / write tasks are submitted, once the first operation completes its response is blocked until all of the remaining 31 data transfers complete. This can cause significant delay in responding to the service / initiator and greatly increases the round trip latency of a storage request. Having two connections means in the previous scenario when the first of the N tasks completes its IO response can be sent immediately while the remaining N-1 read / write operations continue to execute. Send a response to the storage service: Send a create_rdma_connection_response message upon success or an error_response message if anything failed. Copy Copied! app.wait_for_and_process_start_storage(); Wait for the storage service to send a start_storage_request control message and then perform the required actions to fulfill the request: Copy Copied! doca_error_t result; doca_ctx_states ctx_state; result = doca_ctx_get_state(doca_rdma_as_ctx(rdma_ctx), &ctx_state); if (result == DOCA_SUCCESS && ctx_state == DOCA_CTX_STATE_RUNNING) { } Poll all worker connections until RDMA connections are fully established. Signal all work threads to begin data path operation. Send a response to the storage service: Send a start_storage response message upon success or an error_response message if anything failed. Data path execution takes place now until either the user abort the program or a stop message is received. Copy Copied! app.wait_for_and_process_stop_storage(); Wait for the storage service to send a stop_storage_request control message and then perform the required actions to fulfill the request: Join all work threads. Send a response to the storage service: Send a stop_storage_response message upon success or an error_response message if anything failed. Copy Copied! app.wait_for_and_process_shutdown(); Wait for the storage service to send a shutdown_request control message and then perform the required actions to fulfill the request: Collect runtime stats. Destroy workers. Send a response to the storage service: Send a shutdown_response message upon success or an error_response message if anything failed. Copy Copied! app.display_stats(); Display runtime statistics. Application destructor Destroy TCP control channel Destroy doca_mmap objects. Destroy IO blocks memory. Close doca_dev . Application exits.

The work thread proc executes in two phases: Control / configuration phase, followed by data path phase where read and write operations take place.

Copy Copied! void target_rdma_worker::thread_proc(target_rdma_worker *self, uint16_t core_idx) noexcept

The worker starts by executing a loop of:

Lock mutex If message pointer is not null: Process the configuration message. Set the operation result. Unlock the mutex. Yield.

The following configuration operations can be performed by the worker thread:

Copy Copied! void target_rdma_worker::create_worker_objects(worker_create_objects_control_command const &cmd) Create the objects required by the worker to carry out data path operations: Create IO messages memory. Create IO messages doca_mmap (to allow the messages to be accessed by RDMA).. Create doca_buf_inventory . Create doca_pe to drive the DOCA contexts. Create two doca_rdma contexts. Configure and start RDMA contexts. Copy Copied! void target_rdma_worker::create_rdma_connection(worker_create_rdma_connection_command &cmd) Create an RDMA connection from one of the two RDMA contexts held by the worker (which one to use is specified by the command): Call doca_rdma_connect to create the connection object, and connection blob. Call doca_rdma_connect to start connecting to the remote side of the connection. Store the local connection blob in the command object for use by the control thread to respond to the service. Copy Copied! void target_rdma_worker::are_contexts_ready(worker_are_contexts_ready_control_command &cmd) const noexcept Check each RDMA context and report when both are both ready for use. Copy Copied! void target_rdma_worker::prepare_tasks( void ) Final preparations for data path execution: Cache frequently used static values into hot data (a single cache line). Allocate transaction objects. Allocate doca_buf objects. Allocate doca_task objects. Set task user data. Set task pointers into the transaction context. Copy Copied! void target_rdma_worker::start_data_path() Break out of the wait for configuration event loop and start the data path loop.

Info After the configuration phase the mutex is not used again

After breaking out of the initial configuration loop, the thread submits receive tasks (RDMA recv tasks) then enters the data path function: run_data_path_ops .

Copy Copied! void target_rdma_worker::run_data_path_ops(target_rdma_worker::hot_data &hot_data) { DOCA_LOG_INFO( "Core: %u running" , hot_data.core_idx); while (hot_data.run_flag) { doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count); } while (hot_data.error_flag == false && hot_data.in_flight_transaction_count != 0) { doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count); } }

During the data path phase the thread simply polls the doca_pe as quickly as possible to check for a task completion from one of the RDMA contexts. The interesting work is done in the callbacks of these tasks. The flow will always start with a RDMA recv task completion. This is the reception of the IO message from the storage service.

Copy Copied! void target_rdma_worker::doca_rdma_task_receive_cb(doca_rdma_task_receive *task, doca_data task_user_data, doca_data ctx_user_data) noexcept

This callback is the main driver of the storage target. It carries out the following steps:

Copy Copied! auto * const io_message = storage::get_buffer_bytes(doca_rdma_task_receive_get_dst_buf(task)); auto const message_type = storage::io_message_view::get_type(io_message); Interpret the type of IO request that has been received. Copy Copied! char * const remote_addr = hot_data->remote_memory_start_addr + storage::io_message_view::get_requester_offset(io_message); char * const local_addr = hot_data->local_memory_start_addr + storage::io_message_view::get_storage_offset(io_message); uint32_t const transfer_size = storage::io_message_view::get_io_size(io_message); Extract IO addresses and transfer size from the IO request. Copy Copied! auto * const transfer_ctx = static_cast <transfer_context *>(task_user_data.ptr); Get a reference to the transaction object from the task user data. Copy Copied! switch (message_type) { case storage::io_message_type::read: { oca_buf_inventory_buf_reuse_by_data(transfer_ctx->storage_buf, local_addr, transfer_size); doca_buf_inventory_buf_reuse_by_addr(transfer_ctx->host_buf, remote_addr, transfer_size); doca_task_submit(doca_rdma_task_write_as_task(transfer_ctx->write_task)); } break ; case storage::io_message_type::write: { doca_buf_inventory_buf_reuse_by_data(transfer_ctx->host_buf, remote_addr, transfer_size); doca_buf_inventory_buf_reuse_by_addr(transfer_ctx->storage_buf, local_addr, transfer_size); doca_task_submit(doca_rdma_task_read_as_task(transfer_ctx->read_task)); } break ; Submit the appropriate RDMA task. IO read is moving data from storage IO blocks to the remote IO blocks; so from the storage point of view its a write. IO write is moving data from the remote IO blocks to the storage IO blocks; so from the storage point of view its a read. If the any action in this process has failed send an IO error response to the caller, otherwise wait for the RDMA read / write task to complete.

Copy Copied! void target_rdma_worker::on_transfer_complete(doca_task *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { auto * const hot_data = static_cast <target_rdma_worker::hot_data *>(ctx_user_data.ptr); auto * const response_task = static_cast <doca_rdma_task_send *>(task_user_data.ptr); auto * const io_message = storage::get_buffer_bytes( const_cast <doca_buf *>(doca_rdma_task_send_get_src_buf(response_task))); ++(hot_data->completed_transaction_count); storage::io_message_view::set_type(storage::io_message_type::result, io_message); storage::io_message_view::set_result(DOCA_SUCCESS, io_message); auto const ret = doca_task_submit(doca_rdma_task_send_as_task(response_task)); if (ret != DOCA_SUCCESS) { DOCA_LOG_ERR( "Failed submit response task: %s" , doca_error_get_name(ret)); }

A callback that is re-used for both doca_rdma_task_read and doca_rdma_task_write . The work is the same regardless of which task type completed and more detailed task type info is not required. This callback will simply update the IO message to change its type to a response, and set the result to success. The IO message is then sent back to the storage service.

Copy Copied! void target_rdma_worker::doca_rdma_task_send_cb(doca_rdma_task_send *task, doca_data task_user_data, doca_data ctx_user_data) noexcept { auto * const request_task = static_cast <doca_rdma_task_receive *>(task_user_data.ptr); doca_buf_reset_data_len(doca_rdma_task_receive_get_dst_buf(request_task)); auto const ret = doca_task_submit(doca_rdma_task_receive_as_task(request_task)); if (ret != DOCA_SUCCESS) { DOCA_LOG_ERR( "Failed re-submit request task: %s" , doca_error_get_name(ret)); } auto * const hot_data = static_cast <target_rdma_worker::hot_data *>(ctx_user_data.ptr); --(hot_data->in_flight_transaction_count); }

The RDMA task send callback resets the task data and resubmits the recv task allowing the transaction to be reused.