Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "protein_analysis_tools"]
path = protein_analysis_tools
url = https://github.com/Rmadeye/protein_analysis_tools
66 changes: 66 additions & 0 deletions lbs/af2/af2_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import json
import os
from statistics import mean
import re

import numpy as np
import pandas as pd
from typing import Union, List

class AF2Analysis:


def __init__(self, json_output: Union[str, List]) -> None:
"""
Initialize the AF2_analysis object.

Parameters:
json_output (Union[str, List]): Either the path to the AF2 JSON output file or a list of JSON files.

Returns:
None
"""
self.json_output = json_output
if isinstance(self.json_output, str):
self.json_output = [json_output]
elif isinstance(self.json_output, list):
self.json_output = json_output


def read_json(self) -> pd.DataFrame:
"""
Read the AF2 JSON output file.

Parameters:
None

Returns:
pd.DataFrame: The DataFrame containing the AF2 JSON output data.
"""
df = pd.DataFrame()
for _json in self.json_output:

with open(_json) as f:
_file = f.read()
parsed = json.loads(_file)

plddt = mean(parsed['plddt'])
ptm = float(parsed['ptm'])
pae = np.asarray(parsed['pae'], dtype='float')
pae = (pae + pae.T) / 2
pdb_file = _json.replace('.json', '.pdb').replace('scores','unrelaxed')
rank = re.findall('_rank_(\d{3})_',_json)
pae = np.asarray(parsed['pae'], dtype='float')
pae = (pae + pae.T) / 2

j_df = pd.DataFrame({
'json': [_json],
'plddt': [plddt],
'ptm': [ptm],
'pae': [pae],
'pdb_file': [pdb_file],
'rank': [rank][0]
})
df = pd.concat([df,j_df], axis=0, ignore_index=True)

return df
158 changes: 158 additions & 0 deletions lbs/af2/af2_input_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import os
from typing import List, Tuple, Union
import math
import re

class AF2InputGenerator:
def __init__(
self, input_fasta: Union[str, List], output_dir: Union[os.PathLike, str]
) -> None:
"""
Initialize the AF2_input_generator object.

Parameters:
input_fasta (Union[str, List]): Either the path to the input FASTA, A3M file or a list of FASTA sequences.
output_dir (Union[os.PathLike, str]): The directory where the AF2 input files will be written.

Returns:
None
"""


self.input_fasta = input_fasta
self.output_dir = output_dir
if isinstance(self.input_fasta, str):
self.input_fasta = input_fasta
elif isinstance(self.input_fasta, list):
self.input_fasta = ":".join(self.input_fasta)

def check_sequence(self) -> None:
"""
Check if the input sequence is a valid protein sequence.

Parameters:
None

Returns:
None

Raises:
ValueError: If the input sequence contains invalid characters or is empty.
"""
sequence = self.input_fasta.replace(" ", "")
valid_chars = re.compile(r"^[ACDEFGHIKLMNPQRSTVWY:]+$")
if not valid_chars.match(sequence):
raise ValueError("Invalid protein sequence. It contains invalid characters.")
if not sequence:
raise ValueError("Invalid protein sequence. It is empty.")

def write_af2_inputs(self, oligo_state=1) -> str:
"""
Write AF2 input files.

Parameters:
oligo_state (int): The number of oligo states.

Returns:
None
"""
os.makedirs(self.output_dir, exist_ok=True)

if self.input_fasta.endswith(".a3m"):
# copy file to output dir
os.system(f"cp {self.input_fasta} {self.output_dir}")

return "a3m detected, copying file to output dir"

else:
self.check_sequence()

output_name = (
self.output_dir.split("/")[-1]
if "/" in self.output_dir
else self.output_dir
)
if oligo_state > 1:
self.input_fasta = ":".join([self.input_fasta] * oligo_state)

with open(os.path.join(self.output_dir, "af2input_fasta.csv"), "w") as fout:
fout.write("id,sequence\n")
fout.write(f"{output_name},{self.input_fasta}")

return "fasta detected, writing af2_input.fasta"

def write_af2_exec(
self,
n_cores=7,
amber=True,
num_models=5,
num_recycles=5,
nodes_excluded=[0,1,2,3],
memory=16,
use_dropout=False,
num_seeds=1,
max_seqs=False,
max_extra_seqs=False
) -> None:
"""
Write the AF2 execution script.

Parameters:
n_cores (int): The number of CPU cores to use.
amber (bool): Whether to use Amber relaxation (True) or not (False).
num_models (int): The number of models to generate.
num_recycles (int): The number of recycle steps.
nodes_excluded (List[int]): List of node numbers to exclude.
memory (int): The memory (in GB) required for the execution.
use_dropout (bool): Whether to use dropout for models (True) or not (False).
num_seeds (int): The number of random seeds to use.
max_seqs (int): The maximum number of sequences (should be a power of 2).
max_extra_seqs (int): The maximum number of extra sequences.


Returns:
None
"""
if amber:
amber = " --amber"
else:
amber = ""
if use_dropout:
use_dropout = " --use-dropout"
else:
use_dropout = ""
if max_seqs:
if not (math.log2(max_seqs).is_integer()):
raise ValueError("max_seq must be a power of 2.")

