44from types import SimpleNamespace
55
66import count_nucleotides_and_seqs
7+ from pyfixtures import fixture
78from virtool_core .utils import compress_file , decompress_file , is_gzipped
89from virtool_workflow import hooks , step
9- from virtool_workflow .api .subtractions import SubtractionProvider
10+ from virtool_workflow .data .subtractions import WFNewSubtraction
1011from virtool_workflow .runtime .run_subprocess import RunSubprocess
1112
1213
1314@hooks .on_failure
14- async def delete_subtraction (subtraction_provider : SubtractionProvider ):
15+ async def delete_subtraction (new_subtraction : WFNewSubtraction ):
1516 """Delete the subtraction in the case of a failure."""
16- await subtraction_provider .delete ()
17+ await new_subtraction .delete ()
1718
1819
19- @step
20+ @fixture
21+ async def bowtie_index_path (work_path : Path ) -> Path :
22+ """The output directory for the subtraction's Bowtie2 index."""
23+ path = work_path / "bowtie"
24+ await asyncio .to_thread (path .mkdir )
25+
26+ return path
27+
28+
29+ @fixture
30+ async def decompressed_fasta_path (work_path : Path ) -> Path :
31+ """The path to the input FASTA file for the subtraction."""
32+ return work_path / "subtraction.fa"
33+
34+
35+ @fixture
36+ def intermediate () -> SimpleNamespace :
37+ """A namespace for intermediate variables."""
38+ return SimpleNamespace ()
39+
40+
41+ @step (name = "Decompress FASTA" )
2042async def decompress (
21- fasta_path : Path ,
22- input_path : Path ,
43+ decompressed_fasta_path : Path ,
44+ new_subtraction : WFNewSubtraction ,
45+ proc : int ,
2346):
24- """
25- Ensure the input FASTA data is decompressed.
26-
27- """
28- if is_gzipped (input_path ):
47+ """Ensure the input FASTA data is decompressed."""
48+ if await asyncio .to_thread (is_gzipped , new_subtraction .fasta_path ):
2949 await asyncio .to_thread (
3050 decompress_file ,
31- input_path ,
32- fasta_path ,
51+ new_subtraction .fasta_path ,
52+ decompressed_fasta_path ,
53+ processes = proc ,
3354 )
3455 else :
35- await asyncio .to_thread (shutil .copyfile , input_path , fasta_path )
56+ await asyncio .to_thread (
57+ shutil .copyfile , new_subtraction .fasta_path , decompressed_fasta_path
58+ )
3659
3760
38- @step (name = "Compute GC and count" )
39- async def compute_gc_and_count (fasta_path : Path , intermediate : SimpleNamespace ):
61+ @step (name = "Compute GC and Count" )
62+ async def compute_gc_and_count (
63+ decompressed_fasta_path : Path , intermediate : SimpleNamespace
64+ ):
4065 """Compute the GC and count."""
41- a , t , g , c , n , count = count_nucleotides_and_seqs .run (str (fasta_path ))
66+ a , t , g , c , n , count = count_nucleotides_and_seqs .run (str (decompressed_fasta_path ))
4267
4368 nucleotides = {
4469 "a" : int (a ),
@@ -59,50 +84,46 @@ async def compute_gc_and_count(fasta_path: Path, intermediate: SimpleNamespace):
5984
6085@step
6186async def build_index (
62- fasta_path : Path ,
63- intermediate : SimpleNamespace ,
87+ bowtie_index_path : Path ,
88+ decompressed_fasta_path : Path ,
6489 proc : int ,
6590 run_subprocess : RunSubprocess ,
66- work_path : Path ,
6791):
6892 """Build a Bowtie2 index."""
69- bowtie_path = work_path / "index-build"
70- bowtie_path .mkdir ()
71-
72- command = [
73- "bowtie2-build" ,
74- "-f" ,
75- "--threads" ,
76- str (proc ),
77- str (fasta_path ),
78- str (bowtie_path ) + "/subtraction" ,
79- ]
80-
81- await run_subprocess (command )
82-
83- intermediate .bowtie_path = bowtie_path
93+ await run_subprocess (
94+ [
95+ "bowtie2-build" ,
96+ "-f" ,
97+ "--threads" ,
98+ str (proc ),
99+ decompressed_fasta_path ,
100+ str (bowtie_index_path ) + "/subtraction" ,
101+ ]
102+ )
84103
85104
86105@step
87106async def finalize (
88- fasta_path : Path ,
107+ bowtie_index_path : Path ,
108+ decompressed_fasta_path : Path ,
89109 intermediate : SimpleNamespace ,
110+ new_subtraction : WFNewSubtraction ,
90111 proc : int ,
91- subtraction_provider : SubtractionProvider ,
112+ work_path : Path ,
92113):
93114 """Compress and subtraction data."""
94- compressed_path = fasta_path . parent / "subtraction.fa.gz"
115+ compressed_path = work_path / "subtraction.fa.gz"
95116
96117 await asyncio .to_thread (
97118 compress_file ,
98- fasta_path ,
119+ decompressed_fasta_path ,
99120 compressed_path ,
100- proc ,
121+ processes = proc ,
101122 )
102123
103- await subtraction_provider .upload (compressed_path )
124+ await new_subtraction .upload (compressed_path )
104125
105- for path in intermediate . bowtie_path .glob ("*.bt2" ):
106- await subtraction_provider .upload (path )
126+ for path in bowtie_index_path .glob ("*.bt2" ):
127+ await new_subtraction .upload (path )
107128
108- await subtraction_provider .finalize (intermediate .gc , intermediate .count )
129+ await new_subtraction .finalize (intermediate .gc , intermediate .count )
0 commit comments