Skip to content
Open
15 changes: 7 additions & 8 deletions Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ checkpoint prescore:
prescored=temp("{file}.pre.tsv"),
log:
"{file}.prescore.log",
threads: workflow.cores,
params:
cadd=os.environ["CADD"],
shell:
Expand All @@ -85,20 +86,18 @@ checkpoint prescore:
echo '## Prescored variant file' > {output.prescored} 2> {log};
PRESCORED_FILES=`find -L {input.prescored} -maxdepth 1 -type f -name \\*.tsv.gz | wc -l`
cp {input.vcf} {input.vcf}.new
if [ ${{PRESCORED_FILES}} -gt 0 ];
then
for PRESCORED in $(ls {input.prescored}/*.tsv.gz)
do
if [ ${{PRESCORED_FILES}} -gt 0 ]; then
for PRESCORED in $(ls {input.prescored}/*.tsv.gz); do
cat {input.vcf}.new \
| python {params.cadd}/src/scripts/extract_scored.py --header \
-p $PRESCORED --found_out={output.prescored}.tmp \
-p $PRESCORED --found_out={output.prescored}.tmp --threads {threads} \
> {input.vcf}.tmp 2>> {log};
cat {output.prescored}.tmp >> {output.prescored}
mv {input.vcf}.tmp {input.vcf}.new &> {log};
mv {input.vcf}.tmp {input.vcf}.new 2>> {log};
done;
rm {output.prescored}.tmp &>> {log}
rm {output.prescored}.tmp 2>> {log}
fi
mv {input.vcf}.new {output.novel} &>> {log}
mv {input.vcf}.new {output.novel} 2>> {log}
"""


Expand Down
309 changes: 247 additions & 62 deletions src/scripts/extract_scored.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,254 @@
import os
import pysam
from optparse import OptionParser
import multiprocessing as mp

parser = OptionParser()
parser.add_option("-p", "--path", dest="path", help="Path to scored variants.")
parser.add_option("-i", "--input", dest="input", help="Read variants from vcf file (default stdin)", default=None)
parser.add_option("--found_out", dest="found_out", help="Write found variants to file (default: stdout)", default=None)
parser.add_option("--header", dest="header", help="Write full header to output (default none)",
default=False, action="store_true")
(options, args) = parser.parse_args()

if options.input:
stdin = open(options.input, 'r')
else:
stdin = sys.stdin

if options.found_out:
found_out = open(options.found_out, 'w')
else:
found_out = sys.stdout

fpos, fref, falt = 1, 2, 3
if os.path.exists(options.path) and os.path.exists(options.path+".tbi"):
filename = options.path
sys.stderr.write("Opening %s...\n" % (filename))
regionTabix = pysam.Tabixfile(filename, 'r')
header = list(regionTabix.header)
for line in header:
if options.header:
found_out.write(line+"\n")

def buffer_vcf_by_chromosome(input_stream):
"""Read VCF from input stream and buffer by chromosome"""
vcf_by_chrom = {}
header_lines = []

for line in input_stream:
if line.startswith('#'):
header_lines.append(line)
continue

fields = line.strip().split('\t')
chrom = fields[0]

if chrom not in vcf_by_chrom:
vcf_by_chrom[chrom] = []
vcf_by_chrom[chrom].append(line)

return header_lines, vcf_by_chrom


def setup_output_dir(output_base, chrom):
"""Create chromosome-specific output directory"""
chrom_dir = os.path.join(output_base, chrom)
try:
os.makedirs(chrom_dir)
except OSError:
if not os.path.isdir(chrom_dir):
raise
return chrom_dir


def extract_prescored_chromosome(input_file, output_base, chrom):
"""Extract records for a single chromosome from prescored TSV file"""
try:
# Setup output directory
chrom_dir = setup_output_dir(output_base, chrom)
input_file_name = os.path.basename(input_file)
input_file_name_base = input_file_name.replace(".tsv.gz", "")
assert input_file_name_base != input_file_name, "The input file name {0} is not valid".format(input_file_name)
output_file = os.path.join(chrom_dir, "{0}.{1}.tsv".format(input_file_name_base, chrom))
compressed_file = "{0}.gz".format(output_file)

# Check if extraction is needed
if os.path.exists(compressed_file):
if os.path.getmtime(compressed_file) > os.path.getmtime(input_file):
if os.path.exists(compressed_file + ".tbi"):
if os.path.getmtime(compressed_file + ".tbi") > os.path.getmtime(compressed_file):
sys.stderr.write("The prescored file {0} for chromosome {1} is up to date, skip the extraction\n".format(compressed_file, chrom))
return compressed_file
else:
tabix_only=True
else:
tabix_only=True

if tabix_only:
pysam.tabix_index(compressed_file,
preset=None,
force=True,
seq_col=0,
start_col=1,
end_col=1,
zerobased=False)
return compressed_file

# Extract records for this chromosome using tabix
tbx = pysam.TabixFile(input_file)
with open(output_file, 'w') as f:
for row in tbx.fetch(chrom):
f.write("{0}\n".format(row))

# Compress and index the output file
pysam.tabix_compress(output_file, compressed_file, force=True)
pysam.tabix_index(compressed_file,
preset=None,
force=True,
seq_col=0,
start_col=1,
end_col=1,
zerobased=False)

# Remove uncompressed file
os.remove(output_file)
sys.stderr.write("The prescored file {0} for chromosome {1} is extracted\n".format(compressed_file, chrom))
return compressed_file

except Exception as e:
raise Exception("Error extracting prescored chromosome {0}: {1}".format(chrom, str(e)))


def process_chromosome(args):
"""Process a single chromosome"""
chrom, vcf_lines, prescored_file, temp_dir, fpos, fref, falt = args
try:
# First extract prescored records for this chromosome
prescored_chrom_file = extract_prescored_chromosome(
prescored_file,
os.path.dirname(prescored_file),
chrom
)

# Setup output files for this chromosome
found_file = os.path.join(temp_dir, "matches", "found.{0}.tmp".format(chrom))
notfound_file = os.path.join(temp_dir, "matches", "notfound.{0}.tmp".format(chrom))
try:
os.makedirs(os.path.dirname(found_file))
except OSError:
if not os.path.isdir(os.path.dirname(found_file)):
raise

if prescored_chrom_file is None:
# Create empty found file and output all records to notfound file
with open(notfound_file, 'w') as f_notfound:
for line in vcf_lines:
f_notfound.write(line)
return chrom, True

# Open prescored tabix file
pre_tbx = pysam.TabixFile(prescored_chrom_file)

with open(found_file, 'w') as f_found, open(notfound_file, 'w') as f_notfound:
# Process each variant
for line in vcf_lines:
fields = line.strip().split('\t')
pos = int(fields[1])
lref, allele = fields[-2], fields[-1].strip()
found = False

# Look for matches in prescored file
for pre_line in pre_tbx.fetch(chrom, pos-1, pos):
vfields = pre_line.rstrip().split('\t')
if (vfields[fref] == lref) and (vfields[falt] == allele) and (vfields[fpos] == fields[1]):
f_found.write(pre_line + '\n')
found = True
break

if not found:
f_notfound.write(line)

return chrom, True
except Exception as e:
sys.stderr.write('Error processing chromosome {0}: {1}\n'.format(chrom, str(e)))
return chrom, False


def main():
parser = OptionParser()
parser.add_option("-p", "--path", dest="path", help="Path to scored variants.")
parser.add_option("-i", "--input", dest="input", help="Read variants from vcf file (default stdin)", default=None)
parser.add_option("--found_out", dest="found_out", help="Write found variants to file (default: stdout)", default=None)
parser.add_option("--header", dest="header", help="Write full header to output (default none)",
default=False, action="store_true")
parser.add_option("-t", "--threads", dest="threads", help="Number of threads to use (default: 1)", default=1)
(options, args) = parser.parse_args()

# Setup input stream
input_stream = sys.stdin
if options.input and options.input != "-":
try:
fref = line.split('\t').index('Ref')
falt = line.split('\t').index('Alt')
except ValueError:
pass
else:
raise IOError("No valid file with pre-scored variants.\n")

for line in stdin:
line = line.rstrip('\n\r')
if line.startswith('#'):
sys.stdout.write(line + '\n')
continue
input_stream = open(options.input, 'r')
except IOError as e:
sys.stderr.write("Error opening input file: {0}\n".format(str(e)))
sys.exit(1)

# Setup output stream
found_out = open(options.found_out, 'w') if options.found_out else sys.stdout

try:
fields = line.split('\t')
found = False
chrom = fields[0]
pos = int(fields[1])
lref, allele = fields[-2], fields[-1]
for regionHit in regionTabix.fetch(chrom, pos-1, pos):
vfields = regionHit.rstrip().split('\t')
if (vfields[fref] == lref) and (vfields[falt] == allele) and (vfields[fpos] == fields[1]):
found_out.write(regionHit+"\n")
found = True

if not found:
sys.stdout.write(line + '\n')

except ValueError:
sys.stderr.write('Encountered uncovered chromosome\n')
sys.stdout.write(line + '\n')

if options.input:
stdin.close()

if options.found_out:
found_out.close()
# Initialize column indices
fpos, fref, falt = 1, 2, 3

# Check prescored file
if not (os.path.exists(options.path) and os.path.exists(options.path+".tbi")):
raise IOError("No valid file with pre-scored variants.\n")

# Get header and column indices from prescored file
pre_tbx = pysam.TabixFile(options.path, 'r')
header = list(pre_tbx.header)

# Write headers to output files if requested
if options.header:
for line in header:
found_out.write(line+"\n")

# Get column indices from header
for line in header:
try:
fref = line.split('\t').index('Ref')
falt = line.split('\t').index('Alt')
except ValueError:
pass

# Buffer VCF data and get chromosomes
header_lines, vcf_by_chrom = buffer_vcf_by_chromosome(input_stream)
chromosomes = sorted(vcf_by_chrom.keys())
sys.stderr.write("The chromosomes are {0}\n".format(chromosomes))
sys.stderr.write("There are in total {} lines of records got from the buffer of the input VCF file\n".format(sum([len(vcf_by_chrom[chrom]) for chrom in chromosomes])))

# Write VCF headers to stdout
for line in header_lines:
sys.stdout.write(line)

# Get number of threads from Snakemake
threads = min(options.threads, len(chromosomes))
sys.stderr.write("Using {0} threads to extract the scored variants across all chromosomes\n".format(threads))

temp_dir = os.environ.get("TMPDIR", "/tmp")

# Setup parallel processing args
process_args = [
(chrom, vcf_by_chrom[chrom], options.path, temp_dir, fpos, fref, falt)
for chrom in chromosomes
]

# Process chromosomes in parallel
pool = mp.Pool(threads)
results = pool.map(process_chromosome, process_args)
pool.close()
pool.join()

# Combine results
for chrom, success in results:
if success:
found_file = os.path.join(temp_dir, "matches", "found.{0}.tmp".format(chrom))
notfound_file = os.path.join(temp_dir, "matches", "notfound.{0}.tmp".format(chrom))

if os.path.exists(found_file):
with open(found_file) as f:
for line in f:
found_out.write(line)
os.remove(found_file)

if os.path.exists(notfound_file):
with open(notfound_file) as f:
for line in f:
sys.stdout.write(line)
os.remove(notfound_file)

finally:
# Close input file if it's not stdin
if options.input and options.input != "-":
input_stream.close()

# Close output file if it's not stdout
if options.found_out:
found_out.close()

if __name__ == "__main__":
main()
Loading