Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bench/python/dlslime_torch_dist_sendrecv_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def benchmark_send_recv(args):
parser.add_argument(
"--master-addr",
type=str,
default="10.102.207.84",
default="127.0.0.1",
help="Master address for distributed training",
)
parser.add_argument(
Expand Down Expand Up @@ -166,4 +166,5 @@ def benchmark_send_recv(args):
os.environ["NCCL_P2P_DISABLE"] = "1"
os.environ["NCCL_SHM_DISABLE"] = "1"
import time

benchmark_send_recv(args)
89 changes: 35 additions & 54 deletions bench/python/endpoint_sendrecv_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,29 @@ def get_readable_size(size_in_bytes):
def run_benchmark(device_type="cuda", num_qp=1, iterations=200):
# 检查设备
nic_devices = available_nic()
if len(nic_devices) < 2:
print(
f"Warning: Only {len(nic_devices)} RDMA device found. Trying loopback on device 0 if possible, or script might fail."
)
dev0 = nic_devices[0]
dev1 = nic_devices[0] # Loopback
else:
dev0 = nic_devices[0]
dev1 = nic_devices[1]

print(f"Initializing Endpoints: Send[{dev0}] <-> Recv[{dev1}]")
dev = nic_devices[0]

print(f"Initializing Endpoints: Send[{dev}] <-> Recv[{dev}]")
print(f"Tensor Device: {device_type.upper()}")

# 初始化 Endpoint
send_endpoint = _slime_c.rdma_endpoint(dev0, 1, "RoCE", num_qp)
recv_endpoint = _slime_c.rdma_endpoint(dev1, 1, "RoCE", num_qp)

ctx = _slime_c.rdma_context()
ctx.init_rdma_context(dev, 1, "RoCE")

worker= _slime_c.rdma_worker(dev, 0)

send_endpoint = _slime_c.rdma_endpoint(ctx, num_qp)
recv_endpoint = _slime_c.rdma_endpoint(ctx, num_qp)

worker.add_endpoint(send_endpoint)
worker.add_endpoint(recv_endpoint)

# 建立连接
send_endpoint.context_connect(
recv_endpoint.get_data_context_info(), recv_endpoint.get_meta_context_info()
)
recv_endpoint.context_connect(
send_endpoint.get_data_context_info(), send_endpoint.get_meta_context_info()
)
send_endpoint.connect(recv_endpoint.endpoint_info())
recv_endpoint.connect(send_endpoint.endpoint_info())

worker.start()

# 定义测试大小:2KB 到 128MB
# 2KB = 2 * 1024
Expand Down Expand Up @@ -74,40 +73,26 @@ def run_benchmark(device_type="cuda", num_qp=1, iterations=200):
)
recv_tensor = torch.zeros((size,), dtype=torch.uint8, device="cpu")

# 2. 注册 RDMA Buffer (MR 注册通常发生在这里)
send_buffer = _slime_c.rdma_buffer(
send_endpoint,
send_tensor.data_ptr(),
send_tensor.storage_offset(),
send_tensor.numel(),
)
recv_buffer = _slime_c.rdma_buffer(
recv_endpoint,
recv_tensor.data_ptr(),
recv_tensor.storage_offset(),
recv_tensor.numel(),
)

