Skip to content
Merged
24 changes: 23 additions & 1 deletion v03_pipeline/api/app_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,28 @@ async def test_loading_pipeline_invalid_requests(self):
'callset_path must point to a file that exists' in log.output[0],
)

body = {
'callset_path': CALLSET_PATH,
'project_guids': ['project_a'],
'sample_type': SampleType.WGS.value,
'reference_genome': ReferenceGenome.GRCh38.value,
'dataset_type': DatasetType.SNV_INDEL.value,
'validations_to_skip': ['bad_validation'],
}
with self.assertLogs(level='ERROR') as log:
async with self.client.request(
'POST',
'/loading_pipeline_enqueue',
json=body,
) as resp:
self.assertEqual(
resp.status,
web_exceptions.HTTPBadRequest.status_code,
)
self.assertTrue(
"input_value='bad_validation" in log.output[0],
)

async def test_loading_pipeline_enqueue(self):
body = {
'callset_path': CALLSET_PATH,
Expand Down Expand Up @@ -96,9 +118,9 @@ async def test_loading_pipeline_enqueue(self):
'reference_genome': 'GRCh38',
'sample_type': 'WGS',
'skip_check_sex_and_relatedness': False,
'skip_validation': False,
'skip_expect_tdr_metrics': False,
'attempt_id': 0,
'validations_to_skip': [],
},
},
)
Expand Down
35 changes: 33 additions & 2 deletions v03_pipeline/api/model.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
from typing import Literal

import hailtop.fs as hfs
from pydantic import AliasChoices, BaseModel, Field, conint, field_validator
from pydantic import (
AliasChoices,
BaseModel,
Field,
conint,
field_validator,
root_validator,
)

from v03_pipeline.lib.core import DatasetType, ReferenceGenome, SampleType
from v03_pipeline.lib.misc.validation import ALL_VALIDATIONS, SKIPPABLE_VALIDATIONS

MAX_LOADING_PIPELINE_ATTEMPTS = 3
STRINGIFIED_SKIPPABLE_VALIDATIONS = [f.__name__ for f in SKIPPABLE_VALIDATIONS]
VALID_FILE_TYPES = ['vcf', 'vcf.gz', 'vcf.bgz', 'mt']


Expand All @@ -26,16 +37,36 @@ class LoadingPipelineRequest(PipelineRunnerRequest):
sample_type: SampleType
reference_genome: ReferenceGenome
dataset_type: DatasetType
skip_validation: bool = False
skip_check_sex_and_relatedness: bool = False
skip_expect_tdr_metrics: bool = False
validations_to_skip: list[Literal[*STRINGIFIED_SKIPPABLE_VALIDATIONS]] = []

def incr_attempt(self):
if self.attempt_id == (MAX_LOADING_PIPELINE_ATTEMPTS - 1):
return False
self.attempt_id += 1
return True

@field_validator('validations_to_skip')
@classmethod
def must_be_known_validation(cls, validations_to_skip):
for v in validations_to_skip:
if v not in set(STRINGIFIED_SKIPPABLE_VALIDATIONS):
msg = f'{v} is not a valid validator'
raise ValueError(msg)
return validations_to_skip

@root_validator(
pre=True,
) # the root validator runs before Pydantic parses or coerces field values.
@classmethod
def override_all_validations(cls, values):
if values.get('validations_to_skip') == [
ALL_VALIDATIONS,
]:
values['validations_to_skip'] = STRINGIFIED_SKIPPABLE_VALIDATIONS
return values

@field_validator('callset_path')
@classmethod
def check_valid_callset_path(cls, callset_path: str) -> str:
Expand Down
29 changes: 29 additions & 0 deletions v03_pipeline/api/model_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,35 @@ def test_invalid_loading_pipeline_requests(self) -> None:
),
)

