From f801d791837e65b0bafd8c5ddde8d6422fa85566 Mon Sep 17 00:00:00 2001 From: rmadeye Date: Thu, 10 Aug 2023 11:29:08 +0200 Subject: [PATCH 1/2] rmadeye tools added --- .gitmodules | 3 +++ protein_analysis_tools | 1 + 2 files changed, 4 insertions(+) create mode 100644 .gitmodules create mode 160000 protein_analysis_tools diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..b0f41c5 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "protein_analysis_tools"] + path = protein_analysis_tools + url = https://github.com/Rmadeye/protein_analysis_tools diff --git a/protein_analysis_tools b/protein_analysis_tools new file mode 160000 index 0000000..dbb813b --- /dev/null +++ b/protein_analysis_tools @@ -0,0 +1 @@ +Subproject commit dbb813be6ce9e25493d98e2033d6335461825d35 From 8a170c2e7d5a627b06782a26e891468b5eb2aa9c Mon Sep 17 00:00:00 2001 From: rmadeye Date: Thu, 10 Aug 2023 11:36:06 +0200 Subject: [PATCH 2/2] reordered tools --- lbs/af2/af2_analysis.py | 66 ++++++ lbs/af2/af2_input_generator.py | 158 +++++++++++++++ lbs/amber/process_amber.py | 77 +++++++ lbs/amber/process_multiple_amber_outputs.py | 212 ++++++++++++++++++++ lbs/pdb_utils/pdb_utils.py | 95 +++++++++ lbs/rosetta/rosetta_parser.py | 125 ++++++++++++ lbs/rosetta/run_rosetta.py | 80 ++++++++ lbs/usalign/run_usalign.py | 142 +++++++++++++ lbs/usalign/usalign_parser.py | 93 +++++++++ protein_analysis_tools | 1 - 10 files changed, 1048 insertions(+), 1 deletion(-) create mode 100644 lbs/af2/af2_analysis.py create mode 100644 lbs/af2/af2_input_generator.py create mode 100644 lbs/amber/process_amber.py create mode 100644 lbs/amber/process_multiple_amber_outputs.py create mode 100644 lbs/pdb_utils/pdb_utils.py create mode 100644 lbs/rosetta/rosetta_parser.py create mode 100644 lbs/rosetta/run_rosetta.py create mode 100644 lbs/usalign/run_usalign.py create mode 100644 lbs/usalign/usalign_parser.py delete mode 160000 protein_analysis_tools diff --git a/lbs/af2/af2_analysis.py b/lbs/af2/af2_analysis.py new file mode 100644 index 0000000..6d767bb --- /dev/null +++ b/lbs/af2/af2_analysis.py @@ -0,0 +1,66 @@ +import json +import os +from statistics import mean +import re + +import numpy as np +import pandas as pd +from typing import Union, List + +class AF2Analysis: + + + def __init__(self, json_output: Union[str, List]) -> None: + """ + Initialize the AF2_analysis object. + + Parameters: + json_output (Union[str, List]): Either the path to the AF2 JSON output file or a list of JSON files. + + Returns: + None + """ + self.json_output = json_output + if isinstance(self.json_output, str): + self.json_output = [json_output] + elif isinstance(self.json_output, list): + self.json_output = json_output + + + def read_json(self) -> pd.DataFrame: + """ + Read the AF2 JSON output file. + + Parameters: + None + + Returns: + pd.DataFrame: The DataFrame containing the AF2 JSON output data. + """ + df = pd.DataFrame() + for _json in self.json_output: + + with open(_json) as f: + _file = f.read() + parsed = json.loads(_file) + + plddt = mean(parsed['plddt']) + ptm = float(parsed['ptm']) + pae = np.asarray(parsed['pae'], dtype='float') + pae = (pae + pae.T) / 2 + pdb_file = _json.replace('.json', '.pdb').replace('scores','unrelaxed') + rank = re.findall('_rank_(\d{3})_',_json) + pae = np.asarray(parsed['pae'], dtype='float') + pae = (pae + pae.T) / 2 + + j_df = pd.DataFrame({ + 'json': [_json], + 'plddt': [plddt], + 'ptm': [ptm], + 'pae': [pae], + 'pdb_file': [pdb_file], + 'rank': [rank][0] + }) + df = pd.concat([df,j_df], axis=0, ignore_index=True) + + return df diff --git a/lbs/af2/af2_input_generator.py b/lbs/af2/af2_input_generator.py new file mode 100644 index 0000000..3b23b4a --- /dev/null +++ b/lbs/af2/af2_input_generator.py @@ -0,0 +1,158 @@ +import os +from typing import List, Tuple, Union +import math +import re + +class AF2InputGenerator: + def __init__( + self, input_fasta: Union[str, List], output_dir: Union[os.PathLike, str] + ) -> None: + """ + Initialize the AF2_input_generator object. + + Parameters: + input_fasta (Union[str, List]): Either the path to the input FASTA, A3M file or a list of FASTA sequences. + output_dir (Union[os.PathLike, str]): The directory where the AF2 input files will be written. + + Returns: + None + """ + + + self.input_fasta = input_fasta + self.output_dir = output_dir + if isinstance(self.input_fasta, str): + self.input_fasta = input_fasta + elif isinstance(self.input_fasta, list): + self.input_fasta = ":".join(self.input_fasta) + + def check_sequence(self) -> None: + """ + Check if the input sequence is a valid protein sequence. + + Parameters: + None + + Returns: + None + + Raises: + ValueError: If the input sequence contains invalid characters or is empty. + """ + sequence = self.input_fasta.replace(" ", "") + valid_chars = re.compile(r"^[ACDEFGHIKLMNPQRSTVWY:]+$") + if not valid_chars.match(sequence): + raise ValueError("Invalid protein sequence. It contains invalid characters.") + if not sequence: + raise ValueError("Invalid protein sequence. It is empty.") + + def write_af2_inputs(self, oligo_state=1) -> str: + """ + Write AF2 input files. + + Parameters: + oligo_state (int): The number of oligo states. + + Returns: + None + """ + os.makedirs(self.output_dir, exist_ok=True) + + if self.input_fasta.endswith(".a3m"): + # copy file to output dir + os.system(f"cp {self.input_fasta} {self.output_dir}") + + return "a3m detected, copying file to output dir" + + else: + self.check_sequence() + + output_name = ( + self.output_dir.split("/")[-1] + if "/" in self.output_dir + else self.output_dir + ) + if oligo_state > 1: + self.input_fasta = ":".join([self.input_fasta] * oligo_state) + + with open(os.path.join(self.output_dir, "af2input_fasta.csv"), "w") as fout: + fout.write("id,sequence\n") + fout.write(f"{output_name},{self.input_fasta}") + + return "fasta detected, writing af2_input.fasta" + + def write_af2_exec( + self, + n_cores=7, + amber=True, + num_models=5, + num_recycles=5, + nodes_excluded=[0,1,2,3], + memory=16, + use_dropout=False, + num_seeds=1, + max_seqs=False, + max_extra_seqs=False + ) -> None: + """ + Write the AF2 execution script. + + Parameters: + n_cores (int): The number of CPU cores to use. + amber (bool): Whether to use Amber relaxation (True) or not (False). + num_models (int): The number of models to generate. + num_recycles (int): The number of recycle steps. + nodes_excluded (List[int]): List of node numbers to exclude. + memory (int): The memory (in GB) required for the execution. + use_dropout (bool): Whether to use dropout for models (True) or not (False). + num_seeds (int): The number of random seeds to use. + max_seqs (int): The maximum number of sequences (should be a power of 2). + max_extra_seqs (int): The maximum number of extra sequences. + + + Returns: + None + """ + if amber: + amber = " --amber" + else: + amber = "" + if use_dropout: + use_dropout = " --use-dropout" + else: + use_dropout = "" + if max_seqs: + if not (math.log2(max_seqs).is_integer()): + raise ValueError("max_seq must be a power of 2.") + + max_seqs = f" --max-seqs {max_seqs}" + else: + max_seqs = "" + if max_extra_seqs: + if not (math.log2(max_seqs).is_integer()): + raise ValueError("max_extra_seqs must be a power of 2.") + max_extra_seqs = f" --max-extra-seq {max_extra_seqs}" + else: + max_extra_seqs = "" + + # remove whitespace from nodes_excluded + + + + + nodes_excluded = str(nodes_excluded).replace(" ", "") + + + + with open(os.path.join(self.output_dir, "af2_run.sh"), "w") as af2exec: + af2exec.write( + f"#!/bin/bash \n\n#SBATCH -p gpu\n#SBATCH -n {n_cores}\n#SBATCH -N 1\n#SBATCH --gres=gpu:1\n#SBATCH --exclude=edi0{nodes_excluded}\n#SBATCH --mem={memory}GB\n#SBATCH -J {self.output_dir}\nsource /opt/miniconda3/bin/activate cf_1.5\n" + ) # necessary imports for AF2 to run + if self.input_fasta.endswith(".a3m"): + af2exec.write( + f"colabfold_batch *.a3m . --num-models {num_models} --num-recycle {num_recycles} {amber} {use_dropout} --num-seeds {num_seeds} {max_seqs} {max_extra_seqs}" + ) + else: + af2exec.write( + f"colabfold_batch af2input_fasta.csv . --num-models {num_models} --num-recycle {num_recycles} {amber} {use_dropout} --num-seeds {num_seeds} {max_seqs} {max_extra_seqs}" + ) diff --git a/lbs/amber/process_amber.py b/lbs/amber/process_amber.py new file mode 100644 index 0000000..62e942f --- /dev/null +++ b/lbs/amber/process_amber.py @@ -0,0 +1,77 @@ +import os +import pandas as pd + + +class ProcessAmber: + """ + Class to process Amber simulation data from a CSV file. + + Parameters: + input_csv (str): The path to the CSV file containing Amber simulation data. + sim_length (int): The length of the simulation in nanoseconds (ns). + number_of_reps (int): The number of simulation replicates. + trajectory_dump_frequency (int, optional): The frequency at which trajectory frames were saved (default is 5000). + timestep (float, optional): The timestep used in the simulation in femtoseconds (fs) (default is 0.002). + + Raises: + AssertionError: If the input CSV file does not exist, has fewer than 2 columns, + or the number of rows is not divisible by the number of replicates, + or if the number of rows in the CSV is not consistent with the simulation parameters. + + Returns: + None + """ + + def __init__( + self, + input_csv: str, + sim_length: int, + number_of_reps: int, + trajectory_dump_frequency: int = 5000, + timestep: float = 0.002, + ) -> None: + self.sim_length = sim_length + self.number_of_reps = number_of_reps + self.df = pd.read_csv(input_csv, sep="\s+") + # assert input csv exists + assert os.path.exists(input_csv), "Input csv does not exist" + # assert that number of columns in csv is higher than 2 + assert self.df.shape[1] >= 2, "Number of columns in csv is less than 2" + # assert that number of rows in csv divided by number of reps is an integer + assert ( + self.df.shape[0] % self.number_of_reps == 0 + ), "Number of rows in csv is not divisible by number of reps, check sim length" + # assert that sim_length / timestep * 1000 * number_of_reps + number_of_reps is equal to number of rows in csv + frame_numer_expected = ( + self.sim_length / timestep + ) * 1000 / trajectory_dump_frequency * self.number_of_reps + 10 + assert ( + frame_numer_expected == self.df.shape[0] + ), f"Number of rows in csv is not equal to sim_length / timestep * 1000 * number_of_reps + number_of_reps in {input_csv}. {frame_numer_expected} != {self.df.shape[0]}" + + def read_dataframe(self) -> pd.DataFrame: + """ + Get the DataFrame containing Amber simulation data. + + Returns: + pd.DataFrame: The DataFrame containing Amber simulation data. + """ + return self.df + + def obtain_statistics_from_df(self) -> pd.DataFrame: + """ + Obtain statistics from the DataFrame. + + Returns: + pd.DataFrame: A DataFrame containing the mean and standard deviation of each column in the original DataFrame. + """ + # Drop #Frame column + self.df = self.df.drop("#Frame", axis=1) + # Obtain mean and standard deviation from the DataFrame by pivoting the columns + self.df_mean = self.df.mean(axis=0) + self.df_std = self.df.std(axis=0) + # Combine both DataFrames into one + self.df_mean = pd.concat([self.df_mean, self.df_std], axis=1) + self.df_mean.columns = ["mean", "std"] + + return self.df_mean diff --git a/lbs/amber/process_multiple_amber_outputs.py b/lbs/amber/process_multiple_amber_outputs.py new file mode 100644 index 0000000..b36912f --- /dev/null +++ b/lbs/amber/process_multiple_amber_outputs.py @@ -0,0 +1,212 @@ +import numpy as np +import sys +import glob + +import pandas as pd +import seaborn as sns +from matplotlib import pyplot as plt +from matplotlib.pyplot import figure + +sys.path.append("tools/amber") + +import tools.amber.process_amber as ac + + + +class ProcessMultipleAmberOutputs: + """ + Class to process multiple Amber simulation outputs and visualize the results. + + Parameters: + input_csvs (list): List of paths to the CSV files containing Amber simulation data. + sim_length (int): The length of the simulation in nanoseconds (ns). + number_of_reps (int): The number of simulation replicates. + trajectory_dump_frequency (int, optional): The frequency at which trajectory frames were saved (default is 5000). + timestep (float, optional): The timestep used in the simulation in femtoseconds (fs) (default is 0.002). + trajectory_names (list, optional): List of names for each trajectory (default is None). + + Raises: + AssertionError: If any of the input CSV files do not exist, have fewer than 2 columns, + or the number of rows is not divisible by the number of replicates, + or if the number of rows in any CSV is not consistent with the simulation parameters. + + Returns: + None + """ + + def __init__( + self, + input_csvs: list, + sim_length: int, + number_of_reps: int, + trajectory_dump_frequency: int = 5000, + timestep: float = 0.2, + trajectory_names: list = None, + ) -> None: + self.input_csvs = input_csvs + self.sim_length = sim_length + self.number_of_reps = number_of_reps + self.trajectory_dump_frequency = trajectory_dump_frequency + self.timestep = timestep + self.df = pd.DataFrame() + + self.df = pd.DataFrame() + if trajectory_names is None: + self.trajectory_names = [csv.split("/")[-1] for csv in self.input_csvs] + for csv, name in zip(self.input_csvs, self.trajectory_names): + single_df = ac.ProcessAmber( + csv, + self.sim_length, + self.number_of_reps, + self.trajectory_dump_frequency, + self.timestep, + ).read_dataframe() + single_df["structure"] = name + self.df = pd.concat([self.df, single_df], axis=0, ignore_index=True) + else: + self.trajectory_names = trajectory_names + assert len(self.trajectory_names) == len( + self.input_csvs + ), f"Number of trajectory names does not match number of input csvs ({len(self.trajectory_names)} != {len(self.input_csvs)})" + for csv, name in zip(self.input_csvs, self.trajectory_names): + single_df = ac.ProcessAmber( + csv, + self.sim_length, + self.number_of_reps, + self.trajectory_dump_frequency, + self.timestep, + ).read_dataframe() + single_df["structure"] = name + self.df = pd.concat([self.df, single_df], axis=0, ignore_index=True) + self.x_axis_points_number = int( + (self.sim_length / timestep) + * 1000 + / trajectory_dump_frequency + * self.number_of_reps + + 10 + ) + + def read_dataframe(self) -> pd.DataFrame: + """ + Get the DataFrame containing Amber simulation data. + + Returns: + pd.DataFrame: The DataFrame containing Amber simulation data. + """ + return self.df + + def melt_dataframe(self) -> pd.DataFrame: + """ + Melt the DataFrame so that it uses "#Frame" and "structure" as id vars and the rest as value vars. + + Returns: + pd.DataFrame: The melted DataFrame. + """ + return pd.melt( + self.df, + id_vars=["#Frame", "structure"], + var_name="name", + value_name="value", + ) + + def analyze_feature(self, feature: str) -> pd.DataFrame: + """ + Analyze the specified feature from the DataFrame. + + Parameters: + feature (str): The name of the feature to analyze. + + Returns: + pd.DataFrame: A DataFrame containing the mean and standard deviation of the specified feature for each trajectory. + """ + df = ( + self.melt_dataframe().loc[self.melt_dataframe()["name"] == feature] + .groupby("structure")['value'] + .agg({"mean", "std"}) + .round(2) + ) + return df + + def vertical_lines(self, color): + """ + Add vertical lines to the plot. + + Parameters: + color: The color of the vertical lines. + + Returns: + None + """ + line_position = [ + x + for x in range( + 0, + self.x_axis_points_number, + int(self.x_axis_points_number / self.number_of_reps), + ) + ] + for x in line_position: + plt.axvline(x, color="w", linewidth=3) + plt.axvline(x, color="black", linestyle=":") + + def plot_feature( + self, + feature: str, + custom_ticks: list = None, + vertical_lines: bool = False, + png: bool = False, + col_wrap: int = 2, + height: int = 4, + aspect: float = 1.5, + ): + """ + Plot the specified feature for each trajectory. + + Parameters: + feature (str): The name of the feature to plot. + custom_ticks (list, optional): Custom labels for x-axis ticks (default is None). + vertical_lines (bool, optional): Whether to add vertical lines to the plot (default is False). + + Returns: + str: A message indicating the figure has been saved. + """ + if custom_ticks is None: + custom_ticks = [str(x) for x in range(1, self.number_of_reps + 1)] + + feature_df = self.melt_dataframe().loc[self.melt_dataframe()["name"] == feature] + g = sns.FacetGrid( + feature_df, + col="structure", + col_wrap=col_wrap, + height=height, + aspect=aspect, + ) + g.map(sns.lineplot, "#Frame", "value") + sns.set(font_scale=1.5, style="whitegrid") + + if vertical_lines: + g.map(self.vertical_lines) + + g.set( + xlabel="Simulation", + ylabel=feature, + xticks=np.arange( + (self.x_axis_points_number / self.number_of_reps) / 2, + self.x_axis_points_number + + (self.x_axis_points_number / self.number_of_reps / 2), + self.x_axis_points_number / self.number_of_reps, + ), + xticklabels=custom_ticks, + ) + if png: + g.savefig(f"{feature}.png") + return f"Figure {feature}.png saved to the current directory" + else: + return "Done!" + + +t=ProcessMultipleAmberOutputs( input_csvs = glob.glob('./files/amber_csvs/*.csv'), + sim_length=30, + number_of_reps=10, + trajectory_dump_frequency=5000, + timestep=0.002) \ No newline at end of file diff --git a/lbs/pdb_utils/pdb_utils.py b/lbs/pdb_utils/pdb_utils.py new file mode 100644 index 0000000..8d83d10 --- /dev/null +++ b/lbs/pdb_utils/pdb_utils.py @@ -0,0 +1,95 @@ +import os +from typing import Union + +from biopandas.pdb import PandasPdb +import numpy as np + + +class PDBUtils: + def __init__(self, pdb_file: Union[os.PathLike, str]) -> None: + self.pdb_file = pdb_file + assert os.path.exists(pdb_file), f"{pdb_file} does not exist." + + def calculate_distance_between_residues( + self, + residue1: int, + residue2: int, + ligand=False, + residue1_chain: str = "A", + residue2_chain: str = "B", + ) -> float: + """ + Calculate the distance between two residues. + + Parameters: + residue1 (str): The first residue. + residue2 (str): The second residue or ligand + + Returns: + float: The distance between the two residues. + """ + pdb = PandasPdb().read_pdb(self.pdb_file) + residue1 = pdb.df["ATOM"][(pdb.df["ATOM"]["residue_number"] == residue1) & (pdb.df["ATOM"]["chain_id"] == residue1_chain)] + if ligand: + residue2 = pdb.df["HETATM"][pdb.df["HETATM"]["residue_name"] == residue2] + else: + residue2 = pdb.df["ATOM"][(pdb.df["ATOM"]["residue_number"] == residue2) & (pdb.df["ATOM"]["chain_id"] == residue2_chain)] + distance = np.linalg.norm(residue1[['x_coord', 'y_coord', 'z_coord']].values - residue2[['x_coord', 'y_coord', 'z_coord']].values) + return round(distance,1) + + def calculate_distance_between_atoms( + self, atom1: int, atom2: int, ligand=False + ) -> float: + """ + Calculate the distance between two atoms. + + Parameters: + atom1 (str): The first atom. + atom2 (str): The second atom of protein or ligand. + + Returns: + float: The distance between the two atoms. + """ + pdb = PandasPdb().read_pdb(self.pdb_file) + atom1 = pdb.df["ATOM"][pdb.df["ATOM"]["atom_number"] == atom1] + if ligand: + atom2 = pdb.df["HETATM"][pdb.df["HETATM"]["atom_number"] == atom2] + else: + atom2 = pdb.df["ATOM"][pdb.df["ATOM"]["atom_number"] == atom2] + distance = np.linalg.norm(atom1[['x_coord', 'y_coord', 'z_coord']].values - atom2[['x_coord', 'y_coord', 'z_coord']].values) + return round(distance,1) + + def truncate_structure_to_backbone(self): + """ + Truncate the structure to only include the backbone atoms. + + Returns: + PandasPdb: The truncated structure. + """ + pdb = PandasPdb().read_pdb(self.pdb_file) + pdb.df["ATOM"] = pdb.df["ATOM"][ + pdb.df["ATOM"]["atom_name"].isin(["N", "CA", "C", "O"]) + ] + return pdb + + def truncate_structure_by_residue_indices( + self, end: int, start: int = 1, chain: str = "A" + ): + """ + Truncate the structure to only include the selected residues. + + Parameters: + start (int): The start residue index. + end (int): The end residue index. + chain (str): The chain ID. + + Returns: + PandasPdb: The truncated structure. + """ + pdb = PandasPdb().read_pdb(self.pdb_file) + pdb.df["ATOM"] = pdb.df["ATOM"][ + (pdb.df["ATOM"]["residue_number"] >= start) + & (pdb.df["ATOM"]["residue_number"] <= end) + & (pdb.df["ATOM"]["chain_id"] == chain) + ] + return pdb diff --git a/lbs/rosetta/rosetta_parser.py b/lbs/rosetta/rosetta_parser.py new file mode 100644 index 0000000..55b7880 --- /dev/null +++ b/lbs/rosetta/rosetta_parser.py @@ -0,0 +1,125 @@ +import os +import typing + +import pandas as pd +import matplotlib.pyplot as plt +import seaborn as sns + + +import os +import pandas as pd +import typing +import seaborn as sns +import matplotlib.pyplot as plt + + +class RosettaParser: + """ + Class to parse Rosetta output files. + + Parameters: + user_input (Union[os.PathLike, pd.DataFrame]): Either the path to the Rosetta output file (CSV format) + or a pandas DataFrame containing Rosetta results. + + Returns: + None + """ + + def __init__(self, user_input: typing.Union[os.PathLike, pd.DataFrame]): + if isinstance(user_input, pd.DataFrame): + self.df = user_input + + elif isinstance(user_input, str): + assert os.path.exists(user_input), "Invalid path or file does not exist" + + with open(user_input, "r") as f: + if f.readline().split()[0] == "SEQUENCE:": + self.df = pd.read_csv(user_input, sep="\s+", skiprows=1) + else: + self.df = pd.read_csv(user_input, sep="\s+") + assert self.df.shape[0] > 0, "Empty dataframe" + + def read_dataframe(self): + """ + Get the parsed DataFrame. + + Returns: + pd.DataFrame: The parsed DataFrame containing the Rosetta results. + """ + return self.df + + def get_top_n_by_selected_column(self, column: str, n: int = 10): + """ + Get the top 'n' rows based on the values in the specified 'column'. + + Parameters: + column (str): The column name based on which to select the top 'n' rows. + n (int): The number of rows to select (default: 10). + + Returns: + pd.DataFrame: A DataFrame containing the top 'n' rows based on the specified 'column'. + """ + return self.df.sort_values(by=column, ascending=False).head(n) + + def add_column_to_dataframe( + self, column: str, values: typing.Union[typing.List, str, float, int] + ): + """ + Add a new column to the DataFrame. + + Parameters: + column (str): The name of the new column. + values (Union[List, str, float, int]): The values to be added to the new column. + + Returns: + None + """ + self.df[column] = values + + def change_strings_in_selected_column_by_user_input( + self, column: str, original: typing.Union[str, int, float], replacement: str + ): + """ + Replace specific strings in the selected column with the given replacement. + + Parameters: + column (str): The column name where replacement will be performed. + original (Union[str, int, float]): The string or value to be replaced. + replacement (str): The string to replace the original values. + + Returns: + None + """ + if original == "all": + self.df[column] = replacement + else: + self.df[column] = self.df[column].str.replace(str(original), replacement) + + def plot_top_n_by_selected_column(self, column: str, n: int = 10): + """ + Plot a bar chart of the top 'n' rows based on the values in the specified 'column'. + + Parameters: + column (str): The column name based on which to select the top 'n' rows. + n (int): The number of rows to select (default: 10). + + Returns: + None + """ + top_n = self.get_top_n_by_selected_column(column, n) + sns.barplot(x=top_n["description"], y=top_n[column]) + plt.xticks(rotation=90) + plt.show() + + def plot_histogram(self, column: str): + """ + Plot a histogram for the values in the specified column. + + Parameters: + column (str): The column name for which to plot the histogram. + + Returns: + None + """ + sns.histplot(self.df[column]) + plt.show() diff --git a/lbs/rosetta/run_rosetta.py b/lbs/rosetta/run_rosetta.py new file mode 100644 index 0000000..c1d40a3 --- /dev/null +++ b/lbs/rosetta/run_rosetta.py @@ -0,0 +1,80 @@ +# if for some reason one cannot use pyrosetta... + +import os +import subprocess +from typing import List + + +class RunRosetta: + def __init__(self, rosetta_bin_directory: str) -> None: + self.bin_dir = rosetta_bin_directory + assert os.path.isdir( + self.bin_dir + ), f"Rosetta bin directory {self.bin_dir} does not exist" + + def run_fast_relax( + self, input_file: str, out_dir: str, extra_flags: str = "", silent_file: bool = False, + ) -> None: + """ + Run minimisation on a input file, output in the same directory where the input_file is + """ + os.makedirs(out_dir, exist_ok=True) + outpath = out_dir + assert os.path.isfile(input_file), f"File {input_file} does not exist" + + if not silent_file: + command = f"{self.bin_dir}/minimize.linuxgccrelease -s {input_file} -out:path:all {outpath} -out:file:scorefile 'score_minim.sc' -ignore_zero_occupancy false \ + -relax:constrain_relax_to_start_coords -relax:default_repeats 5 -relax:ramp_constraints false -in:file:fullatom -ex1 -ex2 -nstruct 1 {extra_flags}" + + subprocess.run(command, shell=True, check=True) + else: + command = f"{self.bin_dir}/minimize.linuxgccrelease -in:file:silent {input_file} -out:path:all {outpath} -out:file:scorefile 'score_minim.sc' -ignore_zero_occupancy false \ + -relax:constrain_relax_to_start_coords -relax:default_repeats 5 -relax:ramp_constraints false -in:file:fullatom -ex1 -ex2 -nstruct 1 {extra_flags}" + + subprocess.run(command, shell=True, check=True) + + + def run_interface_analysis( + self, + input_file: str, + out_file: str, + interchain_interface: list, + extra_flags: str = "", + silent_file: bool = False, + ) -> None: + """ + Run interface analysis on a pdb file + """ + assert os.path.isfile(input_file), f"File {input_file} does not exist" + + + if not silent_file: + command = f"{self.bin_dir}/InterfaceAnalyzer.linuxgccrelease -s {input_file} -interface {'_'.join(interchain_interface)} -pack_separated {extra_flags} -out:file:score_only -out:file:scorefile {out_file}" + subprocess.run(command, shell=True, check=True) + else: + command = f"{self.bin_dir}/InterfaceAnalyzer.linuxgccrelease -in:file:silent {input_file} -interface {'_'.join(interchain_interface)} -pack_separated {extra_flags} -out:file:score_only -out:file:scorefile {out_file}" + subprocess.run(command, shell=True, check=True) + + def run_residue_energy_breakdown( + self, input_file: str, out_file: str, extra_flags: str, silent_file: bool = False, + ) -> None: + """ + Run decomposition on a input_file + """ + assert os.path.isfile(input_file), f"File {input_file} does not exist" + if not silent_file: + command = f"{self.bin_dir}/residue_energy_breakdown.linuxgccrelease -s {input_file} -out:file:silent {out_file} {extra_flags}" + subprocess.run(command, shell=True, check=True) + else: + command = f"{self.bin_dir}/residue_energy_breakdown.linuxgccrelease -in:file:silent {input_file} -out:file:silent {out_file} {extra_flags}" + subprocess.run(command, shell=True, check=True) + + + def extract_pdbs_by_ids(self, input_silent_file, tags: List[str], out_dir: str) -> None: + """ + Extract pdbs from a silent file by tags + """ + assert os.path.isfile(input_silent_file), f"File {input_silent_file} does not exist" + os.makedirs(out_dir, exist_ok=True) + command = f"{self.bin_dir}/extract_pdbs.linuxgccrelease -in:file:silent {input_silent_file} -tags {' '.join(tags)} -out:prefix {out_dir}/" + subprocess.run(command, shell=True, check=True) \ No newline at end of file diff --git a/lbs/usalign/run_usalign.py b/lbs/usalign/run_usalign.py new file mode 100644 index 0000000..ba46d47 --- /dev/null +++ b/lbs/usalign/run_usalign.py @@ -0,0 +1,142 @@ +import subprocess +import os +from typing import Union + +import os +import subprocess +from typing import Union + + +class RunUSAlign: + def __init__(self, usalign_binary: Union[os.PathLike, str]) -> None: + """ + Initialize the RunUSAlign object. + + Parameters: + usalign_binary (Union[os.PathLike, str]): The path to the USAlign binary executable. + + Returns: + None + """ + self.usalign_binary = usalign_binary + + def run_usalign( + self, + reference_pdb: Union[os.PathLike, str], + target_pdb: Union[os.PathLike, str], + output_filename: str, + mm=1, + ter=0, + outfmt=2, + pymol=False, + matrix=False, + pdb_rank=None, + ) -> int: + """ + Run the USAlign program with the given parameters. + + Parameters: + reference_pdb (Union[os.PathLike, str]): The path to the reference PDB file. + target_pdb (Union[os.PathLike, str]): The path to the target PDB file. + output_filename (str): The name of the output file to save the USAlign results. + mm (int): The mm option for USAlign (default: 1). + ter (int): The ter option for USAlign (default: 0). + outfmt (int): The output format for USAlign results (default: 2). + pymol (bool): Whether to generate a PyMOL session file (default: False). + matrix (bool): Whether to generate a rotation matrix file (default: False). + pdb_rank (Union[None, int]): The rank of the PDB file (default: None). + + Returns: + int: 0 if the analysis ran successfully, 1 otherwise. + """ + print(f"Processing {reference_pdb} with {target_pdb}...") + + target_pdb_parent_directory = os.path.basename(target_pdb) + + assert isinstance(reference_pdb, str) and isinstance( + target_pdb, str + ), "Invalid path or file does not exist" + + if pymol == False: + command = ( + f"{self.usalign_binary} {target_pdb} {reference_pdb} -mm {mm} -ter {ter} -outfmt {outfmt} -o " + f"{os.path.join(target_pdb_parent_directory, f'usalign_{pdb_rank}.txt')} " + f"-m {os.path.join(target_pdb_parent_directory, f'matrix_{pdb_rank}.dat')} " + f">> {output_filename}" + ) + + if matrix == False: + command = ( + f"{self.usalign_binary} {target_pdb} {reference_pdb} -mm {mm} -ter {ter} -outfmt {outfmt} -o " + f"{os.path.join(target_pdb_parent_directory, f'usalign_{pdb_rank}.txt')} " + f"{os.path.join(target_pdb_parent_directory, f'usalign_pymol_{pdb_rank}.pse')} " + f">> {output_filename}" + ) + + if pymol == False and matrix == False: + command = ( + f"{self.usalign_binary} {target_pdb} {reference_pdb} -mm {mm} -ter {ter} -outfmt {outfmt} -o " + f"{os.path.join(target_pdb_parent_directory, f'usalign_{pdb_rank}.dat')} " + f">> {output_filename}" + ) + + if pymol == True and matrix == False: + command = ( + f"{self.usalign_binary} {target_pdb} {reference_pdb} -mm {mm} -ter {ter} -outfmt {outfmt} -o " + f"{os.path.join(target_pdb_parent_directory, f'usalign_pymol_{pdb_rank}.pse')} " + f"-m {os.path.join(target_pdb_parent_directory, f'matrix_{pdb_rank}.dat')} " + f">> {output_filename}" + ) + + result = subprocess.run(command, shell=True) + print(command) + if result.returncode == 0: + print("Analysis ran successfully") + return 0 + else: + with open("usalign_errors.txt", "a+") as fout: + print("Analysis encountered an error") + fout.write(f"{reference_pdb} FAILED WITH {target_pdb}\n") + return 1 + + def generate_slurm_job_input(self, + reference_pdb: Union[os.PathLike, str], + target_pdb_paths: Union[os.PathLike, str, list], + input_filelist: str, + input_batch_file: str, + output_filename: str, + mm=1, + ter=0, + outfmt=2, + nodes_excluded=[0,1,2,3], + memory=8, + ) -> str: + + assert isinstance(reference_pdb, str), "Invalid path or file does not exist" + + command = ( f"for pdb in $(cat {input_filelist}.txt); do " + + f"{self.usalign_binary} $pdb {reference_pdb} -mm {mm} -ter {ter} -outfmt {outfmt} -o >> {output_filename}" + ) + + if os.path.exists(input_filelist): + with open(input_batch_file,'w+') as batch_in: + batch_in.write( + f"#!/bin/bash \n\n#SBATCH -p cpu\n#SBATCH -n 1\n#SBATCH -N 1\n#SBATCH --exclude=edi0{nodes_excluded}\n#SBATCH --mem={memory}GB\n#SBATCH -J {output_filename}\n\n" + ) + batch_in.write(command) + return f"{input_batch_file} generated" + + else: + with open(input_batch_file,'w+') as batch_in: + batch_in.write( + f"#!/bin/bash \n\n#SBATCH -p cpu\n#SBATCH -n 1\n#SBATCH -N 1\n#SBATCH --exclude=edi0{nodes_excluded}\n#SBATCH --mem={memory}GB\n#SBATCH -J {output_filename}\n\n" + ) + batch_in.write(command) + with open(input_filelist+'txt', 'w') as fin: + for pdb in target_pdb_paths: + fin.write(pdb + '\n') + + + return f"{input_batch_file} and {input_filelist}.txt generated" + +# RunUSAlign("/home/nfs/rmadaj/bins/usalign/USalign").generate_slurm_job_input('test.pdb', ['test1.pdb', 'test2.pdb'], 'test', 'test.sh', 'test.txt') \ No newline at end of file diff --git a/lbs/usalign/usalign_parser.py b/lbs/usalign/usalign_parser.py new file mode 100644 index 0000000..3fd87d3 --- /dev/null +++ b/lbs/usalign/usalign_parser.py @@ -0,0 +1,93 @@ +import os +from typing import Union + +import pandas as pd + +import os +import pandas as pd +from typing import Union + + +class USalign_parser: + def __init__(self, user_input: Union[str, pd.DataFrame]) -> None: + """ + Initialize the USalign_parser object. + + Parameters: + user_input (Union[str, pd.DataFrame]): Either the path to the USAlign output file (CSV format) + or a pandas DataFrame containing USAlign results. + + Returns: + None + """ + if isinstance(user_input, str): + assert os.path.exists(user_input), "Invalid path or file does not exist" + # load csv, drop all lines starting with # + + + + self.df = pd.read_csv( + user_input, + sep="\s+", + comment="#", + skip_blank_lines=True, + names=[ + "target", + "template", + "tm1", + "tm2", + "rmsd", + "id1", + "id2", + "idali", + "docked_seqlength", + "template_seqlength", + "aligned_length", + ], + ).reset_index(drop=True) + assert self.df.shape[0] > 0, "Empty dataframe" + elif isinstance(user_input, pd.DataFrame): + self.df = user_input + + def read_usalign_output(self): + """ + Process the USAlign output DataFrame and extract relevant information. + + Returns: + pd.DataFrame: The processed DataFrame with additional columns. + """ + self.df["target_path"] = self.df["target"].apply(lambda x: x.split(':')[0]) + self.df["target"] = ( + self.df["target"].str.split("/").str[-1].str.split(".").str[0] + ) + + return self.df + + def get_top_n_by_selected_column( + self, column: str, n: int = 10, ascending: bool = False + ): + """ + Get the top or bottom 'n' rows based on the values in the specified 'column'. + + Parameters: + column (str): The column name based on which to select the top or bottom 'n' rows. + n (int): The number of rows to select (default: 10). + ascending (bool): If True, select the top 'n' rows; otherwise, select the bottom 'n' rows (default: False). + + Returns: + pd.DataFrame: A DataFrame containing the top or bottom 'n' rows based on the specified 'column'. + """ + return self.df.sort_values(by=column, ascending=ascending).head(n) + + def add_column(self, column: str, value: Union[str, int, float]): + """ + Add a column to the DataFrame with the specified value. + + Parameters: + column (str): The name of the column to add. + value (str): The value to add to the column. + + Returns: + None + """ + self.df[column] = value \ No newline at end of file diff --git a/protein_analysis_tools b/protein_analysis_tools deleted file mode 160000 index dbb813b..0000000 --- a/protein_analysis_tools +++ /dev/null @@ -1 +0,0 @@ -Subproject commit dbb813be6ce9e25493d98e2033d6335461825d35