max_seqs = f" --max-seqs {max_seqs}"
else:
max_seqs = ""
if max_extra_seqs:
if not (math.log2(max_seqs).is_integer()):
raise ValueError("max_extra_seqs must be a power of 2.")
max_extra_seqs = f" --max-extra-seq {max_extra_seqs}"
else:
max_extra_seqs = ""

# remove whitespace from nodes_excluded




nodes_excluded = str(nodes_excluded).replace(" ", "")



with open(os.path.join(self.output_dir, "af2_run.sh"), "w") as af2exec:
af2exec.write(
f"#!/bin/bash \n\n#SBATCH -p gpu\n#SBATCH -n {n_cores}\n#SBATCH -N 1\n#SBATCH --gres=gpu:1\n#SBATCH --exclude=edi0{nodes_excluded}\n#SBATCH --mem={memory}GB\n#SBATCH -J {self.output_dir}\nsource /opt/miniconda3/bin/activate cf_1.5\n"
) # necessary imports for AF2 to run
if self.input_fasta.endswith(".a3m"):
af2exec.write(
f"colabfold_batch *.a3m . --num-models {num_models} --num-recycle {num_recycles} {amber} {use_dropout} --num-seeds {num_seeds} {max_seqs} {max_extra_seqs}"
)
else:
af2exec.write(
f"colabfold_batch af2input_fasta.csv . --num-models {num_models} --num-recycle {num_recycles} {amber} {use_dropout} --num-seeds {num_seeds} {max_seqs} {max_extra_seqs}"
)
77 changes: 77 additions & 0 deletions lbs/amber/process_amber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import os
import pandas as pd


class ProcessAmber:
"""
Class to process Amber simulation data from a CSV file.

Parameters:
input_csv (str): The path to the CSV file containing Amber simulation data.
sim_length (int): The length of the simulation in nanoseconds (ns).
number_of_reps (int): The number of simulation replicates.
trajectory_dump_frequency (int, optional): The frequency at which trajectory frames were saved (default is 5000).
timestep (float, optional): The timestep used in the simulation in femtoseconds (fs) (default is 0.002).

Raises:
AssertionError: If the input CSV file does not exist, has fewer than 2 columns,
or the number of rows is not divisible by the number of replicates,
or if the number of rows in the CSV is not consistent with the simulation parameters.

Returns:
None
"""

def __init__(
self,
input_csv: str,
sim_length: int,
number_of_reps: int,
trajectory_dump_frequency: int = 5000,
timestep: float = 0.002,
) -> None:
self.sim_length = sim_length
self.number_of_reps = number_of_reps
self.df = pd.read_csv(input_csv, sep="\s+")
# assert input csv exists
assert os.path.exists(input_csv), "Input csv does not exist"
# assert that number of columns in csv is higher than 2
assert self.df.shape[1] >= 2, "Number of columns in csv is less than 2"
# assert that number of rows in csv divided by number of reps is an integer
assert (
self.df.shape[0] % self.number_of_reps == 0
), "Number of rows in csv is not divisible by number of reps, check sim length"
# assert that sim_length / timestep * 1000 * number_of_reps + number_of_reps is equal to number of rows in csv
frame_numer_expected = (
self.sim_length / timestep
) * 1000 / trajectory_dump_frequency * self.number_of_reps + 10
assert (
frame_numer_expected == self.df.shape[0]
), f"Number of rows in csv is not equal to sim_length / timestep * 1000 * number_of_reps + number_of_reps in {input_csv}. {frame_numer_expected} != {self.df.shape[0]}"

def read_dataframe(self) -> pd.DataFrame:
"""
Get the DataFrame containing Amber simulation data.

Returns:
pd.DataFrame: The DataFrame containing Amber simulation data.
"""
return self.df

def obtain_statistics_from_df(self) -> pd.DataFrame:
"""
Obtain statistics from the DataFrame.

Returns:
pd.DataFrame: A DataFrame containing the mean and standard deviation of each column in the original DataFrame.
"""
# Drop #Frame column
self.df = self.df.drop("#Frame", axis=1)
# Obtain mean and standard deviation from the DataFrame by pivoting the columns
self.df_mean = self.df.mean(axis=0)
self.df_std = self.df.std(axis=0)
# Combine both DataFrames into one
self.df_mean = pd.concat([self.df_mean, self.df_std], axis=1)
self.df_mean.columns = ["mean", "std"]

return self.df_mean
Loading