@@ -35,18 +35,17 @@ Below we show an AllReduce example and then detail each integration option.
3535### Example: AllReduce in the MSCCL++ DSL
3636The snippet defines an AllReduce that uses NVLS for intra-node reduce-scatter followed by broadcast.
3737``` python
38- def allreduce_example (name , gpu_size , num_threads_per_block , min_message_size , max_message_size ):
39- chunksperloop = 1
40- collective = AllReduce(gpu_size, chunksperloop, True )
41- with CollectiveProgram (
42- name,
43- collective,
38+ def allreduce_nvls (spec : mscclpp.AlgoSpec) -> CollectiveProgram:
39+ gpu_size = spec.world_size
40+ with CollectiveProgram(
41+ spec.name,
42+ spec.collective,
4443 gpu_size,
4544 instances = 8 ,
46- protocol = " Simple " ,
47- num_threads_per_block = num_threads_per_block,
48- min_message_size = min_message_size,
49- max_message_size = max_message_size,
45+ protocol = spec.protocol ,
46+ num_threads_per_block = spec. num_threads_per_block,
47+ min_message_size = spec. min_message_size,
48+ max_message_size = spec. max_message_size,
5049 ) as program:
5150 # Creating Channels
5251 nvls_chan = SwitchChannel(rank_list = [gpu for gpu in range (gpu_size)], buffer_type = BufferType.input)
@@ -56,7 +55,7 @@ def allreduce_example(name, gpu_size, num_threads_per_block, min_message_size, m
5655 if peer != gpu:
5756 channels[(peer, gpu)] = MemoryChannel(peer, gpu)
5857
59- # Synchronization to ensure all GPUs are ready
58+ # Synchronization to Ensure all the Gpus are Ready
6059 for gpu in range (gpu_size):
6160 src_rank = gpu
6261 for peer in range (gpu_size):
@@ -67,8 +66,7 @@ def allreduce_example(name, gpu_size, num_threads_per_block, min_message_size, m
6766 if peer != src_rank:
6867 dst_rank = peer
6968 channels[(dst_rank, src_rank)].wait(tb = 0 , relaxed = True , data_sync = SyncType.after)
70-
71- # Reduce then broadcast one chunk per GPU
69+ # Reducing and Storing the data
7270 for gpu in range (gpu_size):
7371 buffer_offset = gpu
7472 rank = Rank(gpu)
@@ -79,8 +77,7 @@ def allreduce_example(name, gpu_size, num_threads_per_block, min_message_size, m
7977 nvls_chan.at_rank(gpu).broadcast(
8078 src_chunk = input_buffer[gpu : gpu + 1 ], buffer_offset = buffer_offset, size = 1 , tb = 0
8179 )
82-
83- # Synchronization to ensure all GPUs finish
80+ # Synchronization to Ensure the Gpus finished
8481 for gpu in range (gpu_size):
8582 src_rank = gpu
8683 for peer in range (gpu_size):
@@ -97,91 +94,12 @@ def allreduce_example(name, gpu_size, num_threads_per_block, min_message_size, m
9794
9895### Integrate with MSCCL++ customized communicator
9996Use when you want a PyTorch‑compatible interface with fine‑grained control. You manage the communicator, compile/register DSL plans, and invoke collectives via a thin wrapper. The example below shows an AllReduce built on the MSCCL++ communicator and executor.
100-
101- ``` python
102- class CustomizedComm :
103- """ High-level MSCCL++ wrapper compatible with PyTorch-style collectives."""
104-
105- def __init__ (self , comm : mscclpp_comm.CommGroup):
106- self .comm = comm
107- self .rank = comm.my_rank
108- self .world_size = comm.nranks
109- self .local_rank = comm.my_rank % comm.nranks_per_node
110- self .n_ranks_per_node = comm.nranks_per_node
111-
112- # Initialize MSCCL++ components
113- self .registry = mscclpp.ExecutionPlanRegistry()
114- self .executor = mscclpp.Executor(comm.communicator)
115-
116- def all_reduce (self , tensor : torch.Tensor, op = torch.distributed.ReduceOp.SUM , stream : torch.cuda.Stream = None ):
117- """ Performs an AllReduce operation using a native MSCCL++ plan."""
118- assert op == torch.distributed.ReduceOp.SUM
119-
120- # Select an appropriate execution plan
121- plan = self .registry.select(
122- collective = " allreduce" ,
123- world_size = self .world_size,
124- n_ranks_per_node = self .n_ranks_per_node,
125- send_buffer = tensor.data_ptr(),
126- recv_buffer = tensor.data_ptr(),
127- message_size = tensor.numel() * tensor.element_size(),
128- )
129- if plan is None :
130- raise ValueError (
131- f " No suitable plan found for collective allreduce with message size { tensor.numel() * tensor.element_size()} "
132- )
133-
134- # Execute the plan using the MSCCL++ executor
135- self .executor.execute(
136- self .rank,
137- tensor.data_ptr(),
138- tensor.data_ptr(),
139- tensor.numel() * tensor.element_size(),
140- tensor.numel() * tensor.element_size(),
141- dtype_to_mscclpp_dtype(tensor.dtype),
142- plan.plan,
143- stream.cuda_stream if stream is not None else 0 ,
144- )
97+ Example source directory:
14598```
146-
147- #### Usage Example
148-
149- ``` python
150- from mscclpp.dsl import presets, jit
151- import mscclpp
152-
153- # Step 1. Compile and register a DSL plan
154- plan = jit.compile(
155- algo = allreduce_nvls,
156- name = " allreduce_nvls" ,
157- collective = " allreduce" ,
158- nranks_per_node = 8 ,
159- world_size = world_size,
160- instances = 2 ,
161- protocol = " Simple" ,
162- num_threads_per_block = 1024 ,
163- min_msg_size = 1 << 20 ,
164- max_msg_size = 48 << 30 ,
165- tags = {" nvls" },
166- )
167- mscclpp.plan.register(plan)
168-
169- # Step 2. Define a plan selector (choose algorithm based on tags, message size, etc.)
170- def selector (plans : Dict[str , mscclpp.PlanHandle], req : mscclpp.Request):
171- collective_plans = plans.get(req.collective)
172- nvls = [p for p in collective_plans if " nvls" in p.tags]
173- return nvls[0 ] if nvls else collective_plans[0 ]
174-
175- mscclpp.plan.set_selector(selector)
176-
177- # Step 3. Initialize communicator and high-level wrapper
178- mscclpp_group = mscclpp.comm.CommGroup(interfaceIpPortTrio = ifIpPortTrio, rank = rank, size = world_size)
179- comm = CustomizedComm(mscclpp_group)
180-
181- # Step 4. Perform the AllReduce operation
182- x = torch.randn(12 << 20 , dtype = torch.float16, device = " cuda" )
183- comm.all_reduce(x, op = torch.distributed.ReduceOp.SUM )
99+ examples/torch-integration
184100```
101+ Key file: ` customized_comm.py ` .
102+
185103
186104#### Launch (single node)
187105``` bash
@@ -190,48 +108,16 @@ MSCCLPP_MASTER_ADDR=<master_ip> MSCCLPP_MASTER_PORT=<port> torchrun --nnodes=1 -
190108
191109### Integrate via NCCL Interposition
192110Keep your script as‑is: init PyTorch with backend="nccl"; MSCCL++ intercepts NCCL calls for drop‑in acceleration.
193-
194- ``` python
195- import torch, torch.distributed as dist
196- from mscclpp.dsl import presets, jit
197- import mscclpp
198-
199- # Step 1. Initialize the PyTorch distributed process group using the NCCL backend
200- dist.init_process_group(backend = " nccl" )
201-
202- # Step 2. Compile and register an MSCCL++ DSL plan
203- plan = jit.compile(
204- algo = allreduce_nvls,
205- name = " allreduce_nvls" ,
206- collective = " allreduce" ,
207- nranks_per_node = 8 ,
208- world_size = world_size,
209- instances = 2 ,
210- protocol = " Simple" ,
211- num_threads_per_block = 1024 ,
212- min_msg_size = 1 << 20 ,
213- max_msg_size = 48 << 30 ,
214- tags = {" nvls" },
215- )
216- mscclpp.plan.register(plan)
217-
218- # Step 3. Define and set a selector to choose the appropriate plan at runtime
219- def selector (plans , req ):
220- collective_plans = plans.get(req.collective)
221- nvls = [p for p in collective_plans if " nvls" in p.tags]
222- return nvls[0 ] if nvls else collective_plans[0 ]
223-
224- mscclpp.plan.set_selector(selector)
225-
226- # Step 4. Perform the AllReduce as usual
227- x = torch.randn(12 << 20 , dtype = torch.float16, device = " cuda" )
228- dist.all_reduce(x, op = dist.ReduceOp.SUM )
111+ Example source directory:
112+ ```
113+ examples/torch-integration
229114```
115+ Key file: ` dsl_with_nccl_api.py ` .
230116
231117#### Launch with interposition
232118To run with NCCL interposition, you preload the MSCCL++ shim so it transparently intercepts NCCL calls made by PyTorch’s nccl backend.
233119``` bash
234- LD_PRELOAD=< MSCCLPP_REPO> /build/apps/nccl/libmscclpp_nccl.so torchrun --nnodes=1 --nproc_per_node=8 dsl-torch-integration/ dsl_with_nccl_api.py
120+ LD_PRELOAD=< MSCCLPP_REPO> /build/apps/nccl/libmscclpp_nccl.so torchrun --nnodes=1 --nproc_per_node=8 dsl_with_nccl_api.py
235121```
236122## Notices:
237123 - When using NCCL interposition, the algorithm selection order is:
0 commit comments