Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 89 additions & 12 deletions v03_pipeline/lib/misc/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ast
import functools
import hashlib
import os
Expand Down Expand Up @@ -289,6 +290,53 @@ def get_create_mv_statements(
)[0]


def normalize_partition(partition: str) -> tuple:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when fetching the partition ids from the system table, they come out as a string representation of a tuple OR just the raw string. It's unfortunate and I couldn't find a cleaner way to do this within clickhouse.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you do SELECT tuple(partition) does that force it to be consistent, or no?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't:

wmbbe-67c :) select tuple(partition), toTypeName(partition), toTypeName(tuple(partition)) FROM system.parts where table = 'my_table';

SELECT
    tuple(partition),
    toTypeName(partition),
    toTypeName(tuple(partition))
FROM system.parts
WHERE `table` = 'my_table'

Query id: eaea3ec0-b4ce-46f5-8411-f8e69133dfd3

   ┌─(partition)────┬─toTypeName(partition)─┬─toTypeName((partition))─┐
1. │ ('1')          │ String                │ Tuple(String)           │
2. │ ('1')          │ String                │ Tuple(String)           │
3. │ ('(\'90\',1)') │ String                │ Tuple(String)           │
4. │ ('(\'93\',3)') │ String                │ Tuple(String)           │
5. │ ('(\'91\',2)') │ String                │ Tuple(String)           │
   └────────────────┴───────────────────────┴─────────────────────────┘

5 rows in set. Elapsed: 0.001 sec.

And it actually gets worse.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😢

"""
Ensure a ClickHouse partition expression is always returned as a tuple.
'project_d' -> ('project_d',)
"('project_d', 0)" -> ('project_d', 0)
"""
if not isinstance(partition, str):
msg = f'Unsupported partition type: {type(partition)}'
raise TypeError(msg)
partition = partition.strip()
if partition.startswith('(') and partition.endswith(')'):
return ast.literal_eval(partition)
return (partition,)


def get_partitions_for_projects(
table_name_builder: TableNameBuilder,
clickhouse_table: ClickHouseTable,
project_guids: list[str],
staging=False,
):
rows = logged_query(
"""
SELECT DISTINCT partition
FROM system.parts
WHERE
database = %(database)s
AND table = %(table)s
AND multiSearchAny(partition, %(project_guids)s)
""",
{
'database': STAGING_CLICKHOUSE_DATABASE
if staging
else Env.CLICKHOUSE_DATABASE,
'table': (
table_name_builder.staging_dst_table(clickhouse_table)
if staging
else table_name_builder.dst_table(clickhouse_table)
)
.split('.')[1]
.replace('`', ''),
'project_guids': project_guids,
},
)
return [normalize_partition(row[0]) for row in rows]


def create_staging_materialized_views(
table_name_builder: TableNameBuilder,
clickhouse_mvs: list[ClickHouseMaterializedView],
Expand Down Expand Up @@ -324,15 +372,19 @@ def stage_existing_project_partitions(
""",
)
continue
for project_guid in project_guids:
for partition in get_partitions_for_projects(
table_name_builder,
clickhouse_table,
project_guids,
):
# Note that ClickHouse successfully handles the case where the project
# does not already exist in the dst table. We simply attach an empty partition!
logged_query(
f"""
ALTER TABLE {table_name_builder.staging_dst_table(clickhouse_table)}
ATTACH PARTITION %(project_guid)s FROM {table_name_builder.dst_table(clickhouse_table)}
ATTACH PARTITION %(partition)s FROM {table_name_builder.dst_table(clickhouse_table)}
""",
{'project_guid': project_guid},
{'partition': partition},
)


Expand All @@ -343,7 +395,7 @@ def delete_existing_families_from_staging_entries(
logged_query(
f"""
INSERT INTO {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)}
SELECT COLUMNS('.*') EXCEPT(sign), -1 as sign
SELECT COLUMNS('.*') EXCEPT(sign, n_partitions, partition_id), -1 as sign
FROM {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)}
WHERE family_guid in %(family_guids)s
""",
Expand All @@ -354,10 +406,25 @@ def delete_existing_families_from_staging_entries(
def insert_new_entries(
table_name_builder: TableNameBuilder,
) -> None:
dst_cols = [
r[0]
for r in logged_query(
f'DESCRIBE TABLE {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)}',
)
]
src_cols = [
r[0]
for r in logged_query(
f'DESCRIBE TABLE {table_name_builder.src_table(ClickHouseTable.ENTRIES)}',
)
]
common = [c for c in dst_cols if c in src_cols]
dst_list = ', '.join(common)
src_list = ', '.join(common)
logged_query(
f"""
INSERT INTO {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)}
SELECT *
INSERT INTO {table_name_builder.staging_dst_table(ClickHouseTable.ENTRIES)} ({dst_list})
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INSERT INTO SELECT * fails with columns misalignment. We really want INSERT EXCEPT (...)... which isn't supported though SELECT EXCEPT is.

