diff --git a/bench/python/dlslime_torch_dist_sendrecv_bench.py b/bench/python/dlslime_torch_dist_sendrecv_bench.py index 5ce8b79..6075b87 100644 --- a/bench/python/dlslime_torch_dist_sendrecv_bench.py +++ b/bench/python/dlslime_torch_dist_sendrecv_bench.py @@ -132,7 +132,7 @@ def benchmark_send_recv(args): parser.add_argument( "--master-addr", type=str, - default="10.102.207.84", + default="127.0.0.1", help="Master address for distributed training", ) parser.add_argument( @@ -166,4 +166,5 @@ def benchmark_send_recv(args): os.environ["NCCL_P2P_DISABLE"] = "1" os.environ["NCCL_SHM_DISABLE"] = "1" import time + benchmark_send_recv(args) diff --git a/bench/python/endpoint_sendrecv_bench.py b/bench/python/endpoint_sendrecv_bench.py index 45ac5e4..9aff246 100644 --- a/bench/python/endpoint_sendrecv_bench.py +++ b/bench/python/endpoint_sendrecv_bench.py @@ -16,30 +16,29 @@ def get_readable_size(size_in_bytes): def run_benchmark(device_type="cuda", num_qp=1, iterations=200): # 检查设备 nic_devices = available_nic() - if len(nic_devices) < 2: - print( - f"Warning: Only {len(nic_devices)} RDMA device found. Trying loopback on device 0 if possible, or script might fail." - ) - dev0 = nic_devices[0] - dev1 = nic_devices[0] # Loopback - else: - dev0 = nic_devices[0] - dev1 = nic_devices[1] - - print(f"Initializing Endpoints: Send[{dev0}] <-> Recv[{dev1}]") + dev = nic_devices[0] + + print(f"Initializing Endpoints: Send[{dev}] <-> Recv[{dev}]") print(f"Tensor Device: {device_type.upper()}") # 初始化 Endpoint - send_endpoint = _slime_c.rdma_endpoint(dev0, 1, "RoCE", num_qp) - recv_endpoint = _slime_c.rdma_endpoint(dev1, 1, "RoCE", num_qp) + + ctx = _slime_c.rdma_context() + ctx.init_rdma_context(dev, 1, "RoCE") + + worker= _slime_c.rdma_worker(dev, 0) + + send_endpoint = _slime_c.rdma_endpoint(ctx, num_qp) + recv_endpoint = _slime_c.rdma_endpoint(ctx, num_qp) + + worker.add_endpoint(send_endpoint) + worker.add_endpoint(recv_endpoint) # 建立连接 - send_endpoint.context_connect( - recv_endpoint.get_data_context_info(), recv_endpoint.get_meta_context_info() - ) - recv_endpoint.context_connect( - send_endpoint.get_data_context_info(), send_endpoint.get_meta_context_info() - ) + send_endpoint.connect(recv_endpoint.endpoint_info()) + recv_endpoint.connect(send_endpoint.endpoint_info()) + + worker.start() # 定义测试大小:2KB 到 128MB # 2KB = 2 * 1024 @@ -74,40 +73,26 @@ def run_benchmark(device_type="cuda", num_qp=1, iterations=200): ) recv_tensor = torch.zeros((size,), dtype=torch.uint8, device="cpu") - # 2. 注册 RDMA Buffer (MR 注册通常发生在这里) - send_buffer = _slime_c.rdma_buffer( - send_endpoint, - send_tensor.data_ptr(), - send_tensor.storage_offset(), - send_tensor.numel(), - ) - recv_buffer = _slime_c.rdma_buffer( - recv_endpoint, - recv_tensor.data_ptr(), - recv_tensor.storage_offset(), - recv_tensor.numel(), - ) - # 3. 预热 (Warmup) # 让 MR 建立,TLB 预热,消除第一次慢启动的影响 - warmup_iters = 10 + warmup_iters = 1 for _ in range(warmup_iters): - send_buffer = _slime_c.rdma_buffer( - send_endpoint, + send_slot = send_endpoint.send( send_tensor.data_ptr(), send_tensor.storage_offset(), send_tensor.numel(), + None, ) - recv_buffer = _slime_c.rdma_buffer( - recv_endpoint, + + recv_slot = recv_endpoint.recv( recv_tensor.data_ptr(), recv_tensor.storage_offset(), recv_tensor.numel(), + None, ) - recv_buffer.recv(None) - send_buffer.send(None) - send_buffer.wait_send() - recv_buffer.wait_recv() + + send_endpoint.wait_send(send_slot) + recv_endpoint.wait_recv(recv_slot) if device_type == "cuda": torch.cuda.synchronize() @@ -116,26 +101,22 @@ def run_benchmark(device_type="cuda", num_qp=1, iterations=200): t_start = time.perf_counter() for _ in range(iterations): - send_buffer = _slime_c.rdma_buffer( - send_endpoint, + send_slot = send_endpoint.send( send_tensor.data_ptr(), send_tensor.storage_offset(), send_tensor.numel(), + None, ) - recv_buffer = _slime_c.rdma_buffer( - recv_endpoint, + + recv_slot = recv_endpoint.recv( recv_tensor.data_ptr(), recv_tensor.storage_offset(), recv_tensor.numel(), + None, ) - # 标准 RDMA 流程:先 Post Recv,再 Post Send - recv_buffer.recv(None) - send_buffer.send(None) - # 等待完成 - # 注意:Stop-and-Wait 模式。如果是流水线模式,吞吐量会更高, - # 但这里我们测的是单次操作的 Latency - send_buffer.wait_send() - recv_buffer.wait_recv() + + send_endpoint.wait_send(send_slot) + recv_endpoint.wait_recv(recv_slot) if device_type == "cuda": torch.cuda.synchronize() diff --git a/csrc/device/host/host_only.cpp b/csrc/device/host/host_only.cpp index cc264f7..006d4e8 100644 --- a/csrc/device/host/host_only.cpp +++ b/csrc/device/host/host_only.cpp @@ -9,7 +9,7 @@ namespace device { std::shared_ptr createSignal(bool bypass) { - SLIME_LOG_INFO("create signal cpu."); + SLIME_LOG_DEBUG("create signal cpu."); return std::make_shared(); } diff --git a/csrc/engine/assignment.cpp b/csrc/engine/assignment.cpp index 6b1203a..bec1a92 100644 --- a/csrc/engine/assignment.cpp +++ b/csrc/engine/assignment.cpp @@ -8,9 +8,12 @@ namespace slime { json Assignment::dump() const { - return json{ - "Assignment", - {{"mr_key", mr_key}, {"remote_mr_key", remote_mr_key}, {"target_offset", target_offset}, {"source_offset", source_offset}, {"length", length}}}; + return json{"Assignment", + {{"mr_key", mr_key}, + {"remote_mr_key", remote_mr_key}, + {"target_offset", target_offset}, + {"source_offset", source_offset}, + {"length", length}}}; } std::ostream& operator<<(std::ostream& os, const Assignment& assignment) diff --git a/csrc/engine/rdma/CMakeLists.txt b/csrc/engine/rdma/CMakeLists.txt index da3d2c3..2e3eceb 100644 --- a/csrc/engine/rdma/CMakeLists.txt +++ b/csrc/engine/rdma/CMakeLists.txt @@ -27,9 +27,10 @@ add_library( ibv_helper.cpp memory_pool.cpp rdma_assignment.cpp + rdma_channel.cpp rdma_context.cpp rdma_endpoint_v0.cpp - rdma_buffer.cpp + rdma_worker.cpp ) target_link_libraries(_slime_rdma PUBLIC _slime_device _slime_engine numa ibverbs) diff --git a/csrc/engine/rdma/rdma_assignment.cpp b/csrc/engine/rdma/rdma_assignment.cpp index 3f0682d..2f82291 100644 --- a/csrc/engine/rdma/rdma_assignment.cpp +++ b/csrc/engine/rdma/rdma_assignment.cpp @@ -1,5 +1,6 @@ #include "rdma_assignment.h" +#include #include namespace slime { @@ -14,11 +15,6 @@ void RDMAAssign::reset(OpCode opcode, size_t qpi, AssignmentBatch& batch, callba } else { callback_ = [this](int code, int imm_data) { - if (code != 0) { - for (int i = 0; i < batch_size_; ++i) { - SLIME_LOG_ERROR("ERROR ASSIGNMENT: ", batch_[i].dump()); - } - } finished_.fetch_add(1, std::memory_order_release); }; } diff --git a/csrc/engine/rdma/rdma_assignment.h b/csrc/engine/rdma/rdma_assignment.h index b2047b1..29ddec8 100644 --- a/csrc/engine/rdma/rdma_assignment.h +++ b/csrc/engine/rdma/rdma_assignment.h @@ -43,6 +43,7 @@ static const std::map ASSIGN_OP_2_IBV_WR_OP = { struct alignas(64) RDMAAssign { static constexpr size_t MAX_ASSIGN_CAPACITY = 4096; friend class RDMAContext; + friend class RDMAChannel; friend std::ostream& operator<<(std::ostream& os, const RDMAAssign& assignment); public: diff --git a/csrc/engine/rdma/rdma_buffer.cpp b/csrc/engine/rdma/rdma_buffer.cpp deleted file mode 100644 index 0504aef..0000000 --- a/csrc/engine/rdma/rdma_buffer.cpp +++ /dev/null @@ -1,36 +0,0 @@ - -#include "engine/rdma/rdma_buffer.h" -#include "engine/assignment.h" -#include "engine/rdma/rdma_env.h" -#include "logging.h" -#include -#include - -namespace slime { - -void RDMABuffer::send(void* stream_handler) -{ - send_completed_.store(0, std::memory_order_release); - endpointv0_->addBuffer(OpCode::SEND, shared_from_this(), stream_handler); -} - -void RDMABuffer::recv(void* stream_handler) -{ - recv_completed_.store(0, std::memory_order_release); - endpointv0_->addBuffer(OpCode::RECV, shared_from_this(), stream_handler); -} - -bool RDMABuffer::waitSend() -{ - signal_->wait_comm_done_cpu((1 << num_pack_) - 1); - SLIME_LOG_DEBUG("complete to send the data."); - return true; -} - -bool RDMABuffer::waitRecv() -{ - signal_->wait_comm_done_cpu((1 << num_pack_) - 1); - SLIME_LOG_DEBUG("complete to recv the data."); - return true; -} -} // namespace slime diff --git a/csrc/engine/rdma/rdma_buffer.h b/csrc/engine/rdma/rdma_buffer.h deleted file mode 100644 index 24a0779..0000000 --- a/csrc/engine/rdma/rdma_buffer.h +++ /dev/null @@ -1,105 +0,0 @@ -#pragma once -#include "device/signal.h" -#include "engine/rdma/memory_pool.h" -#include "engine/rdma/rdma_context.h" -#include "engine/rdma/rdma_endpoint_v0.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "logging.h" -#include "rdma_common.h" - -namespace slime { - -class RDMAEndpoint; - -struct alignas(64) RDMABuffer: public std::enable_shared_from_this { - friend class RDMAEndpoint; - friend class RDMAEndpointV0; - -public: - RDMABuffer(std::shared_ptr endpoint, uintptr_t ptr, size_t offset, size_t data_size): - endpoint_(endpoint), - ptr_(ptr + offset), - offset_(0), - data_size_(data_size), - view_(storage_view_t{ptr + offset, 0, data_size}) - { - } - - RDMABuffer(std::shared_ptr endpoint, uintptr_t ptr, size_t offset, size_t data_size): - endpointv0_(endpoint), - ptr_(ptr + offset), - offset_(0), - data_size_(data_size), - view_(storage_view_t{ptr + offset, 0, data_size}) - { - } - - RDMABuffer(std::shared_ptr endpoint, storage_view_batch_t& batch): - endpoint_(endpoint), storage_view_batch_(std::move(batch)) - { - } - - ~RDMABuffer() = default; - - const size_t batch_size() - { - return storage_view_batch_.size(); - } - - const storage_view_batch_t& view_batch() - { - return storage_view_batch_; - } - - void send(void* stream_handler = nullptr); - void recv(void* stream_handler = nullptr); - - bool waitSend(); - bool waitRecv(); - -private: - std::shared_ptr endpoint_; - std::shared_ptr endpointv0_; - - uintptr_t ptr_; - size_t offset_; - size_t data_size_; - - storage_view_t view_; - - std::vector ptrs_batch_; - std::vector offset_batch_; - std::vector data_size_batch_; - - storage_view_batch_t storage_view_batch_; - - std::atomic send_pending_{0}; - std::atomic recv_pending_{0}; - - std::atomic send_completed_{0}; - std::atomic recv_completed_{0}; - - std::condition_variable send_cv_; - std::condition_variable recv_cv_; - - std::mutex send_mutex_; - std::mutex recv_mutex_; - - std::atomic slot_id_{0}; - - uint64_t num_pack_{1}; - std::shared_ptr signal_; -}; - -} // namespace slime diff --git a/csrc/engine/rdma/rdma_channel.cpp b/csrc/engine/rdma/rdma_channel.cpp new file mode 100644 index 0000000..dc7c099 --- /dev/null +++ b/csrc/engine/rdma/rdma_channel.cpp @@ -0,0 +1,313 @@ +#include "rdma_channel.h" + +namespace slime { +int32_t RDMAChannel::init(std::shared_ptr ctx, size_t num_qp, int32_t max_inline_data) +{ + reset(); + + ctx_ = ctx; + + qp_.resize(num_qp); + + send_wr_pool_.resize(num_qp); + recv_wr_pool_.resize(num_qp); + + send_sge_pool_.resize(num_qp); + recv_sge_pool_.resize(num_qp); + + local_rdma_info_.resize(num_qp); + remote_rdma_info_.resize(num_qp); + + for (int qpi = 0; qpi < num_qp; ++qpi) { + send_wr_pool_[qpi].resize(SLIME_MAX_SEND_WR); + recv_wr_pool_[qpi].resize(SLIME_MAX_RECV_WR); + + send_sge_pool_[qpi].resize(SLIME_MAX_SEND_WR); + recv_sge_pool_[qpi].resize(SLIME_MAX_RECV_WR); + + /* Create Queue Pair (QP) */ + struct ibv_qp_init_attr qp_init_attr = {}; + qp_init_attr.send_cq = ctx_->cq_; + qp_init_attr.recv_cq = ctx_->cq_; + qp_init_attr.qp_type = IBV_QPT_RC; // Reliable Connection + + if (max_inline_data == 0) { + qp_init_attr.cap.max_send_wr = SLIME_MAX_SEND_WR; + } + else { + SLIME_ASSERT(max_inline_data <= 4096, "inline data need to less than or equal to 4096"); + qp_init_attr.cap.max_send_wr = 4096; + qp_init_attr.cap.max_inline_data = max_inline_data; + } + + qp_init_attr.cap.max_recv_wr = SLIME_MAX_RECV_WR; + qp_init_attr.cap.max_send_sge = 1; + qp_init_attr.cap.max_recv_sge = 1; + qp_init_attr.sq_sig_all = false; + + qp_[qpi] = ibv_create_qp(ctx_->pd_, &qp_init_attr); + if (!qp_[qpi]) { + SLIME_LOG_ERROR( + "[" << ctx_->device_name_ << "] Failed to create QP " << qp_[qpi]->qp_num, ": ", strerror(errno)); + return -1; + } + + /* Modify QP to INIT state */ + struct ibv_qp_attr attr = {}; + attr.qp_state = IBV_QPS_INIT; + attr.port_num = ctx_->ib_port_; + attr.pkey_index = 0; + attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE; + + int flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS; + + int ret = ibv_modify_qp(qp_[qpi], &attr, flags); + if (ret) { + SLIME_LOG_ERROR("Failed to modify QP to INIT"); + } + + /* Set Packet Sequence Number (PSN) */ + srand48(time(NULL)); + int32_t psn = lrand48() & 0xffffff; + + /* Get GID */ + if (ctx_->gidx_ != -1 && ibv_query_gid(ctx_->ib_ctx_, 1, ctx_->gidx_, &ctx_->gid_)) { + SLIME_LOG_ERROR("[" << ctx_->device_name_ << "] Failed to get GID"); + } + + /* Set Local RDMA Info */ + local_rdma_info_[qpi].gidx = ctx_->gidx_; + local_rdma_info_[qpi].qpn = qp_[qpi]->qp_num; + local_rdma_info_[qpi].psn = psn; + local_rdma_info_[qpi].gid = ctx_->gid_; + local_rdma_info_[qpi].lid = ctx_->lid_; + local_rdma_info_[qpi].mtu = (uint32_t)ctx_->active_mtu_; + } + SLIME_LOG_INFO("RDMA context initialized") + SLIME_LOG_DEBUG("RDMA context local configuration: ", channelInfo()); + + state = RDMAChannelState::Initialized; + + return 0; +} + +json RDMAChannel::channelInfo() const +{ + json local_info{}; + for (int qpi = 0; qpi < local_rdma_info_.size(); qpi++) + local_info[qpi] = local_rdma_info_[qpi].to_json(); + return local_info; +} + +int32_t RDMAChannel::connect(json remote_rdma_info_json) +{ + SLIME_ASSERT(state == RDMAChannelState::Initialized, "Not Initialized or already connected"); + SLIME_ASSERT_EQ(local_rdma_info_.size(), remote_rdma_info_json.size(), "Peer must have same QP Size."); + + // construct RDMAEndpoint connection + for (int qpi = 0; qpi < local_rdma_info_.size(); qpi++) { + remote_rdma_info_[qpi] = rdma_info_t(remote_rdma_info_json[qpi]); + } + + modify_qp_to_r2r(); + modify_qp_to_r2s(); + + state = RDMAChannelState::Connected; + if (ibv_req_notify_cq(ctx_->cq_, 0)) { + SLIME_ABORT("Failed to request notify for CQ"); + } + SLIME_LOG_INFO("RDMA exchange done"); + return 0; +} + +int32_t RDMAChannel::modify_qp_to_r2r() +{ + for (int qpi = 0; qpi < local_rdma_info_.size(); qpi++) { + int ret; + struct ibv_qp_attr attr = {}; + int flags; + struct ibv_qp* qp = qp_[qpi]; + // Modify QP to Ready to Receive (RTR) state + memset(&attr, 0, sizeof(attr)); + attr.qp_state = IBV_QPS_RTR; + attr.path_mtu = + (enum ibv_mtu)std::min((uint32_t)remote_rdma_info_[qpi].mtu, (uint32_t)local_rdma_info_[qpi].mtu); + + attr.dest_qp_num = remote_rdma_info_[qpi].qpn; + attr.rq_psn = remote_rdma_info_[qpi].psn; + attr.max_dest_rd_atomic = SLIME_MAX_DEST_RD_ATOMIC; + attr.min_rnr_timer = 0x16; + attr.ah_attr.dlid = remote_rdma_info_[qpi].lid; + attr.ah_attr.sl = SLIME_SERVICE_LEVEL; + attr.ah_attr.src_path_bits = 0; + attr.ah_attr.port_num = ctx_->ib_port_; + + attr.ah_attr.is_global = 0; + attr.ah_attr.dlid = 0; + + if (local_rdma_info_[qpi].gidx == -1) { + // IB + attr.ah_attr.dlid = local_rdma_info_[qpi].lid; + } + else { + // RoCE v2 + attr.ah_attr.is_global = 1; + attr.ah_attr.grh.dgid = remote_rdma_info_[qpi].gid; + attr.ah_attr.grh.sgid_index = local_rdma_info_[qpi].gidx; + attr.ah_attr.grh.hop_limit = 1; + attr.ah_attr.grh.flow_label = 0; + attr.ah_attr.grh.traffic_class = 0; + } + + flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC + | IBV_QP_MIN_RNR_TIMER; + + ret = ibv_modify_qp(qp, &attr, flags); + if (ret) { + SLIME_ABORT("Failed to modify QP to RTR: reason: " << strerror(ret)); + } + } + return 0; +} + +int32_t RDMAChannel::modify_qp_to_r2s() +{ + for (int qpi = 0; qpi < local_rdma_info_.size(); qpi++) { + int ret; + struct ibv_qp_attr attr = {}; + int flags; + struct ibv_qp* qp = qp_[qpi]; + // Modify QP to RTS state + memset(&attr, 0, sizeof(attr)); + attr.qp_state = IBV_QPS_RTS; + attr.timeout = 14; + attr.retry_cnt = 7; + attr.rnr_retry = 7; + attr.sq_psn = local_rdma_info_[qpi].psn; + attr.max_rd_atomic = SLIME_MAX_RD_ATOMIC; + + flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN + | IBV_QP_MAX_QP_RD_ATOMIC; + + ret = ibv_modify_qp(qp, &attr, flags); + if (ret) { + SLIME_ABORT("Failed to modify QP to RTS"); + } + } + return 0; +} + +int32_t RDMAChannel::reset() +{ + for (auto& qp : qp_) { + if (qp) + ibv_destroy_qp(qp); + } + return 0; +} + +int64_t RDMAChannel::post_send_batch(int qpi, RDMAAssign* assign) +{ + int ret = 0; + size_t batch_size = assign->batch_size(); + struct ibv_send_wr* bad_wr = nullptr; + struct ibv_send_wr* wr = send_wr_pool_[qpi].data(); + struct ibv_sge* sge = send_sge_pool_[qpi].data(); + for (size_t i = 0; i < batch_size; ++i) { + + Assignment& subassign = assign->batch_[i]; + struct ibv_mr* mr = ctx_->memory_pool_->get_mr(subassign.mr_key); + sge[i].addr = (uintptr_t)mr->addr + subassign.source_offset; + sge[i].length = subassign.length; + sge[i].lkey = mr->lkey; + wr[i].wr_id = (i == batch_size - 1) ? (uintptr_t)(assign) : 0; + wr[i].opcode = ASSIGN_OP_2_IBV_WR_OP.at(assign->opcode_); + wr[i].sg_list = &sge[i]; + wr[i].num_sge = 1; + wr[i].imm_data = (i == batch_size - 1) ? assign->imm_data_ : UNDEFINED_IMM_DATA; + wr[i].send_flags = (i == batch_size - 1) ? IBV_SEND_SIGNALED : 0; + if (assign->is_inline_) + wr[i].send_flags |= IBV_SEND_INLINE; + wr[i].next = (i == batch_size - 1) ? nullptr : &wr[i + 1]; + } + ret = ibv_post_send(qp_[qpi], wr, &bad_wr); + if (ret) { + return -1; + } + return 0; +} + +int64_t RDMAChannel::post_recv_batch(int qpi, RDMAAssign* assign) +{ + int64_t ret = 0; + size_t batch_size = assign->batch_size(); + struct ibv_recv_wr* bad_wr = nullptr; + struct ibv_recv_wr* wr = recv_wr_pool_[qpi].data(); + struct ibv_sge* sge = recv_sge_pool_[qpi].data(); + for (size_t i = 0; i < batch_size; ++i) { + + Assignment& subassign = assign->batch_[i]; + struct ibv_mr* mr = ctx_->memory_pool_->get_mr(subassign.mr_key); + sge[i].addr = (uintptr_t)mr->addr + subassign.source_offset; + sge[i].length = subassign.length; + sge[i].lkey = mr->lkey; + wr[i].wr_id = (i == batch_size - 1) ? (uintptr_t)(assign) : 0; + wr[i].sg_list = &sge[i]; + wr[i].num_sge = 1; + wr[i].next = (i == batch_size - 1) ? nullptr : &wr[i + 1]; + } + ret = ibv_post_recv(qp_[qpi], wr, &bad_wr); + if (ret) { + SLIME_LOG_ERROR("Failed to post RDMA send : " << strerror(ret)); + return -1; + } + + return 0; +} + +int64_t RDMAChannel::post_rc_oneside_batch(int qpi, RDMAAssign* assign) +{ + size_t batch_size = assign->batch_size(); + struct ibv_send_wr* bad_wr = NULL; + struct ibv_send_wr* wr = send_wr_pool_[qpi].data(); + struct ibv_sge* sge = send_sge_pool_[qpi].data(); + + for (size_t i = 0; i < batch_size; ++i) { + Assignment subassign = assign->batch_[i]; + struct ibv_mr* mr = ctx_->memory_pool_->get_mr(subassign.mr_key); + remote_mr_t remote_mr = ctx_->memory_pool_->get_remote_mr(subassign.remote_mr_key); + uint64_t remote_addr = remote_mr.addr; + uint32_t remote_rkey = remote_mr.rkey; + sge[i].addr = (uint64_t)mr->addr + subassign.source_offset; + sge[i].length = subassign.length; + sge[i].lkey = mr->lkey; + wr[i].wr_id = (i == batch_size - 1) ? (uintptr_t)(assign) : 0; + + wr[i].opcode = ASSIGN_OP_2_IBV_WR_OP.at(assign->opcode_); + if (wr[i].opcode == IBV_WR_RDMA_WRITE_WITH_IMM and (i != batch_size - 1)) { + wr[i].opcode = IBV_WR_RDMA_WRITE; + } + + wr[i].sg_list = &sge[i]; + wr[i].num_sge = 1; + wr[i].imm_data = (i == batch_size - 1) ? assign->imm_data_ : UNDEFINED_IMM_DATA; + wr[i].send_flags = (i == batch_size - 1) ? IBV_SEND_SIGNALED : 0; + if (assign->is_inline_) + wr[i].send_flags |= IBV_SEND_INLINE; + wr[i].wr.rdma.remote_addr = remote_addr + assign->batch_[i].target_offset; + wr[i].wr.rdma.rkey = remote_rkey; + wr[i].next = (i == batch_size - 1) ? NULL : &wr[i + 1]; + } + int ret = 0; + { + ret = ibv_post_send(qp_[qpi], wr, &bad_wr); + } + + if (ret) { + SLIME_LOG_ERROR("Failed to post RDMA send : " << strerror(ret)); + return -1; + } + return 0; +} + +} // namespace slime diff --git a/csrc/engine/rdma/rdma_channel.h b/csrc/engine/rdma/rdma_channel.h new file mode 100644 index 0000000..d5b277d --- /dev/null +++ b/csrc/engine/rdma/rdma_channel.h @@ -0,0 +1,66 @@ +#pragma once + +#include "rdma_context.h" + +#include "json.hpp" + +namespace slime { + +enum RDMAChannelState { + Initialized, + Connected, + Destoried +}; + +class RDMAChannel { + inline static constexpr int UNDEFINED_QPI = -1; + inline static constexpr uint32_t UNDEFINED_IMM_DATA = -1; + friend class RDMAEndpointV0; + +public: + RDMAChannel() = default; + + ~RDMAChannel() + { + reset(); + } + + int32_t init(std::shared_ptr ctx, size_t num_qp, int32_t inline_size); + int32_t connect(json channel_info); + + json channelInfo() const; + + /* Async RDMA SendRecv */ + int64_t post_send_batch(int qpi, RDMAAssign* assign); + int64_t post_recv_batch(int qpi, RDMAAssign* assign); + + /* Async RDMA Read */ + int64_t post_rc_oneside_batch(int qpi, RDMAAssign* assign); + + int32_t reset(); + + inline int32_t num_channel() { + return qp_.size(); + } + +private: + int32_t modify_qp_to_r2r(); + int32_t modify_qp_to_r2s(); + + std::vector qp_{}; + + /* RDMA Exchange Information */ + std::vector remote_rdma_info_; + std::vector local_rdma_info_; + + /* polling pool */ + std::vector> send_wr_pool_; + std::vector> recv_wr_pool_; + std::vector> send_sge_pool_; + std::vector> recv_sge_pool_; + + std::shared_ptr ctx_{}; + + RDMAChannelState state{RDMAChannelState::Destoried}; +}; +} // namespace slime diff --git a/csrc/engine/rdma/rdma_context.cpp b/csrc/engine/rdma/rdma_context.cpp index d527b6b..f6cff7c 100644 --- a/csrc/engine/rdma/rdma_context.cpp +++ b/csrc/engine/rdma/rdma_context.cpp @@ -22,6 +22,7 @@ #include "engine/rdma/ibv_helper.h" #include "engine/rdma/memory_pool.h" #include "engine/rdma/rdma_assignment.h" +#include "engine/rdma/rdma_channel.h" #include "engine/rdma/rdma_config.h" #include "engine/rdma/rdma_env.h" #include "engine/rdma/rdma_utils.h" @@ -32,25 +33,31 @@ namespace slime { +RDMAContext::~RDMAContext() +{ + stop_future(); + + if (cq_) + ibv_destroy_cq(cq_); + + if (pd_) + ibv_dealloc_pd(pd_); + + if (ib_ctx_) + ibv_close_device(ib_ctx_); + + SLIME_LOG_DEBUG("RDMAContext deconstructed") +} + int64_t RDMAContext::init(const std::string& dev_name, uint8_t ib_port, const std::string& link_type) { device_name_ = dev_name; - uint16_t lid; - enum ibv_mtu active_mtu; - union ibv_gid gid; - int64_t gidx; - uint32_t psn; SLIME_LOG_INFO("Initializing RDMA Context ..."); SLIME_LOG_DEBUG("device name: " << dev_name); SLIME_LOG_DEBUG("ib port: " << int{ib_port}); SLIME_LOG_DEBUG("link type: " << link_type); - if (initialized_) { - SLIME_LOG_ERROR("Already initialized."); - return -1; - } - /* Get RDMA Device Info */ struct ibv_device** dev_list; struct ibv_device* ib_dev; @@ -127,22 +134,22 @@ int64_t RDMAContext::init(const std::string& dev_name, uint8_t ib_port, const st } if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { - gidx = -1; + gidx_ = -1; } else { if (SLIME_GID_INDEX > 0) - gidx = SLIME_GID_INDEX; + gidx_ = SLIME_GID_INDEX; else - gidx = ibv_find_sgid_type(ib_ctx_, ib_port_, ibv_gid_type_custom::IBV_GID_TYPE_ROCE_V2, AF_INET); - if (gidx < 0) { + gidx_ = ibv_find_sgid_type(ib_ctx_, ib_port_, ibv_gid_type_custom::IBV_GID_TYPE_ROCE_V2, AF_INET); + if (gidx_ < 0) { SLIME_ABORT("Failed to find GID"); } } - SLIME_LOG_DEBUG("Set GID INDEX to " << gidx); + SLIME_LOG_DEBUG("Set GID INDEX to " << gidx_); - lid = port_attr.lid; - active_mtu = port_attr.active_mtu; + lid_ = port_attr.lid; + active_mtu_ = port_attr.active_mtu; /* Alloc Protected Domain (PD) */ pd_ = ibv_alloc_pd(ib_ctx_); @@ -158,177 +165,7 @@ int64_t RDMAContext::init(const std::string& dev_name, uint8_t ib_port, const st cq_ = ibv_create_cq(ib_ctx_, SLIME_MAX_CQ_DEPTH, NULL, comp_channel_, 0); SLIME_ASSERT(cq_, "create CQ failed"); - for (int qpi = 0; qpi < qp_list_len_; ++qpi) { - qp_management_t* qp_man = qp_management_[qpi]; - qp_man->send_wr_pool_.resize(SLIME_MAX_SEND_WR); - qp_man->send_sge_pool_.resize(SLIME_MAX_SEND_WR); - qp_man->recv_wr_pool_.resize(SLIME_MAX_RECV_WR); - qp_man->recv_sge_pool_.resize(SLIME_MAX_RECV_WR); - - if (posix_memalign((void**)(&qp_man->assign_pool_), 64, qp_man->poolSize() * sizeof(RDMAAssign)) != 0) { - SLIME_ABORT("Failed to allocate ring memory"); - } - memset(qp_man->assign_pool_, 0, qp_man->poolSize()); - - /* init ring */ - ssize_t mem_size = jring_get_buf_ring_size(sizeof(void*), BACKPRESSURE_BUFFER_SIZE); - if (posix_memalign(&qp_man->ring_memory_, 64, mem_size) != 0) { - SLIME_ABORT("Failed to allocate ring memory"); - } - - qp_man->overflow_ring_ = (struct jring*)qp_man->ring_memory_; - - if (jring_init(qp_man->overflow_ring_, BACKPRESSURE_BUFFER_SIZE, sizeof(void*), 1, 0) < 0) { - SLIME_ABORT("jring init failed"); - } - - /* Create Queue Pair (QP) */ - struct ibv_qp_init_attr qp_init_attr = {}; - qp_init_attr.send_cq = cq_; - qp_init_attr.recv_cq = cq_; - qp_init_attr.qp_type = IBV_QPT_RC; // Reliable Connection - - if (max_num_inline_data_ == 0) { - qp_init_attr.cap.max_send_wr = SLIME_MAX_SEND_WR; - } - else { - SLIME_ASSERT(max_num_inline_data_ <= 4096, "inline data need to less than or equal to 4096"); - qp_init_attr.cap.max_send_wr = 4096; - qp_init_attr.cap.max_inline_data = max_num_inline_data_; - } - - qp_init_attr.cap.max_recv_wr = SLIME_MAX_RECV_WR; - qp_init_attr.cap.max_send_sge = 1; - qp_init_attr.cap.max_recv_sge = 1; - qp_init_attr.sq_sig_all = false; - rdma_info_t& local_rdma_info = qp_man->local_rdma_info_; - qp_man->qp_ = ibv_create_qp(pd_, &qp_init_attr); - if (!qp_man->qp_) { - SLIME_LOG_ERROR("Failed to create QP " << qp_man->qp_->qp_num, ": ", strerror(errno)); - return -1; - } - - /* Modify QP to INIT state */ - struct ibv_qp_attr attr = {}; - attr.qp_state = IBV_QPS_INIT; - attr.port_num = ib_port_; - attr.pkey_index = 0; - attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE; - - int flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS; - - int ret = ibv_modify_qp(qp_man->qp_, &attr, flags); - if (ret) { - SLIME_LOG_ERROR("Failed to modify QP to INIT"); - } - - /* Set Packet Sequence Number (PSN) */ - psn = lrand48() & 0xffffff; - - /* Get GID */ - if (gidx != -1 && ibv_query_gid(ib_ctx_, 1, gidx, &gid)) { - SLIME_LOG_ERROR("Failed to get GID"); - } - - /* Set Local RDMA Info */ - local_rdma_info.gidx = gidx; - local_rdma_info.qpn = qp_man->qp_->qp_num; - local_rdma_info.psn = psn; - local_rdma_info.gid = gid; - local_rdma_info.lid = lid; - local_rdma_info.mtu = (uint32_t)active_mtu; - } - SLIME_LOG_INFO("RDMA context initialized") - SLIME_LOG_DEBUG("RDMA context local configuration: ", endpoint_info()); - - initialized_ = true; - - return 0; -} - -int64_t RDMAContext::connect(const json& endpoint_info_json) -{ - SLIME_LOG_INFO("RDMA context remote connecting"); - SLIME_LOG_DEBUG("RDMA context remote configuration: ", endpoint_info_json); - // Register Remote Memory Region - for (auto& item : endpoint_info_json["mr_info"].items()) { - registerRemoteMemoryRegion(item.value()["mr_key"].get(), item.value()); - } - SLIME_ASSERT(!connected_, "Already connected!"); - SLIME_ASSERT_EQ(qp_list_len_, endpoint_info_json["rdma_info"].size(), "Peer must have same QP Size."); - - // construct RDMAEndpoint connection - for (int qpi = 0; qpi < qp_list_len_; qpi++) { - int ret; - struct ibv_qp_attr attr = {}; - int flags; - qp_management_t* qp_man = qp_management_[qpi]; - struct ibv_qp* qp = qp_man->qp_; - rdma_info_t& local_rdma_info = qp_man->local_rdma_info_; - rdma_info_t& remote_rdma_info = qp_man->remote_rdma_info_; - remote_rdma_info = rdma_info_t(endpoint_info_json["rdma_info"][qpi]); - - // Modify QP to Ready to Receive (RTR) state - memset(&attr, 0, sizeof(attr)); - attr.qp_state = IBV_QPS_RTR; - attr.path_mtu = (enum ibv_mtu)std::min((uint32_t)remote_rdma_info.mtu, (uint32_t)local_rdma_info.mtu); - attr.dest_qp_num = remote_rdma_info.qpn; - attr.rq_psn = remote_rdma_info.psn; - attr.max_dest_rd_atomic = SLIME_MAX_DEST_RD_ATOMIC; - attr.min_rnr_timer = 0x16; - attr.ah_attr.dlid = remote_rdma_info.lid; - attr.ah_attr.sl = SLIME_SERVICE_LEVEL; - attr.ah_attr.src_path_bits = 0; - attr.ah_attr.port_num = ib_port_; - - attr.ah_attr.is_global = 0; - attr.ah_attr.dlid = 0; - - if (local_rdma_info.gidx == -1) { - // IB - attr.ah_attr.dlid = local_rdma_info.lid; - } - else { - // RoCE v2 - attr.ah_attr.is_global = 1; - attr.ah_attr.grh.dgid = remote_rdma_info.gid; - attr.ah_attr.grh.sgid_index = local_rdma_info.gidx; - attr.ah_attr.grh.hop_limit = 1; - attr.ah_attr.grh.flow_label = 0; - attr.ah_attr.grh.traffic_class = 0; - } - - flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC - | IBV_QP_MIN_RNR_TIMER; - - ret = ibv_modify_qp(qp, &attr, flags); - if (ret) { - SLIME_ABORT("Failed to modify QP to RTR: reason: " << strerror(ret)); - } - - // Modify QP to RTS state - memset(&attr, 0, sizeof(attr)); - attr.qp_state = IBV_QPS_RTS; - attr.timeout = 14; - attr.retry_cnt = 7; - attr.rnr_retry = 7; - attr.sq_psn = local_rdma_info.psn; - attr.max_rd_atomic = SLIME_MAX_RD_ATOMIC; - - flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN - | IBV_QP_MAX_QP_RD_ATOMIC; - - ret = ibv_modify_qp(qp, &attr, flags); - if (ret) { - SLIME_ABORT("Failed to modify QP to RTS"); - } - SLIME_LOG_INFO("RDMA exchange done"); - connected_ = true; - - if (ibv_req_notify_cq(cq_, 0)) { - SLIME_ABORT("Failed to request notify for CQ"); - } - } + launch_future(); return 0; } @@ -345,288 +182,15 @@ void RDMAContext::stop_future() if (!stop_cq_thread_ && cq_thread_.joinable()) { stop_cq_thread_ = true; - // create fake wr to wake up cq thread - ibv_req_notify_cq(cq_, 0); - struct ibv_sge sge; - memset(&sge, 0, sizeof(sge)); - sge.addr = (uintptr_t)this; - sge.length = sizeof(*this); - sge.lkey = 0; - - struct ibv_send_wr send_wr; - memset(&send_wr, 0, sizeof(send_wr)); - // send_wr.wr_id = (uintptr_t)this; - send_wr.wr_id = 0; - send_wr.sg_list = &sge; - send_wr.num_sge = 1; - send_wr.opcode = IBV_WR_SEND; - send_wr.send_flags = IBV_SEND_SIGNALED; - - struct ibv_send_wr* bad_send_wr; - { - std::unique_lock lock(qp_management_[0]->rdma_post_send_mutex_); - ibv_post_send(qp_management_[0]->qp_, &send_wr, &bad_send_wr); - } // wait thread done cq_thread_.join(); } } -std::shared_ptr RDMAContext::submit( - OpCode opcode, AssignmentBatch& batch, callback_fn_t callback, int qpi, int32_t imm_data, bool is_inline) -{ - // Step 1: Split by max length - size_t length = SLIME_MAX_LENGTH_PER_ASSIGNMENT; - AssignmentBatch batch_split; - split_assign_by_max_length(opcode, batch, batch_split, length); - AssignmentBatch batch_after_agg_qp; - while (batch_split.size() < SLIME_AGG_QP_NUM) { - length = length / 2; - split_assign_by_max_length(opcode, batch_split, batch_after_agg_qp, length); - batch_split = std::move(batch_after_agg_qp); - } - batch_after_agg_qp = std::move(batch_after_agg_qp); - - std::vector agg_qpi_list; - if (qpi == UNDEFINED_QPI) { - agg_qpi_list = select_qpi(SLIME_AGG_QP_NUM); - } - else { - for (int i = 0; i < SLIME_AGG_QP_NUM; ++i) { - agg_qpi_list.push_back(qpi % qp_list_len_); - qpi += 1; - } - } - - SLIME_ASSERT(batch_split.size() >= SLIME_AGG_QP_NUM, "batch_split.size() < SLIME_AGG_QP_NUM"); - - std::vector qp_batch; - nsplit_assign_by_step(opcode, batch_split, qp_batch, SLIME_AGG_QP_NUM); - - std::vector assigns; - for (int agg_idx = 0; agg_idx < SLIME_AGG_QP_NUM; ++agg_idx) { - size_t agg_qpi = agg_qpi_list[agg_idx]; - std::vector batch_split_after_cq_depth; - split_assign_by_step(opcode, qp_batch[agg_idx], batch_split_after_cq_depth, SLIME_MAX_CQ_DEPTH / 2); - - size_t split_size_this_qp = batch_split_after_cq_depth.size(); - for (int i = 0; i < split_size_this_qp; ++i) { - qp_management_t* qp_man = qp_management_[agg_qpi]; - callback_fn_t split_callback = (i == split_size_this_qp - 1 ? callback : [](int, int) { return 0; }); - uint32_t raw_idx = qp_man->assign_slot_id_.fetch_add(1, std::memory_order_release); - uint32_t slot_id = raw_idx % qp_man->poolSize(); - RDMAAssign* assign_ptr = &(qp_man->assign_pool_[slot_id]); - assign_ptr->reset(opcode, agg_qpi, batch_split_after_cq_depth[i], split_callback, is_inline); - - assign_ptr->with_imm_data_ = (i == split_size_this_qp - 1) ? (imm_data != UNDEFINED_IMM_DATA) : false; - assign_ptr->imm_data_ = (i == split_size_this_qp - 1) ? imm_data : UNDEFINED_IMM_DATA; - auto& outstanding = qp_man->qp_outstanding_; - - bool is_fast_path = true; - if (with_backpressure_) { - bool ring_is_empty = jring_empty(qp_man->overflow_ring_); - int current_outstanding = outstanding.load(std::memory_order_relaxed); - is_fast_path = ring_is_empty && current_outstanding < SLIME_MAX_CQ_DEPTH - assign_ptr->batch_size(); - } - if (is_fast_path) { - switch (opcode) { - case OpCode::WRITE: - case OpCode::WRITE_WITH_IMM: - case OpCode::READ: { - post_rc_oneside_batch(agg_qpi, assign_ptr); - break; - } - case OpCode::SEND: - case OpCode::SEND_WITH_IMM: { - post_send_batch(agg_qpi, assign_ptr); - break; - } - case OpCode::RECV: { - post_recv_batch(agg_qpi, assign_ptr); - break; - } - default: - SLIME_ABORT("Unknown OpCode"); - } - assigns.push_back(assign_ptr); - continue; - } - unsigned int free_space; - int ret = jring_enqueue_burst(qp_man->overflow_ring_, - (const void*)&assign_ptr, - 1, - &free_space); - - if (ret != 1) { - do { - _mm_pause(); - ret = jring_enqueue_burst(qp_man->overflow_ring_, (const void*)&assign_ptr, 1, &free_space); - } while (ret != 1); - } - - assigns.push_back(assign_ptr); - } - } - return std::make_shared(assigns); -} - -int64_t RDMAContext::post_send_batch(int qpi, RDMAAssign* assign) -{ - int ret = 0; - qp_management_t* qp_man = qp_management_[qpi]; - size_t batch_size = assign->batch_size(); - struct ibv_send_wr* bad_wr = nullptr; - struct ibv_send_wr* wr = qp_man->send_wr_pool_.data(); - struct ibv_sge* sge = qp_man->send_sge_pool_.data(); - for (size_t i = 0; i < batch_size; ++i) { - - Assignment& subassign = assign->batch_[i]; - struct ibv_mr* mr = memory_pool_->get_mr(subassign.mr_key); - sge[i].addr = (uintptr_t)mr->addr + subassign.source_offset; - sge[i].length = subassign.length; - sge[i].lkey = mr->lkey; - wr[i].wr_id = (i == batch_size - 1) ? (uintptr_t)(assign) : 0; - wr[i].opcode = ASSIGN_OP_2_IBV_WR_OP.at(assign->opcode_); - wr[i].sg_list = &sge[i]; - wr[i].num_sge = 1; - wr[i].imm_data = (i == batch_size - 1) ? assign->imm_data_ : UNDEFINED_IMM_DATA; - wr[i].send_flags = (i == batch_size - 1) ? IBV_SEND_SIGNALED : 0; - if (assign->is_inline_) - wr[i].send_flags |= IBV_SEND_INLINE; - wr[i].next = (i == batch_size - 1) ? nullptr : &wr[i + 1]; - } - { - std::unique_lock lock(qp_management_[qpi]->rdma_post_send_mutex_); - if (with_backpressure_) - qp_management_[qpi]->qp_outstanding_.fetch_add(batch_size, std::memory_order_release); - ret = ibv_post_send(qp_management_[qpi]->qp_, wr, &bad_wr); - } - if (ret) { - SLIME_LOG_ERROR("Failed to post RDMA send : " << strerror(ret)); - if (with_backpressure_) - qp_management_[qpi]->qp_outstanding_.fetch_sub(batch_size, std::memory_order_release); - return -1; - } - return 0; -} - -int64_t RDMAContext::post_recv_batch(int qpi, RDMAAssign* assign) -{ - int64_t ret = 0; - qp_management_t* qp_man = qp_management_[qpi]; - size_t batch_size = assign->batch_size(); - struct ibv_recv_wr* bad_wr = nullptr; - struct ibv_recv_wr* wr = qp_man->recv_wr_pool_.data(); - struct ibv_sge* sge = qp_man->recv_sge_pool_.data(); - for (size_t i = 0; i < batch_size; ++i) { - - Assignment& subassign = assign->batch_[i]; - struct ibv_mr* mr = memory_pool_->get_mr(subassign.mr_key); - sge[i].addr = (uintptr_t)mr->addr + subassign.source_offset; - sge[i].length = subassign.length; - sge[i].lkey = mr->lkey; - wr[i].wr_id = (i == batch_size - 1) ? (uintptr_t)(assign) : 0; - wr[i].sg_list = &sge[i]; - wr[i].num_sge = 1; - wr[i].next = (i == batch_size - 1) ? nullptr : &wr[i + 1]; - } - { - std::unique_lock lock(qp_management_[qpi]->rdma_post_send_mutex_); - if (with_backpressure_) - qp_management_[qpi]->qp_outstanding_.fetch_add(batch_size, std::memory_order_relaxed); - ret = ibv_post_recv(qp_management_[qpi]->qp_, wr, &bad_wr); - } - if (ret) { - SLIME_LOG_ERROR("Failed to post RDMA send : " << strerror(ret)); - if (with_backpressure_) - qp_management_[qpi]->qp_outstanding_.fetch_sub(batch_size, std::memory_order_relaxed); - return -1; - } - - return 0; -} - -int64_t RDMAContext::post_rc_oneside_batch(int qpi, RDMAAssign* assign) -{ - qp_management_t* qp_man = qp_management_[qpi]; - size_t batch_size = assign->batch_size(); - struct ibv_send_wr* bad_wr = NULL; - struct ibv_send_wr* wr = qp_man->send_wr_pool_.data(); - struct ibv_sge* sge = qp_man->send_sge_pool_.data(); - - for (size_t i = 0; i < batch_size; ++i) { - Assignment subassign = assign->batch_[i]; - struct ibv_mr* mr = memory_pool_->get_mr(subassign.mr_key); - remote_mr_t remote_mr = memory_pool_->get_remote_mr(subassign.remote_mr_key); - uint64_t remote_addr = remote_mr.addr; - uint32_t remote_rkey = remote_mr.rkey; - sge[i].addr = (uint64_t)mr->addr + subassign.source_offset; - sge[i].length = subassign.length; - sge[i].lkey = mr->lkey; - wr[i].wr_id = (i == batch_size - 1) ? (uintptr_t)(assign) : 0; - - wr[i].opcode = ASSIGN_OP_2_IBV_WR_OP.at(assign->opcode_); - if (wr[i].opcode == IBV_WR_RDMA_WRITE_WITH_IMM and (i != batch_size - 1)) { - wr[i].opcode = IBV_WR_RDMA_WRITE; - } - - wr[i].sg_list = &sge[i]; - wr[i].num_sge = 1; - wr[i].imm_data = (i == batch_size - 1) ? assign->imm_data_ : UNDEFINED_IMM_DATA; - wr[i].send_flags = (i == batch_size - 1) ? IBV_SEND_SIGNALED : 0; - if (assign->is_inline_) - wr[i].send_flags |= IBV_SEND_INLINE; - wr[i].wr.rdma.remote_addr = remote_addr + assign->batch_[i].target_offset; - wr[i].wr.rdma.rkey = remote_rkey; - wr[i].next = (i == batch_size - 1) ? NULL : &wr[i + 1]; - } - int ret = 0; - { - if (with_backpressure_) { - qp_management_[qpi]->qp_outstanding_.fetch_add(assign->batch_size(), std::memory_order_release); - } - ret = ibv_post_send(qp_management_[qpi]->qp_, wr, &bad_wr); - } - - if (ret) { - SLIME_LOG_ERROR("Failed to post RDMA send : " << strerror(ret)); - return -1; - } - return 0; -} - -void RDMAContext::drain_submission_queue(int qpi) -{ - qp_management_t* qp_man = qp_management_[qpi]; - - if (jring_empty(qp_man->overflow_ring_)) - return; - - const int BURST = 16; - RDMAAssign* burst_ptr[BURST]; - unsigned int available; - - while (qp_man->qp_outstanding_.load(std::memory_order_relaxed) < SLIME_MAX_CQ_DEPTH) { - int n = jring_dequeue_burst(qp_man->overflow_ring_, (void*)burst_ptr, BURST, &available); - - if (n == 0) - break; - - for (int i = 0; i < n; ++i) { - RDMAAssign* assign = burst_ptr[i]; - post_send_batch(qpi, assign); - } - } -} - int64_t RDMAContext::cq_poll_handle() { SLIME_LOG_INFO("Polling CQ"); - if (!connected_) { - SLIME_LOG_ERROR("Start CQ handle before connected, please construct first"); - return -1; - } if (comp_channel_ == NULL) SLIME_LOG_ERROR("comp_channel_ should be constructed"); while (!stop_cq_thread_) { @@ -638,19 +202,24 @@ int64_t RDMAContext::cq_poll_handle() RDMAAssign::CALLBACK_STATUS status_code = RDMAAssign::SUCCESS; if (wc[i].status != IBV_WC_SUCCESS) { status_code = RDMAAssign::FAILED; - SLIME_LOG_ERROR("WR failed with status: ", - ibv_wc_status_str(wc[i].status), - ", vi vendor err: ", - wc[i].vendor_err); + if (wc[i].status != IBV_WC_WR_FLUSH_ERR) { + if (wc[i].wr_id != 0) { + RDMAAssign* assign = reinterpret_cast(wc[i].wr_id); + for (int i = 0; i < assign->batch_size_; ++i) { + SLIME_LOG_ERROR("ERROR ASSIGNMENT: Batch: ", assign->batch_[i].dump()); + } + SLIME_LOG_ERROR("ERROR OpCode: , ", uint64_t(assign->opcode_)); + } + SLIME_LOG_ERROR("WR failed with status: ", + ibv_wc_status_str(wc[i].status), + ", vi vendor err: ", + wc[i].vendor_err); + } } if (wc[i].wr_id != 0) { RDMAAssign* assign = reinterpret_cast(wc[i].wr_id); assign->callback_(status_code, wc[i].imm_data); size_t batch_size = assign->batch_size_; - if (with_backpressure_) { - qp_management_[assign->qpi_]->qp_outstanding_.fetch_sub(batch_size, std::memory_order_release); - drain_submission_queue(assign->qpi_); - } } } } diff --git a/csrc/engine/rdma/rdma_context.h b/csrc/engine/rdma/rdma_context.h index 1e91349..b288f08 100644 --- a/csrc/engine/rdma/rdma_context.h +++ b/csrc/engine/rdma/rdma_context.h @@ -28,68 +28,24 @@ namespace slime { +class RDMAChannel; + using json = nlohmann::json; -class RDMAContext { +class RDMAContext: public std::enable_shared_from_this { friend class RDMAEndpoint; // RDMA Endpoint need to use the register memory pool in context friend class RDMAEndpointV0; + friend class RDMAChannel; public: /* A context of rdma QP. */ - RDMAContext() - { - SLIME_LOG_DEBUG("Initializing qp management, num qp: " << SLIME_QP_NUM); - - qp_list_len_ = SLIME_QP_NUM; - qp_management_ = new qp_management_t*[qp_list_len_]; - for (int qpi = 0; qpi < qp_list_len_; qpi++) { - qp_management_[qpi] = new qp_management_t(); - } - - /* random initialization for psn configuration */ - srand48(time(NULL)); - } - - RDMAContext(size_t qp_num, size_t max_num_inline_data = 0, bool with_backpressure = true) - { - SLIME_LOG_DEBUG("Initializing qp management, num qp: " << qp_num); - qp_list_len_ = qp_num; - max_num_inline_data_ = max_num_inline_data; + RDMAContext() = default; - with_backpressure_ = with_backpressure; - - qp_management_ = new qp_management_t*[qp_list_len_]; - for (int qpi = 0; qpi < qp_list_len_; qpi++) { - qp_management_[qpi] = new qp_management_t(); - } - - /* random initialization for psn configuration */ - srand48(time(NULL)); - } - - ~RDMAContext() - { - stop_future(); - for (int qpi = 0; qpi < qp_list_len_; qpi++) { - delete qp_management_[qpi]; - } - delete[] qp_management_; - - if (cq_) - ibv_destroy_cq(cq_); - - if (pd_) - ibv_dealloc_pd(pd_); - - if (ib_ctx_) - ibv_close_device(ib_ctx_); - - SLIME_LOG_DEBUG("RDMAContext deconstructed") - } + ~RDMAContext(); struct ibv_mr* get_mr(const uintptr_t& mr_key) { @@ -105,19 +61,19 @@ class RDMAContext { int64_t init(const std::string& dev_name, uint8_t ib_port, const std::string& link_type); /* Memory Allocation */ - inline int64_t registerMemoryRegion(const uintptr_t& mr_key, uintptr_t data_ptr, size_t length) + inline int64_t registerOrAccessMemoryRegion(const uintptr_t& mr_key, uintptr_t data_ptr, size_t length) { memory_pool_->registerMemoryRegion(mr_key, data_ptr, length); return 0; } - inline int registerRemoteMemoryRegion(const uintptr_t& mr_key, uintptr_t addr, size_t length, uint32_t rkey) + inline int registerOrAccessRemoteMemoryRegion(const uintptr_t& mr_key, uintptr_t addr, size_t length, uint32_t rkey) { memory_pool_->registerRemoteMemoryRegion(mr_key, addr, length, rkey); return 0; } - inline int64_t registerRemoteMemoryRegion(const uintptr_t& mr_key, json mr_info) + inline int64_t registerOrAccessRemoteMemoryRegion(const uintptr_t& mr_key, json mr_info) { memory_pool_->registerRemoteMemoryRegion(mr_key, mr_info); return 0; @@ -135,41 +91,9 @@ class RDMAContext { return 0; } - /* RDMA Link Construction */ - int64_t connect(const json& endpoint_info_json); - /* Submit an assignment */ - std::shared_ptr submit(OpCode opcode, - AssignmentBatch& assignment, - callback_fn_t callback = nullptr, - int qpi = UNDEFINED_QPI, - int32_t imm_data = UNDEFINED_IMM_DATA, - bool is_inline = false); - void launch_future(); void stop_future(); - json local_rdma_info() const - { - json local_info{}; - for (int qpi = 0; qpi < qp_list_len_; qpi++) - local_info[qpi] = qp_management_[qpi]->local_rdma_info_.to_json(); - return local_info; - } - - json remote_rdma_info() const - { - json remote_info{}; - for (int qpi = 0; qpi < qp_list_len_; qpi++) - remote_info[qpi] = qp_management_[qpi]->remote_rdma_info_.to_json(); - return remote_info; - } - - json endpoint_info() const - { - json endpoint_info = json{{"rdma_info", local_rdma_info()}, {"mr_info", memory_pool_->mr_info()}}; - return endpoint_info; - } - std::string get_dev_ib() const { return "@" + device_name_ + "#" + std::to_string(ib_port_); @@ -182,8 +106,8 @@ class RDMAContext { } private: - inline static constexpr int UNDEFINED_QPI = -1; - inline static constexpr uint32_t UNDEFINED_IMM_DATA = -1; + inline static constexpr int UNDEFINED_QPI = -1; + inline static constexpr uint32_t UNDEFINED_IMM_DATA = -1; inline static constexpr uint32_t BACKPRESSURE_BUFFER_SIZE = 8192; std::string device_name_ = ""; @@ -195,55 +119,23 @@ class RDMAContext { struct ibv_cq* cq_ = nullptr; uint8_t ib_port_ = -1; size_t max_num_inline_data_{0}; + uint16_t lid_; + enum ibv_mtu active_mtu_; + union ibv_gid gid_; + int64_t gidx_; std::unique_ptr memory_pool_; - typedef struct qp_management { - /* queue peer list */ - struct ibv_qp* qp_{nullptr}; - - /* RDMA Exchange Information */ - rdma_info_t remote_rdma_info_; - rdma_info_t local_rdma_info_; - - /* Send Mutex */ - std::mutex rdma_post_send_mutex_; - - /* Assignment Queue */ - struct jring* overflow_ring_; - void* ring_memory_; - - std::atomic assign_slot_id_{0}; - RDMAAssign* assign_pool_; - std::atomic qp_outstanding_{0}; - /* polling pool */ - std::vector send_wr_pool_; - std::vector recv_wr_pool_; - std::vector send_sge_pool_; - std::vector recv_sge_pool_; - - ~qp_management() - { - if (qp_) - ibv_destroy_qp(qp_); - free(assign_pool_); - } - inline size_t poolSize() { - return BACKPRESSURE_BUFFER_SIZE + SLIME_MAX_CQ_DEPTH * 2; - } - } qp_management_t; - - size_t qp_list_len_{1}; - qp_management_t** qp_management_; + int32_t num_qp_; + int32_t last_qp_selection_{-1}; - int last_qp_selection_{-1}; std::vector select_qpi(int num) { std::vector agg_qpi; // Simplest round robin, we could enrich it in the future for (int i = 0; i < num; ++i) { - last_qp_selection_ = (last_qp_selection_ + 1) % qp_list_len_; + last_qp_selection_ = (last_qp_selection_ + 1) % num_qp_; agg_qpi.push_back(last_qp_selection_); } @@ -254,10 +146,6 @@ class RDMAContext { // TODO: multi cq handlers. } cq_management_t; - /* State Management */ - bool initialized_ = false; - bool connected_ = false; - /* async cq handler */ std::thread cq_thread_; std::atomic stop_cq_thread_{false}; @@ -265,17 +153,7 @@ class RDMAContext { /* Completion Queue Polling */ int64_t cq_poll_handle(); - /* Async RDMA SendRecv */ - int64_t post_send_batch(int qpi, RDMAAssign* assign); - int64_t post_recv_batch(int qpi, RDMAAssign* assign); - - /* Async RDMA Read */ - int64_t post_rc_oneside_batch(int qpi, RDMAAssign* assign); - int64_t service_level_{0}; - - bool with_backpressure_; - void drain_submission_queue(int qpi); }; } // namespace slime diff --git a/csrc/engine/rdma/rdma_endpoint_v0.cpp b/csrc/engine/rdma/rdma_endpoint_v0.cpp index c0b657c..2b14970 100644 --- a/csrc/engine/rdma/rdma_endpoint_v0.cpp +++ b/csrc/engine/rdma/rdma_endpoint_v0.cpp @@ -3,7 +3,7 @@ #include "device/device_api.h" #include "engine/assignment.h" #include "engine/rdma/rdma_assignment.h" -#include "engine/rdma/rdma_buffer.h" +#include "engine/rdma/rdma_channel.h" #include "engine/rdma/rdma_common.h" #include "engine/rdma/rdma_context.h" #include "engine/rdma/rdma_env.h" @@ -26,13 +26,15 @@ jring_t* RDMAEndpointV0::createRing(const char* name, size_t count) size_t ring_sz = jring_get_buf_ring_size(sizeof(void*), count); void* mem = nullptr; + // Align to 64 bytes to match cache line size, preventing false sharing. if (posix_memalign(&mem, 64, ring_sz) != 0) { throw std::runtime_error(std::string("Failed to allocate ring memory: ") + name); } jring_t* r = (jring_t*)mem; - // Initialize ring: MP=1 (Multi-Producer safe), MC=1 (Multi-Consumer safe) + // Initialize ring: MP=1 (Multi-Producer safe), MC=1 (Multi-Consumer safe). + // This allows multiple threads to enqueue requests if needed. if (jring_init(r, count, sizeof(void*), 1, 1) < 0) { free(mem); throw std::runtime_error(std::string("Failed to init ring: ") + name); @@ -47,103 +49,65 @@ void RDMAEndpointV0::freeRing(jring_t* ring) } } -RDMAEndpointV0::RDMAEndpointV0(const std::string& dev_name, - size_t ib_port, - const std::string& link_type, - size_t qp_nums) +RDMAEndpointV0::RDMAEndpointV0(std::shared_ptr ctx, size_t num_qp): ctx_(ctx), num_qp_(num_qp) { SLIME_LOG_INFO("Init RDMAEndpointV0 Contexts and Devices..."); SLIME_LOG_INFO("bypass Signal: ", SLIME_BYPASS_DEVICE_SIGNAL); if (SLIME_BYPASS_DEVICE_SIGNAL) bypass_signal_ = true; - qp_nums_ = qp_nums; - // Initialize RDMA Contexts. - data_ctx_ = std::make_shared(qp_nums, 0, false); // qp_num, inline_size, bypass_dispatcher - meta_ctx_ = std::make_shared(1, 256, false); // qp_num, inline_size, bypass_dispatcher + num_qp_ = num_qp; + + // Aggregation logic is not supported in V0 Send/Recv mode. SLIME_ASSERT(1 == SLIME_AGG_QP_NUM, "cannot aggqp when sendrecv"); SLIME_ASSERT(64 > SLIME_QP_NUM, "QP NUM must less than 64"); - data_ctx_->init(dev_name, ib_port, link_type); - meta_ctx_->init(dev_name, ib_port, link_type); - - data_ctx_qp_num_ = data_ctx_->qp_list_len_; - meta_ctx_qp_num_ = meta_ctx_->qp_list_len_; - - SLIME_LOG_INFO("Data Plane QP Num: ", data_ctx_qp_num_); - SLIME_LOG_INFO("Control Plane QP Num: ", meta_ctx_qp_num_); - - size_t meta_buffer_size = sizeof(meta_info_t) * MAX_FIFO_DEPTH; - + // Allocate dummy buffer for Immediate Data payload or signaling. void* dummy_mem = nullptr; if (posix_memalign(&dummy_mem, 64, sizeof(int64_t)) != 0) throw std::runtime_error("dummy alloc fail"); dummy_ = (int64_t*)dummy_mem; - void* remote_raw_mem = nullptr; - if (posix_memalign(&remote_raw_mem, 64, meta_buffer_size) != 0) + // Allocate context pools aligned to cache lines. + void* raw_send_ctx = nullptr; + if (posix_memalign(&raw_send_ctx, 64, sizeof(SendContext) * MAX_FIFO_DEPTH) != 0) throw std::runtime_error("remote meta alloc fail"); - remote_meta_info_ = static_cast(remote_raw_mem); - - void* local_raw_mem = nullptr; - if (posix_memalign(&local_raw_mem, 64, meta_buffer_size) != 0) - throw std::runtime_error("local meta alloc fail"); - local_meta_info_ = static_cast(local_raw_mem); - - send_ctx_pool_.resize(MAX_FIFO_DEPTH); - recv_ctx_pool_.resize(MAX_FIFO_DEPTH); - - size_t assign_buffer_size = sizeof(RDMAAssign) * MAX_FIFO_DEPTH; - - void* data_send_assign_raw = nullptr; - if (posix_memalign(&data_send_assign_raw, 64, assign_buffer_size) != 0) - throw std::runtime_error("send data assign alloc fail"); - data_send_assign_ = static_cast(data_send_assign_raw); - - void* data_recv_assign_raw = nullptr; - if (posix_memalign(&data_recv_assign_raw, 64, assign_buffer_size) != 0) - throw std::runtime_error("send data assign alloc fail"); - data_recv_assign_ = static_cast(data_recv_assign_raw); - - void* meta_send_assign_raw = nullptr; - if (posix_memalign(&meta_send_assign_raw, 64, assign_buffer_size) != 0) - throw std::runtime_error("send data assign alloc fail"); - meta_send_assign_ = static_cast(meta_send_assign_raw); + send_ctx_pool_ = static_cast(raw_send_ctx); - void* meta_recv_assign_raw = nullptr; - if (posix_memalign(&meta_recv_assign_raw, 64, assign_buffer_size) != 0) - throw std::runtime_error("send data assign alloc fail"); - meta_recv_assign_ = static_cast(meta_recv_assign_raw); + void* raw_recv_ctx = nullptr; + if (posix_memalign(&raw_recv_ctx, 64, sizeof(RecvContext) * MAX_FIFO_DEPTH) != 0) + throw std::runtime_error("remote meta alloc fail"); + recv_ctx_pool_ = static_cast(raw_recv_ctx); for (int i = 0; i < MAX_FIFO_DEPTH; ++i) { send_ctx_pool_[i].signal = slime::device::createSignal(bypass_signal_); recv_ctx_pool_[i].signal = slime::device::createSignal(bypass_signal_); } - // Register Memory Regions (MR) - // Registering these upfront prevents expensive registration calls during runtime. - meta_ctx_->registerMemoryRegion( - reinterpret_cast(dummy_), reinterpret_cast(dummy_), sizeof(int64_t)); - meta_ctx_->registerMemoryRegion(reinterpret_cast(remote_meta_info_), - reinterpret_cast(remote_meta_info_), - meta_buffer_size); - meta_ctx_->registerMemoryRegion( - reinterpret_cast(local_meta_info_), reinterpret_cast(local_meta_info_), meta_buffer_size); - data_ctx_->registerMemoryRegion( + // Register Memory Regions (MR) upfront. + // Dynamic registration during the datapath is expensive and should be avoided. + ctx_->registerOrAccessMemoryRegion( reinterpret_cast(dummy_), reinterpret_cast(dummy_), sizeof(int64_t)); - SLIME_LOG_INFO("Memory Regions Registered."); - - // Initialize Scoreboards - // These atomic flags serve as signaling mechanism between RDMA callback thread and Proxy threads. - SLIME_LOG_DEBUG("Initializing scoreboards..."); - meta_arrived_scoreboard_ = new PaddedAtomicUint64[MAX_FIFO_DEPTH]; for (int i = 0; i < MAX_FIFO_DEPTH; ++i) { - meta_arrived_scoreboard_[i].val.store(0); + ctx_->registerOrAccessMemoryRegion(reinterpret_cast(&(send_ctx_pool_[i].remote_meta_info_)), + reinterpret_cast(&(send_ctx_pool_[i].remote_meta_info_)), + sizeof(meta_info_t)); + ctx_->registerOrAccessMemoryRegion(reinterpret_cast(&(recv_ctx_pool_[i].local_meta_info_)), + reinterpret_cast(&(recv_ctx_pool_[i].local_meta_info_)), + sizeof(meta_info_t)); } - // Initialize Rings - // Size is double the depth to handle potential overflow gracefully. + SLIME_LOG_INFO("Memory Regions Registered."); + + data_channel_ = std::make_unique(); + meta_channel_ = std::make_unique(); + + // Meta channel uses 1 QP (latency sensitive), Data channel uses num_qp_ (throughput sensitive). + meta_channel_->init(ctx_, 1, 256); + data_channel_->init(ctx_, num_qp_, 0); + + // Initialize Rings. Size is double the depth to handle potential overflow gracefully. size_t ring_size = MAX_FIFO_DEPTH * 2; send_buffer_ring_ = createRing("send_buf", ring_size); recv_buffer_ring_ = createRing("recv_buf", ring_size); @@ -154,15 +118,10 @@ RDMAEndpointV0::RDMAEndpointV0(const std::string& dev_name, RDMAEndpointV0::~RDMAEndpointV0() { try { - proxyDestroy(); - data_ctx_->stop_future(); - meta_ctx_->stop_future(); free(dummy_); - free(local_meta_info_); - free(remote_meta_info_); - delete[] meta_arrived_scoreboard_; - + free(send_ctx_pool_); + free(recv_ctx_pool_); freeRing(send_buffer_ring_); freeRing(recv_buffer_ring_); @@ -173,252 +132,335 @@ RDMAEndpointV0::~RDMAEndpointV0() } } -void RDMAEndpointV0::proxyInit() +json RDMAEndpointV0::endpointInfo() const { - send_proxy_thread_ = std::thread([this]() { this->sendProxy(); }); - recv_proxy_thread_ = std::thread([this]() { this->recvProxy(); }); + json remote_meta_key = {}; - SLIME_LOG_INFO("RDMA Proxy Threads Started."); + for (int i = 0; i < MAX_FIFO_DEPTH; ++i) { + remote_meta_key.push_back((uintptr_t)(&(send_ctx_pool_[i].remote_meta_info_))); + } + json endpoint_info = json{{"mr_info", ctx_->memory_pool_->mr_info()}, + {"meta_channel_info", meta_channel_->channelInfo()}, + {"data_channel_info", data_channel_->channelInfo()}, + {"remote_meta_key", remote_meta_key}}; + return endpoint_info; } -void RDMAEndpointV0::proxyDestroy() +void RDMAEndpointV0::connect(const json& remote_endpoint_info) { - SLIME_LOG_INFO("Stopping RDMA proxy threads..."); - - stop_send_proxy_signal_.store(true, std::memory_order_release); - stop_recv_proxy_signal_.store(true, std::memory_order_release); + SLIME_LOG_INFO("Establishing RDMA Connection..."); - if (send_proxy_thread_.joinable()) - send_proxy_thread_.join(); - if (recv_proxy_thread_.joinable()) - recv_proxy_thread_.join(); + for (auto& item : remote_endpoint_info["mr_info"].items()) { + ctx_->registerOrAccessRemoteMemoryRegion(item.value()["mr_key"].get(), item.value()); + } - SLIME_LOG_INFO("RDMA Proxy Threads Stopped."); -} + meta_channel_->connect(remote_endpoint_info["meta_channel_info"]); + data_channel_->connect(remote_endpoint_info["data_channel_info"]); -void RDMAEndpointV0::connect(const json& data_ctx_info, const json& meta_ctx_info) -{ - SLIME_LOG_INFO("Establishing RDMA Connection..."); - data_ctx_->connect(data_ctx_info); - meta_ctx_->connect(meta_ctx_info); + SLIME_LOG_INFO("Connection Established. Pre-posting RECV requests..."); - remote_meta_key_ = meta_ctx_info["remote_meta_key"]; + SLIME_ASSERT_EQ(remote_endpoint_info["remote_meta_key"].size(), MAX_FIFO_DEPTH, "FIFO Depth mismatch"); - SLIME_LOG_INFO("Connection Established. Pre-posting RECV requests..."); + for (int i = 0; i < MAX_FIFO_DEPTH; ++i) { + recv_ctx_pool_[i].remote_meta_key_ = remote_endpoint_info["remote_meta_key"][i]; + } + // Pre-post RECV requests for Meta Channel to handle incoming handshake signals. for (int i = 0; i < MAX_FIFO_DEPTH; ++i) { + SendContext* send_ctx = &(send_ctx_pool_[i]); std::vector batch{Assignment(reinterpret_cast(dummy_), 0, 0, sizeof(int64_t))}; - meta_recv_assign_[i].reset(OpCode::RECV, 0, batch, [this, i](int32_t status, int32_t imm) { - meta_arrived_scoreboard_[i].val.store(1, std::memory_order_release); + send_ctx->meta_recv_assign_.reset(OpCode::RECV, 0, batch, [send_ctx](int32_t status, int32_t imm) { + send_ctx->meta_arrived_flag_.val.store(1, std::memory_order_release); }); - auto assign = meta_ctx_->post_recv_batch(0, &(meta_recv_assign_[i])); + meta_channel_->post_recv_batch(0, &(send_ctx->meta_recv_assign_)); } + // Pre-post RECV requests for Data Channel to handle completion signals (Imm Data). for (int i = 0; i < MAX_FIFO_DEPTH; ++i) { - auto signal = recv_ctx_pool_[i].signal; - for (size_t qpi = 0; qpi < data_ctx_qp_num_; ++qpi) { + RecvContext* recv_ctx = &(recv_ctx_pool_[i]); + for (size_t qpi = 0; qpi < num_qp_; ++qpi) { std::vector batch{Assignment(reinterpret_cast(dummy_), 0, 0, sizeof(int64_t))}; - data_recv_assign_[i].reset(OpCode::RECV, qpi, batch, [signal, qpi](int32_t status, int32_t imm) { + recv_ctx->data_recv_assign_.reset(OpCode::RECV, qpi, batch, [recv_ctx, qpi](int32_t status, int32_t imm) { if (status == 0) { - signal->set_comm_done(qpi); + recv_ctx->signal->set_comm_done(qpi); } else { SLIME_LOG_ERROR("Data Recv Failed during pre-post"); } }); - data_ctx_->post_recv_batch(qpi, &(data_recv_assign_[i])); + data_channel_->post_recv_batch(qpi, &(recv_ctx->data_recv_assign_)); } } - proxyInit(); - - data_ctx_->launch_future(); - meta_ctx_->launch_future(); SLIME_LOG_INFO("RDMA Contexts Launched."); } -int32_t RDMAEndpointV0::addBuffer(OpCode opcode, std::shared_ptr buffer, void* stream_handle) +int32_t RDMAEndpointV0::send(uintptr_t data_ptr, size_t offset, size_t length, void* stream_handle) { - auto buffer_mr = data_ctx_->get_mr(buffer->ptr_); - if (not(buffer_mr and buffer_mr->length == buffer->data_size_)) { - SLIME_LOG_DEBUG("Registering new MR for buffer: ", buffer->ptr_); - data_ctx_->registerMemoryRegion(buffer->ptr_, buffer->ptr_, buffer->data_size_); + // Fast path: check MR cache. + storage_view_t view{data_ptr, offset, length}; + auto buffer_mr = ctx_->get_mr(data_ptr); + if (not(buffer_mr and buffer_mr->length == length)) { + SLIME_LOG_DEBUG("Registering new MR for buffer: ", data_ptr); + ctx_->registerOrAccessMemoryRegion(data_ptr, data_ptr, length); } - uint32_t target_mask = (1 << qp_nums_) - 1; - buffer->num_pack_ = qp_nums_; + // Acquire a slot from the FIFO pool. + uint32_t target_mask = (1 << num_qp_) - 1; + uint64_t slot = send_slot_id_.fetch_add(1, std::memory_order_release) % MAX_FIFO_DEPTH; - if (OpCode::SEND == opcode) { - uint64_t slot = send_slot_id_.fetch_add(1, std::memory_order_release) % MAX_FIFO_DEPTH; - buffer->slot_id_ = slot; + SendContext* s_ctx = &(send_ctx_pool_[slot]); - SendContext* ctx = &send_ctx_pool_[slot]; + s_ctx->slot_id = slot; + s_ctx->local_meta_info_.view_ = {data_ptr, offset, length}; + s_ctx->expected_mask = target_mask; - ctx->slot_id = slot; - ctx->buffer = buffer; - ctx->expected_mask = target_mask; + // Reset signal and bind to the compute stream for synchronization. + s_ctx->signal->reset_all(); + s_ctx->signal->bind_stream(stream_handle); + s_ctx->signal->record_gpu_ready(); - ctx->signal->reset_all(); - ctx->signal->bind_stream(stream_handle); - ctx->signal->record_gpu_ready(); - buffer->signal_ = ctx->signal; - - while (jring_enqueue_burst(send_buffer_ring_, (void**)&ctx, 1, nullptr) == 0) { - cpu_relax(); - } + // Enqueue to the ring (lock-free producer). + while (jring_enqueue_burst(send_buffer_ring_, (void**)&s_ctx, 1, nullptr) == 0) { + cpu_relax(); } - else if (OpCode::RECV == opcode) { - uint64_t slot = recv_slot_id_.fetch_add(1, std::memory_order_release) % MAX_FIFO_DEPTH; - buffer->slot_id_ = slot; - - RecvContext* ctx = &recv_ctx_pool_[slot]; - - ctx->slot_id = slot; - ctx->buffer = buffer; - ctx->expected_mask = target_mask; - - ctx->signal->reset_all(); - ctx->signal->bind_stream(stream_handle); - ctx->signal->record_gpu_ready(); - buffer->signal_ = ctx->signal; - local_meta_info_[slot].r_key_ = data_ctx_->get_mr(buffer->ptr_)->rkey; - local_meta_info_[slot].view_ = buffer->view_; - - while (jring_enqueue_burst(recv_buffer_ring_, (void**)&ctx, 1, nullptr) == 0) { - cpu_relax(); - } - } - return 0; + return slot; } -int32_t RDMAEndpointV0::sendProxy() +int32_t RDMAEndpointV0::recv(uintptr_t data_ptr, size_t offset, size_t length, void* stream_handle) { - bindToSocket(socketId(data_ctx_->device_name_)); - - void* buf_ptrs[BURST_SIZE]; + auto buffer_mr = ctx_->get_mr(data_ptr); + if (not(buffer_mr and buffer_mr->length == length)) { + SLIME_LOG_DEBUG("Registering new MR for buffer: ", data_ptr); + ctx_->registerOrAccessMemoryRegion(data_ptr, data_ptr, length); + } - while (!stop_send_proxy_signal_.load(std::memory_order_relaxed)) { + uint32_t target_mask = (1 << num_qp_) - 1; + uint64_t slot = recv_slot_id_.fetch_add(1, std::memory_order_release) % MAX_FIFO_DEPTH; - int n = jring_dequeue_burst(send_buffer_ring_, buf_ptrs, BURST_SIZE, nullptr); + RecvContext* r_ctx = &(recv_ctx_pool_[slot]); - if (n > 0) { - for (int i = 0; i < n; ++i) { - auto* s_ctx = (SendContext*)buf_ptrs[i]; - int slot = s_ctx->slot_id; + r_ctx->slot_id = slot; + r_ctx->view_ = {data_ptr, offset, length}; + r_ctx->expected_mask = target_mask; - while (!s_ctx->signal->is_gpu_ready()) { - cpu_relax(); - } + r_ctx->signal->reset_all(); + r_ctx->signal->bind_stream(stream_handle); + r_ctx->signal->record_gpu_ready(); - while (!meta_arrived_scoreboard_[slot].val.load(std::memory_order_acquire)) { - cpu_relax(); - } + r_ctx->local_meta_info_.r_key_ = ctx_->get_mr(data_ptr)->rkey; + r_ctx->local_meta_info_.view_ = {data_ptr, offset, length}; - meta_arrived_scoreboard_[slot].val.store(false, std::memory_order_relaxed); + while (jring_enqueue_burst(recv_buffer_ring_, (void**)&r_ctx, 1, nullptr) == 0) { + cpu_relax(); + } - auto meta = remote_meta_info_[slot]; + return slot; +} - data_ctx_->registerRemoteMemoryRegion( - meta.view_.data_ptr, meta.view_.data_ptr, meta.view_.length, meta.r_key_); +int32_t RDMAEndpointV0::waitSend(int32_t slot_id) +{ + // Blocking wait on CPU until the communication is marked done. + send_ctx_pool_[slot_id].signal->wait_comm_done_cpu((1 << send_ctx_pool_[slot_id].expected_mask) - 1); + return 0; +} - size_t total_len = s_ctx->buffer->data_size_; - size_t chunk_size = (total_len + qp_nums_ - 1) / qp_nums_; +int32_t RDMAEndpointV0::waitRecv(int32_t slot_id) +{ + recv_ctx_pool_[slot_id].signal->wait_comm_done_cpu((1 << recv_ctx_pool_[slot_id].expected_mask) - 1); + return 0; +} - for (size_t qpi = 0; qpi < qp_nums_; ++qpi) { - size_t offset = qpi * chunk_size; +// In rdma_endpoint_v0.cc - if (offset >= total_len) - break; - size_t current_len = std::min(chunk_size, total_len - offset); +// Returns: Number of tasks processed (0 indicates idle). +int32_t RDMAEndpointV0::sendProcess() +{ + int work_done = 0; + + // ============================================================ + // Stage 1: Ingest - Dequeue from Ring + // ============================================================ + // Attempt to dequeue a burst of tasks. + int n = jring_dequeue_burst(send_buffer_ring_, send_new_burst_buf_, BURST_SIZE, nullptr); + if (n > 0) { + work_done += n; + for (int i = 0; i < n; ++i) { + auto* s_ctx = (SendContext*)send_new_burst_buf_[i]; + pending_send_queue_.push_back(s_ctx); + } + } - Assignment assign(s_ctx->buffer->ptr_, - meta.view_.data_ptr, - offset, // target offset - offset, // source offset - current_len); + // ============================================================ + // Stage 2: State Machine Execution + // ============================================================ + auto it = pending_send_queue_.begin(); + + if (it != pending_send_queue_.end()) { + SendContext* s_ctx = *it; + bool task_completed = false; + + switch (s_ctx->state_) { + case SendContextState::WAIT_GPU_READY: + // Non-blocking check for GPU signal. + if (s_ctx->signal->is_gpu_ready()) { + s_ctx->state_ = SendContextState::WAIT_META; + goto CHECK_META_READY; + } + break; + + CHECK_META_READY: + case SendContextState::WAIT_META: + // Non-blocking check for remote meta signal (atomic load). + if (s_ctx->meta_arrived_flag_.val.load(std::memory_order_acquire)) { + s_ctx->meta_arrived_flag_.val.store(false, std::memory_order_release); + s_ctx->state_ = SendContextState::POST_DATA_SEND; + + // Update remote MR info. + ctx_->registerOrAccessRemoteMemoryRegion(s_ctx->remote_meta_info_.view_.data_ptr, + s_ctx->remote_meta_info_.view_.data_ptr, + s_ctx->remote_meta_info_.view_.length, + s_ctx->remote_meta_info_.r_key_); + + // Chunk data across QPs. + size_t total_len = s_ctx->remote_meta_info_.view_.length; + size_t chunk_size = (total_len + num_qp_ - 1) / num_qp_; + + for (size_t qpi = 0; qpi < num_qp_; ++qpi) { + size_t offset = qpi * chunk_size; + if (offset >= total_len) + break; + + size_t current_len = std::min(chunk_size, total_len - offset); + + Assignment assign(s_ctx->local_meta_info_.view_.data_ptr, + s_ctx->remote_meta_info_.view_.data_ptr, + offset, + offset, + current_len); + AssignmentBatch batch{assign}; + + s_ctx->data_send_assign_.reset( + OpCode::WRITE_WITH_IMM, + qpi, + batch, + [s_ctx, qpi](int32_t stat, int32_t imm_data) { s_ctx->signal->set_comm_done(qpi); }, + false); + + data_channel_->post_rc_oneside_batch(qpi, &(s_ctx->data_send_assign_)); + } + + // Prepare for next handshake (Post Recv). + std::vector meta_batch{ + Assignment(reinterpret_cast(dummy_), 0, 0, sizeof(int64_t))}; - AssignmentBatch batch{assign}; + s_ctx->meta_recv_assign_.reset( + OpCode::RECV, 0, meta_batch, [this, s_ctx](int32_t status, int32_t imm) { + s_ctx->meta_arrived_flag_.val.store(1, std::memory_order_release); + }); + meta_channel_->post_recv_batch(0, &(s_ctx->meta_recv_assign_)); - data_send_assign_[slot].reset( - OpCode::WRITE_WITH_IMM, - qpi, - batch, - [s_ctx, qpi](int32_t stat, int32_t imm_data) { s_ctx->signal->set_comm_done(qpi); }, - false); - data_ctx_->post_rc_oneside_batch(qpi, &(data_send_assign_[slot])); + task_completed = true; } + break; - std::vector meta_batch{ - Assignment(reinterpret_cast(dummy_), 0, 0, sizeof(int64_t))}; + default: + break; + } - meta_recv_assign_[s_ctx->slot_id].reset( - OpCode::RECV, 0, meta_batch, [this, s_ctx](int32_t status, int32_t imm) { - meta_arrived_scoreboard_[s_ctx->slot_id].val.store(1, std::memory_order_release); - }); - auto assign = meta_ctx_->post_recv_batch(0, &(meta_recv_assign_[s_ctx->slot_id])); - } + if (task_completed) { + pending_send_queue_.pop_front(); + work_done++; } else { - cpu_relax(); + // Task is still pending, so we are "busy" waiting. + // Marking as work_done=1 prevents the worker from sleeping too aggressively + // if we are just waiting for a GPU signal or network packet. + // However, to save power, we might strictly return 0 here if no state transition occurred. + // For low latency, return 1. + // work_done++; } } - return 0; + + // Note: cpu_relax() is removed from here and handled by the Worker. + return work_done; } -int32_t RDMAEndpointV0::recvProxy() +// Returns: Number of tasks processed (0 indicates idle). +int32_t RDMAEndpointV0::recvProcess() { - bindToSocket(socketId(data_ctx_->device_name_)); - SLIME_LOG_INFO("RecvProxy Thread running on NUMA node."); - - void* buf_ptrs[BURST_SIZE]; - - while (!stop_recv_proxy_signal_.load(std::memory_order_relaxed)) { - - int n = jring_dequeue_burst(recv_buffer_ring_, buf_ptrs, BURST_SIZE, nullptr); + int work_done = 0; + + int n = jring_dequeue_burst(recv_buffer_ring_, recv_new_burst_buf_, BURST_SIZE, nullptr); + if (n > 0) { + work_done += n; + for (int i = 0; i < n; ++i) { + auto* r_ctx = (RecvContext*)recv_new_burst_buf_[i]; + r_ctx->state_ = RecvContextState::WAIT_GPU_BUF; + pending_recv_queue_.push_back(r_ctx); + } + } - if (n > 0) { - for (int i = 0; i < n; ++i) { - RecvContext* r_ctx = static_cast(buf_ptrs[i]); - int slot = r_ctx->slot_id; + auto it = pending_recv_queue_.begin(); + if (it != pending_recv_queue_.end()) { + RecvContext* r_ctx = *it; + bool task_completed = false; - Assignment assign(reinterpret_cast(local_meta_info_), - remote_meta_key_, - slot * sizeof(meta_info_t), - slot * sizeof(meta_info_t), - sizeof(meta_info_t)); - AssignmentBatch assign_batch{assign}; - meta_send_assign_[slot].reset(OpCode::WRITE_WITH_IMM, 0, assign_batch, nullptr, true); - meta_ctx_->post_rc_oneside_batch(0, &(meta_send_assign_[slot])); - - while (!r_ctx->signal->is_gpu_ready()) { - cpu_relax(); + switch (r_ctx->state_) { + case RecvContextState::WAIT_GPU_BUF: { + if (r_ctx->signal->is_gpu_ready()) { + r_ctx->state_ = RecvContextState::INIT_SEND_META; + goto SEND_META; } + break; + } - const uint32_t TARGET_MASK = r_ctx->expected_mask; - - for (size_t qpi = 0; qpi < data_ctx_qp_num_; ++qpi) { - std::vector batch{ - Assignment(reinterpret_cast(dummy_), 0, 0, sizeof(int64_t))}; - - data_recv_assign_[slot].reset(OpCode::RECV, qpi, batch, [r_ctx, qpi](int32_t status, int32_t imm) { + SEND_META: + case RecvContextState::INIT_SEND_META: { + // Step 1: Pre-post Recv WQEs (Correct Order). + for (size_t qpi = 0; qpi < num_qp_; ++qpi) { + std::vector batch{Assignment(reinterpret_cast(dummy_), 0, 0, 8)}; + r_ctx->data_recv_assign_.reset(OpCode::RECV, qpi, batch, [r_ctx, qpi](int32_t status, int32_t imm) { if (status == 0) { r_ctx->signal->set_comm_done(qpi); } else { - SLIME_LOG_ERROR("Data Recv Failed during pre-post"); + SLIME_LOG_ERROR("Data Recv Failed during completion"); } }); - data_ctx_->post_recv_batch(qpi, &(data_recv_assign_[slot])); + data_channel_->post_recv_batch(qpi, &(r_ctx->data_recv_assign_)); } + + // Step 2: Send Meta to notify sender. + int slot = r_ctx->slot_id; + Assignment assign(reinterpret_cast(&(r_ctx->local_meta_info_)), + r_ctx->remote_meta_key_, + 0, + 0, + sizeof(meta_info_t)); + AssignmentBatch assign_batch{assign}; + + r_ctx->meta_send_assign_.reset(OpCode::WRITE_WITH_IMM, 0, assign_batch, nullptr, true); + meta_channel_->post_rc_oneside_batch(0, &(r_ctx->meta_send_assign_)); + + r_ctx->state_ = RecvContextState::WAIT_GPU_BUF; + task_completed = true; + break; } + + default: + break; } - else { - cpu_relax(); + + if (task_completed) { + pending_recv_queue_.pop_front(); + work_done++; } } - return 0; + + return work_done; } -} // namespace slime \ No newline at end of file +} // namespace slime diff --git a/csrc/engine/rdma/rdma_endpoint_v0.h b/csrc/engine/rdma/rdma_endpoint_v0.h index 080d068..e42aa21 100644 --- a/csrc/engine/rdma/rdma_endpoint_v0.h +++ b/csrc/engine/rdma/rdma_endpoint_v0.h @@ -6,6 +6,7 @@ #include "engine/assignment.h" #include "engine/rdma/rdma_assignment.h" +#include "engine/rdma/rdma_channel.h" #include "jring.h" #include "json.hpp" #include "rdma_common.h" @@ -20,32 +21,37 @@ #include #include -#include - namespace slime { using json = nlohmann::json; -class RDMABuffer; - static const size_t MAX_FIFO_DEPTH = 4096; static const int BURST_SIZE = 128; -static inline void cpu_relax() -{ - _mm_pause(); -} +enum class SendContextState : uint8_t { + WAIT_GPU_READY, + WAIT_META, + POST_DATA_SEND, + DONE +}; + +enum class RecvContextState : uint8_t { + INIT_SEND_META, + WAIT_GPU_BUF, + POST_DATA_RECV, + DONE +}; /** * @brief Meta information exchanged between nodes. * Aligned to 64 bytes to match Cache Line size, preventing False Sharing. */ -typedef struct alignas(64) ViewInfo { +typedef struct alignas(64) MetaInfo { uint32_t r_key_; storage_view_t view_; - ViewInfo(): r_key_(0), view_() {} // Default constructor - ViewInfo(uint32_t r_key, storage_view_t view): r_key_(r_key), view_(view) {} + MetaInfo(): r_key_(0), view_() {} // Default constructor + MetaInfo(uint32_t r_key, storage_view_t view): r_key_(r_key), view_(view) {} std::string dump() { @@ -62,10 +68,17 @@ struct alignas(64) PaddedAtomicUint64 { // Context for Send Operations struct alignas(64) SendContext { - int64_t slot_id; - std::shared_ptr buffer; + int64_t slot_id; + + PaddedAtomicUint64 meta_arrived_flag_; + + meta_info_t local_meta_info_; + meta_info_t remote_meta_info_; - // PaddedAtomicUint64 meta_arrived_; + RDMAAssign meta_recv_assign_; + RDMAAssign data_send_assign_; + + SendContextState state_; std::shared_ptr signal; @@ -73,15 +86,24 @@ struct alignas(64) SendContext { void reset() { - buffer = nullptr; expected_mask = 0; + state_ = SendContextState::WAIT_GPU_READY; } }; // Context for Recv Operations struct alignas(64) RecvContext { - int64_t slot_id; - std::shared_ptr buffer; + int64_t slot_id; + storage_view_t view_; + + meta_info_t local_meta_info_; + + RDMAAssign meta_send_assign_; + RDMAAssign data_recv_assign_; + + uintptr_t remote_meta_key_; + + RecvContextState state_; std::shared_ptr signal; @@ -89,96 +111,60 @@ struct alignas(64) RecvContext { void reset() { - buffer = nullptr; expected_mask = 0; + state_ = RecvContextState::INIT_SEND_META; } }; -// ========================================== -// Class Definition -// ========================================== - class RDMAEndpointV0: public std::enable_shared_from_this { - friend class RDMABuffer; + friend class RDMAWorker; public: - explicit RDMAEndpointV0(const std::string& dev_name, size_t ib_port, const std::string& link_type, size_t qp_nums); + explicit RDMAEndpointV0(std::shared_ptr ctx, size_t qp_nums); + ~RDMAEndpointV0(); - /** - * @brief Submit a buffer for Send or Recv operation. - * This method is thread-safe and non-blocking (uses lock-free ring). - */ - int32_t addBuffer(OpCode opcode, std::shared_ptr buffer, void* stream_handle = nullptr); + void connect(const json& remote_endpoint_info); - /** - * @brief Establish connection and start proxy threads. - */ - void connect(const json& data_ctx_info, const json& meta_ctx_info); + json endpointInfo() const; - inline json dataCtxInfo() const - { - return data_ctx_->endpoint_info(); - } + int32_t send(uintptr_t data_ptr, size_t offset, size_t length, void* stream_handler); - inline json metaCtxInfo() const - { - auto endpoint_info = meta_ctx_->endpoint_info(); - endpoint_info["remote_meta_key"] = uintptr_t(remote_meta_info_); - return endpoint_info; - } + int32_t recv(uintptr_t data_ptr, size_t offset, size_t length, void* stream_handler); - inline int32_t registerMemoryRegion(uintptr_t mr_key, uintptr_t ptr, size_t length) - { - return data_ctx_->registerMemoryRegion(mr_key, ptr, length); - } + int32_t waitSend(int32_t slot_id); + + int32_t waitRecv(int32_t slot_id); private: bool bypass_signal_{false}; - int64_t qp_nums_; + int64_t num_qp_; - std::shared_ptr data_ctx_; - std::shared_ptr meta_ctx_; - size_t data_ctx_qp_num_; - size_t meta_ctx_qp_num_; + std::shared_ptr ctx_; - // DMA-able memory regions for metadata exchange - meta_info_t* remote_meta_info_; - meta_info_t* local_meta_info_; + std::unique_ptr meta_channel_; + std::unique_ptr data_channel_; // --- jring_t* Lock-free Queues --- jring_t* send_buffer_ring_; jring_t* recv_buffer_ring_; // Context Pools to avoid dynamic allocation - std::vector send_ctx_pool_; - std::vector recv_ctx_pool_; + SendContext* send_ctx_pool_; + RecvContext* recv_ctx_pool_; - RDMAAssign* meta_send_assign_; - RDMAAssign* meta_recv_assign_; - RDMAAssign* data_send_assign_; - RDMAAssign* data_recv_assign_; - - uintptr_t remote_meta_key_; - - // Scoreboards for signaling completion between RDMA callbacks and Proxy threads - PaddedAtomicUint64* meta_arrived_scoreboard_; + std::deque pending_send_queue_; + std::deque pending_recv_queue_; std::atomic send_slot_id_{0}; std::atomic recv_slot_id_{0}; - std::atomic stop_send_proxy_signal_{false}; - std::atomic stop_recv_proxy_signal_{false}; - - std::thread send_proxy_thread_; - std::thread recv_proxy_thread_; - - void proxyInit(); - void proxyDestroy(); + void* send_new_burst_buf_[BURST_SIZE]; + void* recv_new_burst_buf_[BURST_SIZE]; - int32_t sendProxy(); - int32_t recvProxy(); + int32_t sendProcess(); + int32_t recvProcess(); jring_t* createRing(const char* name, size_t count); void freeRing(jring_t* ring); diff --git a/csrc/engine/rdma/rdma_io_endpoint.cpp b/csrc/engine/rdma/rdma_io_endpoint.cpp new file mode 100644 index 0000000..25bac9d --- /dev/null +++ b/csrc/engine/rdma/rdma_io_endpoint.cpp @@ -0,0 +1,19 @@ +#include "rdma_io_endpoint.h" + +namespace slime { +RDMAIOEndpoint::RDMAIOEndpoint(std::shared_ptr ctx, size_t qp_nums) {} + +void RDMAIOEndpoint::connect(const json& remote_endpoint_info){}; + +inline json RDMAIOEndpoint::endpointInfo() const {}; + +inline int32_t RDMAIOEndpoint::registerMemoryRegion(uintptr_t mr_key, uintptr_t ptr, size_t length){}; + +int32_t RDMAIOEndpoint::readBatch() const {}; + +int32_t RDMAIOEndpoint::writeBatch() const {}; + +int32_t RDMAIOEndpoint::writeWithImmBatch() const {}; + +int32_t RDMAIOEndpoint::RecvImm() const {}; +} // namespace slime diff --git a/csrc/engine/rdma/rdma_io_endpoint.h b/csrc/engine/rdma/rdma_io_endpoint.h new file mode 100644 index 0000000..eac2fe3 --- /dev/null +++ b/csrc/engine/rdma/rdma_io_endpoint.h @@ -0,0 +1,37 @@ +#pragma once + +#include "rdma_context.h" + +#include "json.hpp" + +namespace slime { + +using json = nlohmann::json; + +class RDMAIOEndpoint { +public: + RDMAIOEndpoint() = default; + ~RDMAIOEndpoint(); + + explicit RDMAIOEndpoint(std::shared_ptr ctx, size_t qp_nums); + + void connect(const json& remote_endpoint_info); + + inline json endpointInfo() const; + + inline int32_t registerMemoryRegion(uintptr_t mr_key, uintptr_t ptr, size_t length); + + int32_t readBatch() const; + + int32_t writeBatch() const; + + int32_t writeWithImmBatch() const; + + int32_t RecvImm() const; + + int32_t wait(int32_t slot) const; + +private: +}; + +} // namespace slime \ No newline at end of file diff --git a/csrc/engine/rdma/rdma_utils.h b/csrc/engine/rdma/rdma_utils.h index a3dcca7..0d7685b 100644 --- a/csrc/engine/rdma/rdma_utils.h +++ b/csrc/engine/rdma/rdma_utils.h @@ -2,13 +2,14 @@ #include #include +#include #include +#include #include #include #include #include -#include #include "engine/rdma/ibv_helper.h" #include "engine/rdma/rdma_env.h" @@ -16,6 +17,11 @@ namespace slime { +inline void cpu_relax() +{ + _mm_pause(); +} + inline std::vector available_nic() { int num_devices; diff --git a/csrc/engine/rdma/rdma_worker.cpp b/csrc/engine/rdma/rdma_worker.cpp new file mode 100644 index 0000000..2dcb384 --- /dev/null +++ b/csrc/engine/rdma/rdma_worker.cpp @@ -0,0 +1,90 @@ +#include "engine/rdma/rdma_worker.h" + +#include "engine/rdma/rdma_endpoint_v0.h" +#include "engine/rdma/rdma_utils.h" +#include "logging.h" + +#include // for _mm_pause + +namespace slime { + +RDMAWorker::RDMAWorker(std::string dev_name, int id) + : device_name_(std::move(dev_name)), worker_id_(id) {} + +RDMAWorker::~RDMAWorker() { + stop(); +} + +void RDMAWorker::start() { + bool expected = false; + if (running_.compare_exchange_strong(expected, true)) { + worker_thread_ = std::thread([this]() { this->workerLoop(); }); + } +} + +void RDMAWorker::stop() { + bool expected = true; + if (running_.compare_exchange_strong(expected, false)) { + if (worker_thread_.joinable()) { + worker_thread_.join(); + } + } +} + +void RDMAWorker::addEndpoint(std::shared_ptr endpoint) { + std::lock_guard lock(add_endpoint_mutex_); + tasks_.push_back(EndpointTask{std::move(endpoint)}); +} + +void RDMAWorker::workerLoop() { + // Bind the worker thread to the NUMA node associated with the RDMA device + // to minimize PCIe and memory latency. + bindToSocket(socketId(device_name_)); + SLIME_LOG_INFO("RDMA Worker Thread ", worker_id_, " started on ", device_name_); + + // Use a local copy of tasks to avoid locking in the hot path, + // assuming tasks are added rarely (control plane) but polled frequently (data plane). + // Note: If dynamic removal is needed, a more complex lock-free structure or RCU is required. + // For now, we take the lock only when updating the local list. + std::vector local_tasks; + + while (running_.load(std::memory_order_relaxed)) { + + // Sync with main task list if needed (omitted for performance in simple version, + // strictly speaking should check a dirty flag or use lock-free queue). + // Here we just check lock to see if we need to update. + { + // Optimization: Double-checked locking or atomic flag could be better here. + // Given addEndpoint is rare, std::unique_lock with try_lock is a low-overhead check. + std::unique_lock lock(add_endpoint_mutex_, std::try_to_lock); + if (lock.owns_lock() && local_tasks.size() != tasks_.size()) { + local_tasks = tasks_; + } + } + + if (local_tasks.empty()) { + // Avoid busy loop if no endpoints are registered. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + + int total_work_done = 0; + + // Poll all endpoints. + for (auto& task : local_tasks) { + // process() functions now return the number of processed events/bytes + // or 1 if busy, 0 if idle. + total_work_done += task.endpoint->sendProcess(); + total_work_done += task.endpoint->recvProcess(); + } + + // Backoff Strategy: + // Only relax the CPU if ALL endpoints are idle. + // This prevents one idle endpoint from slowing down other active endpoints. + if (total_work_done == 0) { + cpu_relax(); // _mm_pause() + } + } +} + +} // namespace slime diff --git a/csrc/engine/rdma/rdma_worker.h b/csrc/engine/rdma/rdma_worker.h new file mode 100644 index 0000000..e0c7093 --- /dev/null +++ b/csrc/engine/rdma/rdma_worker.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace slime { + +class RDMAEndpointV0; // Forward declaration + +class RDMAWorker { +public: + RDMAWorker(std::string dev_name, int id); + ~RDMAWorker(); + + void start(); + void stop(); + + // Registers an endpoint to be polled by this worker. + // Thread-safe. + void addEndpoint(std::shared_ptr endpoint); + +private: + // Main loop function executed by the worker thread. + void workerLoop(); + + struct EndpointTask { + std::shared_ptr endpoint; + }; + + // Protects access to tasks_ during dynamic additions. + std::mutex add_endpoint_mutex_; + std::vector tasks_; + + std::thread worker_thread_; + std::atomic running_{false}; + + int worker_id_; + std::string device_name_; +}; + +} // namespace slime diff --git a/csrc/engine/rdma/utils.h b/csrc/engine/rdma/utils.h deleted file mode 100644 index 698e6b9..0000000 --- a/csrc/engine/rdma/utils.h +++ /dev/null @@ -1,65 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include - -#include -#include - -#include "engine/rdma/ibv_helper.h" -#include "engine/rdma/rdma_env.h" -#include "logging.h" - -namespace slime { - -inline std::vector available_nic() -{ - int num_devices; - struct ibv_device** dev_list; - - dev_list = ibv_get_device_list(&num_devices); - if (!dev_list) { - SLIME_LOG_DEBUG("No RDMA devices"); - return {}; - } - - std::vector available_devices; - for (int i = 0; i < num_devices; ++i) { - std::string dev_name = (char*)ibv_get_device_name(dev_list[i]); - if (SLIME_VISIBLE_DEVICES.empty() - || std::find(SLIME_VISIBLE_DEVICES.begin(), SLIME_VISIBLE_DEVICES.end(), dev_name) - != SLIME_VISIBLE_DEVICES.end()) - available_devices.push_back(dev_name); - } - return available_devices; -} - -inline int get_gid_index(std::string dev_name) -{ - int num_devices; - struct ibv_device** dev_list; - - dev_list = ibv_get_device_list(&num_devices); - if (!dev_list) { - SLIME_LOG_DEBUG("No RDMA devices"); - return {}; - } - - std::vector available_devices; - for (int i = 0; i < num_devices; ++i) { - std::string dev_name_i = (char*)ibv_get_device_name(dev_list[i]); - if (strcmp(dev_name_i.c_str(), dev_name.c_str()) == 0) { - struct ibv_context* ib_ctx = ibv_open_device(dev_list[i]); - int gidx = ibv_find_sgid_type(ib_ctx, 1, ibv_gid_type_custom::IBV_GID_TYPE_ROCE_V2, AF_INET); - ibv_close_device(ib_ctx); - return gidx; - } - } - return -1; -} - -} // namespace slime diff --git a/csrc/python/bind.cpp b/csrc/python/bind.cpp index 970aeb6..3a89d5a 100644 --- a/csrc/python/bind.cpp +++ b/csrc/python/bind.cpp @@ -23,10 +23,11 @@ #ifdef BUILD_RDMA #include "engine/rdma/rdma_assignment.h" -#include "engine/rdma/rdma_buffer.h" #include "engine/rdma/rdma_config.h" #include "engine/rdma/rdma_context.h" -#include "engine/rdma/utils.h" +#include "engine/rdma/rdma_endpoint_v0.h" +#include "engine/rdma/rdma_utils.h" +#include "engine/rdma/rdma_worker.h" #endif #if defined(BUILD_INTRA_OPS) || defined(BUILD_INTER_OPS) @@ -150,50 +151,31 @@ PYBIND11_MODULE(_slime_c, m) py::class_>(m, "rdma_context") .def(py::init<>()) - .def(py::init()) .def("init_rdma_context", &slime::RDMAContext::init) .def("register_memory_region", static_cast( - &slime::RDMAContext::registerMemoryRegion)) + &slime::RDMAContext::registerOrAccessMemoryRegion)) .def("register_remote_memory_region", static_cast( - &slime::RDMAContext::registerRemoteMemoryRegion)) + &slime::RDMAContext::registerOrAccessRemoteMemoryRegion)) .def("reload_memory_pool", &slime::RDMAContext::reloadMemoryPool) - .def("endpoint_info", &slime::RDMAContext::endpoint_info) - .def("connect", &slime::RDMAContext::connect) .def("launch_future", &slime::RDMAContext::launch_future) - .def("stop_future", &slime::RDMAContext::stop_future) - .def("submit", &slime::RDMAContext::submit, py::call_guard()) - .def( - "submit_by_vector", - [](slime::RDMAContext& self, - slime::OpCode opcode, - std::vector& mr_keys, - std::vector& toff, - std::vector& soff, - std::vector& length) { - std::vector batch; - int bs = mr_keys.size(); - for (int i = 0; i < bs; ++i) { - batch.emplace_back(slime::Assignment(mr_keys[i], toff[i], soff[i], length[i])); - } - return self.submit(opcode, batch); - }, - py::call_guard()); + .def("stop_future", &slime::RDMAContext::stop_future); py::class_>(m, "rdma_endpoint") - .def(py::init()) - .def("context_connect", &slime::RDMAEndpointV0::connect) - .def("get_data_context_info", &slime::RDMAEndpointV0::dataCtxInfo) - .def("get_meta_context_info", &slime::RDMAEndpointV0::metaCtxInfo) - .def("register_memory_region", &slime::RDMAEndpointV0::registerMemoryRegion); - - py::class_>(m, "rdma_buffer") - .def(py::init, uintptr_t, size_t, size_t>()) - .def("send", &slime::RDMABuffer::send) - .def("recv", &slime::RDMABuffer::recv) - .def("wait_send", &slime::RDMABuffer::waitSend) - .def("wait_recv", &slime::RDMABuffer::waitRecv); + .def(py::init, size_t>()) + .def("connect", &slime::RDMAEndpointV0::connect) + .def("endpoint_info", &slime::RDMAEndpointV0::endpointInfo) + .def("send", &slime::RDMAEndpointV0::send) + .def("recv", &slime::RDMAEndpointV0::recv) + .def("wait_send", &slime::RDMAEndpointV0::waitSend) + .def("wait_recv", &slime::RDMAEndpointV0::waitRecv); + + py::class_>(m, "rdma_worker") + .def(py::init(), py::arg("dev_name"), py::arg("id")) + .def("start", &slime::RDMAWorker::start) + .def("stop", &slime::RDMAWorker::stop) + .def("add_endpoint", &slime::RDMAWorker::addEndpoint, py::arg("endpoint")); m.def("available_nic", &slime::available_nic); #endif @@ -231,7 +213,7 @@ PYBIND11_MODULE(_slime_c, m) &slime::AllToAllIntraLLBuffer::allToAllLL2D, py::arg("x"), py::arg("is_transpose") = false, - py::arg("mask") = py::none(), + py::arg("mask") = py::none(), "AllGather with optional mask"); #endif diff --git a/csrc/torch/slime_backend.cpp b/csrc/torch/slime_backend.cpp index 55dde43..3ff7c77 100644 --- a/csrc/torch/slime_backend.cpp +++ b/csrc/torch/slime_backend.cpp @@ -1,7 +1,8 @@ #include "slime_backend.h" -#include "engine/rdma/rdma_buffer.h" +#include "engine/rdma/rdma_context.h" #include "engine/rdma/rdma_endpoint_v0.h" #include "engine/rdma/rdma_env.h" +#include "engine/rdma/rdma_worker.h" #include "logging.h" #ifdef SLIME_USE_CUDA @@ -46,8 +47,11 @@ static uint32_t checkTag(int32_t tag) return (uint32_t)tag; } -SendWork::SendWork(std::vector& tensor, std::shared_ptr<::slime::RDMABuffer> buffer, uint64_t seq): - Work(-1, ::c10d::OpType::SEND), tensor_(tensor), buffer_(std::move(buffer)), seq_(seq) +SendWork::SendWork(std::vector& tensor, + std::shared_ptr endpoint, + int32_t slot_id, + uint64_t seq): + Work(-1, ::c10d::OpType::SEND), tensor_(tensor), endpoint_(endpoint), slot_id_(slot_id), seq_(seq) { } @@ -57,10 +61,10 @@ bool SendWork::wait(std::chrono::milliseconds timeout) std::exception_ptr exception{nullptr}; try { if (timeout == kNoTimeout) { - sendCompleted = buffer_->waitSend(); + sendCompleted = endpoint_->waitSend(slot_id_); } else { - sendCompleted = buffer_->waitSend(); + sendCompleted = endpoint_->waitSend(slot_id_); } } catch (...) { @@ -71,8 +75,11 @@ bool SendWork::wait(std::chrono::milliseconds timeout) return sendCompleted; } -RecvWork::RecvWork(std::vector& tensor, std::shared_ptr<::slime::RDMABuffer> buffer, uint64_t seq): - Work(-1, ::c10d::OpType::RECV), tensor_(tensor), buffer_(std::move(buffer)), srcRank_(-1), seq_(seq) +RecvWork::RecvWork(std::vector& tensor, + std::shared_ptr endpoint, + int32_t slot_id, + uint64_t seq): + Work(-1, ::c10d::OpType::SEND), tensor_(tensor), endpoint_(endpoint), slot_id_(slot_id), seq_(seq) { } @@ -82,10 +89,10 @@ bool RecvWork::wait(std::chrono::milliseconds timeout) std::exception_ptr exception{nullptr}; try { if (timeout == kNoTimeout) { - recvCompleted = buffer_->waitRecv(); + recvCompleted = endpoint_->waitRecv(slot_id_); } else { - recvCompleted = buffer_->waitRecv(); + recvCompleted = endpoint_->waitRecv(slot_id_); } } catch (...) { @@ -119,14 +126,13 @@ c10::intrusive_ptr<::c10d::Work> slimeBackend::send(std::vector& ten stream_handle = nullptr; } - auto buf = std::make_shared( - end_point_set_[mod_positive(dstRank - rank_, size_ - 1)], ptrs[0], offset[0], data_size[0]); - buf->send(stream_handle); + auto endpoint = end_point_set_[mod_positive(dstRank - rank_, size_ - 1)]; + int32_t slot_id = endpoint->send(ptrs[0], offset[0], data_size[0], stream_handle); ++seq_; // The work captures the tensor to prevent it being deallocated and // the unbound buffer to synchronize on completion of the recv. - auto send_work = c10::make_intrusive(tensors, std::move(buf), seq_); + auto send_work = c10::make_intrusive(tensors, endpoint, slot_id, seq_); if (group_active_) { grouped_works_.emplace_back(send_work); } @@ -156,14 +162,13 @@ c10::intrusive_ptr<::c10d::Work> slimeBackend::recv(std::vector& ten stream_handle = nullptr; } - auto buf = std::make_shared( - end_point_set_[mod_positive(srcRank - rank_, size_ - 1)], ptrs[0], offset[0], data_size[0]); - buf->recv(stream_handle); + auto endpoint = end_point_set_[mod_positive(srcRank - rank_, size_ - 1)]; + int32_t slot_id = endpoint->recv(ptrs[0], offset[0], data_size[0], stream_handle); ++seq_; // The work captures the tensor to prevent it being deallocated and // the unbound buffer to synchronize on completion of the send. - auto recv_work = c10::make_intrusive(tensors, std::move(buf), seq_); + auto recv_work = c10::make_intrusive(tensors, endpoint, slot_id, seq_); if (group_active_) { grouped_works_.emplace_back(recv_work); } @@ -183,23 +188,28 @@ slimeBackend::slimeBackend(const c10::intrusive_ptr<::c10d::Store>& store, int r uint8_t ib_port = 1; size_t qp_num = SLIME_QP_NUM; + std::shared_ptr context = std::make_shared(); + rdma_worker_ = std::make_shared(dev_name, rank); + context->init(dev_name, ib_port, link_type); for (int i = 0; i < size - 1; ++i) { + auto endpoint = std::make_shared(context, qp_num); // TODO: the different end_point in the rank can use different RDMA dev to transmit the message. - end_point_set_.push_back(std::make_shared(dev_name, ib_port, link_type, qp_num)); + end_point_set_.push_back(endpoint); + rdma_worker_->addEndpoint(endpoint); json channel_info; - channel_info["data_channel"] = end_point_set_[i]->dataCtxInfo(); - channel_info["meta_channel"] = end_point_set_[i]->metaCtxInfo(); + channel_info = end_point_set_[i]->endpointInfo(); local_channel_info_.push_back(channel_info); } + rdma_worker_->start(); exchangeChannelInfo(); try { for (int i = 0; i < size_ - 1; ++i) { json cur_channel_info = global_channel_info_[mod_positive(rank_ + i + 1, size_)][size_ - 2 - i]; - end_point_set_[i]->connect(cur_channel_info["data_channel"], cur_channel_info["meta_channel"]); + end_point_set_[i]->connect(cur_channel_info); } } catch (const std::runtime_error& e) { diff --git a/csrc/torch/slime_backend.h b/csrc/torch/slime_backend.h index 3f756e1..8c5485f 100644 --- a/csrc/torch/slime_backend.h +++ b/csrc/torch/slime_backend.h @@ -11,7 +11,8 @@ #include #include "engine/rdma/rdma_endpoint_v0.h" -#include "engine/rdma/utils.h" +#include "engine/rdma/rdma_utils.h" +#include "engine/rdma/rdma_worker.h" namespace slime { namespace c10d { @@ -22,7 +23,10 @@ class TORCH_API SendWork: public ::c10d::Work { friend class slimeBackend; public: - explicit SendWork(std::vector& tensor, std::shared_ptr<::slime::RDMABuffer> buffer, uint64_t seq); + explicit SendWork(std::vector& tensor, + std::shared_ptr endpoint, + int32_t slot_id, + uint64_t seq); bool wait(std::chrono::milliseconds timeout = kNoTimeout) override; void abort() override { @@ -30,17 +34,21 @@ class TORCH_API SendWork: public ::c10d::Work { } protected: - std::vector tensor_; - std::shared_ptr<::slime::RDMABuffer> buffer_; - int dstRank_; - const uint64_t seq_; + std::vector tensor_; + std::shared_ptr endpoint_; + int32_t slot_id_; + int dstRank_; + const uint64_t seq_; }; class TORCH_API RecvWork: public ::c10d::Work { friend class slimeBackend; public: - explicit RecvWork(std::vector& tensor, std::shared_ptr<::slime::RDMABuffer> buffer, uint64_t seq); + explicit RecvWork(std::vector& tensor, + std::shared_ptr endpoint, + int32_t slot_id, + uint64_t seq); bool wait(std::chrono::milliseconds timeout = kNoTimeout) override; void abort() override { @@ -48,10 +56,11 @@ class TORCH_API RecvWork: public ::c10d::Work { } protected: - std::vector tensor_; - std::shared_ptr<::slime::RDMABuffer> buffer_; - int srcRank_; - const uint64_t seq_; + std::vector tensor_; + std::shared_ptr endpoint_; + int32_t slot_id_; + int dstRank_; + const uint64_t seq_; }; class GroupWork: public ::c10d::Work { @@ -199,12 +208,13 @@ class TORCH_API slimeBackend: public ::c10d::Backend { } private: - void exchangeChannelInfo(); - c10::intrusive_ptr<::c10d::Store> store_; + void exchangeChannelInfo(); + c10::intrusive_ptr<::c10d::Store> store_; + std::shared_ptr rdma_worker_; std::vector> end_point_set_; - std::vector local_channel_info_; - std::vector global_channel_info_; - uint64_t seq_{0}; + std::vector local_channel_info_; + std::vector global_channel_info_; + uint64_t seq_{0}; // for batched_isend_irecv bool group_active_{false};