From 82b8011a616e7b7e9f56bc6f34f154517d26fa86 Mon Sep 17 00:00:00 2001 From: UsernameFull Date: Wed, 4 Feb 2026 10:42:28 +0800 Subject: [PATCH 1/8] fix: add sft support on npu # Conflicts: # roll/pipeline/sft/sft_pipeline.py --- roll/distributed/strategy/megatron_strategy.py | 2 +- roll/pipeline/sft/sft_pipeline.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/roll/distributed/strategy/megatron_strategy.py b/roll/distributed/strategy/megatron_strategy.py index 890527077..3b95c4928 100644 --- a/roll/distributed/strategy/megatron_strategy.py +++ b/roll/distributed/strategy/megatron_strategy.py @@ -1263,7 +1263,7 @@ def save_checkpoint(self, save_dir, global_step, ckpt_id, tag="checkpoint", loca validate_access_integrity=self._validate_access_integrity, ) self._validate_access_integrity = False - elif not dist.is_initialized() or mpu.get_data_modulo_expert_parallel_rank() == 0: + elif not dist.is_initialized() or mpu.get_expert_data_parallel_rank() == 0: torch.save(self.optimizer.state_dict(), os.path.join(checkpoint_dir, OPTIMIZER_NAME)) logger.info(f"Saving optimizer state to {os.path.join(checkpoint_dir, OPTIMIZER_NAME)}") diff --git a/roll/pipeline/sft/sft_pipeline.py b/roll/pipeline/sft/sft_pipeline.py index ed21b9553..415bdbebf 100644 --- a/roll/pipeline/sft/sft_pipeline.py +++ b/roll/pipeline/sft/sft_pipeline.py @@ -4,9 +4,9 @@ import numpy as np import ray import torch -from tqdm import tqdm from codetiming import Timer from torch.utils.data import DataLoader +from tqdm import tqdm from roll.datasets.chat_template import get_chat_template from roll.datasets.collator import DataCollatorForSFT @@ -165,8 +165,8 @@ def __init__(self, pipeline_config: SFTConfig): self.pipeline_config.sequence_length, encode_function, num_proc=self.pipeline_config.sft_train.data_args.preprocessing_num_workers) - - global_val_batch_size = dp_size * ga_steps * self.pipeline_config.sft_train.infer_batch_size + + global_val_batch_size = dp_size * self.pipeline_config.sft_train.infer_batch_size self.val_dataloader = DataLoader( dataset=self.val_dataset, batch_size=global_val_batch_size, From fd9c3929066d993f0d091b78ca10349f533e3819 Mon Sep 17 00:00:00 2001 From: jiaqiw09 Date: Fri, 6 Feb 2026 16:04:33 +0800 Subject: [PATCH 2/8] feat: add npu mindspeed --- .../dpo_config.yaml | 106 ++++++++++++++++++ .../run_dpo_pipeline.sh | 5 + .../src/mcore_adapter/models/model_config.py | 70 +++++++++++- .../src/mcore_adapter/training_args.py | 6 + .../distributed/strategy/megatron_strategy.py | 7 ++ roll/platforms/__init__.py | 21 ++-- 6 files changed, 205 insertions(+), 10 deletions(-) create mode 100644 examples/qwen3-4B-dpo_megatron_ascend/dpo_config.yaml create mode 100644 examples/qwen3-4B-dpo_megatron_ascend/run_dpo_pipeline.sh diff --git a/examples/qwen3-4B-dpo_megatron_ascend/dpo_config.yaml b/examples/qwen3-4B-dpo_megatron_ascend/dpo_config.yaml new file mode 100644 index 000000000..1a45cd338 --- /dev/null +++ b/examples/qwen3-4B-dpo_megatron_ascend/dpo_config.yaml @@ -0,0 +1,106 @@ +defaults: + - ../config/deepspeed_zero@_here_ + - ../config/deepspeed_zero2@_here_ + - ../config/deepspeed_zero3@_here_ + - ../config/deepspeed_zero3_cpuoffload@_here_ + +hydra: + run: + dir: . + output_subdir: null + +exp_name: "qwen2.5-3B-dpo-config" +seed: 42 +logging_dir: ./output/logs +output_dir: ./output +system_envs: + USE_MODELSCOPE: '1' + +checkpoint_config: + type: file_system + output_dir: /data/cpfs_0/rl_examples/models/${exp_name} + + +track_name: None + + +max_steps: 500 +save_steps: 5000000 +logging_steps: 1 +eval_steps: 100 +resume_from_checkpoint: false + +sequence_length: 512 +train_batch_size: 64 +val_batch_size: 64 + +# local_rank: -1 +num_nodes: 1 +num_gpus_per_node: 4 + +pretrain: /home/wjq/Qwen3-4B + +ipo: false +beta: 0.1 +label_smoothing: 0.0 + +chosen_key: chosen +rejected_key: rejected + +validation: + data_args: + template: qwen2_5 + file_name: data/comparison_gpt4_data_zh.json + +actor_train: + model_args: + disable_gradient_checkpointing: false + dtype: bf16 + model_type: ~ + training_args: + lr_scheduler_type: constant + learning_rate: 1.0e-6 + weight_decay: 0 + per_device_train_batch_size: 16 + gradient_accumulation_steps: 1 + warmup_steps: 20 + num_train_epochs: 10 + data_args: + template: qwen2_5_dpo + file_name: + - data/comparison_gpt4_data_zh.json + dataset_dir: data + preprocessing_num_workers: 1 + strategy_args: + strategy_name: megatron_train + strategy_config: + tensor_model_parallel_size: 1 + pipeline_model_parallel_size: 1 + expert_model_parallel_size: 1 + use_distributed_optimizer: true + recompute_granularity: full + additional_configs: + mindspeed_args: + attention_mask_type: general + device_mapping: list(range(0,2)) + infer_batch_size: 16 + + +reference: + model_args: + disable_gradient_checkpointing: true + dtype: bf16 + model_type: ~ + data_args: + template: qwen2_5 + strategy_args: + strategy_name: megatron_infer + strategy_config: + tensor_model_parallel_size: 1 + pipeline_model_parallel_size: 1 + expert_model_parallel_size: 1 + additional_configs: + mindspeed_args: + attention_mask_type: general + device_mapping: list(range(2,4)) + infer_batch_size: 16 \ No newline at end of file diff --git a/examples/qwen3-4B-dpo_megatron_ascend/run_dpo_pipeline.sh b/examples/qwen3-4B-dpo_megatron_ascend/run_dpo_pipeline.sh new file mode 100644 index 000000000..d9223c28c --- /dev/null +++ b/examples/qwen3-4B-dpo_megatron_ascend/run_dpo_pipeline.sh @@ -0,0 +1,5 @@ +#!/bin/bash +set +x + +CONFIG_PATH=$(basename $(dirname $0)) +python examples/start_dpo_pipeline.py --config_path $CONFIG_PATH --config_name dpo_config diff --git a/mcore_adapter/src/mcore_adapter/models/model_config.py b/mcore_adapter/src/mcore_adapter/models/model_config.py index 511487e73..c69950c9b 100644 --- a/mcore_adapter/src/mcore_adapter/models/model_config.py +++ b/mcore_adapter/src/mcore_adapter/models/model_config.py @@ -6,7 +6,7 @@ import os import shutil from dataclasses import dataclass, field, fields -from typing import TYPE_CHECKING, Literal, Optional +from typing import TYPE_CHECKING, Literal, Optional, Union import torch import torch.nn.functional as F @@ -18,6 +18,7 @@ from ..constants import HUGGINGFACE_AUTOMAP_CACHE, MCA_CONFIG_NAME from ..initialize import initialize_megatron from ..training_args import DistributingParallelArguments, TrainingArguments +from ..platforms import current_platform from ..utils import get_logger from .converter.template import get_template from .model_utils import check_and_get_attention_backend_by_env @@ -295,8 +296,14 @@ class McaModelConfig(TransformerConfig, PretrainedConfig): "choices": ["local", "transformer_engine"], }, ) + mindspeed_args: Optional[Union[dict, list, str]] = field( + default=None, + metadata={"help": "Extra MindSpeed args as dict, list, or JSON string/path."}, + ) def __post_init__(self): + self._augment_mindspeed_defaults() + if self.virtual_pipeline_model_parallel_size is None and self.overlap_p2p_comm: self.overlap_p2p_comm = False logger.warning("Non-interleaved pipeline parallelism does not support overlapping p2p communication!") @@ -406,10 +413,69 @@ def distribute_config_match(self, other: "McaModelConfig"): ] ) + def _build_mindspeed_argv(self): + if self.mindspeed_args is None: + return [] + if isinstance(self.mindspeed_args, dict): + argv = [] + for key, value in self.mindspeed_args.items(): + flag = key if key.startswith("-") else f"--{key.replace('_', '-')}" + if isinstance(value, bool): + if value: + argv.append(flag) + continue + if value is None: + continue + if isinstance(value, (list, tuple)): + argv.append(flag) + argv.extend([str(v) for v in value]) + else: + argv.extend([flag, str(value)]) + return argv + if isinstance(self.mindspeed_args, (list, tuple)): + return [str(v) for v in self.mindspeed_args] + return [] + + def _augment_mindspeed_defaults(self): + if not current_platform.is_npu(): + return + if getattr(McaModelConfig, "_mindspeed_defaults_cache", None) is None: + McaModelConfig._mindspeed_defaults_cache = {} + argv = self._build_mindspeed_argv() + cache_key = tuple(argv) + if cache_key not in McaModelConfig._mindspeed_defaults_cache: + defaults = {} + try: + from mindspeed.arguments import process_args + from argparse import ArgumentParser + import mindspeed.features_manager as mfm + + original_features = list(mfm.FEATURES_LIST) + full_features = mfm.create_features_list() + mfm.FEATURES_LIST.clear() + mfm.FEATURES_LIST.extend(full_features) + try: + parser = ArgumentParser() + process_args(parser) + args, _ = parser.parse_known_args(argv) + defaults = vars(args) + finally: + mfm.FEATURES_LIST.clear() + mfm.FEATURES_LIST.extend(original_features) + except Exception: + defaults = {} + McaModelConfig._mindspeed_defaults_cache[cache_key] = defaults + mindspeed_defaults = McaModelConfig._mindspeed_defaults_cache.get(cache_key, {}) + if mindspeed_defaults: + for name, value in mindspeed_defaults.items(): + normalized_name = name.replace("-", "_") + if not hasattr(self, normalized_name): + setattr(self, normalized_name, value) + @dataclass class MLAMcaModelConfig(McaModelConfig, MLATransformerConfig): multi_latent_attention: Optional[bool] = field(default=True, metadata={"help": "Whether use mla"}) def __post_init__(self): - super().__post_init__() + super().__post_init__() \ No newline at end of file diff --git a/mcore_adapter/src/mcore_adapter/training_args.py b/mcore_adapter/src/mcore_adapter/training_args.py index 412322d11..794d10cc7 100644 --- a/mcore_adapter/src/mcore_adapter/training_args.py +++ b/mcore_adapter/src/mcore_adapter/training_args.py @@ -2,6 +2,12 @@ from dataclasses import dataclass, field, fields from typing import Literal, Optional, Union +try: + # NPU patch + import mindspeed.megatron_adaptor +except ImportError: + pass + from megatron.core.transformer.pipeline_parallel_layer_layout import PipelineParallelLayerLayout from transformers import Seq2SeqTrainingArguments as HFSeq2SeqTrainingArguments from transformers import TrainingArguments as HFTrainingArguments diff --git a/roll/distributed/strategy/megatron_strategy.py b/roll/distributed/strategy/megatron_strategy.py index 3b95c4928..18c435fc0 100644 --- a/roll/distributed/strategy/megatron_strategy.py +++ b/roll/distributed/strategy/megatron_strategy.py @@ -416,6 +416,13 @@ def inner_forward_step(self, loss_func, data_iterator: Iterator[DataProto], mode else: input_ids = self._get_feature_on_this_cp_rank(input_ids, "input_ids") attention_mask = self._get_feature_on_this_cp_rank(attention_mask, "attention_mask") + + if hasattr(torch, "npu") and torch.npu.is_available() and attention_mask is not None: + attention_mask = attention_mask.bool() + B, S = attention_mask.shape + attention_mask = attention_mask[:, None, None, :] # [B,1,1,S] + attention_mask = attention_mask.expand(B, 1, S, S) # [B,1,S,S] + if labels is not None: labels = self._get_feature_on_this_cp_rank(labels, "labels") position_ids = None diff --git a/roll/platforms/__init__.py b/roll/platforms/__init__.py index 6869621f4..c9dff3f15 100644 --- a/roll/platforms/__init__.py +++ b/roll/platforms/__init__.py @@ -25,26 +25,31 @@ def _init_platform() -> Platform: Returns: An instance of a subclass of Platform corresponding to the detected hardware. """ + try: + import torch_npu # noqa: F401 + + if hasattr(torch, "npu") and torch.npu.is_available(): + logger.debug("Detected NPU (torch_npu). Initializing NPU platform.") + return NpuPlatform() + except ImportError: + pass + if torch.cuda.is_available(): device_name = torch.cuda.get_device_name().upper() logger.debug(f"Detected CUDA device: {device_name}") + if "NVIDIA" in device_name: logger.debug("Initializing CUDA platform (NVIDIA).") return CudaPlatform() elif "AMD" in device_name: logger.debug("Initializing ROCm platform (AMD).") return RocmPlatform() + logger.warning("Unrecognized CUDA device. Falling back to UnknownPlatform.") return UnknownPlatform() - else: - try: - import torch_npu # noqa: F401 - logger.debug("Detected torch_npu. Initializing NPU platform.") - return NpuPlatform() - except ImportError: - logger.debug("No supported accelerator detected. Initializing CPU platform.") - return CpuPlatform() + logger.debug("No supported accelerator detected. Initializing CPU platform.") + return CpuPlatform() # Global singleton representing the current platform in use. From f7759ee6e66dccef2cbe4d7ff460eff2735f0c0b Mon Sep 17 00:00:00 2001 From: UsernameFull Date: Tue, 10 Feb 2026 17:02:03 +0800 Subject: [PATCH 3/8] feat: add NPU (Ascend) support for FSDP2, vLLM, model update, and platform detection --- .../src/mcore_adapter/models/model_config.py | 2 +- .../src/mcore_adapter/platforms/__init__.py | 21 +++++++++++-------- roll/distributed/strategy/fsdp2_strategy.py | 21 ++++++++++++++----- roll/pipeline/sft/sft_pipeline.py | 2 +- roll/third_party/fsdp2/model_update.py | 10 +++++++++ roll/third_party/vllm/__init__.py | 7 ++++++- 6 files changed, 46 insertions(+), 17 deletions(-) diff --git a/mcore_adapter/src/mcore_adapter/models/model_config.py b/mcore_adapter/src/mcore_adapter/models/model_config.py index c69950c9b..0d8674202 100644 --- a/mcore_adapter/src/mcore_adapter/models/model_config.py +++ b/mcore_adapter/src/mcore_adapter/models/model_config.py @@ -478,4 +478,4 @@ class MLAMcaModelConfig(McaModelConfig, MLATransformerConfig): multi_latent_attention: Optional[bool] = field(default=True, metadata={"help": "Whether use mla"}) def __post_init__(self): - super().__post_init__() \ No newline at end of file + super().__post_init__() diff --git a/mcore_adapter/src/mcore_adapter/platforms/__init__.py b/mcore_adapter/src/mcore_adapter/platforms/__init__.py index ca92058d6..0a99237cf 100644 --- a/mcore_adapter/src/mcore_adapter/platforms/__init__.py +++ b/mcore_adapter/src/mcore_adapter/platforms/__init__.py @@ -24,26 +24,29 @@ def _init_platform() -> Platform: Returns: An instance of a subclass of Platform corresponding to the detected hardware. """ + try: + if hasattr(torch, "npu") and torch.npu.is_available(): + logger.debug("Detected NPU (torch_npu). Initializing NPU platform.") + return NpuPlatform() + except ImportError: + pass + if torch.cuda.is_available(): device_name = torch.cuda.get_device_name().upper() logger.debug(f"Detected CUDA device: {device_name}") + if "NVIDIA" in device_name: logger.debug("Initializing CUDA platform (NVIDIA).") return CudaPlatform() elif "AMD" in device_name: logger.debug("Initializing ROCm platform (AMD).") return RocmPlatform() + logger.warning("Unrecognized CUDA device. Falling back to UnknownPlatform.") return UnknownPlatform() - else: - try: - import torch_npu # noqa: F401 - - logger.debug("Detected torch_npu. Initializing NPU platform.") - return NpuPlatform() - except ImportError: - logger.debug("No supported accelerator detected. Initializing CPU platform.") - return CpuPlatform() + + logger.debug("No supported accelerator detected. Initializing CPU platform.") + return CpuPlatform() # Global singleton representing the current platform in use. diff --git a/roll/distributed/strategy/fsdp2_strategy.py b/roll/distributed/strategy/fsdp2_strategy.py index 92429fa1a..ea438f3da 100644 --- a/roll/distributed/strategy/fsdp2_strategy.py +++ b/roll/distributed/strategy/fsdp2_strategy.py @@ -11,7 +11,6 @@ import torch.distributed as dist import torch.distributed.checkpoint as dcp from codetiming import Timer -from flash_attn.bert_padding import index_first_axis, pad_input, rearrange, unpad_input from torch import optim from torch.distributed.checkpoint.state_dict import StateDictOptions, get_model_state_dict from torch.distributed.device_mesh import init_device_mesh @@ -438,16 +437,23 @@ def load_checkpoint(self, load_dir, tag="checkpoint", **kwargs): def get_rng_state(): rng_state = { "cpu": torch.get_rng_state(), - "cuda": torch.cuda.get_rng_state(), "numpy": np.random.get_state(), "random": random.getstate(), } + if current_platform.device_type == "cuda": + rng_state["device"] = torch.cuda.get_rng_state() + elif current_platform.device_type == "npu": + rng_state["device"] = torch.npu.get_rng_state() return rng_state @staticmethod def load_rng_state(rng_state): torch.set_rng_state(rng_state["cpu"]) - torch.cuda.set_rng_state(rng_state["cuda"]) + if "device" in rng_state: + if current_platform.device_type == "cuda": + torch.cuda.set_rng_state(rng_state["device"]) + elif current_platform.device_type == "npu": + torch.npu.set_rng_state(rng_state["device"]) np.random.set_state(rng_state["numpy"]) random.setstate(rng_state["random"]) @@ -484,7 +490,7 @@ def _gather_full_tensor(self, param: torch.nn.Parameter) -> torch.Tensor: tensor = param.data if hasattr(param, "data") else param if isinstance(tensor, DTensor): original_device = tensor.device - if original_device.type == "cpu" and current_platform.device_type == "cuda" and torch.cuda.is_available(): + if original_device.type == "cpu" and current_platform.device_type != "cpu": tensor = tensor.to(current_platform.device_type) tensor = tensor.full_tensor() if original_device.type == "cpu": @@ -704,6 +710,8 @@ def offload_states(self, include=None, non_blocking=False): self.model.to("cpu", non_blocking=non_blocking) if current_platform.device_type == "cuda": torch.cuda.empty_cache() + elif current_platform.device_type == "npu": + torch.npu.empty_cache() # When cpu_offload is disabled, optimizer states should stay on GPU # Only offload optimizer states if cpu_offload is enabled else: @@ -1268,7 +1276,10 @@ def train_step( self.scheduler.step() self.optimizer.zero_grad(set_to_none=True) - torch.cuda.empty_cache() + if current_platform.device_type == "cuda": + torch.cuda.empty_cache() + elif current_platform.device_type == "npu": + torch.npu.empty_cache() return metrics def setup_model_update(self, infer_cluster, model_update_name: str): diff --git a/roll/pipeline/sft/sft_pipeline.py b/roll/pipeline/sft/sft_pipeline.py index 415bdbebf..97bd7b6a3 100644 --- a/roll/pipeline/sft/sft_pipeline.py +++ b/roll/pipeline/sft/sft_pipeline.py @@ -4,9 +4,9 @@ import numpy as np import ray import torch +from tqdm import tqdm from codetiming import Timer from torch.utils.data import DataLoader -from tqdm import tqdm from roll.datasets.chat_template import get_chat_template from roll.datasets.collator import DataCollatorForSFT diff --git a/roll/third_party/fsdp2/model_update.py b/roll/third_party/fsdp2/model_update.py index f575ef82d..1e6b3095e 100644 --- a/roll/third_party/fsdp2/model_update.py +++ b/roll/third_party/fsdp2/model_update.py @@ -8,6 +8,7 @@ from roll.configs.base_config import PPOConfig from roll.configs.worker_config import is_actor_infer_overlapping_with_any_cluster +from roll.platforms import current_platform from roll.utils.collective import collective from roll.utils.logging import get_logger from roll.utils.network_utils import collect_free_port, get_node_ip @@ -295,7 +296,16 @@ def _broadcast_to_infer_workers(self, named_weights) -> list[ray.ObjectRef]: for worker in self._broadcast_workers ] handles = [] + # Keep references to tensors moved to device to prevent premature deallocation + device_tensors = [] + for _, weight in named_weights: + # Ensure weight is on the correct device (e.g. NPU) if using HCCL/NCCL + if weight.device.type == "cpu" and current_platform.device_type != "cpu": + weight_device = weight.to(current_platform.device_type) + device_tensors.append(weight_device) + weight = weight_device + handles.append( collective.broadcast(tensor=weight, src_rank=0, group_name=self.model_update_group_name, async_op=True) ) diff --git a/roll/third_party/vllm/__init__.py b/roll/third_party/vllm/__init__.py index 5fc9b504d..4977091a1 100644 --- a/roll/third_party/vllm/__init__.py +++ b/roll/third_party/vllm/__init__.py @@ -58,7 +58,12 @@ async def create_async_llm(resource_placement_groups: List[Dict], **kwargs): os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "" # torch.cuda may already init, explicitly disable expandable_segments # here (only matters when VLLM_USE_RAY_SPMD_WORKER=0) - torch.cuda.memory._set_allocator_settings("expandable_segments:False") + from roll.platforms import current_platform + if current_platform.is_npu(): + # use env variable to disable expandable_segments + os.environ["PYTORCH_NPU_ALLOC_CONF"] = "expandable_segments:False" + else: + torch.cuda.memory._set_allocator_settings("expandable_segments:False") os.environ["VLLM_CACHE_ROOT"] = os.path.join(get_default_cache_root(), "vllm", os.environ.get("WORKER_NAME", "")) From 9e216c3d66ee087d2e0728e76927439c4a0f3126 Mon Sep 17 00:00:00 2001 From: UsernameFull Date: Wed, 11 Feb 2026 10:37:00 +0800 Subject: [PATCH 4/8] Revert "adapt mindspeed" This reverts commit 8880ed0a2f3e388f71ae06c56dbc441dab5b5cef. --- .../dpo_config.yaml | 106 ------------------ .../run_dpo_pipeline.sh | 5 - .../src/mcore_adapter/models/model_config.py | 68 +---------- .../src/mcore_adapter/training_args.py | 6 - .../distributed/strategy/megatron_strategy.py | 7 -- roll/platforms/__init__.py | 21 ++-- 6 files changed, 9 insertions(+), 204 deletions(-) delete mode 100644 examples/qwen3-4B-dpo_megatron_ascend/dpo_config.yaml delete mode 100644 examples/qwen3-4B-dpo_megatron_ascend/run_dpo_pipeline.sh diff --git a/examples/qwen3-4B-dpo_megatron_ascend/dpo_config.yaml b/examples/qwen3-4B-dpo_megatron_ascend/dpo_config.yaml deleted file mode 100644 index 1a45cd338..000000000 --- a/examples/qwen3-4B-dpo_megatron_ascend/dpo_config.yaml +++ /dev/null @@ -1,106 +0,0 @@ -defaults: - - ../config/deepspeed_zero@_here_ - - ../config/deepspeed_zero2@_here_ - - ../config/deepspeed_zero3@_here_ - - ../config/deepspeed_zero3_cpuoffload@_here_ - -hydra: - run: - dir: . - output_subdir: null - -exp_name: "qwen2.5-3B-dpo-config" -seed: 42 -logging_dir: ./output/logs -output_dir: ./output -system_envs: - USE_MODELSCOPE: '1' - -checkpoint_config: - type: file_system - output_dir: /data/cpfs_0/rl_examples/models/${exp_name} - - -track_name: None - - -max_steps: 500 -save_steps: 5000000 -logging_steps: 1 -eval_steps: 100 -resume_from_checkpoint: false - -sequence_length: 512 -train_batch_size: 64 -val_batch_size: 64 - -# local_rank: -1 -num_nodes: 1 -num_gpus_per_node: 4 - -pretrain: /home/wjq/Qwen3-4B - -ipo: false -beta: 0.1 -label_smoothing: 0.0 - -chosen_key: chosen -rejected_key: rejected - -validation: - data_args: - template: qwen2_5 - file_name: data/comparison_gpt4_data_zh.json - -actor_train: - model_args: - disable_gradient_checkpointing: false - dtype: bf16 - model_type: ~ - training_args: - lr_scheduler_type: constant - learning_rate: 1.0e-6 - weight_decay: 0 - per_device_train_batch_size: 16 - gradient_accumulation_steps: 1 - warmup_steps: 20 - num_train_epochs: 10 - data_args: - template: qwen2_5_dpo - file_name: - - data/comparison_gpt4_data_zh.json - dataset_dir: data - preprocessing_num_workers: 1 - strategy_args: - strategy_name: megatron_train - strategy_config: - tensor_model_parallel_size: 1 - pipeline_model_parallel_size: 1 - expert_model_parallel_size: 1 - use_distributed_optimizer: true - recompute_granularity: full - additional_configs: - mindspeed_args: - attention_mask_type: general - device_mapping: list(range(0,2)) - infer_batch_size: 16 - - -reference: - model_args: - disable_gradient_checkpointing: true - dtype: bf16 - model_type: ~ - data_args: - template: qwen2_5 - strategy_args: - strategy_name: megatron_infer - strategy_config: - tensor_model_parallel_size: 1 - pipeline_model_parallel_size: 1 - expert_model_parallel_size: 1 - additional_configs: - mindspeed_args: - attention_mask_type: general - device_mapping: list(range(2,4)) - infer_batch_size: 16 \ No newline at end of file diff --git a/examples/qwen3-4B-dpo_megatron_ascend/run_dpo_pipeline.sh b/examples/qwen3-4B-dpo_megatron_ascend/run_dpo_pipeline.sh deleted file mode 100644 index d9223c28c..000000000 --- a/examples/qwen3-4B-dpo_megatron_ascend/run_dpo_pipeline.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash -set +x - -CONFIG_PATH=$(basename $(dirname $0)) -python examples/start_dpo_pipeline.py --config_path $CONFIG_PATH --config_name dpo_config diff --git a/mcore_adapter/src/mcore_adapter/models/model_config.py b/mcore_adapter/src/mcore_adapter/models/model_config.py index 0d8674202..511487e73 100644 --- a/mcore_adapter/src/mcore_adapter/models/model_config.py +++ b/mcore_adapter/src/mcore_adapter/models/model_config.py @@ -6,7 +6,7 @@ import os import shutil from dataclasses import dataclass, field, fields -from typing import TYPE_CHECKING, Literal, Optional, Union +from typing import TYPE_CHECKING, Literal, Optional import torch import torch.nn.functional as F @@ -18,7 +18,6 @@ from ..constants import HUGGINGFACE_AUTOMAP_CACHE, MCA_CONFIG_NAME from ..initialize import initialize_megatron from ..training_args import DistributingParallelArguments, TrainingArguments -from ..platforms import current_platform from ..utils import get_logger from .converter.template import get_template from .model_utils import check_and_get_attention_backend_by_env @@ -296,14 +295,8 @@ class McaModelConfig(TransformerConfig, PretrainedConfig): "choices": ["local", "transformer_engine"], }, ) - mindspeed_args: Optional[Union[dict, list, str]] = field( - default=None, - metadata={"help": "Extra MindSpeed args as dict, list, or JSON string/path."}, - ) def __post_init__(self): - self._augment_mindspeed_defaults() - if self.virtual_pipeline_model_parallel_size is None and self.overlap_p2p_comm: self.overlap_p2p_comm = False logger.warning("Non-interleaved pipeline parallelism does not support overlapping p2p communication!") @@ -413,65 +406,6 @@ def distribute_config_match(self, other: "McaModelConfig"): ] ) - def _build_mindspeed_argv(self): - if self.mindspeed_args is None: - return [] - if isinstance(self.mindspeed_args, dict): - argv = [] - for key, value in self.mindspeed_args.items(): - flag = key if key.startswith("-") else f"--{key.replace('_', '-')}" - if isinstance(value, bool): - if value: - argv.append(flag) - continue - if value is None: - continue - if isinstance(value, (list, tuple)): - argv.append(flag) - argv.extend([str(v) for v in value]) - else: - argv.extend([flag, str(value)]) - return argv - if isinstance(self.mindspeed_args, (list, tuple)): - return [str(v) for v in self.mindspeed_args] - return [] - - def _augment_mindspeed_defaults(self): - if not current_platform.is_npu(): - return - if getattr(McaModelConfig, "_mindspeed_defaults_cache", None) is None: - McaModelConfig._mindspeed_defaults_cache = {} - argv = self._build_mindspeed_argv() - cache_key = tuple(argv) - if cache_key not in McaModelConfig._mindspeed_defaults_cache: - defaults = {} - try: - from mindspeed.arguments import process_args - from argparse import ArgumentParser - import mindspeed.features_manager as mfm - - original_features = list(mfm.FEATURES_LIST) - full_features = mfm.create_features_list() - mfm.FEATURES_LIST.clear() - mfm.FEATURES_LIST.extend(full_features) - try: - parser = ArgumentParser() - process_args(parser) - args, _ = parser.parse_known_args(argv) - defaults = vars(args) - finally: - mfm.FEATURES_LIST.clear() - mfm.FEATURES_LIST.extend(original_features) - except Exception: - defaults = {} - McaModelConfig._mindspeed_defaults_cache[cache_key] = defaults - mindspeed_defaults = McaModelConfig._mindspeed_defaults_cache.get(cache_key, {}) - if mindspeed_defaults: - for name, value in mindspeed_defaults.items(): - normalized_name = name.replace("-", "_") - if not hasattr(self, normalized_name): - setattr(self, normalized_name, value) - @dataclass class MLAMcaModelConfig(McaModelConfig, MLATransformerConfig): diff --git a/mcore_adapter/src/mcore_adapter/training_args.py b/mcore_adapter/src/mcore_adapter/training_args.py index 794d10cc7..412322d11 100644 --- a/mcore_adapter/src/mcore_adapter/training_args.py +++ b/mcore_adapter/src/mcore_adapter/training_args.py @@ -2,12 +2,6 @@ from dataclasses import dataclass, field, fields from typing import Literal, Optional, Union -try: - # NPU patch - import mindspeed.megatron_adaptor -except ImportError: - pass - from megatron.core.transformer.pipeline_parallel_layer_layout import PipelineParallelLayerLayout from transformers import Seq2SeqTrainingArguments as HFSeq2SeqTrainingArguments from transformers import TrainingArguments as HFTrainingArguments diff --git a/roll/distributed/strategy/megatron_strategy.py b/roll/distributed/strategy/megatron_strategy.py index 18c435fc0..3b95c4928 100644 --- a/roll/distributed/strategy/megatron_strategy.py +++ b/roll/distributed/strategy/megatron_strategy.py @@ -416,13 +416,6 @@ def inner_forward_step(self, loss_func, data_iterator: Iterator[DataProto], mode else: input_ids = self._get_feature_on_this_cp_rank(input_ids, "input_ids") attention_mask = self._get_feature_on_this_cp_rank(attention_mask, "attention_mask") - - if hasattr(torch, "npu") and torch.npu.is_available() and attention_mask is not None: - attention_mask = attention_mask.bool() - B, S = attention_mask.shape - attention_mask = attention_mask[:, None, None, :] # [B,1,1,S] - attention_mask = attention_mask.expand(B, 1, S, S) # [B,1,S,S] - if labels is not None: labels = self._get_feature_on_this_cp_rank(labels, "labels") position_ids = None diff --git a/roll/platforms/__init__.py b/roll/platforms/__init__.py index c9dff3f15..6869621f4 100644 --- a/roll/platforms/__init__.py +++ b/roll/platforms/__init__.py @@ -25,31 +25,26 @@ def _init_platform() -> Platform: Returns: An instance of a subclass of Platform corresponding to the detected hardware. """ - try: - import torch_npu # noqa: F401 - - if hasattr(torch, "npu") and torch.npu.is_available(): - logger.debug("Detected NPU (torch_npu). Initializing NPU platform.") - return NpuPlatform() - except ImportError: - pass - if torch.cuda.is_available(): device_name = torch.cuda.get_device_name().upper() logger.debug(f"Detected CUDA device: {device_name}") - if "NVIDIA" in device_name: logger.debug("Initializing CUDA platform (NVIDIA).") return CudaPlatform() elif "AMD" in device_name: logger.debug("Initializing ROCm platform (AMD).") return RocmPlatform() - logger.warning("Unrecognized CUDA device. Falling back to UnknownPlatform.") return UnknownPlatform() + else: + try: + import torch_npu # noqa: F401 - logger.debug("No supported accelerator detected. Initializing CPU platform.") - return CpuPlatform() + logger.debug("Detected torch_npu. Initializing NPU platform.") + return NpuPlatform() + except ImportError: + logger.debug("No supported accelerator detected. Initializing CPU platform.") + return CpuPlatform() # Global singleton representing the current platform in use. From ea5ab5bb80b1b086d791124784b7122f6f72072d Mon Sep 17 00:00:00 2001 From: UsernameFull Date: Wed, 11 Feb 2026 11:19:52 +0800 Subject: [PATCH 5/8] fix: rng_state on npu --- roll/distributed/strategy/fsdp2_strategy.py | 6 ++++-- roll/platforms/npu.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/roll/distributed/strategy/fsdp2_strategy.py b/roll/distributed/strategy/fsdp2_strategy.py index ea438f3da..d10bf98cd 100644 --- a/roll/distributed/strategy/fsdp2_strategy.py +++ b/roll/distributed/strategy/fsdp2_strategy.py @@ -443,7 +443,8 @@ def get_rng_state(): if current_platform.device_type == "cuda": rng_state["device"] = torch.cuda.get_rng_state() elif current_platform.device_type == "npu": - rng_state["device"] = torch.npu.get_rng_state() + import torch_npu + rng_state["device"] = torch_npu.npu.get_rng_state() return rng_state @staticmethod @@ -453,7 +454,8 @@ def load_rng_state(rng_state): if current_platform.device_type == "cuda": torch.cuda.set_rng_state(rng_state["device"]) elif current_platform.device_type == "npu": - torch.npu.set_rng_state(rng_state["device"]) + import torch_npu + torch_npu.npu.set_rng_state(rng_state["device"]) np.random.set_state(rng_state["numpy"]) random.setstate(rng_state["random"]) diff --git a/roll/platforms/npu.py b/roll/platforms/npu.py index dbc9d1f00..30a74cc03 100644 --- a/roll/platforms/npu.py +++ b/roll/platforms/npu.py @@ -80,6 +80,6 @@ def apply_ulysses_patch(cls) -> None: return @classmethod - def device_memory_used(cls) -> None: + def device_memory_used(cls) -> int: free, total = torch.npu.mem_get_info() return total - free \ No newline at end of file From dde4ca4061b432bc668afc84c9a386f0fb7c57d3 Mon Sep 17 00:00:00 2001 From: UsernameFull Date: Tue, 3 Mar 2026 16:03:04 +0800 Subject: [PATCH 6/8] fix: DeepSpeedEngine.load_checkpoint method doesn't take an is_last_step argument --- roll/distributed/strategy/deepspeed_strategy.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/roll/distributed/strategy/deepspeed_strategy.py b/roll/distributed/strategy/deepspeed_strategy.py index 064071589..51cf1b21d 100644 --- a/roll/distributed/strategy/deepspeed_strategy.py +++ b/roll/distributed/strategy/deepspeed_strategy.py @@ -538,6 +538,8 @@ def save_checkpoint(self, save_dir, global_step, ckpt_id, tag="checkpoint", loca if getattr(self, "processor", None): self.processor.save_pretrained(save_dir) # save tokenizer + # DeepSpeedEngine.load_checkpoint method doesn't take an is_last_step argument + kwargs.pop("is_last_step", None) self.model.save_checkpoint(save_dir, tag=tag, **kwargs) if self.worker_config.checkpoint_config.get("async_upload", True) and not is_last_step: From 734138ffb5a020dedd7aba437f1630be104b8218 Mon Sep 17 00:00:00 2001 From: UsernameFull Date: Thu, 5 Mar 2026 12:15:27 +0800 Subject: [PATCH 7/8] platform add empty_cache get_rng_state set_rng_state --- roll/distributed/strategy/fsdp2_strategy.py | 23 ++++----------------- roll/platforms/npu.py | 2 +- 2 files changed, 5 insertions(+), 20 deletions(-) diff --git a/roll/distributed/strategy/fsdp2_strategy.py b/roll/distributed/strategy/fsdp2_strategy.py index d10bf98cd..d184c58d3 100644 --- a/roll/distributed/strategy/fsdp2_strategy.py +++ b/roll/distributed/strategy/fsdp2_strategy.py @@ -437,25 +437,16 @@ def load_checkpoint(self, load_dir, tag="checkpoint", **kwargs): def get_rng_state(): rng_state = { "cpu": torch.get_rng_state(), + "device": current_platform.get_rng_state(), "numpy": np.random.get_state(), "random": random.getstate(), } - if current_platform.device_type == "cuda": - rng_state["device"] = torch.cuda.get_rng_state() - elif current_platform.device_type == "npu": - import torch_npu - rng_state["device"] = torch_npu.npu.get_rng_state() return rng_state @staticmethod def load_rng_state(rng_state): torch.set_rng_state(rng_state["cpu"]) - if "device" in rng_state: - if current_platform.device_type == "cuda": - torch.cuda.set_rng_state(rng_state["device"]) - elif current_platform.device_type == "npu": - import torch_npu - torch_npu.npu.set_rng_state(rng_state["device"]) + current_platform.set_rng_state(rng_state["device"]) np.random.set_state(rng_state["numpy"]) random.setstate(rng_state["random"]) @@ -710,10 +701,7 @@ def offload_states(self, include=None, non_blocking=False): if not self.cpu_offload_enabled: if include is None or OffloadStateType.model_params in include: self.model.to("cpu", non_blocking=non_blocking) - if current_platform.device_type == "cuda": - torch.cuda.empty_cache() - elif current_platform.device_type == "npu": - torch.npu.empty_cache() + current_platform.empty_cache() # When cpu_offload is disabled, optimizer states should stay on GPU # Only offload optimizer states if cpu_offload is enabled else: @@ -1278,10 +1266,7 @@ def train_step( self.scheduler.step() self.optimizer.zero_grad(set_to_none=True) - if current_platform.device_type == "cuda": - torch.cuda.empty_cache() - elif current_platform.device_type == "npu": - torch.npu.empty_cache() + current_platform.empty_cache() return metrics def setup_model_update(self, infer_cluster, model_update_name: str): diff --git a/roll/platforms/npu.py b/roll/platforms/npu.py index 30a74cc03..591edf0f6 100644 --- a/roll/platforms/npu.py +++ b/roll/platforms/npu.py @@ -82,4 +82,4 @@ def apply_ulysses_patch(cls) -> None: @classmethod def device_memory_used(cls) -> int: free, total = torch.npu.mem_get_info() - return total - free \ No newline at end of file + return total - free From 7a0f950198835c8fe867b8e8e676aa7fc31dfb9f Mon Sep 17 00:00:00 2001 From: UsernameFull Date: Mon, 9 Mar 2026 20:28:42 +0800 Subject: [PATCH 8/8] fix: support _set_allocator_settings in NPU --- roll/third_party/vllm/__init__.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/roll/third_party/vllm/__init__.py b/roll/third_party/vllm/__init__.py index 4977091a1..77f67cbbb 100644 --- a/roll/third_party/vllm/__init__.py +++ b/roll/third_party/vllm/__init__.py @@ -10,6 +10,7 @@ from vllm.envs import get_default_cache_root from vllm.usage.usage_lib import UsageContext +from roll.platforms import current_platform import roll.third_party.vllm.fp8 as fp8 from roll.utils.import_utils import safe_import_class from roll.utils.logging import get_logger @@ -58,12 +59,7 @@ async def create_async_llm(resource_placement_groups: List[Dict], **kwargs): os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "" # torch.cuda may already init, explicitly disable expandable_segments # here (only matters when VLLM_USE_RAY_SPMD_WORKER=0) - from roll.platforms import current_platform - if current_platform.is_npu(): - # use env variable to disable expandable_segments - os.environ["PYTORCH_NPU_ALLOC_CONF"] = "expandable_segments:False" - else: - torch.cuda.memory._set_allocator_settings("expandable_segments:False") + current_platform.memory._set_allocator_settings("expandable_segments:False") os.environ["VLLM_CACHE_ROOT"] = os.path.join(get_default_cache_root(), "vllm", os.environ.get("WORKER_NAME", ""))