def test_validations_to_skip(self) -> None:
shared_params = {
'callset_path': CALLSET_PATH,
'projects_to_run': ['project_a'],
'sample_type': SampleType.WGS.value,
'reference_genome': ReferenceGenome.GRCh38.value,
'dataset_type': DatasetType.SNV_INDEL.value,
}
raw_request = {
**shared_params,
'validations_to_skip': ['all'],
}
lpr = LoadingPipelineRequest.model_validate(raw_request)
self.assertGreater(len(lpr.validations_to_skip), 2)

raw_request = {
**shared_params,
'validations_to_skip': ['validate_sample_type'],
}
lpr = LoadingPipelineRequest.model_validate(raw_request)
self.assertEqual(len(lpr.validations_to_skip), 1)

raw_request = {
**shared_params,
'validations_to_skip': ['validate_blended_exome'],
}
with self.assertRaises(ValueError):
LoadingPipelineRequest.model_validate(raw_request)

def test_delete_families_request(self) -> None:
raw_request = {'project_guid': 'project_a', 'family_guids': []}
with self.assertRaises(ValueError):
Expand Down
6 changes: 3 additions & 3 deletions v03_pipeline/bin/pipeline_worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def test_process_queue_integration_test(
'sample_type': SampleType.WGS.value,
'reference_genome': ReferenceGenome.GRCh38.value,
'dataset_type': DatasetType.SNV_INDEL.value,
'skip_validation': True,
'validations_to_skip': ['all'],
}
os.makedirs(
loading_pipeline_queue_dir(),
Expand All @@ -192,7 +192,7 @@ def test_process_queue_integration_test(
json.dump(raw_request, f)
process_queue(local_scheduler=True)
mock_safe_post_to_slack.assert_called_once_with(
':white_check_mark: Pipeline Runner Request Success! :white_check_mark:\nRun ID: 20250916-200704-123456\n```{\n "attempt_id": 0,\n "callset_path": "v03_pipeline/var/test/callsets/1kg_30variants.vcf",\n "dataset_type": "SNV_INDEL",\n "project_guids": [\n "R0113_test_project"\n ],\n "reference_genome": "GRCh38",\n "request_type": "LoadingPipelineRequest",\n "sample_type": "WGS",\n "skip_check_sex_and_relatedness": false,\n "skip_expect_tdr_metrics": false,\n "skip_validation": true\n}```',
':white_check_mark: Pipeline Runner Request Success! :white_check_mark:\nRun ID: 20250916-200704-123456\n```{\n "attempt_id": 0,\n "callset_path": "v03_pipeline/var/test/callsets/1kg_30variants.vcf",\n "dataset_type": "SNV_INDEL",\n "project_guids": [\n "R0113_test_project"\n ],\n "reference_genome": "GRCh38",\n "request_type": "LoadingPipelineRequest",\n "sample_type": "WGS",\n "skip_check_sex_and_relatedness": false,\n "skip_expect_tdr_metrics": false,\n "validations_to_skip": [\n "validate_allele_depth_length",\n "validate_allele_type",\n "validate_expected_contig_frequency",\n "validate_no_duplicate_variants",\n "validate_sample_type"\n ]\n}```',
)
with hfs.open(
clickhouse_load_success_file_path(
Expand Down Expand Up @@ -263,7 +263,7 @@ def test_process_failure(
process_queue(local_scheduler=True)
process_queue(local_scheduler=True)
mock_safe_post_to_slack.assert_called_once_with(
':failed: Pipeline Runner Request Failed :failed:\nRun ID: 20250918-200704-123456\n```{\n "attempt_id": 2,\n "callset_path": "v03_pipeline/var/test/callsets/1kg_30variants.vcf",\n "dataset_type": "SNV_INDEL",\n "project_guids": [\n "project_a"\n ],\n "reference_genome": "GRCh38",\n "request_type": "LoadingPipelineRequest",\n "sample_type": "WGS",\n "skip_check_sex_and_relatedness": false,\n "skip_expect_tdr_metrics": false,\n "skip_validation": false\n}```\nReason: there were failed tasks',
':failed: Pipeline Runner Request Failed :failed:\nRun ID: 20250918-200704-123456\n```{\n "attempt_id": 2,\n "callset_path": "v03_pipeline/var/test/callsets/1kg_30variants.vcf",\n "dataset_type": "SNV_INDEL",\n "project_guids": [\n "project_a"\n ],\n "reference_genome": "GRCh38",\n "request_type": "LoadingPipelineRequest",\n "sample_type": "WGS",\n "skip_check_sex_and_relatedness": false,\n "skip_expect_tdr_metrics": false,\n "validations_to_skip": []\n}```\nReason: there were failed tasks',
)
self.assertEqual(len(os.listdir(loading_pipeline_queue_dir())), 0)
with open(
Expand Down
4 changes: 2 additions & 2 deletions v03_pipeline/lib/misc/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def compute_hail_n_partitions(file_size_b: int) -> int:
)
def split_multi_hts(
mt: hl.MatrixTable,
skip_validation: bool,
skip_validate_no_duplicate_variants: bool,
max_samples_split_multi_shuffle=MAX_SAMPLES_SPLIT_MULTI_SHUFFLE,
) -> hl.MatrixTable:
bi = mt.filter_rows(hl.len(mt.alleles) == BIALLELIC)
Expand All @@ -98,7 +98,7 @@ def split_multi_hts(
mt = split.union_rows(bi)
# If we've disabled validation (which is expected to throw an exception
# for duplicate variants, we would like to distinc )
if skip_validation:
if skip_validate_no_duplicate_variants:
return mt.distinct_by_row()
return mt

Expand Down
10 changes: 10 additions & 0 deletions v03_pipeline/lib/misc/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
SampleType,
)

ALL_VALIDATIONS = 'all'
AMBIGUOUS_THRESHOLD_PERC: float = 0.01 # Fraction of samples identified as "ambiguous_sex" above which an error will be thrown.
MIN_ROWS_PER_CONTIG = 100
SAMPLE_TYPE_MATCH_THRESHOLD = 0.3
Expand Down Expand Up @@ -191,3 +192,12 @@ def validate_sample_type(
if has_noncoding and has_coding and sample_type != SampleType.WGS:
msg = 'Sample type validation error: dataset sample-type is specified as WES but appears to be WGS because it contains many common non-coding variants'
raise SeqrValidationError(msg)


SKIPPABLE_VALIDATIONS = [
validate_allele_depth_length,
validate_allele_type,
validate_expected_contig_frequency,
validate_no_duplicate_variants,
validate_sample_type,
]
5 changes: 1 addition & 4 deletions v03_pipeline/lib/tasks/base/base_loading_run_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ class BaseLoadingRunParams(luigi.Task):
default=False,
parsing=luigi.BoolParameter.EXPLICIT_PARSING,
)
skip_validation = luigi.BoolParameter(
default=False,
parsing=luigi.BoolParameter.EXPLICIT_PARSING,
)
validations_to_skip = luigi.ListParameter(default=[])
is_new_gcnv_joint_call = luigi.BoolParameter(
default=False,
parsing=luigi.BoolParameter.EXPLICIT_PARSING,
Expand Down
4 changes: 2 additions & 2 deletions v03_pipeline/lib/tasks/dataproc/misc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def test_to_kebab_str_args(self, _: Mock):
'False',
'--skip-expect-tdr-metrics',
'False',
'--skip-validation',
'False',
'--validations-to-skip',
'[]',
'--is-new-gcnv-joint-call',
'False',
'--run-id',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
ReferenceGenome,
SampleType,
)
from v03_pipeline.lib.misc.validation import ALL_VALIDATIONS
from v03_pipeline.lib.paths import (
db_id_to_gene_id_path,
new_entries_parquet_path,
Expand Down Expand Up @@ -114,7 +115,7 @@ def test_write_new_entries_parquet(self):
sample_type=SampleType.WGS,
callset_path=TEST_SNV_INDEL_VCF,
project_guids=['R0113_test_project', 'R0114_project4'],
skip_validation=True,
validations_to_skip=[ALL_VALIDATIONS],
run_id=TEST_RUN_ID,
)
worker.add(task)
Expand Down Expand Up @@ -214,7 +215,7 @@ def test_mito_write_new_entries_parquet(self):
sample_type=SampleType.WGS,
callset_path=TEST_MITO_CALLSET,
project_guids=['R0116_test_project3'],
skip_validation=True,
validations_to_skip=[ALL_VALIDATIONS],
run_id=TEST_RUN_ID,
)
worker.add(task)
Expand Down Expand Up @@ -269,7 +270,7 @@ def test_sv_write_new_entries_parquet(self):
sample_type=SampleType.WGS,
callset_path=TEST_SV_VCF_2,
project_guids=['R0115_test_project2'],
skip_validation=True,
validations_to_skip=[ALL_VALIDATIONS],
run_id=TEST_RUN_ID,
)
worker.add(task)
Expand Down Expand Up @@ -352,7 +353,7 @@ def test_gcnv_write_new_entries_parquet(self):
sample_type=SampleType.WES,
callset_path=TEST_GCNV_BED_FILE,
project_guids=['R0115_test_project2'],
skip_validation=True,
validations_to_skip=[ALL_VALIDATIONS],
run_id=TEST_RUN_ID,
)
worker.add(task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
ReferenceGenome,
SampleType,
)
from v03_pipeline.lib.misc.validation import ALL_VALIDATIONS
from v03_pipeline.lib.paths import (
new_transcripts_parquet_path,
new_variants_table_path,
Expand Down Expand Up @@ -87,7 +88,7 @@ def test_write_new_transcripts_parquet_test(
project_guids=[
'fake_project',
],
skip_validation=True,
validations_to_skip=[ALL_VALIDATIONS],
run_id=TEST_RUN_ID,
)
worker.add(task)
Expand Down Expand Up @@ -187,7 +188,7 @@ def test_grch37_write_new_transcripts_parquet_test(
project_guids=[
'fake_project',
],
skip_validation=True,
validations_to_skip=[ALL_VALIDATIONS],
run_id=TEST_RUN_ID,
)
worker.add(task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
ReferenceGenome,
SampleType,
)
from v03_pipeline.lib.misc.validation import ALL_VALIDATIONS
from v03_pipeline.lib.paths import (
new_variants_parquet_path,
new_variants_table_path,
Expand Down Expand Up @@ -107,7 +108,7 @@ def test_write_new_variants_parquet_test(
project_guids=[
'fake_project',
],
skip_validation=True,
validations_to_skip=[ALL_VALIDATIONS],
run_id=TEST_RUN_ID,
)
worker.add(task)
Expand Down Expand Up @@ -231,7 +232,7 @@ def test_grch37_write_new_variants_parquet_test(
project_guids=[
'fake_project',
],
skip_validation=True,
validations_to_skip=[ALL_VALIDATIONS],
run_id=TEST_RUN_ID,
)
worker.add(task)
Expand Down Expand Up @@ -342,7 +343,7 @@ def test_mito_write_new_variants_parquet_test(
project_guids=[
'fake_project',
],
skip_validation=True,
validations_to_skip=[ALL_VALIDATIONS],
run_id=TEST_RUN_ID,
)
worker.add(task)
Expand Down Expand Up @@ -436,7 +437,7 @@ def test_sv_write_new_variants_parquet_test(
project_guids=[
'fake_project',
],
skip_validation=True,
validations_to_skip=[ALL_VALIDATIONS],
run_id=TEST_RUN_ID,
)
worker.add(task)
Expand Down Expand Up @@ -513,7 +514,7 @@ def test_gcnv_write_new_variants_parquet_test(
project_guids=[
'fake_project',
],
skip_validation=True,
validations_to_skip=[ALL_VALIDATIONS],
run_id=TEST_RUN_ID,
)
worker.add(task)
Expand Down
Loading