# 3. 预热 (Warmup)
# 让 MR 建立,TLB 预热,消除第一次慢启动的影响
warmup_iters = 10
warmup_iters = 1
for _ in range(warmup_iters):
send_buffer = _slime_c.rdma_buffer(
send_endpoint,
send_slot = send_endpoint.send(
send_tensor.data_ptr(),
send_tensor.storage_offset(),
send_tensor.numel(),
None,
)
recv_buffer = _slime_c.rdma_buffer(
recv_endpoint,

recv_slot = recv_endpoint.recv(
recv_tensor.data_ptr(),
recv_tensor.storage_offset(),
recv_tensor.numel(),
None,
)
recv_buffer.recv(None)
send_buffer.send(None)
send_buffer.wait_send()
recv_buffer.wait_recv()

send_endpoint.wait_send(send_slot)
recv_endpoint.wait_recv(recv_slot)

if device_type == "cuda":
torch.cuda.synchronize()
Expand All @@ -116,26 +101,22 @@ def run_benchmark(device_type="cuda", num_qp=1, iterations=200):
t_start = time.perf_counter()

for _ in range(iterations):
send_buffer = _slime_c.rdma_buffer(
send_endpoint,
send_slot = send_endpoint.send(
send_tensor.data_ptr(),
send_tensor.storage_offset(),
send_tensor.numel(),
None,
)
recv_buffer = _slime_c.rdma_buffer(
recv_endpoint,

recv_slot = recv_endpoint.recv(
recv_tensor.data_ptr(),
recv_tensor.storage_offset(),
recv_tensor.numel(),
None,
)
# 标准 RDMA 流程:先 Post Recv,再 Post Send
recv_buffer.recv(None)
send_buffer.send(None)
# 等待完成
# 注意:Stop-and-Wait 模式。如果是流水线模式,吞吐量会更高,
# 但这里我们测的是单次操作的 Latency
send_buffer.wait_send()
recv_buffer.wait_recv()

send_endpoint.wait_send(send_slot)
recv_endpoint.wait_recv(recv_slot)

if device_type == "cuda":
torch.cuda.synchronize()
Expand Down
2 changes: 1 addition & 1 deletion csrc/device/host/host_only.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace device {

std::shared_ptr<DeviceSignal> createSignal(bool bypass)
{
SLIME_LOG_INFO("create signal cpu.");
SLIME_LOG_DEBUG("create signal cpu.");
return std::make_shared<HostOnlySignal>();
}

Expand Down
9 changes: 6 additions & 3 deletions csrc/engine/assignment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
namespace slime {
json Assignment::dump() const
{
return json{
"Assignment",
{{"mr_key", mr_key}, {"remote_mr_key", remote_mr_key}, {"target_offset", target_offset}, {"source_offset", source_offset}, {"length", length}}};
return json{"Assignment",
{{"mr_key", mr_key},
{"remote_mr_key", remote_mr_key},
{"target_offset", target_offset},
{"source_offset", source_offset},
{"length", length}}};
}

std::ostream& operator<<(std::ostream& os, const Assignment& assignment)
Expand Down
3 changes: 2 additions & 1 deletion csrc/engine/rdma/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ add_library(
ibv_helper.cpp
memory_pool.cpp
rdma_assignment.cpp
rdma_channel.cpp
rdma_context.cpp
rdma_endpoint_v0.cpp
rdma_buffer.cpp
rdma_worker.cpp
)

target_link_libraries(_slime_rdma PUBLIC _slime_device _slime_engine numa ibverbs)
Expand Down
6 changes: 1 addition & 5 deletions csrc/engine/rdma/rdma_assignment.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "rdma_assignment.h"

#include <cstdint>
#include <stdexcept>

namespace slime {
Expand All @@ -14,11 +15,6 @@ void RDMAAssign::reset(OpCode opcode, size_t qpi, AssignmentBatch& batch, callba
}
else {
callback_ = [this](int code, int imm_data) {
if (code != 0) {
for (int i = 0; i < batch_size_; ++i) {
SLIME_LOG_ERROR("ERROR ASSIGNMENT: ", batch_[i].dump());
}
}
finished_.fetch_add(1, std::memory_order_release);
};
}
Expand Down
1 change: 1 addition & 0 deletions csrc/engine/rdma/rdma_assignment.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ static const std::map<OpCode, ibv_wr_opcode> ASSIGN_OP_2_IBV_WR_OP = {
struct alignas(64) RDMAAssign {
static constexpr size_t MAX_ASSIGN_CAPACITY = 4096;
friend class RDMAContext;
friend class RDMAChannel;
friend std::ostream& operator<<(std::ostream& os, const RDMAAssign& assignment);

public:
Expand Down
36 changes: 0 additions & 36 deletions csrc/engine/rdma/rdma_buffer.cpp

This file was deleted.

105 changes: 0 additions & 105 deletions csrc/engine/rdma/rdma_buffer.h

This file was deleted.

Loading