From 6b343f046722b744b8fa79aa21acf61d32b23868 Mon Sep 17 00:00:00 2001 From: Andrey170170 Date: Sun, 16 Feb 2025 16:09:37 -0500 Subject: [PATCH 1/3] Added transfer_and_type_change tool --- .../transfer_and_type_change/__init__.py | 0 .../transfer_and_type_change/classes.py | 136 +++++++++ .../transfer_and_type_change/filter.py | 22 ++ src/DD_tools/transfer_and_type_change/main.py | 286 ++++++++++++++++++ .../transfer_and_type_change/runner.py | 27 ++ .../transfer_and_type_change/scheduler.py | 29 ++ .../tools_filter.slurm | 20 ++ .../tools_scheduler.slurm | 32 ++ .../transfer_and_type_change/tools_submit.sh | 55 ++++ .../tools_verifier.slurm | 31 ++ .../tools_worker.slurm | 32 ++ .../transfer_and_type_change/verification.py | 40 +++ 12 files changed, 710 insertions(+) create mode 100644 src/DD_tools/transfer_and_type_change/__init__.py create mode 100644 src/DD_tools/transfer_and_type_change/classes.py create mode 100644 src/DD_tools/transfer_and_type_change/filter.py create mode 100644 src/DD_tools/transfer_and_type_change/main.py create mode 100644 src/DD_tools/transfer_and_type_change/runner.py create mode 100644 src/DD_tools/transfer_and_type_change/scheduler.py create mode 100644 src/DD_tools/transfer_and_type_change/tools_filter.slurm create mode 100644 src/DD_tools/transfer_and_type_change/tools_scheduler.slurm create mode 100644 src/DD_tools/transfer_and_type_change/tools_submit.sh create mode 100644 src/DD_tools/transfer_and_type_change/tools_verifier.slurm create mode 100644 src/DD_tools/transfer_and_type_change/tools_worker.slurm create mode 100644 src/DD_tools/transfer_and_type_change/verification.py diff --git a/src/DD_tools/transfer_and_type_change/__init__.py b/src/DD_tools/transfer_and_type_change/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/DD_tools/transfer_and_type_change/classes.py b/src/DD_tools/transfer_and_type_change/classes.py new file mode 100644 index 0000000..1cf5ca5 --- /dev/null +++ b/src/DD_tools/transfer_and_type_change/classes.py @@ -0,0 +1,136 @@ +import glob +import os +from typing import List + +import pandas as pd + +from DD_tools.main.config import Config +from DD_tools.main.filters import PythonFilterToolBase +from DD_tools.main.runners import MPIRunnerTool +from DD_tools.main.schedulers import DefaultScheduler + + +class Filter(PythonFilterToolBase): + def __init__(self, cfg: Config): + super().__init__(cfg) + + self.filter_name: str = "transfer_and_type_change" + + def run(self): + pass + + +class ScheduleCreation(DefaultScheduler): + def __init__(self, cfg: Config, seq_id: int): + super().__init__(cfg) + + self.filter_name: str = "transfer_and_type_change" + self.seq_id = seq_id + + def run(self): + assert self.filter_name is not None, ValueError("filter name is not set") + + filter_folder = os.path.join( + self.tools_path, self.filter_name, str(self.seq_id).zfill(4) + ) + filter_table_folder = os.path.join(filter_folder, "filter_table") + + all_files = glob.glob(os.path.join(filter_table_folder, "*.csv")) + df: pd.DataFrame = pd.concat( + (pd.read_csv(f) for f in all_files), ignore_index=True + ) + df = df[["source", "server", "file_name"]] + df = df.drop_duplicates(subset=["source", "server", "file_name"]).reset_index( + drop=True + ) + df["rank"] = df.index % self.total_workers + + df.to_csv(os.path.join(filter_folder, "schedule.csv"), header=True, index=False) + + +class Runner(MPIRunnerTool): + def __init__(self, cfg: Config, seq_id: int): + super().__init__(cfg) + + self.filter_name: str = "transfer_and_type_change" + self.data_scheme: List[str] = ["source", "server", "file_name"] + self.verification_scheme: List[str] = ["source", "server", "file_name"] + self.total_time = 150 + self.seq_id = seq_id + + self.src_path = self.config["src_path"] + self.dst_path = self.config["dst_path"] + + def ensure_folders_created(self): + assert self.filter_name is not None, ValueError("filter name is not set") + assert self.verification_scheme is not None, ValueError( + "verification scheme is not set" + ) + + self.filter_folder = os.path.join( + self.tools_path, self.filter_name, str(self.seq_id).zfill(4) + ) + self.filter_table_folder = os.path.join(self.filter_folder, "filter_table") + self.verification_folder = os.path.join(self.filter_folder, "verification") + + os.makedirs(self.verification_folder, exist_ok=True) + + def apply_filter_different( + self, filtering_df: pd.DataFrame, source: str, server: str, file_name: str + ) -> int: + self.is_enough_time() + + src_path = os.path.join( + self.src_path, f"source={source}", "data", f"server={server}", file_name + ) + dst_path = os.path.join( + self.dst_path, f"source={source}", f"server={server}", file_name + ) + os.makedirs( + os.path.join( + self.dst_path, + f"source={source}", + f"server={server}", + ), + exist_ok=True, + ) + + if not os.path.exists(src_path): + self.logger.info(f"Path doesn't exists: {src_path}") + return 0 + + renamed_parquet = pd.read_parquet(src_path) + + self.is_enough_time() + + renamed_parquet = renamed_parquet.astype({"source_id": "string"}) + + renamed_parquet.to_parquet( + dst_path, index=False, compression="zstd", compression_level=3 + ) + + os.remove(src_path) + + return len(renamed_parquet) + + def runner_fn(self, df_local: pd.DataFrame) -> int: + filtering_df = df_local.reset_index(drop=True) + source = filtering_df.iloc[0]["source"] + server = filtering_df.iloc[0]["server"] + file_name = filtering_df.iloc[0]["file_name"] + try: + filtered_parquet_length = self.apply_filter_different( + filtering_df, source, server, file_name + ) + except NotImplementedError: + raise NotImplementedError("Filter function wasn't implemented") + except Exception as e: + self.logger.exception(e) + self.logger.error(f"Error occurred: {e}") + return 0 + else: + print(f"{source},{server},{file_name}", end="\n", file=self.verification_IO) + self.logger.debug( + f"Completed filtering: {source}/{server}/{file_name} with {filtered_parquet_length}" + ) + return 1 diff --git a/src/DD_tools/transfer_and_type_change/filter.py b/src/DD_tools/transfer_and_type_change/filter.py new file mode 100644 index 0000000..df8af2e --- /dev/null +++ b/src/DD_tools/transfer_and_type_change/filter.py @@ -0,0 +1,22 @@ +import os + +from DD_tools.main.config import Config +from DD_tools.main.utils import init_logger +from DD_tools.transfer_and_type_change.classes import Filter + +if __name__ == "__main__": + config_path = os.environ.get("CONFIG_PATH") + if config_path is None: + raise ValueError("CONFIG_PATH not set") + + config = Config.from_path(config_path, "tools") + logger = init_logger(__name__) + + tool_filter = Filter(config) + + logger.info("Starting filter") + tool_filter.run() + + logger.info("completed filtering") + + tool_filter = None diff --git a/src/DD_tools/transfer_and_type_change/main.py b/src/DD_tools/transfer_and_type_change/main.py new file mode 100644 index 0000000..0384c12 --- /dev/null +++ b/src/DD_tools/transfer_and_type_change/main.py @@ -0,0 +1,286 @@ +import argparse +import os +from logging import Logger +from typing import Dict, List, Optional, TextIO, Tuple + +import pandas as pd +from attr import Factory, define, field + +from DD_tools.main.checkpoint import Checkpoint +from DD_tools.main.config import Config +from DD_tools.main.registry import ToolsRegistryBase +from DD_tools.main.utils import ( + init_logger, + truncate_paths, + ensure_created, + submit_job, + preprocess_dep_ids, +) + +division_df = pd.read_csv( + "/users/PAS2119/andreykopanev/distributed_downloader_test/data_move_division_big.csv" +) + + +@define +class Tools: + config: Config + tool_name: str + seq_id: int + + logger: Logger = field(default=Factory(lambda: init_logger(__name__))) + + tool_folder: Optional[str] = None + tool_job_history_path: Optional[str] = None + tool_checkpoint_path: Optional[str] = None + checkpoint_scheme: Optional[Dict[str, bool]] = None + + tool_checkpoint: Optional[Checkpoint] = None + _checkpoint_override: Optional[Dict[str, bool]] = None + tool_job_history: Optional[List[int]] = None + tool_job_history_io: Optional[TextIO] = None + + @classmethod + def from_path( + cls, + path: str, + tool_name: str, + set_id: int, + checkpoint_override: Optional[Dict[str, bool]] = None, + tool_name_override: Optional[bool] = False, + ) -> "Tools": + if ( + not tool_name_override + and tool_name not in ToolsRegistryBase.TOOLS_REGISTRY.keys() + ): + raise ValueError("unknown tool name") + + return cls( + config=Config.from_path(path, "tools"), + tool_name=tool_name, + checkpoint_override=checkpoint_override, + seq_id=set_id, + ) + + def __attrs_post_init__(self): + # noinspection PyTypeChecker + self.tool_folder: str = os.path.join( + self.config.get_folder("tools_folder"), + self.tool_name, + str(self.seq_id).zfill(4), + ) + self.tool_job_history_path: str = os.path.join( + self.tool_folder, "job_history.csv" + ) + self.tool_checkpoint_path: str = os.path.join( + self.tool_folder, "tool_checkpoint.yaml" + ) + + if not self.checkpoint_scheme: + self.checkpoint_scheme = { + "filtered": False, + "schedule_created": False, + "completed": False, + } + + self.__init_environment() + self.__init_file_structure() + + def __init_environment(self) -> None: + os.environ["CONFIG_PATH"] = self.config.config_path + + os.environ["ACCOUNT"] = self.config["account"] + os.environ["PATH_TO_INPUT"] = self.config["path_to_input"] + + os.environ["PATH_TO_OUTPUT"] = self.config["path_to_output_folder"] + for output_folder, output_path in self.config.folder_structure.items(): + os.environ["OUTPUT_" + output_folder.upper()] = output_path + os.environ["OUTPUT_TOOLS_LOGS_FOLDER"] = os.path.join(self.tool_folder, "logs") + + for downloader_var, downloader_value in self.config["tools_parameters"].items(): + os.environ["TOOLS_" + downloader_var.upper()] = str(downloader_value) + + self.logger.info("Environment initialized") + + def __init_file_structure(self): + ensure_created( + [ + self.tool_folder, + os.path.join(self.tool_folder, "filter_table"), + os.path.join(self.tool_folder, "verification"), + os.path.join(self.tool_folder, "logs"), + ] + ) + + self.tool_checkpoint = Checkpoint.from_path( + self.tool_checkpoint_path, self.checkpoint_scheme + ) + if self._checkpoint_override is not None: + for key, value in self._checkpoint_override.items(): + if key == "verification": + truncate_paths([os.path.join(self.tool_folder, "verification")]) + self.tool_checkpoint["completed"] = False + continue + if key not in self.checkpoint_scheme.keys(): + raise KeyError("Unknown key for override in checkpoint") + + self.tool_checkpoint[key] = value + + self.tool_job_history, self.tool_job_history_io = self.__load_job_history() + + def __load_job_history(self) -> Tuple[List[int], TextIO]: + job_ids = [] + + if os.path.exists(self.tool_job_history_path): + df = pd.read_csv(self.tool_job_history_path) + job_ids = df["job_ids"].to_list() + else: + with open(self.tool_job_history_path, "w") as f: + print("job_ids", file=f) + + job_io = open(self.tool_job_history_path, "a") + + return job_ids, job_io + + def __update_job_history(self, new_id: int) -> None: + self.tool_job_history.append(new_id) + print(new_id, file=self.tool_job_history_io) + + def __schedule_filtering(self) -> None: + self.logger.info("Scheduling filtering script") + + sub_division = division_df[division_df["division"] == self.seq_id] + sub_division.to_csv( + os.path.join(self.tool_folder, "filter_table", "division.csv"), + index=False, + header=True, + ) + + self.tool_checkpoint["filtered"] = True + self.logger.info("Scheduled filtering script") + + def __schedule_schedule_creation(self) -> None: + self.logger.info("Scheduling schedule creation script") + job_id = submit_job( + self.config.get_script("tools_submitter"), + self.config.get_script("tools_scheduling_script"), + self.tool_name, + str(self.seq_id), + *preprocess_dep_ids( + [self.tool_job_history[-1] if len(self.tool_job_history) != 0 else None] + ), + ) + self.__update_job_history(job_id) + self.tool_checkpoint["schedule_created"] = True + self.logger.info("Scheduled schedule creation script") + + def __schedule_workers(self) -> None: + self.logger.info("Scheduling workers script") + + for _ in range(self.config["tools_parameters"]["num_workers"]): + job_id = submit_job( + self.config.get_script("tools_submitter"), + self.config.get_script("tools_worker_script"), + self.tool_name, + str(self.seq_id), + *preprocess_dep_ids([self.tool_job_history[-1]]), + ) + self.__update_job_history(job_id) + + job_id = submit_job( + self.config.get_script("tools_submitter"), + self.config.get_script("tools_verification_script"), + self.tool_name, + str(self.seq_id), + *preprocess_dep_ids([self.tool_job_history[-1]]), + ) + self.__update_job_history(job_id) + + self.logger.info("Scheduled workers script") + + def apply_tool(self): + if not self.tool_checkpoint.get("filtered", False): + self.__schedule_filtering() + else: + self.logger.info("Skipping filtering script: table already created") + + if not self.tool_checkpoint.get("schedule_created", False): + self.__schedule_schedule_creation() + else: + self.logger.info( + "Skipping schedule creation script: schedule already created" + ) + + if not self.tool_checkpoint.get("completed", False): + self.__schedule_workers() + else: + self.logger.error("Tool completed its job") + + def __del__(self): + if self.tool_job_history_io is not None: + self.tool_job_history_io.close() + + +def main(): + parser = argparse.ArgumentParser(description="Tools") + parser.add_argument( + "config_path", + metavar="config_path", + type=str, + help="the name of the tool that is intended to be used", + ) + parser.add_argument( + "tool_name", + metavar="tool_name", + type=str, + help="the name of the tool that is intended to be used", + ) + parser.add_argument( + "--reset_filtering", + action="store_true", + help="Will reset filtering and scheduling steps", + ) + parser.add_argument( + "--reset_scheduling", action="store_true", help="Will reset scheduling step" + ) + parser.add_argument( + "--reset_runners", + action="store_true", + help="Will reset runners, making them to start over", + ) + parser.add_argument( + "--tool_name_override", + action="store_true", + help="Will override tool name check (allows for custom tool run)", + ) + _args = parser.parse_args() + + config_path = _args.config_path + tool_name = _args.tool_name + state_override = None + if _args.reset_filtering: + state_override = { + "filtered": False, + "schedule_created": False, + "verification": False, + } + elif _args.reset_scheduling: + state_override = {"schedule_created": False} + if _args.reset_runners: + state_override = {"verification": False} + + division_total = division_df["division"].unique().tolist() + + for division in division_total: + dd = Tools.from_path( + config_path, + tool_name, + division, + checkpoint_override=state_override, + tool_name_override=_args.tool_name_override, + ) + dd.apply_tool() + + +if __name__ == "__main__": + main() diff --git a/src/DD_tools/transfer_and_type_change/runner.py b/src/DD_tools/transfer_and_type_change/runner.py new file mode 100644 index 0000000..4253c54 --- /dev/null +++ b/src/DD_tools/transfer_and_type_change/runner.py @@ -0,0 +1,27 @@ +import argparse +import os + +from DD_tools.main.config import Config +from DD_tools.main.utils import init_logger +from DD_tools.transfer_and_type_change.classes import Runner + +if __name__ == "__main__": + config_path = os.environ.get("CONFIG_PATH") + if config_path is None: + raise ValueError("CONFIG_PATH not set") + + config = Config.from_path(config_path, "tools") + logger = init_logger(__name__) + + parser = argparse.ArgumentParser(description='Running step of the Tool') + parser.add_argument("seq_id", metavar="seq_id", type=int, + help="the name of the tool that is intended to be used") + _args = parser.parse_args() + seq_id = _args.seq_id + + tool_filter = Runner(config, seq_id) + + logger.info("Starting runner") + tool_filter.run() + + logger.info("completed runner") diff --git a/src/DD_tools/transfer_and_type_change/scheduler.py b/src/DD_tools/transfer_and_type_change/scheduler.py new file mode 100644 index 0000000..aa5ab98 --- /dev/null +++ b/src/DD_tools/transfer_and_type_change/scheduler.py @@ -0,0 +1,29 @@ +import argparse +import os +import pprint + +from DD_tools.main.config import Config +from DD_tools.main.utils import init_logger +from DD_tools.transfer_and_type_change.classes import ScheduleCreation + +if __name__ == "__main__": + config_path = os.environ.get("CONFIG_PATH") + if config_path is None: + raise ValueError("CONFIG_PATH not set") + + config = Config.from_path(config_path, "tools") + logger = init_logger(__name__) + + parser = argparse.ArgumentParser(description='Running step of the Tool') + parser.add_argument("seq_id", metavar="seq_id", type=int, + help="the name of the tool that is intended to be used") + _args = parser.parse_args() + logger.info(pprint.pformat(_args)) + seq_id = _args.seq_id + + tool_filter = ScheduleCreation(config, seq_id) + + logger.info("Starting scheduler") + tool_filter.run() + + logger.info("completed scheduler") diff --git a/src/DD_tools/transfer_and_type_change/tools_filter.slurm b/src/DD_tools/transfer_and_type_change/tools_filter.slurm new file mode 100644 index 0000000..ddb1c9d --- /dev/null +++ b/src/DD_tools/transfer_and_type_change/tools_filter.slurm @@ -0,0 +1,20 @@ +#!/bin/bash +#SBATCH --job-name tool_filter +#SBATCH --mem=0 + +logs_dir="${OUTPUT_TOOLS_LOGS_FOLDER}" +mkdir -p "$logs_dir" + +driver_memory="110G" +executor_memory="64G" + +module load spark/3.4.1 +module load miniconda3/23.3.1-py310 +source "${REPO_ROOT}/.venv/bin/activate" +export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" + +pbs-spark-submit \ + --driver-memory $driver_memory \ + --executor-memory $executor_memory \ + "/users/PAS2119/andreykopanev/distributed_downloader_test/column_name_change/filter.py" \ + > "${logs_dir}/tool_filter.log" diff --git a/src/DD_tools/transfer_and_type_change/tools_scheduler.slurm b/src/DD_tools/transfer_and_type_change/tools_scheduler.slurm new file mode 100644 index 0000000..72de5fc --- /dev/null +++ b/src/DD_tools/transfer_and_type_change/tools_scheduler.slurm @@ -0,0 +1,32 @@ +#!/bin/bash +#SBATCH --job-name tool_scheduler +#SBATCH --mem=0 +#SBATCH --time=00:05:00 + +if [ "$#" -eq 0 ]; then + echo "Usage: $0 tool_name seq_id" + exit 1 +fi + +tool_name=$1 +seq_id=$2 + +logs_dir="${OUTPUT_TOOLS_LOGS_FOLDER}" +mkdir -p "$logs_dir" + +module load intel/2021.10.0 +module load intelmpi/2021.10 +module load miniconda3/23.3.1-py310 +source "${REPO_ROOT}/.venv/bin/activate" +export PYARROW_IGNORE_TIMEZONE=1 +export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 +export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" + +srun \ + --mpi=pmi2 \ + --nodes=1 \ + --ntasks-per-node=1 \ + --cpus-per-task=1 \ + --mem=0 \ + --output="${logs_dir}/tool_scheduler.log" \ + python "/users/PAS2119/andreykopanev/distributed_downloader_test/transfer_and_type_change/scheduler.py" "${seq_id}" diff --git a/src/DD_tools/transfer_and_type_change/tools_submit.sh b/src/DD_tools/transfer_and_type_change/tools_submit.sh new file mode 100644 index 0000000..16f0f40 --- /dev/null +++ b/src/DD_tools/transfer_and_type_change/tools_submit.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +set -e + +SCRIPTS_DIR=$(dirname "$(realpath "$0")") +REPO_ROOT=$(dirname "$(realpath "${SCRIPTS_DIR}")") +export REPO_ROOT +logs_dir="${OUTPUT_TOOLS_LOGS_FOLDER}" +mkdir -p "${logs_dir}" + +# Check if any arguments were passed +if [ "$#" -eq 0 ]; then + echo "Usage: $0 script1 tool_name seq_id [dependency] [--spark]" + exit 1 +fi + +script=$1 +if [ ! -f "$script" ]; then + echo "Error: File '$script' not found" + exit 1 +fi + +filename=$(basename "$script") +ext="${filename##*.}" +base_filename=$(basename "${filename}" ."${ext}") +tool_name=$2 +seq_id=$3 +dependency="" +spark_flag="" + +if [ "$4" == "--spark" ]; then + spark_flag="--spark" + dependency="$5" +else + dependency="$4" + if [ "$5" == "--spark" ]; then + spark_flag="--spark" + fi +fi + +sbatch_cmd="sbatch --output=\"${logs_dir}/${base_filename}.out\" --error=\"${logs_dir}/${base_filename}.err\" --nodes=${TOOLS_MAX_NODES}" + +if [ -n "$dependency" ]; then + sbatch_cmd+=" --dependency=afterany:${dependency}" +fi + +if [ -z "$spark_flag" ]; then +# sbatch_cmd+=" --ntasks-per-node=${TOOLS_WORKERS_PER_NODE} --cpus-per-task=${TOOLS_CPU_PER_WORKER}" + sbatch_cmd+=" --begin=now+${seq_id}days" +fi + +sbatch_cmd+=" --account=${ACCOUNT} ${script} ${tool_name} ${seq_id}" + +#echo "$sbatch_cmd" > test.log +eval "$sbatch_cmd" diff --git a/src/DD_tools/transfer_and_type_change/tools_verifier.slurm b/src/DD_tools/transfer_and_type_change/tools_verifier.slurm new file mode 100644 index 0000000..0e279e5 --- /dev/null +++ b/src/DD_tools/transfer_and_type_change/tools_verifier.slurm @@ -0,0 +1,31 @@ +#!/bin/bash +#SBATCH --job-name tool_verifier +#SBATCH --mem=0 +#SBATCH --time=00:05:00 + +if [ "$#" -eq 0 ]; then + echo "Usage: $0 tool_name seq_id" + exit 1 +fi + +tool_name=$1 +seq_id=$2 + +logs_dir="${OUTPUT_TOOLS_LOGS_FOLDER}" +mkdir -p "$logs_dir" + +module load intel/2021.10.0 +module load intelmpi/2021.10 +module load miniconda3/23.3.1-py310 +source "${REPO_ROOT}/.venv/bin/activate" +export PYARROW_IGNORE_TIMEZONE=1 +export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 + +srun \ + --mpi=pmi2 \ + --nodes=1 \ + --ntasks-per-node=1 \ + --cpus-per-task=1 \ + --mem=0 \ + --output="${logs_dir}/tool_verifier.log" \ + python "/users/PAS2119/andreykopanev/distributed_downloader_test/transfer_and_type_change/verification.py" "${seq_id}" diff --git a/src/DD_tools/transfer_and_type_change/tools_worker.slurm b/src/DD_tools/transfer_and_type_change/tools_worker.slurm new file mode 100644 index 0000000..013406a --- /dev/null +++ b/src/DD_tools/transfer_and_type_change/tools_worker.slurm @@ -0,0 +1,32 @@ +#!/bin/bash +#SBATCH --job-name tool_worker +#SBATCH --mem=0 +#SBATCH --time=02:00:00 + +if [ "$#" -eq 0 ]; then + echo "Usage: $0 tool_name seq_id" + exit 1 +fi + +tool_name=$1 +seq_id=$2 + +logs_dir="${OUTPUT_TOOLS_LOGS_FOLDER}" +mkdir -p "$logs_dir" + +module load intel/2021.10.0 +module load intelmpi/2021.10 +module load miniconda3/23.3.1-py310 +source "${REPO_ROOT}/.venv/bin/activate" +export PYARROW_IGNORE_TIMEZONE=1 +export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 +export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" + +srun \ + --mpi=pmi2 \ + --nodes="$TOOLS_MAX_NODES" \ + --ntasks-per-node="$TOOLS_WORKERS_PER_NODE" \ + --cpus-per-task="$TOOLS_CPU_PER_WORKER" \ + --mem=0 \ + --output="${logs_dir}/tool_worker-%2t.log" \ + python "/users/PAS2119/andreykopanev/distributed_downloader_test/transfer_and_type_change/runner.py" "${seq_id}" diff --git a/src/DD_tools/transfer_and_type_change/verification.py b/src/DD_tools/transfer_and_type_change/verification.py new file mode 100644 index 0000000..9327d57 --- /dev/null +++ b/src/DD_tools/transfer_and_type_change/verification.py @@ -0,0 +1,40 @@ +import argparse +import os + +import pandas as pd + +from DD_tools.main.checkpoint import Checkpoint +from DD_tools.main.config import Config +from DD_tools.main.runners import MPIRunnerTool +from DD_tools.main.utils import init_logger + +if __name__ == "__main__": + config_path = os.environ.get("CONFIG_PATH") + if config_path is None: + raise ValueError("CONFIG_PATH not set") + + config = Config.from_path(config_path, "tools") + logger = init_logger(__name__) + + parser = argparse.ArgumentParser(description='Running step of the Tool') + parser.add_argument("seq_id", metavar="seq_id", type=int, + help="the name of the tool that is intended to be used") + _args = parser.parse_args() + tool_name = "transfer_and_type_change" + seq_id = _args.seq_id + + tool_folder = os.path.join(config.get_folder("tools_folder"), tool_name, str(seq_id).zfill(4)) + checkpoint = Checkpoint.from_path(os.path.join(tool_folder, "tool_checkpoint.yaml"), {"completed": False}) + schedule_df = pd.read_csv(os.path.join(tool_folder, "schedule.csv")) + verification_df = MPIRunnerTool.load_table(os.path.join(tool_folder, "verification"), + ["source", "server", "file_name"]) + + outer_join = schedule_df.merge(verification_df, how='outer', indicator=True, on=["source", "server", "file_name"]) + left = outer_join[(outer_join["_merge"] == 'left_only')].drop('_merge', axis=1) + + if len(left) == 0: + checkpoint["completed"] = True + + logger.info("Tool completed its job") + else: + logger.info(f"Tool needs more time, left to complete: {len(left)}") From 059287e018b03d16ed27a7037d4272f0b50f8c9e Mon Sep 17 00:00:00 2001 From: Andrey170170 <24122004@bk.ru> Date: Tue, 27 May 2025 02:19:37 -0400 Subject: [PATCH 2/3] Refactor transfer_and_type_change tool: update imports, enhance class documentation, and improve scheduling logic --- .../transfer_and_type_change/filter.py | 22 ---- .../tools_filter.slurm | 20 ---- src/TreeOfLife_toolbox/__init__.py | 2 +- .../transfer_and_type_change/README.md | 62 ++++++++++ .../transfer_and_type_change/__init__.py | 0 .../transfer_and_type_change/classes.py | 109 +++++++++++++++--- .../transfer_and_type_change/main.py | 89 ++++++++------ .../transfer_and_type_change/runner.py | 12 +- .../transfer_and_type_change/scheduler.py | 12 +- .../tools_scheduler.slurm | 2 +- .../transfer_and_type_change/tools_submit.sh | 1 - .../tools_verifier.slurm | 2 +- .../tools_worker.slurm | 3 +- .../transfer_and_type_change/verification.py | 14 ++- 14 files changed, 244 insertions(+), 106 deletions(-) delete mode 100644 src/DD_tools/transfer_and_type_change/filter.py delete mode 100644 src/DD_tools/transfer_and_type_change/tools_filter.slurm create mode 100644 src/TreeOfLife_toolbox/transfer_and_type_change/README.md rename src/{DD_tools => TreeOfLife_toolbox}/transfer_and_type_change/__init__.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/transfer_and_type_change/classes.py (51%) rename src/{DD_tools => TreeOfLife_toolbox}/transfer_and_type_change/main.py (77%) rename src/{DD_tools => TreeOfLife_toolbox}/transfer_and_type_change/runner.py (66%) rename src/{DD_tools => TreeOfLife_toolbox}/transfer_and_type_change/scheduler.py (67%) rename src/{DD_tools => TreeOfLife_toolbox}/transfer_and_type_change/tools_scheduler.slurm (85%) rename src/{DD_tools => TreeOfLife_toolbox}/transfer_and_type_change/tools_submit.sh (97%) rename src/{DD_tools => TreeOfLife_toolbox}/transfer_and_type_change/tools_verifier.slurm (83%) rename src/{DD_tools => TreeOfLife_toolbox}/transfer_and_type_change/tools_worker.slurm (76%) rename src/{DD_tools => TreeOfLife_toolbox}/transfer_and_type_change/verification.py (78%) diff --git a/src/DD_tools/transfer_and_type_change/filter.py b/src/DD_tools/transfer_and_type_change/filter.py deleted file mode 100644 index df8af2e..0000000 --- a/src/DD_tools/transfer_and_type_change/filter.py +++ /dev/null @@ -1,22 +0,0 @@ -import os - -from DD_tools.main.config import Config -from DD_tools.main.utils import init_logger -from DD_tools.transfer_and_type_change.classes import Filter - -if __name__ == "__main__": - config_path = os.environ.get("CONFIG_PATH") - if config_path is None: - raise ValueError("CONFIG_PATH not set") - - config = Config.from_path(config_path, "tools") - logger = init_logger(__name__) - - tool_filter = Filter(config) - - logger.info("Starting filter") - tool_filter.run() - - logger.info("completed filtering") - - tool_filter = None diff --git a/src/DD_tools/transfer_and_type_change/tools_filter.slurm b/src/DD_tools/transfer_and_type_change/tools_filter.slurm deleted file mode 100644 index ddb1c9d..0000000 --- a/src/DD_tools/transfer_and_type_change/tools_filter.slurm +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash -#SBATCH --job-name tool_filter -#SBATCH --mem=0 - -logs_dir="${OUTPUT_TOOLS_LOGS_FOLDER}" -mkdir -p "$logs_dir" - -driver_memory="110G" -executor_memory="64G" - -module load spark/3.4.1 -module load miniconda3/23.3.1-py310 -source "${REPO_ROOT}/.venv/bin/activate" -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" - -pbs-spark-submit \ - --driver-memory $driver_memory \ - --executor-memory $executor_memory \ - "/users/PAS2119/andreykopanev/distributed_downloader_test/column_name_change/filter.py" \ - > "${logs_dir}/tool_filter.log" diff --git a/src/TreeOfLife_toolbox/__init__.py b/src/TreeOfLife_toolbox/__init__.py index c1a2a93..78abf93 100644 --- a/src/TreeOfLife_toolbox/__init__.py +++ b/src/TreeOfLife_toolbox/__init__.py @@ -1,2 +1,2 @@ -from TreeOfLife_toolbox import tol200m_fathom_net_crop, tol200m_bioscan_data_transfer, data_transfer +from TreeOfLife_toolbox import tol200m_fathom_net_crop, tol200m_bioscan_data_transfer, data_transfer, transfer_and_type_change diff --git a/src/TreeOfLife_toolbox/transfer_and_type_change/README.md b/src/TreeOfLife_toolbox/transfer_and_type_change/README.md new file mode 100644 index 0000000..6b68bf6 --- /dev/null +++ b/src/TreeOfLife_toolbox/transfer_and_type_change/README.md @@ -0,0 +1,62 @@ +# Transfer and Type Change Tool + +## Summary + +This tool was specifically created for transferring data from one place to another on research storage and changing the column type in certain chunks. It performs two main operations: + +1. Transfers parquet files from a source location to a destination location +2. Converts the 'source_id' column from its original type to string type + +The tool operates in three phases: + +- **Filtering**: Processes the division CSV to identify which files need to be processed +- **Scheduling**: Creates a schedule for parallel processing by assigning files to worker ranks +- **Execution**: Runs multiple workers in parallel to read, convert, and write files + +## Configuration Parameters + +Required configuration fields: + +- `src_path` - Path to the source data (absolute path) +- `dst_path` - Path to the destination data (absolute path) + +## Division File + +Before running the tool, you need to create a `divisions.csv` file that has the following columns: + +- `division` - ID of the division (e.g. `0`, `1`, `2`, etc.) +- `source` - Source name of the data (is part of the path) +- `server` - Server name of the data (is part of the path) +- `file_name` - File name of the data (is part of the path) + +The goal of this file is to divide the data into manageable chunks (smaller than 10TB) for efficient processing. + +## Usage + +```bash +python -m TreeOfLife_toolbox.transfer_and_type_change.main transfer_and_type_change +``` + +## Pre-conditions + +- Source data should be in the following folder structure: + +``` +/source=/data/server=/ +``` + +- The source files must be in parquet format +- Each file must have a 'source_id' column +- The division CSV file must be properly formatted + +## Post-conditions + +- Destination data will be in the following folder structure: + +``` +/source=/server=/ +``` + +- The 'source_id' column in all files will be converted to string type +- Original files will be removed after successful transfer and conversion +- Verification records will be created to track which files have been processed diff --git a/src/DD_tools/transfer_and_type_change/__init__.py b/src/TreeOfLife_toolbox/transfer_and_type_change/__init__.py similarity index 100% rename from src/DD_tools/transfer_and_type_change/__init__.py rename to src/TreeOfLife_toolbox/transfer_and_type_change/__init__.py diff --git a/src/DD_tools/transfer_and_type_change/classes.py b/src/TreeOfLife_toolbox/transfer_and_type_change/classes.py similarity index 51% rename from src/DD_tools/transfer_and_type_change/classes.py rename to src/TreeOfLife_toolbox/transfer_and_type_change/classes.py index 1cf5ca5..03a1d2e 100644 --- a/src/DD_tools/transfer_and_type_change/classes.py +++ b/src/TreeOfLife_toolbox/transfer_and_type_change/classes.py @@ -4,30 +4,48 @@ import pandas as pd -from DD_tools.main.config import Config -from DD_tools.main.filters import PythonFilterToolBase -from DD_tools.main.runners import MPIRunnerTool -from DD_tools.main.schedulers import DefaultScheduler - - -class Filter(PythonFilterToolBase): - def __init__(self, cfg: Config): - super().__init__(cfg) - - self.filter_name: str = "transfer_and_type_change" - - def run(self): - pass +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.runners import MPIRunnerTool +from TreeOfLife_toolbox.main.schedulers import DefaultScheduler class ScheduleCreation(DefaultScheduler): + """ + A scheduler class that creates the data transfer and type change schedule. + + This class is responsible for reading the filter table files that contain information + about which files need to be processed, and creating a schedule file that will be used + by runner instances to process the data in parallel. The schedule assigns each file to + a specific rank for balanced workload distribution. + """ def __init__(self, cfg: Config, seq_id: int): + """ + Initialize the ScheduleCreation scheduler. + + Args: + cfg (Config): Configuration object containing settings for the tool + seq_id (int): Sequence ID of the batch to process + """ super().__init__(cfg) self.filter_name: str = "transfer_and_type_change" self.seq_id = seq_id def run(self): + """ + Execute the scheduling process. + + This method: + 1. Reads all CSV files from the filter table directory for the current sequence ID + 2. Combines them into a single DataFrame + 3. Extracts the necessary columns (source, server, file_name) + 4. Removes duplicates + 5. Assigns a rank to each file for load balancing + 6. Saves the schedule to a CSV file + + Raises: + ValueError: If filter_name is not set + """ assert self.filter_name is not None, ValueError("filter name is not set") filter_folder = os.path.join( @@ -49,19 +67,47 @@ def run(self): class Runner(MPIRunnerTool): + """ + A runner class that executes the data transfer and type change operations. + + This class is responsible for processing files according to the schedule. + For each file, it: + 1. Reads the parquet file from the source location + 2. Converts the 'source_id' column from its original type to string + 3. Saves the modified file to the destination location + 4. Removes the original file to free up storage space + + The class uses MPI for parallel processing across multiple nodes and cores. + """ def __init__(self, cfg: Config, seq_id: int): + """ + Initialize the Runner. + + Args: + cfg (Config): Configuration object containing settings for the tool + seq_id (int): Sequence ID of the batch to process + """ super().__init__(cfg) self.filter_name: str = "transfer_and_type_change" self.data_scheme: List[str] = ["source", "server", "file_name"] self.verification_scheme: List[str] = ["source", "server", "file_name"] - self.total_time = 150 + self.total_time = 150 # Maximum execution time in seconds self.seq_id = seq_id self.src_path = self.config["src_path"] self.dst_path = self.config["dst_path"] def ensure_folders_created(self): + """ + Ensure that all necessary folders for processing exist. + + This method creates the verification folder and other required directories + if they don't already exist. + + Raises: + ValueError: If filter_name or verification_scheme is not set + """ assert self.filter_name is not None, ValueError("filter name is not set") assert self.verification_scheme is not None, ValueError( "verification scheme is not set" @@ -78,6 +124,26 @@ def ensure_folders_created(self): def apply_filter_different( self, filtering_df: pd.DataFrame, source: str, server: str, file_name: str ) -> int: + """ + Apply type conversion to a specific file and transfer it to the destination. + + This method: + 1. Constructs source and destination paths + 2. Creates destination directories if needed + 3. Reads the source parquet file + 4. Converts the 'source_id' column to string type + 5. Writes the modified data to the destination + 6. Removes the source file to free up space + + Args: + filtering_df (pd.DataFrame): DataFrame containing filter information + source (str): Source name (used to construct the path) + server (str): Server name (used to construct the path) + file_name (str): File name (used to construct the path) + + Returns: + int: Number of rows in the processed file, or 0 if the file doesn't exist + """ self.is_enough_time() src_path = os.path.join( @@ -114,6 +180,19 @@ def apply_filter_different( return len(renamed_parquet) def runner_fn(self, df_local: pd.DataFrame) -> int: + """ + Process a single file according to the schedule. + + This method extracts information about the file to process, + calls the apply_filter_different method to perform the actual processing, + and records the result in the verification file. + + Args: + df_local (pd.DataFrame): DataFrame containing information about the file to process + + Returns: + int: 1 if processing was successful, 0 if an error occurred + """ filtering_df = df_local.reset_index(drop=True) source = filtering_df.iloc[0]["source"] server = filtering_df.iloc[0]["server"] diff --git a/src/DD_tools/transfer_and_type_change/main.py b/src/TreeOfLife_toolbox/transfer_and_type_change/main.py similarity index 77% rename from src/DD_tools/transfer_and_type_change/main.py rename to src/TreeOfLife_toolbox/transfer_and_type_change/main.py index 0384c12..117366b 100644 --- a/src/DD_tools/transfer_and_type_change/main.py +++ b/src/TreeOfLife_toolbox/transfer_and_type_change/main.py @@ -1,15 +1,16 @@ import argparse import os from logging import Logger +from pathlib import Path from typing import Dict, List, Optional, TextIO, Tuple import pandas as pd from attr import Factory, define, field -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import ( +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import ( init_logger, truncate_paths, ensure_created, @@ -17,23 +18,26 @@ preprocess_dep_ids, ) -division_df = pd.read_csv( - "/users/PAS2119/andreykopanev/distributed_downloader_test/data_move_division_big.csv" -) - @define class Tools: config: Config tool_name: str seq_id: int + division_df: pd.DataFrame logger: Logger = field(default=Factory(lambda: init_logger(__name__))) tool_folder: Optional[str] = None tool_job_history_path: Optional[str] = None tool_checkpoint_path: Optional[str] = None - checkpoint_scheme: Optional[Dict[str, bool]] = None + checkpoint_scheme = { + "filtering_scheduled": False, + "filtering_completed": False, + "scheduling_scheduled": False, + "scheduling_completed": False, + "completed": False, + } tool_checkpoint: Optional[Checkpoint] = None _checkpoint_override: Optional[Dict[str, bool]] = None @@ -42,16 +46,17 @@ class Tools: @classmethod def from_path( - cls, - path: str, - tool_name: str, - set_id: int, - checkpoint_override: Optional[Dict[str, bool]] = None, - tool_name_override: Optional[bool] = False, + cls, + path: str, + tool_name: str, + set_id: int, + division_df: pd.DataFrame, + checkpoint_override: Optional[Dict[str, bool]] = None, + tool_name_override: Optional[bool] = False, ) -> "Tools": if ( - not tool_name_override - and tool_name not in ToolsRegistryBase.TOOLS_REGISTRY.keys() + not tool_name_override + and tool_name not in ToolsRegistryBase.TOOLS_REGISTRY.keys() ): raise ValueError("unknown tool name") @@ -60,6 +65,7 @@ def from_path( tool_name=tool_name, checkpoint_override=checkpoint_override, seq_id=set_id, + division_df=division_df, ) def __attrs_post_init__(self): @@ -76,18 +82,12 @@ def __attrs_post_init__(self): self.tool_folder, "tool_checkpoint.yaml" ) - if not self.checkpoint_scheme: - self.checkpoint_scheme = { - "filtered": False, - "schedule_created": False, - "completed": False, - } - self.__init_environment() self.__init_file_structure() def __init_environment(self) -> None: os.environ["CONFIG_PATH"] = self.config.config_path + os.environ["TOOLBOX_PATH"] = str(Path(__file__).parent.parent.resolve()) os.environ["ACCOUNT"] = self.config["account"] os.environ["PATH_TO_INPUT"] = self.config["path_to_input"] @@ -149,7 +149,7 @@ def __update_job_history(self, new_id: int) -> None: def __schedule_filtering(self) -> None: self.logger.info("Scheduling filtering script") - sub_division = division_df[division_df["division"] == self.seq_id] + sub_division = self.division_df[self.division_df["division"] == self.seq_id] sub_division.to_csv( os.path.join(self.tool_folder, "filter_table", "division.csv"), index=False, @@ -199,16 +199,24 @@ def __schedule_workers(self) -> None: self.logger.info("Scheduled workers script") def apply_tool(self): - if not self.tool_checkpoint.get("filtered", False): + if not ( + self.tool_checkpoint.get("filtering_scheduled", False) + or self.tool_checkpoint.get("filtering_completed", False) + ): self.__schedule_filtering() else: - self.logger.info("Skipping filtering script: table already created") + self.logger.info( + "Skipping filtering script: job is already scheduled or table has been already created" + ) - if not self.tool_checkpoint.get("schedule_created", False): + if not ( + self.tool_checkpoint.get("schedule_scheduled", False) + or self.tool_checkpoint.get("schedule_completed", False) + ): self.__schedule_schedule_creation() else: self.logger.info( - "Skipping schedule creation script: schedule already created" + "Skipping schedule creation script: job is already scheduled or schedule has been already created" ) if not self.tool_checkpoint.get("completed", False): @@ -235,6 +243,12 @@ def main(): type=str, help="the name of the tool that is intended to be used", ) + parser.add_argument( + "divisions_path", + metavar="divisions_path", + type=str, + help="the path to the CSV file with job divisions", + ) parser.add_argument( "--reset_filtering", action="store_true", @@ -260,15 +274,23 @@ def main(): state_override = None if _args.reset_filtering: state_override = { - "filtered": False, - "schedule_created": False, + "filtering_scheduled": False, + "filtering_completed": False, + "scheduling_scheduled": False, + "scheduling_completed": False, "verification": False, + "completed": False, } elif _args.reset_scheduling: - state_override = {"schedule_created": False} + state_override = { + "scheduling_scheduled": False, + "scheduling_completed": False, + "completed": False, + } if _args.reset_runners: - state_override = {"verification": False} + state_override = {"verification": False, "completed": False} + division_df = pd.read_csv(_args.divisions_path) division_total = division_df["division"].unique().tolist() for division in division_total: @@ -276,6 +298,7 @@ def main(): config_path, tool_name, division, + division_df=division_df, checkpoint_override=state_override, tool_name_override=_args.tool_name_override, ) diff --git a/src/DD_tools/transfer_and_type_change/runner.py b/src/TreeOfLife_toolbox/transfer_and_type_change/runner.py similarity index 66% rename from src/DD_tools/transfer_and_type_change/runner.py rename to src/TreeOfLife_toolbox/transfer_and_type_change/runner.py index 4253c54..c8296fc 100644 --- a/src/DD_tools/transfer_and_type_change/runner.py +++ b/src/TreeOfLife_toolbox/transfer_and_type_change/runner.py @@ -1,9 +1,9 @@ import argparse import os -from DD_tools.main.config import Config -from DD_tools.main.utils import init_logger -from DD_tools.transfer_and_type_change.classes import Runner +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.utils import init_logger +from TreeOfLife_toolbox.transfer_and_type_change.classes import Runner if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") @@ -14,6 +14,12 @@ logger = init_logger(__name__) parser = argparse.ArgumentParser(description='Running step of the Tool') + parser.add_argument( + "scheduler_name", + metavar="scheduler_name", + type=str, + help="the name of the tool that is intended to be used", + ) parser.add_argument("seq_id", metavar="seq_id", type=int, help="the name of the tool that is intended to be used") _args = parser.parse_args() diff --git a/src/DD_tools/transfer_and_type_change/scheduler.py b/src/TreeOfLife_toolbox/transfer_and_type_change/scheduler.py similarity index 67% rename from src/DD_tools/transfer_and_type_change/scheduler.py rename to src/TreeOfLife_toolbox/transfer_and_type_change/scheduler.py index aa5ab98..580eaed 100644 --- a/src/DD_tools/transfer_and_type_change/scheduler.py +++ b/src/TreeOfLife_toolbox/transfer_and_type_change/scheduler.py @@ -2,9 +2,9 @@ import os import pprint -from DD_tools.main.config import Config -from DD_tools.main.utils import init_logger -from DD_tools.transfer_and_type_change.classes import ScheduleCreation +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.utils import init_logger +from TreeOfLife_toolbox.transfer_and_type_change.classes import ScheduleCreation if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") @@ -15,6 +15,12 @@ logger = init_logger(__name__) parser = argparse.ArgumentParser(description='Running step of the Tool') + parser.add_argument( + "scheduler_name", + metavar="scheduler_name", + type=str, + help="the name of the tool that is intended to be used", + ) parser.add_argument("seq_id", metavar="seq_id", type=int, help="the name of the tool that is intended to be used") _args = parser.parse_args() diff --git a/src/DD_tools/transfer_and_type_change/tools_scheduler.slurm b/src/TreeOfLife_toolbox/transfer_and_type_change/tools_scheduler.slurm similarity index 85% rename from src/DD_tools/transfer_and_type_change/tools_scheduler.slurm rename to src/TreeOfLife_toolbox/transfer_and_type_change/tools_scheduler.slurm index 72de5fc..044ca11 100644 --- a/src/DD_tools/transfer_and_type_change/tools_scheduler.slurm +++ b/src/TreeOfLife_toolbox/transfer_and_type_change/tools_scheduler.slurm @@ -29,4 +29,4 @@ srun \ --cpus-per-task=1 \ --mem=0 \ --output="${logs_dir}/tool_scheduler.log" \ - python "/users/PAS2119/andreykopanev/distributed_downloader_test/transfer_and_type_change/scheduler.py" "${seq_id}" + python "${TOOLBOX_PATH}/transfer_and_type_change/scheduler.py" "${tool_name}" "${seq_id}" diff --git a/src/DD_tools/transfer_and_type_change/tools_submit.sh b/src/TreeOfLife_toolbox/transfer_and_type_change/tools_submit.sh similarity index 97% rename from src/DD_tools/transfer_and_type_change/tools_submit.sh rename to src/TreeOfLife_toolbox/transfer_and_type_change/tools_submit.sh index 16f0f40..bfd444f 100644 --- a/src/DD_tools/transfer_and_type_change/tools_submit.sh +++ b/src/TreeOfLife_toolbox/transfer_and_type_change/tools_submit.sh @@ -51,5 +51,4 @@ fi sbatch_cmd+=" --account=${ACCOUNT} ${script} ${tool_name} ${seq_id}" -#echo "$sbatch_cmd" > test.log eval "$sbatch_cmd" diff --git a/src/DD_tools/transfer_and_type_change/tools_verifier.slurm b/src/TreeOfLife_toolbox/transfer_and_type_change/tools_verifier.slurm similarity index 83% rename from src/DD_tools/transfer_and_type_change/tools_verifier.slurm rename to src/TreeOfLife_toolbox/transfer_and_type_change/tools_verifier.slurm index 0e279e5..383911e 100644 --- a/src/DD_tools/transfer_and_type_change/tools_verifier.slurm +++ b/src/TreeOfLife_toolbox/transfer_and_type_change/tools_verifier.slurm @@ -28,4 +28,4 @@ srun \ --cpus-per-task=1 \ --mem=0 \ --output="${logs_dir}/tool_verifier.log" \ - python "/users/PAS2119/andreykopanev/distributed_downloader_test/transfer_and_type_change/verification.py" "${seq_id}" + python "${TOOLBOX_PATH}/transfer_and_type_change/verification.py" "${tool_name}" "${seq_id}" diff --git a/src/DD_tools/transfer_and_type_change/tools_worker.slurm b/src/TreeOfLife_toolbox/transfer_and_type_change/tools_worker.slurm similarity index 76% rename from src/DD_tools/transfer_and_type_change/tools_worker.slurm rename to src/TreeOfLife_toolbox/transfer_and_type_change/tools_worker.slurm index 013406a..a16c4fb 100644 --- a/src/DD_tools/transfer_and_type_change/tools_worker.slurm +++ b/src/TreeOfLife_toolbox/transfer_and_type_change/tools_worker.slurm @@ -20,7 +20,6 @@ module load miniconda3/23.3.1-py310 source "${REPO_ROOT}/.venv/bin/activate" export PYARROW_IGNORE_TIMEZONE=1 export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" srun \ --mpi=pmi2 \ @@ -29,4 +28,4 @@ srun \ --cpus-per-task="$TOOLS_CPU_PER_WORKER" \ --mem=0 \ --output="${logs_dir}/tool_worker-%2t.log" \ - python "/users/PAS2119/andreykopanev/distributed_downloader_test/transfer_and_type_change/runner.py" "${seq_id}" + python "${TOOLBOX_PATH}/transfer_and_type_change/runner.py" "${tool_name}" "${seq_id}" diff --git a/src/DD_tools/transfer_and_type_change/verification.py b/src/TreeOfLife_toolbox/transfer_and_type_change/verification.py similarity index 78% rename from src/DD_tools/transfer_and_type_change/verification.py rename to src/TreeOfLife_toolbox/transfer_and_type_change/verification.py index 9327d57..d522292 100644 --- a/src/DD_tools/transfer_and_type_change/verification.py +++ b/src/TreeOfLife_toolbox/transfer_and_type_change/verification.py @@ -3,10 +3,10 @@ import pandas as pd -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.runners import MPIRunnerTool -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.runners import MPIRunnerTool +from TreeOfLife_toolbox.main.utils import init_logger if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") @@ -17,6 +17,12 @@ logger = init_logger(__name__) parser = argparse.ArgumentParser(description='Running step of the Tool') + parser.add_argument( + "scheduler_name", + metavar="scheduler_name", + type=str, + help="the name of the tool that is intended to be used", + ) parser.add_argument("seq_id", metavar="seq_id", type=int, help="the name of the tool that is intended to be used") _args = parser.parse_args() From 11b7827fc6601fc19a241acabcc223e6a294aec2 Mon Sep 17 00:00:00 2001 From: Andrey170170 <24122004@bk.ru> Date: Tue, 27 May 2025 02:31:09 -0400 Subject: [PATCH 3/3] Update README.md: add detailed documentation for Slurm submission scripts and their architecture --- .../transfer_and_type_change/README.md | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/src/TreeOfLife_toolbox/transfer_and_type_change/README.md b/src/TreeOfLife_toolbox/transfer_and_type_change/README.md index 6b68bf6..69cbb96 100644 --- a/src/TreeOfLife_toolbox/transfer_and_type_change/README.md +++ b/src/TreeOfLife_toolbox/transfer_and_type_change/README.md @@ -60,3 +60,96 @@ python -m TreeOfLife_toolbox.transfer_and_type_change.main transfe - The 'source_id' column in all files will be converted to string type - Original files will be removed after successful transfer and conversion - Verification records will be created to track which files have been processed + +## Special Submission Scripts + +## tools_submit.sh + +### Purpose + +`tools_submit.sh` is designed for submitting tool-related jobs to Slurm with specific resource requirements for tooling +operations. It supports both regular and Spark-based tool submissions and handles job dependencies. + +### Usage + +```bash +./tools_submit.sh script tool_name seq_id [dependency] [--spark] +``` + +### Arguments + +1. `script`: The script file to submit to Slurm +2. `tool_name`: The name of the tool to be run +3. `seq_id`: The sequence ID for the job's division in the `divisions.csv` file +4. `dependency`: (Optional) The job ID that this job depends on +5. `--spark`: (Optional) Flag indicating this is a Spark-based job + +### Features + +- Sets up the repository root environment variable (`REPO_ROOT`) +- Creates the logs directory automatically +- Handles job dependencies (if provided) +- Special handling for Spark jobs, which have different resource requirements +- For non-Spark jobs, applies tool-specific resource configurations + +### Environment Variables Used + +- `OUTPUT_TOOLS_LOGS_FOLDER`: Directory to store tool log files +- `TOOLS_MAX_NODES`: Maximum number of nodes for tools +- `TOOLS_WORKERS_PER_NODE`: Number of tool workers per node +- `TOOLS_CPU_PER_WORKER`: Number of CPUs per tool worker +- `ACCOUNT`: Slurm account to charge the job to + +## Tools Slurm Script Architecture + +### tools_scheduler.slurm + +**Purpose**: Creates execution schedules for the tool workers based on filtered data. + +**Key Components**: + +- Runs on a single node +- Calls `transfer_and_type_change/scheduler.py` with the specified tool name and sequence ID +- Processes the CSV files produced by the filter step +- Assigns images to different worker processes to balance the load +- Typical run time is 5 minutes +- Creates schedule files that map partitions to worker ranks + +**Example**: +For a size-based filter tool, the scheduler might group images by server name and partition ID (which corresponds to a +single parquet file) and assign these groups to different MPI ranks (e.g., worker 1 processes partitions 1,2,3,4). + +### tools_worker.slurm + +**Purpose**: Executes the actual tool processing using MPI parallelism. + +**Key Components**: + +- Runs on the configured number of nodes with specified worker distribution +- Calls `transfer_and_type_change/runner.py` with the specified tool name and sequence ID +- Reads the schedule created by the scheduler and processes assigned partitions +- Uses all allocated nodes for maximum parallelism +- Configures memory settings for optimal performance +- Typical run time is 3 hours +- Creates output files specific to the tool (e.g., resized images) + +**Example**: +For an image resizing tool, each worker would load the images assigned to it from the schedule, resize them to the +specified dimensions, and save the results to the output location. + +### tools_verifier.slurm + +**Purpose**: Verifies the completion of the tool processing and updates status flags. + +**Key Components**: + +- Runs on a single node +- Calls `transfer_and_type_change/verification.py` with the specified tool name and sequence ID +- Checks if all scheduled tasks have been completed +- Updates the completion status in the tool's checkpoint file +- Typical run time is 5 minutes +- Sets the "completed" flag when all processing is done + +**Example**: +For any tool, the verifier checks if all scheduled tasks have been processed successfully and marks the overall +operation as complete when verified.