diff --git a/IsaacLab b/IsaacLab new file mode 160000 index 0000000000..9d17b52453 --- /dev/null +++ b/IsaacLab @@ -0,0 +1 @@ +Subproject commit 9d17b524536b341da09987ef765a0ad0501c64ca diff --git a/data_juicer/ops/filter/specified_field_filter.py b/data_juicer/ops/filter/specified_field_filter.py index 4c75a138c7..24e217d42a 100644 --- a/data_juicer/ops/filter/specified_field_filter.py +++ b/data_juicer/ops/filter/specified_field_filter.py @@ -1,5 +1,3 @@ -from typing import List - from data_juicer.utils.constant import Fields from ..base_op import OPERATORS, Filter @@ -22,7 +20,7 @@ class SpecifiedFieldFilter(Filter): - Supports multi-level field keys, e.g., 'level1.level2'. - Converts non-list/tuple field values to a list for comparison.""" - def __init__(self, field_key: str = "", target_value: List = [], *args, **kwargs): + def __init__(self, field_key: str = "", target_value: list = [], *args, **kwargs): """ Initialization method. diff --git a/data_juicer/ops/mapper/__init__.py b/data_juicer/ops/mapper/__init__.py index 08cbe84a35..b46dc71c2d 100644 --- a/data_juicer/ops/mapper/__init__.py +++ b/data_juicer/ops/mapper/__init__.py @@ -1,3 +1,4 @@ +from .annotate_demos_mapper import AnnotateDemosMapper from .annotation.human_preference_annotation_mapper import ( HumanPreferenceAnnotationMapper, ) @@ -12,6 +13,7 @@ from .clean_html_mapper import CleanHtmlMapper from .clean_ip_mapper import CleanIpMapper from .clean_links_mapper import CleanLinksMapper +from .convert_to_lerobot_mapper import ConvertToLeRobotMapper from .detect_character_attributes_mapper import DetectCharacterAttributesMapper from .detect_character_locations_mapper import DetectCharacterLocationsMapper from .detect_main_character_mapper import DetectMainCharacterMapper @@ -29,6 +31,7 @@ from .extract_support_text_mapper import ExtractSupportTextMapper from .extract_tables_from_html_mapper import ExtractTablesFromHtmlMapper from .fix_unicode_mapper import FixUnicodeMapper +from .generate_dataset_mapper import GenerateDatasetMapper from .generate_qa_from_examples_mapper import GenerateQAFromExamplesMapper from .generate_qa_from_text_mapper import GenerateQAFromTextMapper from .image_blur_mapper import ImageBlurMapper @@ -74,6 +77,7 @@ RemoveWordsWithIncorrectSubstringsMapper, ) from .replace_content_mapper import ReplaceContentMapper +from .replay_demos_randomized_mapper import ReplayDemosRandomizedMapper from .sdxl_prompt2prompt_mapper import SDXLPrompt2PromptMapper from .sentence_augmentation_mapper import SentenceAugmentationMapper from .sentence_split_mapper import SentenceSplitMapper @@ -103,6 +107,10 @@ from .whitespace_normalization_mapper import WhitespaceNormalizationMapper __all__ = [ + "AnnotateDemosMapper", + "ReplayDemosRandomizedMapper", + "GenerateDatasetMapper", + "ConvertToLeRobotMapper", "AudioAddGaussianNoiseMapper", "AudioFFmpegWrappedMapper", "CalibrateQAMapper", diff --git a/data_juicer/ops/mapper/annotate_demos_mapper.py b/data_juicer/ops/mapper/annotate_demos_mapper.py new file mode 100644 index 0000000000..bbec01af26 --- /dev/null +++ b/data_juicer/ops/mapper/annotate_demos_mapper.py @@ -0,0 +1,419 @@ +import os +from typing import Any, Dict, List, Optional + +from loguru import logger + +from ..base_op import OPERATORS, UNFORKABLE, Mapper + + +@OPERATORS.register_module("annotate_demos_mapper") +@UNFORKABLE.register_module("annotate_demos_mapper") +class AnnotateDemosMapper(Mapper): + """ + Automatically annotate robot demonstration episodes using Isaac Lab. + + This mapper integrates Isaac Lab's automatic annotation pipeline to: + 1. Load episodes from HDF5 files + 2. Replay them in Isaac Sim environment + 3. Automatically detect subtask completions + 4. Export annotated episodes with subtask term signals + + Requires Isaac Lab environment to be properly installed. + """ + + _batched_op = True + # Mark this operator as CUDA-accelerated so the framework passes rank and sets proper mp start method + _accelerator = "cuda" + # Request actor restart after each task to ensure clean Isaac Sim state + _requires_actor_restart = True + + def __init__( + self, + # Task configuration + task_name: Optional[str] = None, + device: str = "cuda:auto", + # Input/Output keys in JSON metadata + input_file_key: str = "input_file", + output_file_key: str = "output_file", + # Isaac Sim options + headless: bool = True, + enable_cameras: bool = False, + enable_pinocchio: bool = False, + *args, + **kwargs, + ): + """ + Initialize the AnnotateDemosMapper. + + Each sample in the JSON should contain: + - input_file: Path to input HDF5 file (with all episodes) + - output_file: Path to output annotated HDF5 file + + For distributed processing, create a JSONL with multiple tasks: + {"text": "task1", "input_file": "file1.hdf5", "output_file": "file1_ann.hdf5"} + {"text": "task2", "input_file": "file2.hdf5", "output_file": "file2_ann.hdf5"} + ... + + Data-Juicer will distribute these tasks across workers. + + :param task_name: Isaac Lab task name (required) + :param device: Device to run on ('cuda:0', 'cpu', etc.) + :param input_file_key: Key in JSON containing input HDF5 path + :param output_file_key: Key in JSON containing output HDF5 path + :param headless: Run Isaac Sim in headless mode + :param enable_cameras: Enable cameras in Isaac Sim + :param enable_pinocchio: Enable Pinocchio for IK controllers + """ + kwargs["ray_execution_mode"] = "task" + super().__init__(*args, **kwargs) + # By default, Data-Juicer will cap runtime_np to GPU count when both + # mem_required and gpu_required are 0. For this operator we honor user-specified + # num_proc even if it exceeds GPU count (the user controls oversubscription), + # so we set a tiny non-zero mem_required to bypass that auto-cap unless the + # user explicitly provided mem_required/gpu_required. + if getattr(self, "mem_required", 0) == 0 and getattr(self, "gpu_required", 0) == 0: + # store in GB (consistent with base OP conversion), ~1KB + self.mem_required = 1e-6 + + if task_name is None: + raise ValueError( + "task_name is required. Please set it in your config, e.g.:\n" + "process:\n" + " - annotate_demos_mapper:\n" + " task_name: 'Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Mimic-v0'" + ) + self.task_name = task_name + self.device = device + self.input_file_key = input_file_key + self.output_file_key = output_file_key + self.headless = headless + self.enable_cameras = enable_cameras + self.enable_pinocchio = enable_pinocchio + + # Lazy initialization - will be set on first use + self._env = None + self._success_term = None + self._simulation_app = None + # file handle used to enable faulthandler safely + self._faulthandler_file = None + # original std streams to restore on cleanup + self._orig_streams = {} + self._isaac_initialized = False + # per-task recorder output overrides (set right before creating env) + self._recorder_export_dir = None + self._recorder_filename = None + + # Force batch_size=1 to ensure each actor processes exactly one task + self.batch_size = 1 + + logger.info( + f"Initialized AnnotateDemosMapper: task={task_name}, " + f"device={device}, headless={headless}, batch_size=1 (one task per actor)" + ) + + def _create_task_env(self): + """Create a fresh Isaac Lab env for a single task. + + Recorder export paths are taken from self._recorder_export_dir and + self._recorder_filename, which must be set by the caller before + invoking this method (keeps API consistent with other operator params). + """ + import gymnasium as gym + from isaaclab.envs.mdp.recorders.recorders_cfg import ( + ActionStateRecorderManagerCfg, + ) + from isaaclab.managers import RecorderTerm, RecorderTermCfg + from isaaclab.utils import configclass + from isaaclab_tasks.utils.parse_cfg import parse_env_cfg + + # Parse environment config for this task + env_cfg = parse_env_cfg(self.task_name, device=self.device, num_envs=1) + env_cfg.env_name = self.task_name + + # Record success termination term (if exists); disable all terminations + self._success_term = getattr(env_cfg.terminations, "success", None) + env_cfg.terminations = None + + # Setup recorder configuration and set export paths like the original script + env_cfg.recorders = self._create_recorder_config( + ActionStateRecorderManagerCfg, + RecorderTerm, + RecorderTermCfg, + configclass, + ) + env_cfg.recorders.dataset_export_dir_path = self._recorder_export_dir + env_cfg.recorders.dataset_filename = self._recorder_filename + + # Create a new env for this task + self._env = gym.make(self.task_name, cfg=env_cfg).unwrapped + self._env.reset() + return self._env + + def _create_recorder_config(self, ActionStateRecorderManagerCfg, RecorderTerm, RecorderTermCfg, configclass): + """Create recorder configuration for mimic annotations.""" + + # Define custom recorder terms + class PreStepDatagenInfoRecorder(RecorderTerm): + """Recorder term that records the datagen info data in each step.""" + + def record_pre_step(self): + eef_pose_dict = {} + for eef_name in self._env.cfg.subtask_configs.keys(): + eef_pose_dict[eef_name] = self._env.get_robot_eef_pose(eef_name=eef_name) + + datagen_info = { + "object_pose": self._env.get_object_poses(), + "eef_pose": eef_pose_dict, + "target_eef_pose": self._env.action_to_target_eef_pose(self._env.action_manager.action), + } + return "obs/datagen_info", datagen_info + + @configclass + class PreStepDatagenInfoRecorderCfg(RecorderTermCfg): + class_type = PreStepDatagenInfoRecorder + + class PreStepSubtaskTermsObservationsRecorder(RecorderTerm): + """Recorder term that records the subtask completion observations in each step.""" + + def record_pre_step(self): + return "obs/datagen_info/subtask_term_signals", self._env.get_subtask_term_signals() + + @configclass + class PreStepSubtaskTermsObservationsRecorderCfg(RecorderTermCfg): + class_type = PreStepSubtaskTermsObservationsRecorder + + @configclass + class MimicRecorderManagerCfg(ActionStateRecorderManagerCfg): + """Mimic specific recorder terms.""" + + record_pre_step_datagen_info = PreStepDatagenInfoRecorderCfg() + record_pre_step_subtask_term_signals = PreStepSubtaskTermsObservationsRecorderCfg() + + return MimicRecorderManagerCfg() + + def _annotate_file( + self, + input_file: str, + output_file: str, + ) -> Dict[str, Any]: + """ + Annotate entire HDF5 file with all episodes. + + :param input_file: Input HDF5 file path + :param output_file: Output HDF5 file path + :return: Annotation result + """ + # Ensure SimulationApp is running (only once), but recreate Isaac Lab env per task + from data_juicer.utils.isaac_utils import ensure_isaac_sim_app + + ensure_isaac_sim_app(self, mode="mimic") + + import torch + from isaaclab.utils.datasets import HDF5DatasetFileHandler + + # Load dataset + dataset_handler = HDF5DatasetFileHandler() + dataset_handler.open(input_file) + episode_count = dataset_handler.get_num_episodes() + + logger.info(f"Processing {episode_count} episodes from {input_file}") + + # Setup output path (use absolute path for consistency) + output_dir = os.path.dirname(output_file) + output_filename = os.path.splitext(os.path.basename(output_file))[0] + # Resolve to absolute directory (without coupling to Data-Juicer's work_dir) + output_dir_abs = os.path.abspath(output_dir) if output_dir else os.getcwd() + os.makedirs(output_dir_abs, exist_ok=True) + + # Create a fresh env for this task and configure recorder outputs + # Close any existing env to avoid cross-task contamination + if self._env is not None: + self._env.close() + self._env = None + + # Set recorder output targets for this task and create a fresh env + self._recorder_export_dir = output_dir_abs + self._recorder_filename = output_filename + self._create_task_env() + assert self._env is not None, "Environment should be created by _create_task_env()" + + # The actual output file path that Isaac will write to + output_file_abs = os.path.join(output_dir_abs, f"{output_filename}.hdf5") + + # Process each episode + success_count = 0 + for episode_index, episode_name in enumerate(dataset_handler.get_episode_names()): + logger.info(f"Annotating episode #{episode_index} ({episode_name})") + + episode = dataset_handler.load_episode(episode_name, self._env.device) + success = self._replay_and_annotate(episode) + + if success: + # Set success and export + self._env.recorder_manager.set_success_to_episodes( + None, torch.tensor([[True]], dtype=torch.bool, device=self._env.device) + ) + self._env.recorder_manager.export_episodes() + success_count += 1 + logger.info(f"\tExported annotated episode {episode_index}") + else: + logger.warning(f"\tSkipped episode {episode_index} due to annotation failure") + + dataset_handler.close() + + logger.info(f"Exported {success_count}/{episode_count} annotated episodes to {output_file_abs}") + + # NOTE: Do NOT close env here - keep it alive for subsequent batches + # The environment will be closed in cleanup() when the Actor shuts down + # Closing and recreating Isaac Sim environment causes deadlocks + + return { + "success": True, + "output_file": output_file_abs, + "total_episodes": episode_count, + "successful_episodes": success_count, + } + + def _replay_and_annotate(self, episode) -> bool: + """ + Replay episode in environment and annotate (automatic mode). + + :param episode: Episode data to replay + :return: True if annotation successful, False otherwise + """ + assert self._env is not None, "Environment must be initialized before replay" + import torch + + # Extract initial state and actions + initial_state = episode.data["initial_state"] + actions = episode.data["actions"] + + # Reset environment to initial state + self._env.sim.reset() + self._env.recorder_manager.reset() + self._env.reset_to(initial_state, None, is_relative=True) + + # Replay all actions + for action in actions: + action_tensor = torch.Tensor(action).reshape([1, action.shape[0]]) + self._env.step(action_tensor) + + # Check if task was completed successfully + if self._success_term is not None: + success_result = self._success_term.func(self._env, **self._success_term.params)[0] + if not bool(success_result): + logger.warning("Episode replay failed: task not completed") + return False + + # Verify all subtask term signals are annotated + annotated_episode = self._env.recorder_manager.get_episode(0) + subtask_signals = annotated_episode.data["obs"]["datagen_info"]["subtask_term_signals"] + + for signal_name, signal_flags in subtask_signals.items(): + if not torch.any(torch.tensor(signal_flags)): + logger.warning(f"Subtask '{signal_name}' not completed") + return False + + return True + + def process_batched(self, samples: Dict[str, Any], rank: Optional[int] = None) -> Dict[str, Any]: + """ + Process a single annotation task (batch_size=1). + + After processing, the actor will exit to ensure clean Isaac Sim state. + Ray will automatically create a new actor for the next task. + + Each sample should contain: + - input_file: Path to input HDF5 file + - output_file: Path to output annotated HDF5 file + + :param samples: Batch of samples (must contain exactly 1 sample) + :param rank: Ray actor rank for GPU assignment + :return: Processed batch with annotation results + """ + # Ensure we only process one task per actor + num_samples = len(samples[self.text_key]) + if num_samples != 1: + logger.warning( + f"AnnotateDemosMapper expects batch_size=1, got {num_samples}. " f"Processing only the first sample." + ) + # Truncate to first sample only + samples = {k: [v[0]] for k, v in samples.items()} + num_samples = 1 + # Per-process GPU assignment: if device is 'cuda:auto', bind this process to a GPU by rank + try: + import os as _os + + import torch as _torch + + if isinstance(self.device, str) and self.device.startswith("cuda"): + # Honor explicit device like 'cuda:1'; only auto-assign when 'cuda' or 'cuda:auto' + if self.device in ("cuda", "cuda:auto"): + # If Ray sets CUDA_VISIBLE_DEVICES to a single GPU, pin to cuda:0 in the local view + visible = _os.environ.get("CUDA_VISIBLE_DEVICES", "") + if visible and len(visible.split(",")) == 1: + self.device = "cuda:0" + else: + # Default executor: spread by rank across all visible GPUs + # Rank will be provided by the framework when _accelerator == 'cuda' + # Fallback to 0 if rank or device count not available + def _assign_device_by_rank(_rank: Optional[int]): + if _torch.cuda.is_available(): + count = _torch.cuda.device_count() + if _rank is not None and count > 0: + return f"cuda:{_rank % count}" + if count > 0: + return "cuda:0" + return self.device + + self.device = _assign_device_by_rank(rank) + except Exception: + # Do not fail batch processing due to device binding issues; keep existing self.device + pass + + logger.info("Processing single annotation task (actor will exit after completion)") + + # Prepare results container for single task + results: List[Optional[Dict[str, Any]]] = [None] + + # Check required keys + if self.input_file_key not in samples or self.output_file_key not in samples: + logger.error( + f"Required keys '{self.input_file_key}' and/or '{self.output_file_key}' " f"not found in samples" + ) + return samples + + # Process the single task + input_file = samples[self.input_file_key][0] + output_file = samples[self.output_file_key][0] + + logger.info(f"Task: {input_file} -> {output_file}") + + try: + result = self._annotate_file(input_file, output_file) + results[0] = result + + if result.get("success"): + logger.info("✓ Task completed successfully") + else: + logger.warning(f"✗ Task failed: {result.get('error')}") + + samples["annotation_result"] = results + + finally: + # Cleanup after processing one task + # Note: Actor exit is handled by the framework layer, not here + logger.info("Task complete. Cleaning up resources...") + self.cleanup() + + return samples + + # Use shared cleanup logic + from data_juicer.utils.isaac_utils import cleanup_isaac_env as cleanup + + def __del__(self): + """Cleanup Isaac Sim environment on deletion.""" + try: + self.cleanup() + except Exception: + pass diff --git a/data_juicer/ops/mapper/convert_to_lerobot_mapper.py b/data_juicer/ops/mapper/convert_to_lerobot_mapper.py new file mode 100644 index 0000000000..43e164b990 --- /dev/null +++ b/data_juicer/ops/mapper/convert_to_lerobot_mapper.py @@ -0,0 +1,523 @@ +import json +import shutil +import subprocess +from functools import reduce +from pathlib import Path +from typing import Any, Dict, Optional + +import h5py +import numpy as np +import pandas as pd +import yaml +from loguru import logger +from tqdm import tqdm + +from ..base_op import OPERATORS, UNFORKABLE, Mapper + + +@OPERATORS.register_module("convert_to_lerobot_mapper") +@UNFORKABLE.register_module("convert_to_lerobot_mapper") +class ConvertToLeRobotMapper(Mapper): + """ + Convert HDF5 datasets (MimicGen/Isaac Lab format) to LeRobot dataset format. + """ + + _batched_op = True + + def __init__( + self, + config_path: str = None, + input_file_key: str = "input_file", + output_dir_key: str = "output_dir", + video_dir_key: str = "video_dir", + config_path_key: str = "config_path", + *args, + **kwargs, + ): + kwargs["ray_execution_mode"] = "task" + super().__init__(*args, **kwargs) + + self.config_path = config_path + self.input_file_key = input_file_key + self.output_dir_key = output_dir_key + self.video_dir_key = video_dir_key + self.config_path_key = config_path_key + + # Force batch_size=1 to handle one dataset conversion per process + self.batch_size = 1 + + # Load configuration template if provided + if config_path: + self.base_config = self._load_config(config_path) + else: + self.base_config = None + + def _load_config(self, config_path): + if config_path is None: + raise ValueError("config_path is required.") + + config_path = Path(config_path) + if not config_path.exists(): + raise FileNotFoundError(f"Configuration file not found: {config_path}") + + with open(config_path, "r") as f: + external_config = yaml.safe_load(f) + + # Create a config dict + config = {} + config["external_config"] = external_config + + # Defaults + config["chunks_size"] = 1000 + config["fps"] = 20 + config["data_path"] = "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet" + config["video_path"] = "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4" + config["modality_fname"] = "modality.json" + config["episodes_fname"] = "episodes.jsonl" + config["tasks_fname"] = "tasks.jsonl" + config["info_fname"] = "info.json" + config["task_index"] = 0 + config["total_episodes"] = 5000 + + # Override from YAML + if "dataset" in external_config: + ds_config = external_config["dataset"] + for field_name in ["robot_type", "fps", "chunks_size"]: + if field_name in ds_config: + config[field_name] = ds_config[field_name] + + # Handle templates + # Resolve relative paths relative to the config file location if possible, or workspace root + # Here we assume they are relative to workspace root if not absolute + if "modality_template_path" in external_config: + config["modality_template_path"] = Path(external_config["modality_template_path"]) + if "info_template_path" in external_config: + config["info_template_path"] = Path(external_config["info_template_path"]) + + if "tasks" in external_config: + config["tasks"] = external_config["tasks"] + + return config + + def _get_video_metadata(self, video_path: str) -> dict: + """Get video metadata using ffprobe.""" + cmd = [ + "ffprobe", + "-v", + "error", + "-select_streams", + "v:0", + "-show_entries", + "stream=height,width,codec_name,pix_fmt,r_frame_rate", + "-of", + "json", + str(video_path), + ] + + try: + output = subprocess.check_output(cmd).decode("utf-8") + probe_data = json.loads(output) + stream = probe_data["streams"][0] + + # Parse frame rate + if "/" in stream["r_frame_rate"]: + num, den = map(int, stream["r_frame_rate"].split("/")) + fps = num / den + else: + fps = float(stream["r_frame_rate"]) + + # Check for audio streams + audio_cmd = [ + "ffprobe", + "-v", + "error", + "-select_streams", + "a", + "-show_entries", + "stream=codec_type", + "-of", + "json", + str(video_path), + ] + audio_output = subprocess.check_output(audio_cmd).decode("utf-8") + audio_data = json.loads(audio_output) + has_audio = len(audio_data.get("streams", [])) > 0 + + metadata = { + "dtype": "video", + "shape": [stream["height"], stream["width"], 3], + "names": ["height", "width", "channel"], + "video_info": { + "video.fps": fps, + "video.codec": stream["codec_name"], + "video.pix_fmt": stream["pix_fmt"], + "video.is_depth_map": False, + "has_audio": has_audio, + }, + } + return metadata + + except (subprocess.CalledProcessError, json.JSONDecodeError, IndexError, KeyError) as e: + logger.warning(f"Error getting metadata for {video_path}: {e}") + return None + + def _get_feature_info(self, step_data: pd.DataFrame, video_paths: dict, config: dict) -> dict: + features = {} + for video_key, video_path in video_paths.items(): + video_metadata = self._get_video_metadata(video_path) + if video_metadata: + features[video_key] = video_metadata + + lerobot_keys = config["external_config"].get("lerobot_keys", {}) + state_key = lerobot_keys.get("state", "observation.state") + action_key = lerobot_keys.get("action", "action") + + for column in step_data.columns: + column_data = np.stack(step_data[column], axis=0) + shape = column_data.shape + if len(shape) == 1: + shape = (1,) + else: + shape = shape[1:] + features[column] = { + "dtype": column_data.dtype.name, + "shape": shape, + } + # State & action + if column in [state_key, action_key]: + dof = column_data.shape[1] + features[column]["names"] = [f"motor_{i}" for i in range(dof)] + + return features + + def _generate_info( + self, + total_episodes: int, + total_frames: int, + total_tasks: int, + total_videos: int, + total_chunks: int, + config: dict, + step_data: pd.DataFrame, + video_paths: dict, + ) -> dict: + with open(config["info_template_path"]) as fp: + info_template = json.load(fp) + + info_template["robot_type"] = config.get("robot_type") + info_template["total_episodes"] = total_episodes + info_template["total_frames"] = total_frames + info_template["total_tasks"] = total_tasks + info_template["total_videos"] = total_videos + info_template["total_chunks"] = total_chunks + info_template["chunks_size"] = config["chunks_size"] + info_template["fps"] = config["fps"] + + info_template["data_path"] = config["data_path"] + info_template["video_path"] = config["video_path"] + + features = self._get_feature_info(step_data, video_paths, config=config) + info_template["features"] = features + + return info_template + + def _parse_structured_slice(self, slice_config): + """ + Parses a structured slice configuration from YAML (list of dims) into a slice object or tuple. + Example YAML: [[null, -1], 0] -> Python: (slice(None, -1), 0) + """ + if not isinstance(slice_config, list): + raise ValueError(f"Slice config must be a list of dimensions, got {type(slice_config)}") + + slices = [] + for dim in slice_config: + if isinstance(dim, int): + # Integer index + slices.append(dim) + elif isinstance(dim, str) and dim == "...": + # Ellipsis + slices.append(Ellipsis) + elif isinstance(dim, list): + # Slice definition [start, stop, step] + # YAML null becomes Python None + slices.append(slice(*dim)) + else: + raise ValueError(f"Invalid slice dimension format: {dim}") + + if len(slices) == 1: + return slices[0] + return tuple(slices) + + def _apply_transform(self, array: np.ndarray, transform_config: dict) -> np.ndarray: + if "slice" in transform_config: + slice_conf = transform_config["slice"] + try: + slice_obj = self._parse_structured_slice(slice_conf) + array = array[slice_obj] + except Exception as e: + logger.error(f"Error applying slice {slice_conf}: {e}") + raise + + if "reshape" in transform_config: + shape = transform_config["reshape"] + array = array.reshape(shape) + + return array + + def _convert_trajectory_to_df( + self, + trajectory: h5py.Group, + episode_index: int, + index_start: int, + config: dict, + ) -> dict: + return_dict = {} + data = {} + + mapping = config["external_config"].get("mapping", {}) + lerobot_keys = config["external_config"].get("lerobot_keys", {}) + + state_key = lerobot_keys.get("state", "observation.state") + action_key = lerobot_keys.get("action", "action") + annotation_keys = lerobot_keys.get( + "annotation", ["annotation.human.action.task_description", "annotation.human.action.valid"] + ) + + # 1. Get state and action + for key_type in ["state", "action"]: + if key_type not in mapping: + continue + + lerobot_key_name = state_key if key_type == "state" else action_key + + concatenated_list = [] + for source_config in mapping[key_type]: + hdf5_key = source_config["key"] + key_path = hdf5_key.split(".") + try: + array = reduce(lambda x, y: x[y], key_path, trajectory) + array = np.array(array).astype(np.float64) + array = self._apply_transform(array, source_config) + concatenated_list.append(array) + except KeyError: + logger.warning(f"Key {hdf5_key} not found in trajectory") + continue + + if concatenated_list: + concatenated = np.concatenate(concatenated_list, axis=1) + data[lerobot_key_name] = [row for row in concatenated] + + if action_key not in data or state_key not in data: + raise ValueError(f"Missing state or action data. Keys found: {list(data.keys())}") + + assert len(data[action_key]) == len(data[state_key]) + length = len(data[action_key]) + data["timestamp"] = np.arange(length).astype(np.float64) * (1.0 / config["fps"]) + + # 2. Get the annotation + data[annotation_keys[0]] = np.ones(length, dtype=int) * config["task_index"] + data[annotation_keys[1]] = np.ones(length, dtype=int) * 1 + + # 3. Other data + data["episode_index"] = np.ones(length, dtype=int) * episode_index + data["task_index"] = np.zeros(length, dtype=int) + data["index"] = np.arange(length, dtype=int) + index_start + + reward = np.zeros(length, dtype=np.float64) + reward[-1] = 1 + done = np.zeros(length, dtype=bool) + done[-1] = True + data["next.reward"] = reward + data["next.done"] = done + + dataframe = pd.DataFrame(data) + + return_dict["data"] = dataframe + return_dict["length"] = length + return_dict["annotation"] = set(data[annotation_keys[0]]) | set(data[annotation_keys[1]]) + return return_dict + + def _check_failed_videos(self, video_dir: Path) -> list: + if not video_dir.exists(): + logger.warning(f"Video directory not found: {video_dir}") + return [] + + video_files = list(video_dir.glob("*.mp4")) + failed_ids = [] + for video_file in video_files: + if "failed" in video_file.name: + try: + traj_id = video_file.name.split("_")[1] + if traj_id not in failed_ids: + failed_ids.append(traj_id) + except IndexError: + pass + return failed_ids + + def _convert_file(self, input_file: str, output_dir: str, video_dir: str = None, config_path: str = None): + # Determine config to use + if config_path: + config = self._load_config(config_path) + elif self.base_config: + config = self.base_config.copy() + else: + logger.error("No configuration provided for file conversion.") + return + + input_path = Path(input_file) + output_path = Path(output_dir) + + if not input_path.exists(): + logger.warning(f"Input file not found: {input_path}") + return + + # Resolve video_dir + if video_dir: + video_dir = Path(video_dir) + if not video_dir.is_absolute(): + # If relative, assume relative to input file's directory + video_dir = input_path.parent / video_dir + else: + # Fallback or assume it's relative to input file location + video_dir = input_path.parent / "videos" # Assumption + + # Validate templates + if "modality_template_path" not in config or not config["modality_template_path"].exists(): + logger.error(f"Modality template not found: {config.get('modality_template_path')}") + return + if "info_template_path" not in config or not config["info_template_path"].exists(): + logger.error(f"Info template not found: {config.get('info_template_path')}") + return + + hdf5_handler = h5py.File(input_path, "r") + hdf5_data = hdf5_handler["data"] + + # Prepare output directories + output_path.mkdir(parents=True, exist_ok=True) + lerobot_meta_dir = output_path / "meta" + lerobot_meta_dir.mkdir(parents=True, exist_ok=True) + + total_length = 0 + example_data = None + video_paths = {} + + trajectory_ids = sorted( + [k for k in hdf5_data.keys() if k.startswith("demo_")], key=lambda x: int(x.split("_")[1]) + ) + + failed_ids = self._check_failed_videos(video_dir) + + episodes_info = [] + logger.info(f"Processing {len(trajectory_ids)} trajectories from {input_file}...") + + for episode_index, trajectory_id in enumerate(tqdm(trajectory_ids)): + if trajectory_id in [f"demo_{failed_id}" for failed_id in failed_ids]: + continue + + trajectory = hdf5_data[trajectory_id] + + try: + df_ret_dict = self._convert_trajectory_to_df( + trajectory=trajectory, episode_index=episode_index, index_start=total_length, config=config + ) + except Exception as e: + logger.error(f"Failed to convert trajectory {trajectory_id}: {e}") + continue + + # Save episode data + dataframe = df_ret_dict["data"] + episode_chunk = episode_index // config["chunks_size"] + save_relpath = config["data_path"].format(episode_chunk=episode_chunk, episode_index=episode_index) + save_path = output_path / save_relpath + save_path.parent.mkdir(parents=True, exist_ok=True) + dataframe.to_parquet(save_path) + + length = df_ret_dict["length"] + total_length += length + episodes_info.append( + { + "episode_index": episode_index, + "tasks": [config["tasks"][task_index] for task_index in df_ret_dict["annotation"]], + "length": length, + } + ) + + # Process videos + video_mapping = config["external_config"].get("video_mapping", {}) + for lerobot_key, view_suffix in video_mapping.items(): + try: + new_video_relpath = config["video_path"].format( + episode_chunk=episode_chunk, video_key=lerobot_key, episode_index=episode_index + ) + new_video_path = output_path / new_video_relpath + new_video_path.parent.mkdir(parents=True, exist_ok=True) + + original_video_path = video_dir / f"{trajectory_id}_{view_suffix}.mp4" + if original_video_path.exists(): + shutil.copy2(original_video_path, new_video_path) + if lerobot_key not in video_paths: + video_paths[lerobot_key] = new_video_path + else: + logger.warning(f"Video file not found: {original_video_path}") + except Exception as e: + logger.warning(f"Error processing video {lerobot_key} for {trajectory_id}: {e}") + + if example_data is None: + example_data = df_ret_dict + + if len(episodes_info) > config["total_episodes"] - 1: + break + + # Generate meta files + tasks_path = lerobot_meta_dir / config["tasks_fname"] + task_jsonlines = [{"task_index": task_index, "task": task} for task_index, task in config["tasks"].items()] + with open(tasks_path, "w") as f: + for item in task_jsonlines: + f.write(json.dumps(item) + "\n") + + episodes_path = lerobot_meta_dir / config["episodes_fname"] + with open(episodes_path, "w") as f: + for item in episodes_info: + f.write(json.dumps(item) + "\n") + + modality_path = lerobot_meta_dir / config["modality_fname"] + shutil.copy(config["modality_template_path"], modality_path) + + if example_data: + info_json = self._generate_info( + total_episodes=len(episodes_info), + total_frames=total_length, + total_tasks=len(config["tasks"]), + total_videos=len(episodes_info), + total_chunks=(episode_index + config["chunks_size"] - 1) // config["chunks_size"], + step_data=example_data["data"], + video_paths=video_paths, + config=config, + ) + with open(lerobot_meta_dir / "info.json", "w") as f: + json.dump(info_json, f, indent=4) + + hdf5_handler.close() + logger.info(f"Conversion completed for {input_file}") + + def process_batched(self, samples: Dict[str, Any], rank: Optional[int] = None) -> Dict[str, Any]: + input_files = samples[self.input_file_key] + output_dirs = samples[self.output_dir_key] + + if self.video_dir_key in samples: + video_dirs = samples[self.video_dir_key] + else: + video_dirs = [None] * len(input_files) + + if self.config_path_key in samples: + config_paths = samples[self.config_path_key] + else: + config_paths = [None] * len(input_files) + + for input_file, output_dir, video_dir, config_path in zip(input_files, output_dirs, video_dirs, config_paths): + try: + self._convert_file(input_file, output_dir, video_dir=video_dir, config_path=config_path) + except Exception as e: + logger.exception(f"Failed to convert {input_file}: {e}") + + return samples diff --git a/data_juicer/ops/mapper/generate_dataset_mapper.py b/data_juicer/ops/mapper/generate_dataset_mapper.py new file mode 100644 index 0000000000..3f0b0d1562 --- /dev/null +++ b/data_juicer/ops/mapper/generate_dataset_mapper.py @@ -0,0 +1,301 @@ +import asyncio +import inspect +import os +import random +from typing import Any, Dict, List, Optional + +import numpy as np +from loguru import logger + +from ..base_op import OPERATORS, UNFORKABLE, Mapper + + +@OPERATORS.register_module("generate_dataset_mapper") +@UNFORKABLE.register_module("generate_dataset_mapper") +class GenerateDatasetMapper(Mapper): + """ + Generates a mimic dataset using Isaac Lab. + + This mapper integrates Isaac Lab's dataset generation pipeline to: + 1. Load annotated episodes from an HDF5 file. + 2. Use them as a source to generate a new mimic dataset in a simulation environment. + 3. Export the generated dataset to a new HDF5 file. + + Requires Isaac Lab environment to be properly installed. + """ + + _batched_op = True + # Mark this operator as CUDA-accelerated + _accelerator = "cuda" + # Request actor restart after each task to ensure clean Isaac Sim state + _requires_actor_restart = True + + def __init__( + self, + # Task configuration + task_name: Optional[str] = None, + num_envs: int = 8, + generation_num_trials: int = 1000, + device: str = "cuda:auto", + # Input/Output keys in JSON metadata + input_file_key: str = "input_file", + output_file_key: str = "output_file", + # Isaac Sim options + headless: bool = True, + enable_cameras: bool = False, + enable_pinocchio: bool = False, + pause_subtask: bool = False, + *args, + **kwargs, + ): + """ + Initialize the GenerateMimicDatasetMapper. + + :param task_name: Isaac Lab task name (required). + :param num_envs: Number of parallel environments to use for generation. + :param generation_num_trials: Number of trials for dataset generation. + :param device: Device to run on ('cuda:0', 'cpu', etc.). + :param input_file_key: Key in JSON for the input annotated HDF5 path. + :param output_file_key: Key in JSON for the output generated HDF5 path. + :param headless: Run Isaac Sim in headless mode. + :param enable_cameras: Enable cameras in Isaac Sim. + :param enable_pinocchio: Enable Pinocchio support for IK controllers. + :param pause_subtask: Pause after every subtask during generation (debugging aid). + """ + kwargs["ray_execution_mode"] = "task" + super().__init__(*args, **kwargs) + + if task_name is None: + raise ValueError("task_name is required.") + + self.task_name = task_name + self.num_envs = num_envs + self.generation_num_trials = generation_num_trials + # Honor user-provided runtime constraints; enable explicit num_proc handling like annotate mapper + if getattr(self, "mem_required", 0) == 0 and getattr(self, "gpu_required", 0) == 0: + self.mem_required = 1e-6 + + self.device = device + self.input_file_key = input_file_key + self.output_file_key = output_file_key + self.headless = headless + self.enable_cameras = enable_cameras + self.enable_pinocchio = enable_pinocchio + self.pause_subtask = pause_subtask + + # Lazy initialization + self._env = None + self._simulation_app = None + self._isaac_initialized = False + self._success_term = None + self._orig_streams: Dict[str, Optional[Any]] = {} + self._faulthandler_file = None + self._output_file_abs: Optional[str] = None + + # Force batch_size=1 to ensure each actor processes exactly one task + self.batch_size = 1 + + logger.info( + f"Initialized GenerateMimicDatasetMapper: task={self.task_name}, " + f"num_envs={self.num_envs}, batch_size=1 (one task per actor)" + ) + + def _resolve_env_name(self, input_file: str) -> str: + if self.task_name: + return self.task_name.split(":")[-1] + + from isaaclab_mimic.datagen.utils import get_env_name_from_dataset + + return get_env_name_from_dataset(input_file) + + def _create_task_env(self, input_file: str, output_file: str) -> Optional[Any]: + import gymnasium as gym + import omni + import torch + from isaaclab.envs import ManagerBasedRLMimicEnv + from isaaclab_mimic.datagen.generation import setup_env_config + from isaaclab_mimic.datagen.utils import setup_output_paths + + if self._env is not None: + self._env.close() + self._env = None + + env_name = self._resolve_env_name(input_file) + output_dir, output_filename = setup_output_paths(output_file) + os.makedirs(output_dir, exist_ok=True) + + logger.info( + "Preparing Isaac Lab environment for dataset generation: env=%s, num_envs=%d, trials=%d", + env_name, + self.num_envs, + self.generation_num_trials, + ) + + env_cfg, success_term = setup_env_config( + env_name=env_name, + output_dir=output_dir, + output_file_name=output_filename, + num_envs=self.num_envs, + device=self.device, + generation_num_trials=self.generation_num_trials, + ) + + self._env = gym.make(env_name, cfg=env_cfg).unwrapped + + if not isinstance(self._env, ManagerBasedRLMimicEnv): + raise ValueError("The environment should be derived from ManagerBasedRLMimicEnv") + + if "action_noise_dict" not in inspect.signature(self._env.target_eef_pose_to_action).parameters: + omni.log.warn( + f'The "noise" parameter in the "{env_name}" environment\'s mimic API ' + '"target_eef_pose_to_action" is deprecated. Please update the API to take ' + "action_noise_dict instead." + ) + + if hasattr(self._env, "cfg") and hasattr(self._env.cfg, "datagen_config"): + seed = getattr(self._env.cfg.datagen_config, "seed", None) + if seed is not None: + random.seed(seed) + np.random.seed(seed) + torch.manual_seed(seed) + + self._env.reset() + self._success_term = success_term + self._output_file_abs = os.path.join(output_dir, output_filename) + return success_term + + def _run_async_generation(self, input_file: str, success_term: Optional[Any]) -> None: + from isaaclab_mimic.datagen.generation import env_loop, setup_async_generation + + async_components = setup_async_generation( + env=self._env, + num_envs=self.num_envs, + input_file=input_file, + success_term=success_term, + pause_subtask=self.pause_subtask, + ) + + data_gen_tasks = asyncio.ensure_future(asyncio.gather(*async_components["tasks"])) + try: + env_loop( + self._env, + async_components["reset_queue"], + async_components["action_queue"], + async_components["info_pool"], + async_components["event_loop"], + ) + except asyncio.CancelledError: + logger.warning("Async generation tasks were cancelled early.") + finally: + data_gen_tasks.cancel() + try: + async_components["event_loop"].run_until_complete(data_gen_tasks) + except asyncio.CancelledError: + logger.info("Cancelled async tasks cleaned up successfully.") + except Exception as loop_exc: # pragma: no cover - best effort cleanup logging + logger.warning("Exception while finalizing async tasks: %s", loop_exc) + + def _generate_dataset_for_file(self, input_file: str, output_file: str) -> Dict[str, Any]: + """Generates mimic dataset from a single annotated source file.""" + from data_juicer.utils.isaac_utils import ensure_isaac_sim_app + + ensure_isaac_sim_app(self, mode="mimic") + success_term = self._create_task_env(input_file, output_file) + self._run_async_generation(input_file, success_term) + + output_path = self._output_file_abs or output_file + success = output_path is not None and os.path.exists(output_path) + + if success: + logger.info("Dataset generation completed. Output file located at %s", output_path) + else: + logger.warning( + "Expected output dataset %s was not found after generation. " + "Downstream steps should verify dataset export.", + output_path, + ) + + return { + "success": success, + "output_file": output_path, + "exported_episodes": None, + } + + def process_batched(self, samples: Dict[str, Any], rank: Optional[int] = None) -> Dict[str, Any]: + """ + Process a single dataset generation task (batch_size=1). + """ + num_samples = len(samples.get(self.text_key, [])) + if num_samples != 1: + logger.warning( + "GenerateMimicDatasetMapper expects batch_size=1, received %d. Only the first sample will be processed.", + num_samples, + ) + samples = {key: [values[0]] for key, values in samples.items() if values} + + try: + import os as _os + + import torch as _torch + + if isinstance(self.device, str) and self.device.startswith("cuda"): + if self.device in ("cuda", "cuda:auto"): + visible = _os.environ.get("CUDA_VISIBLE_DEVICES", "") + if visible and len(visible.split(",")) == 1: + self.device = "cuda:0" + elif _torch.cuda.is_available(): + count = _torch.cuda.device_count() + if count > 0: + idx = 0 if rank is None else rank % count + self.device = f"cuda:{idx}" + elif not _torch.cuda.is_available(): + logger.warning("Requested device %s but CUDA is unavailable; falling back to CPU", self.device) + self.device = "cpu" + except Exception: # pragma: no cover - defensive + pass + + logger.info("Processing single generation task on device %s", self.device) + + results: List[Optional[Dict[str, Any]]] = [None] + + if self.input_file_key not in samples or self.output_file_key not in samples: + logger.error( + "Required keys '%s' or '%s' not found in samples.", + self.input_file_key, + self.output_file_key, + ) + samples["generation_result"] = results + return samples + + input_file = samples[self.input_file_key][0] + output_file = samples[self.output_file_key][0] + + logger.info("Task: Generate from %s -> %s", input_file, output_file) + + try: + result = self._generate_dataset_for_file(input_file, output_file) + results[0] = result + + if result.get("success"): + logger.info("✓ Task completed successfully") + else: + logger.warning("✗ Task failed") + + except Exception as exc: + logger.error("An exception occurred during dataset generation: %s", exc, exc_info=True) + results[0] = {"success": False, "error": str(exc)} + finally: + logger.info("Task complete. Cleaning up resources...") + self.cleanup() + + samples["generation_result"] = results + return samples + + # Use shared cleanup logic + from data_juicer.utils.isaac_utils import cleanup_isaac_env as cleanup + + def __del__(self): + try: + self.cleanup() + except Exception: + pass diff --git a/data_juicer/ops/mapper/replay_demos_randomized_mapper.py b/data_juicer/ops/mapper/replay_demos_randomized_mapper.py new file mode 100644 index 0000000000..d54da22e59 --- /dev/null +++ b/data_juicer/ops/mapper/replay_demos_randomized_mapper.py @@ -0,0 +1,450 @@ +import os +import re +import time +from typing import Dict, List, Optional + +import cv2 +import torch +from isaaclab.utils.datasets import HDF5DatasetFileHandler +from loguru import logger + +from ..base_op import OPERATORS, UNFORKABLE, Mapper + + +@OPERATORS.register_module("replay_demos_randomized_mapper") +@UNFORKABLE.register_module("replay_demos_randomized_mapper") +class ReplayDemosRandomizedMapper(Mapper): + """ + Replay demonstrations with Isaac Lab environments and record videos. + """ + + _batched_op = True + # Mark this operator as CUDA-accelerated + _accelerator = "cuda" + # Each task requires a new, clean Isaac Sim instance. + _requires_actor_restart = True + + def __init__( + self, + # Task configuration + task_name: Optional[str] = None, + select_episodes: Optional[List[int]] = None, + validate_states: bool = False, + enable_pinocchio: bool = False, + dual_arm: bool = False, + device: str = "cuda:auto", + randomize_visuals: bool = True, + visual_randomization_config: Optional[str] = None, + # Input/Output keys in JSON metadata + input_file_key: str = "dataset_file", + output_file_key: str = "output_file", + video_dir_key: str = "video_dir", + # Video recording options + video: bool = False, + camera_view_list: Optional[List[str]] = None, + save_depth: bool = False, + # Isaac Sim options + headless: bool = True, + *args, + **kwargs, + ): + """ + Initialize the ReplayDemosRandomizedMapper. + + :param task_name: Isaac Lab task name (e.g., 'Isaac-Stack-Cube-Franka-IK-Rel-v0'). + :param select_episodes: A list of episode indices to be replayed. + If None, replay all episodes. + :param validate_states: Whether to validate states during replay. + :param enable_pinocchio: Enable Pinocchio support. + :param dual_arm: Whether the robot is a dual-arm robot. + :param device: Device to run on ('cuda:0', 'cpu', etc.). + :param randomize_visuals: Whether to randomize visual appearance (lights, materials) during replay. + :param input_file_key: Key in the sample to find the input HDF5 path. + :param output_file_key: Key in the sample to store the output HDF5 path (if dumping). + :param video_dir_key: Key in the sample to store the output video directory. + :param video: Whether to record videos. + :param camera_view_list: A list of camera views to record. + :param save_depth: Whether to save depth images along with RGB. + :param headless: Run Isaac Sim in headless mode. + """ + kwargs["ray_execution_mode"] = "task" + super().__init__(*args, **kwargs) + + if video and not camera_view_list: + raise ValueError("`camera_view_list` must be provided when `video` is True.") + + self.task_name = task_name + self.select_episodes = select_episodes if select_episodes else [] + self.validate_states = validate_states + self.enable_pinocchio = enable_pinocchio + self.dual_arm = dual_arm + self.device = device + self.randomize_visuals = randomize_visuals + + self.visual_randomization_config = None + if visual_randomization_config: + import yaml + + if os.path.exists(visual_randomization_config): + with open(visual_randomization_config, "r") as f: + self.visual_randomization_config = yaml.safe_load(f) + else: + logger.warning(f"Visual randomization config file not found: {visual_randomization_config}") + + self.input_file_key = input_file_key + self.output_file_key = output_file_key + self.video_dir_key = video_dir_key + + self.video = video + self.camera_view_list = camera_view_list if camera_view_list else [] + self.save_depth = save_depth + self.headless = headless + + # Force batch_size=1 to ensure each actor processes exactly one task + self.batch_size = 1 + + # Lazy initialization for Isaac Sim + self._env = None + self._simulation_app = None + self._isaac_initialized = False + + logger.info(f"Initialized ReplayDemosRandomizedMapper for task={self.task_name}") + + def _inject_visual_randomization(self, env_cfg): + """Inject visual randomization terms into the environment configuration.""" + import isaaclab_tasks.manager_based.manipulation.stack.mdp.franka_stack_events as franka_stack_events + from isaaclab.managers import EventTermCfg as EventTerm + from isaaclab.managers import SceneEntityCfg + + from data_juicer.utils.isaac_utils import resolve_nucleus_paths + + if not self.visual_randomization_config: + return + + # Disable scene replication to allow USD-level randomization (materials) + if hasattr(env_cfg, "scene"): + env_cfg.scene.replicate_physics = False + + # Resolve paths in config + resolved_config = resolve_nucleus_paths(self.visual_randomization_config) + + for entry in resolved_config: + func = None + if entry["type"] == "light": + func = franka_stack_events.randomize_scene_lighting_domelight + elif entry["type"] == "asset_texture": + func = franka_stack_events.randomize_visual_texture_material + else: + logger.warning(f"Unknown randomization type: {entry['type']}") + continue + + params = entry.get("params", {}).copy() + + if "intensity_range" in params and isinstance(params["intensity_range"], list): + params["intensity_range"] = tuple(params["intensity_range"]) + if "default_color" in params and isinstance(params["default_color"], list): + params["default_color"] = tuple(params["default_color"]) + + # Convert asset_cfg dict to SceneEntityCfg object if present + if "asset_cfg" in params and isinstance(params["asset_cfg"], dict): + params["asset_cfg"] = SceneEntityCfg(**params["asset_cfg"]) + + logger.debug(f"params: {params}") + + setattr( + env_cfg.events, + entry["name"], + EventTerm( + func=func, + mode="reset", + params=params, + ), + ) + + def _create_env(self): + import gymnasium as gym + from isaaclab_tasks.utils.parse_cfg import parse_env_cfg + + # Build env config + env_cfg = parse_env_cfg(self.task_name, device=self.device, num_envs=1) + env_cfg.env_name = self.task_name + env_cfg.eval_mode = True + env_cfg.eval_type = "all" + + # Inject visual randomization if enabled + if self.randomize_visuals: + self._inject_visual_randomization(env_cfg) + + # Extract success checking function and disable timeouts + success_term = None + if hasattr(env_cfg.terminations, "success"): + success_term = env_cfg.terminations.success + env_cfg.terminations.success = None + if hasattr(env_cfg.terminations, "time_out"): + env_cfg.terminations.time_out = None + + # Some envs expect this + if hasattr(env_cfg, "observations") and hasattr(env_cfg.observations, "policy"): + env_cfg.observations.policy.concatenate_terms = False + + if hasattr(env_cfg, "sim") and hasattr(env_cfg.sim, "physx"): + env_cfg.sim.physx.enable_ccd = True + + # Create environment + env = gym.make(self.task_name, cfg=env_cfg).unwrapped + + self._env = env + return success_term + + def process_batched(self, samples, rank: Optional[int] = None): + """Process a single replay task (batch_size=1).""" + from data_juicer.utils.isaac_utils import create_video_from_images + + # Normalize device if auto and CUDA available + try: + if isinstance(self.device, str) and self.device.startswith("cuda"): + if self.device in ("cuda", "cuda:auto") and torch.cuda.is_available(): + count = torch.cuda.device_count() + if count > 0: + idx = 0 if rank is None else rank % count + self.device = f"cuda:{idx}" + elif not torch.cuda.is_available(): + logger.warning("CUDA requested but unavailable; falling back to CPU") + self.device = "cpu" + except Exception: + pass + + # Validate required input + if self.input_file_key not in samples: + logger.error("Missing required key '%s' in samples", self.input_file_key) + samples.setdefault("replay_result", [None]) + samples["replay_result"][0] = {"success": False, "error": f"missing key {self.input_file_key}"} + return samples + + # Only process first sample + dataset_file = samples[self.input_file_key][0] + # Optional overrides per-sample + camera_views = ( + samples.get("camera_view_list", [self.camera_view_list])[0] + if "camera_view_list" in samples + else self.camera_view_list + ) + save_depth = samples.get("save_depth", [self.save_depth])[0] if "save_depth" in samples else self.save_depth + video_enabled = samples.get("video", [self.video])[0] if "video" in samples else self.video + # Output base dir + base_video_dir = samples.get(self.video_dir_key, [None])[0] + if not base_video_dir: + base_video_dir = os.path.join(os.getcwd(), f"{self.task_name}_videos") + os.makedirs(base_video_dir, exist_ok=True) + # Always allocate a unique sub-directory per task to avoid collisions across parallel tasks + task_video_dir = os.path.join(base_video_dir, f"task_{os.getpid()}_{int(time.time()*1000)}") + os.makedirs(task_video_dir, exist_ok=True) + + logger.info( + "Replay task start: task=%s, dataset=%s, device=%s, video=%s, views=%s", + self.task_name, + dataset_file, + self.device, + video_enabled, + camera_views, + ) + + # Results + video_paths: List[str] = [] + failed_demo_ids: List[int] = [] + replayed_episode_count = 0 + + try: + # 1) Ensure SimulationApp + from data_juicer.utils.isaac_utils import ensure_isaac_sim_app + + ensure_isaac_sim_app(self, mode="tasks") + + # 2) Create env + success_term = self._create_env() + + # 3) Open dataset + dataset_handler = HDF5DatasetFileHandler() + if not os.path.exists(dataset_file): + raise FileNotFoundError(f"Dataset file not found: {dataset_file}") + dataset_handler.open(dataset_file) + episode_names = list(dataset_handler.get_episode_names()) + + # If select_episodes provided, map from names by extracting indices + if self.select_episodes: + name_by_index: Dict[int, str] = {} + for name in episode_names: + m = re.search(r"(\d+)", name) + if m: + name_by_index[int(m.group(1))] = name + ordered = [] + for idx in self.select_episodes: + if idx in name_by_index: + ordered.append(name_by_index[idx]) + episode_names = ordered + + if len(episode_names) == 0: + raise RuntimeError("No episodes found in dataset") + + env = self._env + + # Default camera view if none provided + if hasattr(env, "sim"): + try: + env.sim.set_camera_view(eye=[3.0, 0.0, 1.5], target=[0.0, 0.0, 1.0]) + except Exception: + pass + + # Reset env + env.reset() + + for name in episode_names: + # load episode (device-aware) + episode = dataset_handler.load_episode(name, env.device) + + # Reset to initial state if available + if "initial_state" in episode.data: + initial_state = episode.get_initial_state() + try: + env.sim.reset() + if hasattr(env, "recorder_manager"): + env.recorder_manager.reset() + except Exception: + pass + env.reset_to(initial_state, None, is_relative=True) + + # Prepare per-episode image save dir if video enabled + if video_enabled and camera_views: + demo_save_dir = os.path.join(task_video_dir, "images", f"demo_{replayed_episode_count}") + os.makedirs(demo_save_dir, exist_ok=True) + + step_index = 0 + # Iterate actions + while True: + next_action = episode.get_next_action() + if next_action is None: + break + + # Suction support: last dim controls gripper, remaining are actions + action_tensor = torch.tensor(next_action, device=env.device) + if isinstance(action_tensor, torch.Tensor) and action_tensor.ndim == 1: + action_tensor = action_tensor.reshape(1, -1) + + if "Suction" in self.task_name: + try: + if float(action_tensor[0, -1]) == 1.0: + env.open_suction_cup(0) + else: + env.close_suction_cup(0) + action_applied = action_tensor[:, :-1] + except Exception: + action_applied = action_tensor + else: + action_applied = action_tensor + + env.step(action_applied) + + # Save frames + if video_enabled and camera_views: + for view in camera_views: + try: + rgb_cam = env.scene.sensors[f"{view}_cam"].data.output["rgb"].cpu().numpy()[0] + + rgb_path = os.path.join( + demo_save_dir, + f"frame_{step_index:04d}_{view}_rgb.png", + ) + cv2.imwrite(rgb_path, cv2.cvtColor(rgb_cam, cv2.COLOR_RGB2BGR)) + + if save_depth: + depth_cam = ( + env.scene.sensors[f"{view}_cam"] + .data.output["distance_to_image_plane"] + .cpu() + .numpy()[0] + ) + depth_16bit = (depth_cam * 1000).astype("uint16") + depth_path = os.path.join( + demo_save_dir, + f"frame_{step_index:04d}_{view}_depth.png", + ) + cv2.imwrite(depth_path, depth_16bit) + except Exception as e: + logger.debug(f"Failed saving frame for view {view}: {e}") + + step_index += 1 + + # Check success if term provided + episode_success = True + try: + if success_term is not None: + result = success_term.func(env, **success_term.params)[0] + episode_success = bool(result) + except Exception: + pass + + # Create videos per view + if video_enabled and camera_views: + for view in camera_views: + # RGB video + input_pattern = os.path.join( + task_video_dir, "images", f"demo_{replayed_episode_count}", f"frame_%04d_{view}_rgb.png" + ) + output_video = os.path.join(task_video_dir, f"demo_{replayed_episode_count}_{view}_rgb.mp4") + ok = create_video_from_images(input_pattern, output_video) + if ok: + video_paths.append(output_video) + + if save_depth: + input_pattern = os.path.join( + task_video_dir, + "images", + f"demo_{replayed_episode_count}", + f"frame_%04d_{view}_depth.png", + ) + output_video = os.path.join( + task_video_dir, f"demo_{replayed_episode_count}_{view}_depth.mp4" + ) + ok = create_video_from_images(input_pattern, output_video) + if ok: + video_paths.append(output_video) + + # Record failure + if not episode_success: + failed_demo_ids.append(replayed_episode_count) + + replayed_episode_count += 1 + + # Done + result = { + "success": True, + "replayed_episode_count": replayed_episode_count, + "failed_demo_ids": failed_demo_ids, + "replay_video_paths": video_paths, + "video_dir": task_video_dir, + } + except Exception as exc: + logger.error("Replay task failed: %s", exc, exc_info=True) + result = {"success": False, "error": str(exc)} + finally: + # Always cleanup env resources + self.cleanup() + + # Populate standardized outputs for downstream aggregators + samples.setdefault("replay_result", [None]) + samples["replay_result"][0] = result + samples["replay_success"] = [bool(result.get("success", False))] + samples["replay_video_paths"] = [result.get("replay_video_paths", [])] + samples["replay_failed_demo_ids"] = [result.get("failed_demo_ids", [])] + samples["replay_video_dir"] = [result.get("video_dir", base_video_dir)] + samples["replay_failure_reason"] = [result.get("error", "") if not result.get("success", False) else ""] + return samples + + # Use shared cleanup logic + from data_juicer.utils.isaac_utils import cleanup_isaac_env as cleanup + + def __del__(self): + try: + self.cleanup() + except Exception: + pass diff --git a/data_juicer/utils/isaac_utils.py b/data_juicer/utils/isaac_utils.py new file mode 100644 index 0000000000..3aae6c4aaf --- /dev/null +++ b/data_juicer/utils/isaac_utils.py @@ -0,0 +1,272 @@ +import argparse +import faulthandler +import os +import sys +from contextlib import contextmanager +from typing import Any, Dict, Optional, Union + +import torch +from loguru import logger + +# ============================================================================ +# Context Managers +# ============================================================================ + + +@contextmanager +def LazyStreamRedirector(): + """ + Context manager to temporarily redirect sys.stdin, sys.stdout, and sys.stderr + to the real OS streams (if available) and restore them afterwards. + + This is useful when launching applications like Isaac Sim's SimulationApp + that might interfere with wrapped streams (e.g., from Ray or other loggers). + """ + orig_streams: Dict[str, Optional[Any]] = {} + try: + # Swap wrapped IO streams with the real OS streams + for stream_name in ("stdin", "stdout", "stderr"): + orig_streams[stream_name] = getattr(sys, stream_name, None) + real_stream = getattr(sys, f"__{stream_name}__", None) + if real_stream is not None: + setattr(sys, stream_name, real_stream) + yield + finally: + # Restore wrapped streams + for stream_name, orig_stream in orig_streams.items(): + if orig_stream is not None: + setattr(sys, stream_name, orig_stream) + + +# ============================================================================ +# Core Initialization +# ============================================================================ + + +def init_isaac_sim_app( + headless: bool = True, + device: str = "cuda:auto", + enable_cameras: bool = False, + enable_pinocchio: bool = False, +) -> Any: + """ + Initialize Isaac Sim SimulationApp with common configurations. + + :param headless: Run Isaac Sim in headless mode. + :param device: Device to run on ('cuda:0', 'cpu', etc.). + :param enable_cameras: Enable cameras in Isaac Sim. + :param enable_pinocchio: Enable Pinocchio support. + :return: The initialized SimulationApp instance. + :raises RuntimeError: If CUDA is not available. + :raises ImportError: If Isaac Lab is not installed. + """ + # 1. CUDA checks and setup + if torch.cuda.is_initialized(): + logger.warning("CUDA was initialized before Isaac Sim. Clearing cached state...") + torch.cuda.empty_cache() + + if not torch.cuda.is_available(): + raise RuntimeError( + "CUDA is not available. Isaac Sim requires CUDA to run. " + "Please verify the GPU driver and CUDA installation." + ) + + os.environ["CUDA_LAUNCH_BLOCKING"] = "1" + + logger.info(f"Initializing Isaac Sim SimulationApp (device={device}, headless={headless})") + logger.info(f"CUDA device count detected by torch: {torch.cuda.device_count()}") + + # 2. Prepare AppLauncher arguments + try: + from isaaclab.app import AppLauncher + except ImportError: + raise ImportError("Could not import isaaclab.app.AppLauncher. " "Please ensure Isaac Lab is installed.") + + parser = argparse.ArgumentParser() + AppLauncher.add_app_launcher_args(parser) + # Parse empty args to get defaults, then override + args, _ = parser.parse_known_args([]) + args.headless = headless + args.device = device + args.enable_cameras = enable_cameras + args.enable_scene_lights = not headless + + # 3. Pinocchio setup (pre-launch) + if enable_pinocchio: + import pinocchio # noqa: F401 + + # 4. Faulthandler setup + # Redirect faulthandler to devnull to avoid noise + try: + faulthandler_file = open(os.devnull, "w") + faulthandler.enable(file=faulthandler_file) + # Note: We don't return the file handle here, relying on OS/GC to handle it eventually. + # If strict management is needed, this function should return (app, file_handle). + except Exception as exc: + logger.debug(f"Failed to enable faulthandler: {exc}") + + # 5. Launch SimulationApp with stream redirection + simulation_app = None + with LazyStreamRedirector(): + app_launcher = AppLauncher(args) + simulation_app = app_launcher.app + + # 6. Pinocchio setup (post-launch) + if enable_pinocchio: + import isaaclab.utils.math # noqa: F401 + import warp # noqa: F401 + + logger.info("Isaac Sim SimulationApp initialized successfully") + return simulation_app + + +# ============================================================================ +# Mapper Lifecycle Helpers +# ============================================================================ + + +def ensure_isaac_sim_app(instance: Any, mode: str = "mimic") -> None: + """ + Ensure Isaac Sim SimulationApp is initialized once and reused. + Handles common initialization and mode-specific imports. + + This function is designed to be called within a Mapper's processing method. + It checks `instance._isaac_initialized` to avoid re-initialization. + + :param instance: The mapper instance (self). Expected to have attributes: + headless, device, enable_cameras (or video), enable_pinocchio. + :param mode: 'mimic' for Isaac Lab Mimic, 'tasks' for Isaac Lab Tasks. + """ + if getattr(instance, "_isaac_initialized", False): + return + + # Determine enable_cameras + enable_cameras = False + if hasattr(instance, "enable_cameras"): + enable_cameras = instance.enable_cameras + elif hasattr(instance, "video"): + enable_cameras = bool(instance.video) + + instance._simulation_app = init_isaac_sim_app( + headless=instance.headless, + device=instance.device, + enable_cameras=enable_cameras, + enable_pinocchio=getattr(instance, "enable_pinocchio", False), + ) + + # Mode-specific imports + if mode == "mimic": + import isaaclab_mimic.envs # noqa: F401 + + if getattr(instance, "enable_pinocchio", False): + import isaaclab_mimic.envs.pinocchio_envs # noqa: F401 + + elif mode == "tasks": + import isaaclab_tasks # noqa: F401 + + if getattr(instance, "enable_pinocchio", False): + import isaaclab_tasks.manager_based.locomanipulation.pick_place # noqa: F401 + import isaaclab_tasks.manager_based.manipulation.pick_place # noqa: F401 + + instance._isaac_initialized = True + + +def cleanup_isaac_env(instance: Any) -> Dict[str, str]: + """ + Safely close an Isaac Lab environment stored in a mapper instance. + Can be assigned directly to the `cleanup` method of a mapper class. + + :param instance: The mapper instance containing `_env`. + :return: A status dictionary (e.g., {"status": "cleaned"}). + """ + logger.info(f"Cleaning up {instance.__class__.__name__} resources...") + + if hasattr(instance, "_env") and instance._env is not None: + try: + instance._env.close() + logger.info("Closed Isaac Lab environment.") + except Exception as e: + logger.warning(f"Error closing Isaac Lab environment: {e}") + finally: + instance._env = None + + # Optional: Handle faulthandler if it was stored + if hasattr(instance, "_faulthandler_file") and instance._faulthandler_file is not None: + try: + import faulthandler + + faulthandler.disable() + instance._faulthandler_file.close() + except Exception: + pass + finally: + instance._faulthandler_file = None + + logger.info("Cleanup complete (simulation_app left for Ray to manage).") + return {"status": "cleaned"} + + +# ============================================================================ +# Utility Functions +# ============================================================================ + + +def resolve_nucleus_paths(config: Union[Dict, list, str]) -> Union[Dict, list, str]: + """ + Recursively resolve Isaac Nucleus paths in a configuration dictionary or list. + Replaces placeholders like '{ISAAC_NUCLEUS_DIR}' with actual paths. + + :param config: Configuration object (dict, list, or string). + :return: Configuration object with resolved paths. + """ + from isaaclab.utils.assets import ISAAC_NUCLEUS_DIR, NVIDIA_NUCLEUS_DIR + + def _replace_recursive(item): + if isinstance(item, str): + item = item.replace("{ISAAC_NUCLEUS_DIR}", ISAAC_NUCLEUS_DIR) + item = item.replace("{NVIDIA_NUCLEUS_DIR}", NVIDIA_NUCLEUS_DIR) + elif isinstance(item, list): + item = [_replace_recursive(i) for i in item] + elif isinstance(item, dict): + for k, v in item.items(): + item[k] = _replace_recursive(v) + return item + + return _replace_recursive(config) + + +def create_video_from_images(input_pattern: str, output_video: str, framerate: float = 20.0) -> bool: + """ + Create a video from a sequence of images using ffmpeg. + + :param input_pattern: Input file pattern (e.g., "frame_%04d.png"). + :param output_video: Output video file path. + :param framerate: Frame rate of the output video. + :return: True if successful, False otherwise. + """ + import subprocess + + cmd = [ + "ffmpeg", + "-y", + "-framerate", + str(framerate), + "-i", + input_pattern, + "-c:v", + "libx264", + "-pix_fmt", + "yuv420p", + "-crf", + "23", + "-preset", + "medium", + output_video, + ] + try: + subprocess.run(cmd, check=True, capture_output=True) + logger.info(f"Created video: {output_video}") + return True + except Exception as e: + logger.warning(f"ffmpeg failed to create video {output_video}: {e}") + return False diff --git a/demos/franka_mimicgen/config/gr00t_task_sapce_config.yaml b/demos/franka_mimicgen/config/gr00t_task_sapce_config.yaml new file mode 100644 index 0000000000..aa93888908 --- /dev/null +++ b/demos/franka_mimicgen/config/gr00t_task_sapce_config.yaml @@ -0,0 +1,52 @@ +dataset: + robot_type: "franka_task_space" + fps: 20 + chunks_size: 1000 + +modality_template_path: "./demos/franka_mimicgen/config/modality_task_space.json" +info_template_path: "./demos/franka_mimicgen/config/info.json" + +tasks: + 0: "Pick up the red cube and put it onto the blue cube. Pick up the green cube and place it onto the red cube." + 1: "valid" + +# Mapping from HDF5 keys to LeRobot keys +# Each entry in 'state' and 'action' is a list of sources to be concatenated +mapping: + state: + # - key: "obs.eef_pos" + # slice: "[:-1]" + # - key: "obs.eef_quat" + # slice: "[:-1]" + # - key: "obs.gripper_pos" + # slice: "[:-1, 0]" + # reshape: [-1, 1] + - key: "obs.eef_pos" + slice: [[null, -1]] + - key: "obs.eef_quat" + slice: [[null, -1]] + - key: "obs.gripper_pos" + slice: [[null, -1], 0] + reshape: [-1, 1] + + action: + - key: "actions" + slice: [[null, -1], [null, -1]] + - key: "obs.gripper_pos" + slice: [[1, null], 0] + reshape: [-1, 1] + +# Mapping from LeRobot video keys to file suffixes +video_mapping: + # "observation.images.ego_view": "ego_rgb" + # "observation.images.left_view": "left_wrist_rgb" + # "observation.images.right_view": "right_wrist_rgb" + "observation.images.table_view": "table_rgb" + +# LeRobot specific keys +lerobot_keys: + state: "observation.state" + action: "action" + annotation: + - "annotation.human.action.task_description" + - "annotation.human.action.valid" \ No newline at end of file diff --git a/demos/franka_mimicgen/config/info.json b/demos/franka_mimicgen/config/info.json new file mode 100644 index 0000000000..cb991aca5f --- /dev/null +++ b/demos/franka_mimicgen/config/info.json @@ -0,0 +1,17 @@ +{ + "codebase_version": "v2.0", + "robot_type": null, + "total_episodes": null, + "total_frames": null, + "total_tasks": null, + "total_videos": null, + "total_chunks": null, + "chunks_size": 1000, + "fps": null, + "splits": { + "train": "0:100" + }, + "data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet", + "video_path": "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4", + "features": null +} \ No newline at end of file diff --git a/demos/franka_mimicgen/config/modality_task_space.json b/demos/franka_mimicgen/config/modality_task_space.json new file mode 100644 index 0000000000..062f9a0bb5 --- /dev/null +++ b/demos/franka_mimicgen/config/modality_task_space.json @@ -0,0 +1,51 @@ +{ + "state": { + "franka_eef_pos": { + "original_key": "observation.state", + "start": 0, + "end": 3, + "dtype": "float32" + }, + "franka_eef_quat": { + "original_key": "observation.state", + "start": 3, + "end": 7, + "dtype": "float32", + "rotation_type": "quaternion" + }, + "franka_gripper_pos": { + "start": 7, + "end": 8, + "dtype": "float32" + } + }, + "action": { + "franka_eef_pos": { + "start": 0, + "end": 3, + "dtype": "float32" + }, + "franka_eef_quat": { + "start": 3, + "end": 6, + "dtype": "float32", + "rotation_type": "axis_angle" + }, + "franker_gripper_pos": { + "start": 6, + "end": 7, + "dtype": "float32" + } + }, + "video": { + "table_view": { + "original_key": "observation.images.table_view" + } + }, + "annotation": { + "human.action.task_description": { + }, + "human.validity": { + } + } +} \ No newline at end of file diff --git a/demos/franka_mimicgen/config/visual_randomization.yaml b/demos/franka_mimicgen/config/visual_randomization.yaml new file mode 100644 index 0000000000..79a57d74c1 --- /dev/null +++ b/demos/franka_mimicgen/config/visual_randomization.yaml @@ -0,0 +1,58 @@ +- name: "randomize_light" + type: "light" + params: + intensity_range: [1500.0, 10000.0] + color_variation: 0.4 + default_intensity: 3000.0 + default_color: [0.75, 0.75, 0.75] + default_texture: "" + textures: + - "{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Cloudy/abandoned_parking_4k.hdr" + - "{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Cloudy/evening_road_01_4k.hdr" + - "{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Cloudy/lakeside_4k.hdr" + - "{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/autoshop_01_4k.hdr" + - "{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/carpentry_shop_01_4k.hdr" + - "{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/hospital_room_4k.hdr" + - "{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/hotel_room_4k.hdr" + - "{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/old_bus_depot_4k.hdr" + - "{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/small_empty_house_4k.hdr" + - "{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/surgery_4k.hdr" + - "{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Studio/photo_studio_01_4k.hdr" +- name: "randomize_table_visual_material" + type: "asset_texture" + params: + asset_cfg: + name: "table" + default_texture: "{ISAAC_NUCLEUS_DIR}/Props/Mounts/SeattleLabTable/Materials/Textures/DemoTable_TableBase_BaseColor.png" + textures: + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Ash/Ash_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Bamboo_Planks/Bamboo_Planks_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Birch/Birch_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Cherry/Cherry_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Mahogany_Planks/Mahogany_Planks_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Oak/Oak_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Plywood/Plywood_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Timber/Timber_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Timber_Cladding/Timber_Cladding_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Walnut_Planks/Walnut_Planks_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Stone/Marble/Marble_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Steel_Stainless/Steel_Stainless_BaseColor.png" +- name: "randomize_robot_arm_visual_texture" + type: "asset_texture" + params: + asset_cfg: + name: "robot" + textures: + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Aluminum_Cast/Aluminum_Cast_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Aluminum_Polished/Aluminum_Polished_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Brass/Brass_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Bronze/Bronze_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Brushed_Antique_Copper/Brushed_Antique_Copper_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Cast_Metal_Silver_Vein/Cast_Metal_Silver_Vein_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Copper/Copper_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Gold/Gold_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Iron/Iron_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/RustedMetal/RustedMetal_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Silver/Silver_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Steel_Carbon/Steel_Carbon_BaseColor.png" + - "{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Steel_Stainless/Steel_Stainless_BaseColor.png" \ No newline at end of file diff --git a/demos/franka_mimicgen/data/annotation_tasks.jsonl b/demos/franka_mimicgen/data/annotation_tasks.jsonl new file mode 100644 index 0000000000..64be4eaa4b --- /dev/null +++ b/demos/franka_mimicgen/data/annotation_tasks.jsonl @@ -0,0 +1,2 @@ +{"text": "Annotate dataset 1", "input_file": "./demos/franka_mimicgen/data/dataset.hdf5", "output_file": "./outputs/demo/mimicgen/annotate/dataset_annotated.hdf5"} +{"text": "Annotate dataset 2", "input_file": "./demos/franka_mimicgen/data/dataset.hdf5", "output_file": "./outputs/demo/mimicgen/annotate/dataset_annotated2.hdf5"} \ No newline at end of file diff --git a/demos/franka_mimicgen/data/convert_tasks.jsonl b/demos/franka_mimicgen/data/convert_tasks.jsonl new file mode 100644 index 0000000000..46f957fe72 --- /dev/null +++ b/demos/franka_mimicgen/data/convert_tasks.jsonl @@ -0,0 +1,2 @@ +{"text": "convert dataset 1", "input_file": "./outputs/demo/mimicgen/generate/mimic_generated1.hdf5", "video_dir": "./outputs/demo/mimicgen/replay/videos_run1/task_xxx", "output_dir": "./outputs/demo/mimicgen/convert/mimic_generated1", "config_path": "./demos/franka_mimicgen/convert/gr00t_task_space_config.yaml"} +{"text": "convert dataset 2", "input_file": "./outputs/demo/mimicgen/generate/mimic_generated2.hdf5", "video_dir": "./outputs/demo/mimicgen/replay/videos_run2/task_xxx", "output_dir": "./outputs/demo/mimicgen/convert/mimic_generated2", "config_path": "./demos/franka_mimicgen/convert/gr00t_task_space_config.yaml"} \ No newline at end of file diff --git a/demos/franka_mimicgen/data/dataset.hdf5 b/demos/franka_mimicgen/data/dataset.hdf5 new file mode 100644 index 0000000000..31027f534b Binary files /dev/null and b/demos/franka_mimicgen/data/dataset.hdf5 differ diff --git a/demos/franka_mimicgen/data/generate_tasks.jsonl b/demos/franka_mimicgen/data/generate_tasks.jsonl new file mode 100644 index 0000000000..0327f473e1 --- /dev/null +++ b/demos/franka_mimicgen/data/generate_tasks.jsonl @@ -0,0 +1,2 @@ +{"text": "gen-task-1", "input_file": "./outputs/demo/mimicgen/annotate/dataset_annotated.hdf5", "output_file": "./outputs/demo/mimicgen/generate/mimic_generated1.hdf5"} +{"text": "gen-task-2", "input_file": "./outputs/demo/mimicgen/annotate/dataset_annotated2.hdf5", "output_file": "./outputs/demo/mimicgen/generate/mimic_generated2.hdf5"} \ No newline at end of file diff --git a/demos/franka_mimicgen/data/replay_tasks.jsonl b/demos/franka_mimicgen/data/replay_tasks.jsonl new file mode 100644 index 0000000000..17d8686868 --- /dev/null +++ b/demos/franka_mimicgen/data/replay_tasks.jsonl @@ -0,0 +1,3 @@ +{"text": "replay-1", "dataset_file": "./outputs/demo/mimicgen/generate/mimic_generated1.hdf5", "video_dir": "./outputs/demo/mimicgen/replay/videos_run1", "camera_view_list": ["table", "wrist"], "video": true, "save_depth": true} +{"text": "replay-2", "dataset_file": "./outputs/demo/mimicgen/generate/mimic_generated2.hdf5", "video_dir": "./outputs/demo/mimicgen/replay/videos_run2", "camera_view_list": ["table", "wrist"], "video": true, "save_depth": true} + diff --git a/demos/franka_mimicgen/run_annotate.py b/demos/franka_mimicgen/run_annotate.py new file mode 100644 index 0000000000..be6e2dc05d --- /dev/null +++ b/demos/franka_mimicgen/run_annotate.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 + +import os +import ray +from data_juicer.core.data.ray_dataset import RayDataset +from data_juicer.ops.mapper import AnnotateDemosMapper + +ray.init(address='local') + +# Updated path to the new data directory +ds = RayDataset(ray.data.read_json('./demos/franka_mimicgen/data/annotation_tasks.jsonl')) + +annotator = AnnotateDemosMapper( + task_name='Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Mimic-v0', + num_proc=2, + gpu_required=0.5, + accelerator='cuda', + headless=True, + enable_cameras=True, + enable_pinocchio=False, + batch_size=1, + num_cpus=1 +) + +ds = ds.process([annotator]) + +output_path = './outputs/demo/mimicgen/annotate' +os.makedirs(output_path, exist_ok=True) +ds.data.write_json(output_path, force_ascii=False) + +ray.shutdown() diff --git a/demos/franka_mimicgen/run_convert.py b/demos/franka_mimicgen/run_convert.py new file mode 100644 index 0000000000..df64ee3e3c --- /dev/null +++ b/demos/franka_mimicgen/run_convert.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 + +import os +import ray +from data_juicer.core.data.ray_dataset import RayDataset +from data_juicer.ops.mapper import ConvertToLeRobotMapper + +# NOTE: Before using, please check which video directory is specified in the `video_dir_key`. You can open the `./outputs/demo/mimicgen/replay/videos_run{x}` directory to see the specific video folder name, and then modify the video_dir field in convert_tasks.jsonl to match the correct path. + +ray.init(address='local') + +# Updated path to the new data directory +ds = RayDataset(ray.data.read_json('./demos/franka_mimicgen/data/convert_tasks.jsonl')) + +converter = ConvertToLeRobotMapper( + config_path_key='config_path', + input_file_key='input_file', + output_dir_key='output_dir', + video_dir_key='video_dir', + num_proc=2 +) + +ds = ds.process([converter]) + +output_path = './outputs/demo/mimicgen/convert' +os.makedirs(output_path, exist_ok=True) +ds.data.write_json(output_path, force_ascii=False) + +ray.shutdown() diff --git a/demos/franka_mimicgen/run_generate.py b/demos/franka_mimicgen/run_generate.py new file mode 100644 index 0000000000..e72a02d7d8 --- /dev/null +++ b/demos/franka_mimicgen/run_generate.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 + +import os +import ray +from data_juicer.core.data.ray_dataset import RayDataset +from data_juicer.ops.mapper import GenerateDatasetMapper + +ray.init(address='local') + +# Updated path to the new data directory +ds = RayDataset(ray.data.read_json('./demos/franka_mimicgen/data/generate_tasks.jsonl')) + +generator = GenerateDatasetMapper( + task_name='Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Cosmos-Mimic-v0', + num_envs=8, + generation_num_trials=10, + device='cuda:auto', + headless=True, + enable_cameras=True, + input_file_key='input_file', + output_file_key='output_file', + num_proc=2, + gpu_required=0.5, + accelerator='cuda', + batch_size=1 +) + +ds = ds.process([generator]) + +output_path = './outputs/demo/mimicgen/generate' +os.makedirs(output_path, exist_ok=True) +ds.data.write_json(output_path, force_ascii=False) + +ray.shutdown() diff --git a/demos/franka_mimicgen/run_replay.py b/demos/franka_mimicgen/run_replay.py new file mode 100644 index 0000000000..f81da7e81a --- /dev/null +++ b/demos/franka_mimicgen/run_replay.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +import os +import ray +from data_juicer.core.data.ray_dataset import RayDataset +from data_juicer.ops.mapper import ReplayDemosRandomizedMapper + +ray.init(address='auto') + +# Updated path to the new data directory +ds = RayDataset(ray.data.read_json('./demos/franka_mimicgen/data/replay_tasks.jsonl')) + +replayer = ReplayDemosRandomizedMapper( + task_name='Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Cosmos-v0', + headless=True, + device='cpu', + video=True, + camera_view_list=['table', 'wrist'], + save_depth=False, + num_proc=2, + gpu_required=0.5, + input_file_key='dataset_file', + video_dir_key='video_dir', + # Updated path to the new assets directory + visual_randomization_config='./demos/franka_mimicgen/assets/visual_randomization.yaml', + accelerator='cuda', + batch_size=1 +) + +ds = ds.process([replayer]) + +output_path = './outputs/demo/mimicgen/replay' +os.makedirs(output_path, exist_ok=True) +ds.data.write_json(output_path, force_ascii=False) + +ray.shutdown() diff --git a/docs/Operators.md b/docs/Operators.md index 5772854cc4..f077bb2253 100644 --- a/docs/Operators.md +++ b/docs/Operators.md @@ -40,15 +40,15 @@ The operators in Data-Juicer are categorized into 8 types. Data-Juicer 中的算子分为以下 8 种类型。 | Type 类型 | Number 数量 | Description 描述 | -|------|:---------:|-------------| -| [aggregator](#aggregator) | 4 | Aggregate for batched samples, such as summary or conclusion. 对批量样本进行汇总,如得出总结或结论。 | -| [deduplicator](#deduplicator) | 10 | Detects and removes duplicate samples. 识别、删除重复样本。 | -| [filter](#filter) | 54 | Filters out low-quality samples. 过滤低质量样本。 | -| [formatter](#formatter) | 8 | Discovers, loads, and canonicalizes source data. 发现、加载、规范化原始数据。 | -| [grouper](#grouper) | 3 | Group samples to batched samples. 将样本分组,每一组组成一个批量样本。 | -| [mapper](#mapper) | 96 | Edits and transforms samples. 对数据样本进行编辑和转换。 | -| [pipeline](#pipeline) | 3 | Combines multiple operators into a data processing pipeline. 将多个算子组合成数据处理流水线。 | -| [selector](#selector) | 5 | Selects top samples based on ranking. 基于排序选取高质量样本。 | +|------|:------:|-------------| +| [aggregator](#aggregator) | 4 | Aggregate for batched samples, such as summary or conclusion. 对批量样本进行汇总,如得出总结或结论。 | +| [deduplicator](#deduplicator) | 10 | Detects and removes duplicate samples. 识别、删除重复样本。 | +| [filter](#filter) | 54 | Filters out low-quality samples. 过滤低质量样本。 | +| [formatter](#formatter) | 8 | Discovers, loads, and canonicalizes source data. 发现、加载、规范化原始数据。 | +| [grouper](#grouper) | 3 | Group samples to batched samples. 将样本分组,每一组组成一个批量样本。 | +| [mapper](#mapper) | 100 | Edits and transforms samples. 对数据样本进行编辑和转换。 | +| [pipeline](#pipeline) | 3 | Applies dataset-level processing; both input and output are datasets. 执行数据集级别的操作,输入和输出均为完整数据集。 | +| [selector](#selector) | 5 | Selects top samples based on ranking. 基于排序选取高质量样本。 | All the specific operators are listed below, each featured with several capability tags. 下面列出所有具体算子,每种算子都通过多个标签来注明其主要功能。 @@ -178,6 +178,7 @@ All the specific operators are listed below, each featured with several capabili | Operator 算子 | Tags 标签 | Description 描述 | Details 详情 | Reference 参考 | |----------|------|-------------|-------------|-------------| +| annotate_demos_mapper | 🔤Text 🚀GPU 🟡Beta | Automatically annotate robot demonstration episodes using Isaac Lab. 使用Isaac Lab自动注释机器人演示片段。 | - | - | | audio_add_gaussian_noise_mapper | 📣Audio 💻CPU 🟡Beta | Mapper to add Gaussian noise to audio samples. 映射器将高斯噪声添加到音频样本。 | [info](operators/mapper/audio_add_gaussian_noise_mapper.md) | - | | audio_ffmpeg_wrapped_mapper | 📣Audio 💻CPU 🟢Stable | Wraps FFmpeg audio filters for processing audio files in a dataset. 包装FFmpeg音频过滤器,用于处理数据集中的音频文件。 | [info](operators/mapper/audio_ffmpeg_wrapped_mapper.md) | - | | calibrate_qa_mapper | 🔤Text 💻CPU 🔗API 🟢Stable | Calibrates question-answer pairs based on reference text using an API model. 使用API模型根据参考文本校准问答对。 | [info](operators/mapper/calibrate_qa_mapper.md) | - | @@ -189,6 +190,7 @@ All the specific operators are listed below, each featured with several capabili | clean_html_mapper | 🔤Text 💻CPU 🟢Stable | Cleans HTML code from text samples, converting HTML to plain text. 从文本示例中清除HTML代码,将HTML转换为纯文本。 | [info](operators/mapper/clean_html_mapper.md) | - | | clean_ip_mapper | 🔤Text 💻CPU 🟢Stable | Cleans IPv4 and IPv6 addresses from text samples. 从文本示例中清除IPv4和IPv6地址。 | [info](operators/mapper/clean_ip_mapper.md) | - | | clean_links_mapper | 🔤Text 💻CPU 🟢Stable | Mapper to clean links like http/https/ftp in text samples. 映射器来清理链接,如文本示例中的http/https/ftp。 | [info](operators/mapper/clean_links_mapper.md) | - | +| convert_to_lerobot_mapper | 💻CPU 🟡Beta | Convert HDF5 datasets (MimicGen/Isaac Lab format) to LeRobot dataset format. 将HDF5数据集 (MimicGen/Isaac Lab格式) 转换为LeRobot数据集格式。 | - | - | | detect_character_attributes_mapper | 🚀GPU 🟡Beta | Takes an image, a caption, and main character names as input to extract the characters' attributes. 根据给定的图像、图像描述信息和(多个)角色名称,提取图像中主要角色的属性。 | [info](operators/mapper/detect_character_attributes_mapper.md) | [DetailMaster](https://arxiv.org/abs/2505.16915) | | detect_character_locations_mapper | 🚀GPU 🟡Beta | Given an image and a list of main character names, extract the bounding boxes for each present character. 给定一张图像和主要角色的名称列表,提取每个在场角色的边界框。(YOLOE + MLLM) | [info](operators/mapper/detect_character_locations_mapper.md) | [DetailMaster](https://arxiv.org/abs/2505.16915) | | detect_main_character_mapper | 🚀GPU 🟡Beta | Extract all main character names based on the given image and its caption. 根据给定的图像及其图像描述,提取所有主要角色的名字。 | [info](operators/mapper/detect_main_character_mapper.md) | [DetailMaster](https://arxiv.org/abs/2505.16915) | @@ -206,6 +208,7 @@ All the specific operators are listed below, each featured with several capabili | extract_support_text_mapper | 🔤Text 💻CPU 🔗API 🟢Stable | Extracts a supporting sub-text from the original text based on a given summary. 根据给定的摘要从原始文本中提取支持子文本。 | [info](operators/mapper/extract_support_text_mapper.md) | - | | extract_tables_from_html_mapper | 🔤Text 💻CPU 🟡Beta | Extracts tables from HTML content and stores them in a specified field. 从HTML内容中提取表并将其存储在指定字段中。 | [info](operators/mapper/extract_tables_from_html_mapper.md) | - | | fix_unicode_mapper | 🔤Text 💻CPU 🟢Stable | Fixes unicode errors in text samples. 修复文本示例中的unicode错误。 | [info](operators/mapper/fix_unicode_mapper.md) | - | +| generate_dataset_mapper | 🔤Text 🚀GPU 🟡Beta | Generates a mimic dataset using Isaac Lab. 使用Isaac Lab生成模拟数据集。 | - | - | | generate_qa_from_examples_mapper | 🚀GPU 🌊vLLM 🧩HF 🟢Stable | Generates question and answer pairs from examples using a Hugging Face model. 使用拥抱面部模型从示例生成问题和答案对。 | [info](operators/mapper/generate_qa_from_examples_mapper.md) | - | | generate_qa_from_text_mapper | 🔤Text 🚀GPU 🌊vLLM 🧩HF 🟢Stable | Generates question and answer pairs from text using a specified model. 使用指定的模型从文本生成问题和答案对。 | [info](operators/mapper/generate_qa_from_text_mapper.md) | - | | image_blur_mapper | 🏞Image 💻CPU 🟢Stable | Blurs images in the dataset with a specified probability and blur type. 使用指定的概率和模糊类型对数据集中的图像进行模糊处理。 | [info](operators/mapper/image_blur_mapper.md) | - | @@ -247,6 +250,7 @@ All the specific operators are listed below, each featured with several capabili | remove_table_text_mapper | 🔤Text 💻CPU 🟢Stable | Mapper to remove table texts from text samples. 映射器从文本样本中删除表文本。 | [info](operators/mapper/remove_table_text_mapper.md) | - | | remove_words_with_incorrect_substrings_mapper | 🔤Text 💻CPU 🟢Stable | Mapper to remove words containing specified incorrect substrings. 映射程序删除包含指定的不正确子字符串的单词。 | [info](operators/mapper/remove_words_with_incorrect_substrings_mapper.md) | - | | replace_content_mapper | 🔤Text 💻CPU 🟢Stable | Replaces content in the text that matches a specific regular expression pattern with a designated replacement string. 用指定的替换字符串替换与特定正则表达式模式匹配的文本中的内容。 | [info](operators/mapper/replace_content_mapper.md) | - | +| replay_demos_randomized_mapper | 🚀GPU 🟡Beta | Replay demonstrations with Isaac Lab environments and record videos. 使用Isaac实验室环境重播演示并录制视频。 | - | - | | sdxl_prompt2prompt_mapper | 🔤Text 🚀GPU 🟢Stable | Generates pairs of similar images using the SDXL model. 使用SDXL模型生成成对的相似图像。 | [info](operators/mapper/sdxl_prompt2prompt_mapper.md) | - | | sentence_augmentation_mapper | 🔤Text 🚀GPU 🧩HF 🟢Stable | Augments sentences by generating enhanced versions using a Hugging Face model. 通过使用拥抱面部模型生成增强版本来增强句子。 | [info](operators/mapper/sentence_augmentation_mapper.md) | - | | sentence_split_mapper | 🔤Text 💻CPU 🟢Stable | Splits text samples into individual sentences based on the specified language. 根据指定的语言将文本样本拆分为单个句子。 | [info](operators/mapper/sentence_split_mapper.md) | - | diff --git a/pyproject.toml b/pyproject.toml index bfe779f577..11df5ab6c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -151,6 +151,11 @@ ai_services = [ "label-studio==1.17.0", # Data labeling ] +# Simulator +simulator = [ + "ray>=2.51.0", + "usd-core", +] # All dependencies (default + all optional) all = [ @@ -161,6 +166,7 @@ all = [ "py-data-juicer[distributed]", "py-data-juicer[dev]", "py-data-juicer[ai_services]", + "py-data-juicer[simulator]", ] [project.scripts] diff --git a/tests/ops/mapper/test_annotate_demos_mapper.py b/tests/ops/mapper/test_annotate_demos_mapper.py new file mode 100644 index 0000000000..15cdb1056a --- /dev/null +++ b/tests/ops/mapper/test_annotate_demos_mapper.py @@ -0,0 +1,119 @@ +import unittest +from unittest.mock import MagicMock, patch +import os +import sys +import numpy as np +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase + +@patch.dict(sys.modules, { + 'isaaclab': MagicMock(), + 'isaaclab.app': MagicMock(), + 'isaaclab.envs.mdp.recorders.recorders_cfg': MagicMock(), + 'isaaclab.managers': MagicMock(), + 'isaaclab.utils': MagicMock(), + 'isaaclab.utils.configclass': MagicMock(), + 'isaaclab_tasks.utils.parse_cfg': MagicMock(), + 'isaaclab.utils.datasets': MagicMock(), + 'gymnasium': MagicMock(), + 'torch': MagicMock(), +}) +class AnnotateDemosMapperTest(DataJuicerTestCaseBase): + + def setUp(self): + super().setUp() + self.tmp_dir = os.path.join(os.path.dirname(__file__), 'tmp_annotate') + os.makedirs(self.tmp_dir, exist_ok=True) + self.input_path = os.path.join(self.tmp_dir, 'input.hdf5') + self.output_path = os.path.join(self.tmp_dir, 'output.hdf5') + # Create dummy input file + with open(self.input_path, 'wb') as f: + f.write(b'dummy hdf5 content') + + def tearDown(self): + super().tearDown() + if os.path.exists(self.tmp_dir): + import shutil + shutil.rmtree(self.tmp_dir) + + def test_process_batched(self): + mock_torch = sys.modules['torch'] + # Mock torch cuda + mock_torch.cuda.is_available.return_value = True + mock_torch.cuda.device_count.return_value = 1 + + # Mock isaaclab components + mock_isaac_utils = sys.modules['isaaclab.utils.datasets'] + mock_handler = MagicMock() + mock_isaac_utils.HDF5DatasetFileHandler.return_value = mock_handler + mock_handler.get_num_episodes.return_value = 1 + mock_handler.get_episode_names.return_value = ['episode_0'] + + # Mock episode data + mock_episode = MagicMock() + # Create dummy actions: 5 steps, 7 dims + actions = np.zeros((5, 7)) + initial_state = np.zeros(10) + mock_episode.data = { + "initial_state": initial_state, + "actions": actions + } + mock_handler.load_episode.return_value = mock_episode + + # Mock gym environment + mock_gym = sys.modules['gymnasium'] + mock_env = MagicMock() + mock_gym.make.return_value.unwrapped = mock_env + + # Mock recorder manager output + # This simulates the annotated data returned by Isaac Lab + mock_annotated_episode = MagicMock() + mock_annotated_episode.data = { + "obs": { + "datagen_info": { + "subtask_term_signals": { + "term_1": [0, 0, 1, 0, 0] # Simulated signal + } + } + } + } + mock_env.recorder_manager.get_episode.return_value = mock_annotated_episode + + from data_juicer.ops.mapper.annotate_demos_mapper import AnnotateDemosMapper + + op = AnnotateDemosMapper(task_name="Test-Task", headless=True) + + # Mock _create_task_env to return our mock_env and avoid importing real config classes + # We want to test _replay_and_annotate logic, so we let it run + def create_env_side_effect(): + op._env = mock_env + return mock_env + op._create_task_env = MagicMock(side_effect=create_env_side_effect) + + samples = { + 'text': ['dummy'], + 'input_file': [self.input_path], + 'output_file': [self.output_path] + } + + # Patch the function where it is imported in the module, or patch the module where it is defined + # Since ensure_isaac_sim_app is imported inside the method _annotate_file, we need to patch it in data_juicer.utils.isaac_utils + with patch('data_juicer.utils.isaac_utils.ensure_isaac_sim_app') as mock_ensure_app: + res = op.process_batched(samples) + mock_ensure_app.assert_called() + + self.assertEqual(len(res['input_file']), 1) + mock_handler.open.assert_called_with(self.input_path) + + # Verify Replay Logic + # 1. Verify reset was called + mock_env.reset_to.assert_called_with(initial_state, None, is_relative=True) + + # 2. Verify step was called for each action (5 times) + self.assertEqual(mock_env.step.call_count, 5) + + # 3. Verify recorder manager was queried + mock_env.recorder_manager.get_episode.assert_called_with(0) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/ops/mapper/test_convert_to_lerobot_mapper.py b/tests/ops/mapper/test_convert_to_lerobot_mapper.py new file mode 100644 index 0000000000..31ca71d926 --- /dev/null +++ b/tests/ops/mapper/test_convert_to_lerobot_mapper.py @@ -0,0 +1,83 @@ +import unittest +from unittest.mock import MagicMock, patch +import os +import yaml +import h5py +import numpy as np +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase + +class ConvertToLeRobotMapperTest(DataJuicerTestCaseBase): + + def setUp(self): + super().setUp() + self.tmp_dir = os.path.join(os.path.dirname(__file__), 'tmp_convert') + os.makedirs(self.tmp_dir, exist_ok=True) + self.input_path = os.path.join(self.tmp_dir, 'input.hdf5') + self.output_dir = os.path.join(self.tmp_dir, 'output') + self.config_path = os.path.join(self.tmp_dir, 'config.yaml') + self.modality_path = os.path.join(self.tmp_dir, 'modality.json') + self.info_path = os.path.join(self.tmp_dir, 'info.json') + + # Create dummy HDF5 + with h5py.File(self.input_path, 'w') as f: + data = f.create_group('data') + demo = data.create_group('demo_0') + demo.create_dataset('obs/state', data=np.random.rand(10, 7)) + demo.create_dataset('actions', data=np.random.rand(10, 7)) + demo.attrs['num_samples'] = 10 + + # Create dummy config + config = { + 'dataset': {'robot_type': 'franka', 'fps': 10}, + 'modality_template_path': self.modality_path, + 'info_template_path': self.info_path, + 'tasks': {'0': 'test task'}, + 'external_config': { + 'mapping': { + 'state': [{'key': 'obs/state'}], + 'action': [{'key': 'actions'}] + }, + 'lerobot_keys': { + 'state': 'observation.state', + 'action': 'action' + } + } + } + with open(self.config_path, 'w') as f: + yaml.dump(config, f) + + with open(self.modality_path, 'w') as f: + f.write('{}') + with open(self.info_path, 'w') as f: + f.write('{}') + + def tearDown(self): + super().tearDown() + if os.path.exists(self.tmp_dir): + import shutil + shutil.rmtree(self.tmp_dir) + + @patch('subprocess.check_output') + def test_process_batched(self, mock_subprocess): + # Mock ffprobe output + mock_subprocess.return_value = b'{"streams": [{"height": 100, "width": 100, "codec_name": "h264", "pix_fmt": "yuv420p", "r_frame_rate": "30/1"}]}' + + from data_juicer.ops.mapper.convert_to_lerobot_mapper import ConvertToLeRobotMapper + + op = ConvertToLeRobotMapper(config_path=self.config_path) + + samples = { + 'text': ['dummy'], + 'input_file': [self.input_path], + 'output_dir': [self.output_dir], + 'video_dir': [None], + 'config_path': [None] + } + + op.process_batched(samples) + + self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'meta/tasks.jsonl'))) + self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'meta/episodes.jsonl'))) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/ops/mapper/test_generate_dataset_mapper.py b/tests/ops/mapper/test_generate_dataset_mapper.py new file mode 100644 index 0000000000..fc14b7e9b8 --- /dev/null +++ b/tests/ops/mapper/test_generate_dataset_mapper.py @@ -0,0 +1,115 @@ +import unittest +from unittest.mock import MagicMock, patch +import os +import sys +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase + +@patch.dict(sys.modules, { + 'isaaclab': MagicMock(), + 'isaaclab.app': MagicMock(), + 'isaaclab.envs': MagicMock(), + 'isaaclab_mimic': MagicMock(), + 'isaaclab_mimic.datagen.utils': MagicMock(), + 'isaaclab_mimic.datagen.generation': MagicMock(), + 'gymnasium': MagicMock(), + 'torch': MagicMock(), + 'omni': MagicMock(), +}) +class GenerateDatasetMapperTest(DataJuicerTestCaseBase): + + def setUp(self): + super().setUp() + self.tmp_dir = os.path.join(os.path.dirname(__file__), 'tmp_generate') + os.makedirs(self.tmp_dir, exist_ok=True) + self.input_path = os.path.join(self.tmp_dir, 'input.hdf5') + self.output_path = os.path.join(self.tmp_dir, 'output.hdf5') + + def tearDown(self): + super().tearDown() + if os.path.exists(self.tmp_dir): + import shutil + shutil.rmtree(self.tmp_dir) + + def test_process_batched(self): + mock_torch = sys.modules['torch'] + mock_torch.cuda.is_available.return_value = True + + # Mock isaaclab utils + mock_mimic_utils = sys.modules['isaaclab_mimic.datagen.utils'] + mock_mimic_utils.get_env_name_from_dataset.return_value = 'TestEnv' + # Return real temp path to verify file existence check + mock_mimic_utils.setup_output_paths.return_value = (self.tmp_dir, 'output.hdf5') + + # Mock generation + mock_generation = sys.modules['isaaclab_mimic.datagen.generation'] + mock_generation.setup_env_config.return_value = (MagicMock(), MagicMock()) + mock_generation.setup_async_generation.return_value = { + 'tasks': [], + 'reset_queue': MagicMock(), + 'action_queue': MagicMock(), + 'info_pool': MagicMock(), + 'event_loop': MagicMock() + } + + # Mock ManagerBasedRLMimicEnv for isinstance check + class MockRLEnv: + def close(self): pass + def reset(self): pass + cfg = MagicMock() + def target_eef_pose_to_action(self): pass + sys.modules['isaaclab.envs'].ManagerBasedRLMimicEnv = MockRLEnv + + mock_gym = sys.modules['gymnasium'] + mock_env = MagicMock(spec=MockRLEnv) + # Fix inspect.signature check + def dummy_action_method(action_noise_dict=None): pass + mock_env.target_eef_pose_to_action = dummy_action_method + # Fix seed check + mock_env.cfg.datagen_config.seed = None + mock_gym.make.return_value.unwrapped = mock_env + + from data_juicer.ops.mapper.generate_dataset_mapper import GenerateDatasetMapper + + op = GenerateDatasetMapper(task_name="Test-Task", headless=True, num_envs=8, generation_num_trials=1000) + + # Case 1: Success (File created) + # Manually create the output file to simulate successful generation + with open(self.output_path, 'w') as f: + f.write('dummy content') + + samples = { + 'text': ['dummy'], + 'input_file': [self.input_path], + 'output_file': [self.output_path] + } + # Patch the function where it is imported in the module, or patch the module where it is defined + # Since ensure_isaac_sim_app is imported inside the method _generate_dataset, we need to patch it in data_juicer.utils.isaac_utils + with patch('data_juicer.utils.isaac_utils.ensure_isaac_sim_app') as mock_ensure_app: + res = op.process_batched(samples) + mock_ensure_app.assert_called() + + self.assertTrue(res['generation_result'][0]['success']) + + # Verify config passing + mock_generation.setup_env_config.assert_called_with( + env_name='Test-Task', + output_dir=self.tmp_dir, + output_file_name='output.hdf5', + num_envs=8, + device='cuda:auto', + generation_num_trials=1000 + ) + + # Case 2: Failure (File not created) + if os.path.exists(self.output_path): + os.remove(self.output_path) + + with patch('data_juicer.utils.isaac_utils.ensure_isaac_sim_app') as mock_ensure_app: + res = op.process_batched(samples) + mock_ensure_app.assert_called() + + self.assertFalse(res['generation_result'][0]['success']) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/ops/mapper/test_replay_demos_randomized_mapper.py b/tests/ops/mapper/test_replay_demos_randomized_mapper.py new file mode 100644 index 0000000000..81d35ff3c3 --- /dev/null +++ b/tests/ops/mapper/test_replay_demos_randomized_mapper.py @@ -0,0 +1,159 @@ +import unittest +from unittest.mock import MagicMock, patch +import os +import sys +import h5py +import numpy as np +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase + +@patch.dict(sys.modules, { + 'isaaclab': MagicMock(), + 'isaaclab.app': MagicMock(), + 'isaaclab_tasks': MagicMock(), + 'isaaclab_tasks.utils': MagicMock(), + 'isaaclab_tasks.utils.parse_cfg': MagicMock(), + 'isaaclab.managers': MagicMock(), + 'isaaclab.utils.assets': MagicMock(), + 'isaaclab.utils.datasets': MagicMock(), + 'gymnasium': MagicMock(), + 'torch': MagicMock(), + 'cv2': MagicMock(), +}) +class ReplayDemosRandomizedMapperTest(DataJuicerTestCaseBase): + + def setUp(self): + super().setUp() + self.tmp_dir = os.path.join(os.path.dirname(__file__), 'tmp_replay') + os.makedirs(self.tmp_dir, exist_ok=True) + self.input_path = os.path.join(self.tmp_dir, 'input.hdf5') + self.output_path = os.path.join(self.tmp_dir, 'output.hdf5') + self.video_dir = os.path.join(self.tmp_dir, 'videos') + + # Create dummy HDF5 + with h5py.File(self.input_path, 'w') as f: + data = f.create_group('data') + # Add required env_args attribute + data.attrs['env_args'] = '{"env_name": "Test-Task"}' + demo = data.create_group('demo_0') + demo.create_dataset('initial_state', data=np.zeros(10)) + demo.create_dataset('actions', data=np.zeros((10, 7))) + demo.attrs['num_samples'] = 10 + + def tearDown(self): + super().tearDown() + if os.path.exists(self.tmp_dir): + import shutil + shutil.rmtree(self.tmp_dir) + + def test_process_batched(self): + mock_torch = sys.modules['torch'] + mock_torch.cuda.is_available.return_value = True + + # Define a dummy class for torch.Tensor so isinstance checks work + class MockTensorType: + def reshape(self, *args, **kwargs): pass + def cpu(self): pass + def numpy(self): pass + def __getitem__(self, item): pass + ndim = 1 + shape = (1, 7) + + # Create a mock tensor instance that passes isinstance(x, MockTensorType) + mock_tensor = MagicMock(spec=MockTensorType) + mock_tensor.ndim = 1 + mock_tensor.shape = (1, 7) + mock_tensor.reshape.return_value = mock_tensor + mock_tensor.cpu.return_value = mock_tensor + mock_tensor.numpy.return_value = np.zeros((10, 10, 3), dtype=np.uint8) + mock_tensor.__getitem__.return_value = 0.0 + + # Set torch.Tensor to be the type itself + mock_torch.Tensor = MockTensorType + # Ensure torch.tensor() returns our mock instance + mock_torch.tensor.side_effect = lambda data, **kwargs: mock_tensor + + # Mock isaaclab datasets + mock_isaac_utils = sys.modules['isaaclab.utils.datasets'] + mock_handler = MagicMock() + mock_isaac_utils.HDF5DatasetFileHandler.return_value = mock_handler + mock_handler.get_episode_names.return_value = ['demo_0'] + mock_episode = MagicMock() + # Ensure both initial_state and actions are present in data + mock_episode.data = { + 'initial_state': MagicMock(), + 'actions': MagicMock() # Mock actions tensor/array + } + mock_episode.get_next_action.side_effect = [np.zeros(7), None] + + # Mock actions to be iterable (length 1) + mock_episode.data['actions'].__len__.return_value = 1 + mock_episode.data['actions'].__getitem__.return_value = np.zeros(7) + + mock_handler.load_episode.return_value = mock_episode + + # Mock gym + mock_gym = sys.modules['gymnasium'] + mock_env = MagicMock() + mock_env.reset.return_value = (MagicMock(), {}) + mock_env.step.return_value = (MagicMock(), 0.0, False, False, {}) + # Mock device to be a string or valid device object, not a MagicMock + mock_env.device = 'cpu' + mock_gym.make.return_value.unwrapped = mock_env + + # Mock parse_env_cfg to return an object we can inspect for randomization injection + mock_env_cfg = MagicMock() + mock_env_cfg.events = MagicMock() # Ensure events attribute exists + # Ensure other attributes checked by _create_env exist to avoid AttributeErrors + mock_env_cfg.terminations = MagicMock() + mock_env_cfg.observations = MagicMock() + mock_env_cfg.sim = MagicMock() + + mock_isaac_tasks_utils = sys.modules['isaaclab_tasks.utils.parse_cfg'] + mock_isaac_tasks_utils.parse_env_cfg.return_value = mock_env_cfg + + # Create dummy visual randomization config + config_path = os.path.join(self.tmp_dir, 'rand.yaml') + with open(config_path, 'w') as f: + f.write('test_event:\n func: test_func\n params: {}\n') + + from data_juicer.ops.mapper.replay_demos_randomized_mapper import ReplayDemosRandomizedMapper + + # Initialize with visual randomization config + op = ReplayDemosRandomizedMapper( + task_name="Test-Task", + headless=True, + video=True, + camera_view_list=['front'], + visual_randomization_config=config_path + ) + # Mock _inject_visual_randomization to avoid complex config resolution issues in test + op._inject_visual_randomization = MagicMock() + + # Mock subprocess to verify ffmpeg call + with patch('subprocess.run') as mock_run, \ + patch('data_juicer.utils.isaac_utils.ensure_isaac_sim_app') as mock_ensure_app: + samples = { + 'text': ['dummy'], + 'dataset_file': [self.input_path], + 'output_file': [self.output_path], + 'video_dir': [self.video_dir] + } + + res = op.process_batched(samples) + + self.assertTrue(res['replay_success'][0]) + mock_ensure_app.assert_called() + + # Verify ffmpeg was called + self.assertTrue(mock_run.called) + args, _ = mock_run.call_args + cmd = args[0] + self.assertIn('ffmpeg', cmd) + self.assertIn(str(20.0), cmd) # Default framerate + + # Verify visual randomization was injected (called) + op._inject_visual_randomization.assert_called() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/utils/test_isaac_utils.py b/tests/utils/test_isaac_utils.py new file mode 100644 index 0000000000..74989c4441 --- /dev/null +++ b/tests/utils/test_isaac_utils.py @@ -0,0 +1,157 @@ +import unittest +from unittest.mock import MagicMock, patch, mock_open +import sys +import os +from typing import Dict, List, Union, cast +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase +from data_juicer.utils.isaac_utils import ( + LazyStreamRedirector, + init_isaac_sim_app, + ensure_isaac_sim_app, + cleanup_isaac_env, + resolve_nucleus_paths, + create_video_from_images +) + +class TestIsaacUtils(DataJuicerTestCaseBase): + + def test_lazy_stream_redirector(self): + # Mock sys streams + with patch('sys.stdin', new_callable=MagicMock) as mock_stdin, \ + patch('sys.stdout', new_callable=MagicMock) as mock_stdout, \ + patch('sys.stderr', new_callable=MagicMock) as mock_stderr: + + # Set up real streams mocks + sys.__stdin__ = MagicMock() + sys.__stdout__ = MagicMock() + sys.__stderr__ = MagicMock() + + with LazyStreamRedirector(): + self.assertEqual(sys.stdin, sys.__stdin__) + self.assertEqual(sys.stdout, sys.__stdout__) + self.assertEqual(sys.stderr, sys.__stderr__) + + # Should be restored + self.assertEqual(sys.stdin, mock_stdin) + self.assertEqual(sys.stdout, mock_stdout) + self.assertEqual(sys.stderr, mock_stderr) + + # Clean up + del sys.__stdin__ + del sys.__stdout__ + del sys.__stderr__ + + @patch('data_juicer.utils.isaac_utils.torch') + @patch('data_juicer.utils.isaac_utils.logger') + def test_init_isaac_sim_app(self, mock_logger, mock_torch): + # Mock torch cuda availability + mock_torch.cuda.is_available.return_value = True + mock_torch.cuda.device_count.return_value = 1 + + # Mock isaaclab.app.AppLauncher + with patch.dict(sys.modules, {'isaaclab.app': MagicMock()}): + # We need to mock the import inside the function or ensure sys.modules is checked + # Since the function does `from isaaclab.app import AppLauncher`, + # and we patched sys.modules, it should work. + + # We also need to mock AppLauncher class on the mocked module + mock_module = sys.modules['isaaclab.app'] + mock_launcher_class = MagicMock() + mock_module.AppLauncher = mock_launcher_class + + mock_launcher_instance = MagicMock() + mock_launcher_class.return_value = mock_launcher_instance + + # Mock faulthandler + with patch('data_juicer.utils.isaac_utils.faulthandler') as mock_faulthandler: + app = init_isaac_sim_app(headless=True, device="cuda:0") + + self.assertTrue(mock_torch.cuda.empty_cache.called) + self.assertTrue(mock_launcher_class.called) + self.assertEqual(app, mock_launcher_instance.app) + + @patch('data_juicer.utils.isaac_utils.init_isaac_sim_app') + def test_ensure_isaac_sim_app(self, mock_init): + class MockMapper: + def __init__(self): + self.headless = True + self.device = "cuda:0" + self.enable_cameras = False + self._isaac_initialized = False + self._simulation_app = None + + mapper = MockMapper() + + # Mock imports that happen inside ensure_isaac_sim_app + with patch.dict(sys.modules, { + 'isaaclab_mimic.envs': MagicMock(), + 'isaaclab_tasks': MagicMock() + }): + ensure_isaac_sim_app(mapper, mode='mimic') + + self.assertTrue(mock_init.called) + self.assertTrue(mapper._isaac_initialized) + self.assertIsNotNone(mapper._simulation_app) + + # Call again, should not init + mock_init.reset_mock() + ensure_isaac_sim_app(mapper, mode='mimic') + self.assertFalse(mock_init.called) + + def test_cleanup_isaac_env(self): + class MockMapper: + def __init__(self): + self._env = MagicMock() + self._faulthandler_file = MagicMock() + + mapper = MockMapper() + mock_env = mapper._env + mock_fh = mapper._faulthandler_file + + # Patch sys.modules to intercept the 'import faulthandler' inside the function + mock_faulthandler = MagicMock() + with patch.dict(sys.modules, {'faulthandler': mock_faulthandler}): + res = cleanup_isaac_env(mapper) + + self.assertTrue(mock_env.close.called) + self.assertIsNone(mapper._env) + self.assertTrue(mock_faulthandler.disable.called) + self.assertTrue(mock_fh.close.called) + self.assertIsNone(mapper._faulthandler_file) + self.assertEqual(res, {"status": "cleaned"}) + + def test_resolve_nucleus_paths(self): + # Mock isaaclab.utils.assets + mock_assets = MagicMock() + mock_assets.ISAAC_NUCLEUS_DIR = "/isaac/nucleus" + mock_assets.NVIDIA_NUCLEUS_DIR = "/nvidia/nucleus" + + with patch.dict(sys.modules, {'isaaclab.utils.assets': mock_assets}): + config = { + "path1": "{ISAAC_NUCLEUS_DIR}/item", + "path2": "{NVIDIA_NUCLEUS_DIR}/item", + "nested": ["{ISAAC_NUCLEUS_DIR}/list"] + } + + resolved = cast(Dict, resolve_nucleus_paths(config)) + + self.assertIsInstance(resolved, dict) + self.assertEqual(resolved["path1"], "/isaac/nucleus/item") + self.assertEqual(resolved["path2"], "/nvidia/nucleus/item") + self.assertEqual(resolved["nested"][0], "/isaac/nucleus/list") + + @patch('subprocess.run') + def test_create_video_from_images(self, mock_run): + mock_run.return_value = MagicMock(returncode=0) + + res = create_video_from_images("frame_%d.png", "out.mp4") + + self.assertTrue(res) + mock_run.assert_called_once() + args = mock_run.call_args[0][0] + self.assertIn("ffmpeg", args) + self.assertIn("frame_%d.png", args) + self.assertIn("out.mp4", args) + +if __name__ == '__main__': + unittest.main()