SELECT {src_list}
FROM {table_name_builder.src_table(ClickHouseTable.ENTRIES)}
""",
)
Expand Down Expand Up @@ -488,13 +555,18 @@ def replace_project_partitions(
project_guids: list[str],
) -> None:
for clickhouse_table in clickhouse_tables:
for project_guid in project_guids:
for partition in get_partitions_for_projects(
table_name_builder,
clickhouse_table,
project_guids,
staging=True,
):
logged_query(
f"""
ALTER TABLE {table_name_builder.dst_table(clickhouse_table)}
REPLACE PARTITION %(project_guid)s FROM {table_name_builder.staging_dst_table(clickhouse_table)}
REPLACE PARTITION %(partition)s FROM {table_name_builder.staging_dst_table(clickhouse_table)}
""",
{'project_guid': project_guid},
{'partition': partition},
)


Expand Down Expand Up @@ -778,13 +850,18 @@ def rebuild_gt_stats(
dataset_type,
),
)
for project_guid in project_guids:
for partition in get_partitions_for_projects(
table_name_builder,
ClickHouseTable.PROJECT_GT_STATS,
project_guids,
staging=True,
):
logged_query(
f"""
ALTER TABLE {table_name_builder.staging_dst_table(ClickHouseTable.PROJECT_GT_STATS)}
DROP PARTITION %(project_guid)s
DROP PARTITION %(partition)s
""",
{'project_guid': project_guid},
{'partition': partition},
)
select_statement = get_create_mv_statements(
table_name_builder,
Expand Down
61 changes: 61 additions & 0 deletions v03_pipeline/lib/misc/clickhouse_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
insert_new_entries,
load_complete_run,
logged_query,
normalize_partition,
optimize_entries,
rebuild_gt_stats,
refresh_materialized_views,
Expand Down Expand Up @@ -427,6 +428,13 @@ def test_get_clickhouse_client(self):
result = client.execute('SELECT 1')
self.assertEqual(result[0][0], 1)

def test_normalize_partition(self):
self.assertEqual(normalize_partition('project_d'), ('project_d',))
self.assertEqual(
normalize_partition("('project_d', 0)"),
('project_d', 0),
)

def test_table_name_builder(self):
table_name_builder = TableNameBuilder(
ReferenceGenome.GRCh38,
Expand Down Expand Up @@ -1152,3 +1160,56 @@ def test_rebuild_gt_stats(self):
""",
)
self.assertCountEqual(gt_stats, [(1, 0)])

def test_repartitioned_entries_table(self):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the existing tests cover the existing partitioning structure.

client = get_clickhouse_client()
client.execute(
f"""
REPLACE TABLE {Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/entries` (
`key` UInt32,
`project_guid` LowCardinality(String),
`family_guid` String,
`xpos` UInt64 CODEC(Delta(8), ZSTD(1)),
`sample_type` Enum8('WES' = 0, 'WGS' = 1),
`is_annotated_in_any_gene` Boolean,
`geneId_ids` Array(UInt32),
`calls` Array(
Tuple(
sampleId String,
gt Nullable(Enum8('REF' = 0, 'HET' = 1, 'HOM' = 2)),
)
),
`sign` Int8,
`n_partitions` UInt8 MATERIALIZED 2,
`partition_id` UInt8 MATERIALIZED farmHash64(family_guid) % n_partitions,
PROJECTION xpos_projection
(
SELECT *
ORDER BY is_annotated_in_any_gene, xpos
)
)
ENGINE = CollapsingMergeTree(sign)
PARTITION BY (project_guid, partition_id)
ORDER BY (project_guid, family_guid, key)
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
""",
)
load_complete_run(
ReferenceGenome.GRCh38,
DatasetType.SNV_INDEL,
TEST_RUN_ID,
['project_d'],
['family_d1', 'family_d2', 'family_d3'],
)
project_gt_stats = client.execute(
f"""
SELECT project_guid, sum(het_samples), sum(hom_samples)
FROM
{Env.CLICKHOUSE_DATABASE}.`GRCh38/SNV_INDEL/project_gt_stats`
GROUP BY project_guid
""",
)
self.assertCountEqual(
project_gt_stats,
[('project_d', 1, 1)],
)