From 194b1b2ba9df888be5b25c68177f15bcf1c61ab4 Mon Sep 17 00:00:00 2001 From: Son Chau Date: Mon, 26 Jan 2026 10:50:05 -0800 Subject: [PATCH 1/9] update remaining fields phenopackets --- src/api/phenopacket_operations.py | 229 ++++++++++++++++++++++++++---- 1 file changed, 202 insertions(+), 27 deletions(-) diff --git a/src/api/phenopacket_operations.py b/src/api/phenopacket_operations.py index bef85e7..4aacf67 100644 --- a/src/api/phenopacket_operations.py +++ b/src/api/phenopacket_operations.py @@ -57,42 +57,57 @@ async def get_by_id(dataset_id: str, id: int): async def get_medical_actions(person_id: int): + # TODO: need to group/link each + # response_to_treatments, treatment_intents, treatment_targets + # instead of getting only 1 ( response_to_treatments, + treatment_intents, + treatment_targets, treatment_agents, - procedure_codes, + procedures, radiation_therapies, ) = await asyncio.gather( - get_treatments(person_id), - get_drug_exposures(person_id), - get_procedure_codes(person_id), + get_treatment_responses(person_id), + get_treatment_intents(person_id), + get_treatment_targets(person_id), + get_treatment_agents(person_id), + get_procedures(person_id), get_radiation_therapies(person_id), ) if not response_to_treatments: - if not (treatment_agents or procedure_codes or radiation_therapies): + if not (treatment_agents or procedures or radiation_therapies): return None return None - if not (treatment_agents or procedure_codes or radiation_therapies): + if not (treatment_agents or procedures or radiation_therapies): return None medical_actions = [] + # Use first intent/target for now since we can't figure out the link + treatment_intent = treatment_intents[0] if treatment_intents else None + treatment_target = treatment_targets[0] if treatment_targets else None + # Create combinations of each treatment type with each response for response in response_to_treatments: # Combine treatment agents with responses for agent in treatment_agents: medical_action = { - "action": {"agent": agent}, + "action": agent, + "treatment_target": treatment_target, + "treatment_intent": treatment_intent, "response_to_treatment": response, } medical_actions.append(medical_action) # Combine procedures with responses - for procedure_code in procedure_codes: + for procedure in procedures: medical_action = { - "action": {"code": procedure_code}, + "action": procedure, + "treatment_target": treatment_target, + "treatment_intent": treatment_intent, "response_to_treatment": response, } medical_actions.append(medical_action) @@ -101,6 +116,8 @@ async def get_medical_actions(person_id: int): for radiation in radiation_therapies: medical_action = { "action": radiation, + "treatment_target": treatment_target, + "treatment_intent": treatment_intent, "response_to_treatment": response, } medical_actions.append(medical_action) @@ -574,7 +591,7 @@ async def get_ontologies(concept_ids: list): return {} -async def get_treatments(person_id: int): +async def get_treatment_responses(person_id: int): raw_sql = text(f""" SELECT DISTINCT observation.value_as_concept_id as treatment_response_concept_id @@ -607,11 +624,87 @@ async def get_treatments(person_id: int): return [] +async def get_treatment_intents(person_id: int): + raw_sql = text(f""" + SELECT DISTINCT + observation.value_as_concept_id as treatment_intent_concept_id + FROM {settings.CDM_SCHEMA}.observation observation + WHERE observation.person_id = :person_id + AND observation.observation_concept_id = 4133895 + AND observation.value_as_concept_id IS NOT NULL + """) + + async for session in get_db_session(): + try: + result = await session.execute(raw_sql, {"person_id": person_id}) + rows = result.fetchall() + + # Batch fetch ontologies + concept_ids = [row.treatment_intent_concept_id for row in rows] + ontology_map = await get_ontologies(concept_ids) + + # Convert rows to list of OntologyClass objects + intents = [ + ontology_map.get(row.treatment_intent_concept_id) for row in rows + ] + + # Filter out None values if conversion failed + return [i for i in intents if i is not None] + + except Exception as e: + logger.error(f"Database Error in get_treatment_intents: {str(e)}") + return [] + + return [] + +async def get_treatment_targets(person_id: int): + raw_sql = text(f""" + SELECT DISTINCT + condition_occurrence.condition_concept_id as treatment_target_concept_id + FROM {settings.CDM_SCHEMA}.episode episode + INNER JOIN {settings.CDM_SCHEMA}.episode_event episode_event + ON episode.episode_id = episode_event.episode_id + AND episode_event.episode_event_field_concept_id = 1147127 + INNER JOIN {settings.CDM_SCHEMA}.condition_occurrence condition_occurrence + ON episode_event.event_id = condition_occurrence.condition_occurrence_id + WHERE episode.person_id = :person_id + AND episode.episode_concept_id = 32528 + """) + + async for session in get_db_session(): + try: + result = await session.execute(raw_sql, {"person_id": person_id}) + rows = result.fetchall() + + # Batch fetch ontologies + concept_ids = [row.treatment_target_concept_id for row in rows] + ontology_map = await get_ontologies(concept_ids) + + # Convert rows to list of OntologyClass objects + targets = [ + ontology_map.get(row.treatment_target_concept_id) for row in rows + ] + + # Filter out None values if conversion failed + return [t for t in targets if t is not None] + + except Exception as e: + logger.error(f"Database Error in get_treatment_targets: {str(e)}") + return [] -async def get_drug_exposures(person_id: int): + return [] + +async def get_treatment_agents(person_id: int): raw_sql = text(f""" SELECT DISTINCT - drug_exposure.drug_concept_id + drug_exposure.drug_concept_id, + drug_exposure.dose_unit_source_value as quantity_unit, + drug_exposure.quantity as quantity_value, + drug_exposure.drug_exposure_end_date as dose_intervals_end, + drug_exposure.drug_exposure_start_date as dose_intervals_start, + drug_exposure.dose_unit_source_value as dose_intervals_quantity_unit, + drug_exposure.quantity as dose_intervals_quantity_value, + drug_exposure.route_concept_id as route_concept_id FROM {settings.CDM_SCHEMA}.episode episode INNER JOIN {settings.CDM_SCHEMA}.episode_event episode_event ON episode.episode_id = episode_event.episode_id @@ -621,6 +714,7 @@ async def get_drug_exposures(person_id: int): WHERE episode.person_id = :person_id AND episode.episode_concept_id = 32941 AND drug_exposure.drug_concept_id IS NOT NULL + AND drug_exposure.drug_type_concept_id = 32833 """) async for session in get_db_session(): @@ -628,15 +722,78 @@ async def get_drug_exposures(person_id: int): result = await session.execute(raw_sql, {"person_id": person_id}) rows = result.fetchall() - # Batch fetch ontologies + # Batch fetch ontologies for both drug_concept_id and route_concept_id concept_ids = [row.drug_concept_id for row in rows] + # Add route_concept_ids that are not None + concept_ids.extend([row.route_concept_id for row in rows if row.route_concept_id is not None]) ontology_map = await get_ontologies(concept_ids) - # Convert rows to list of OntologyClass objects - drug_exposures = [ontology_map.get(row.drug_concept_id) for row in rows] + # Convert rows to list of treatment agent objects + treatment_agents = [] + for row in rows: + agent = ontology_map.get(row.drug_concept_id) + if agent: + treatment_agent = { + "agent": agent, + "drug_type": "UNKNOWN_DRUG_TYPE" + } + + # Add route_of_administration + if row.route_concept_id is not None: + route = ontology_map.get(row.route_concept_id) + if route: + treatment_agent["route_of_administration"] = route + else: + treatment_agent["route_of_administration"] = { + "id": "SNOMED:261665006", + "label": "Unknown" + } + else: + treatment_agent["route_of_administration"] = { + "id": "SNOMED:261665006", + "label": "Unknown" + } + + # TODO: need to figureout quantity_unit ontology + # treatment_agent["cumulative_dose"] = { + # "value": row.quantity_value, + # "unit": row.quantity_unit + # } + + # TODO: need to figureout quantity_unit ontology + # Add dose_intervals if we have the necessary data + # if row.dose_intervals_start or row.dose_intervals_end or row.dose_intervals_quantity_value: + # dose_intervals = {} + # treatment_agent["schedule_frequency"] = { + # "id": "SNOMED:261665006", + # "label": "Unknown" + # } + + # # Add quantity if available + # if row.dose_intervals_quantity_value is not None: + # dose_intervals["quantity"] = { + # "value": row.dose_intervals_quantity_value, + # "unit": {} + # } + + + # # Add interval if start or end date is available + # if row.dose_intervals_start or row.dose_intervals_end: + # interval = {} + # if row.dose_intervals_start: + # interval["start"] = row.dose_intervals_start.isoformat() + # if row.dose_intervals_end: + # interval["end"] = row.dose_intervals_end.isoformat() + # dose_intervals["interval"] = interval + + # if dose_intervals: + # treatment_agent["dose_intervals"] = dose_intervals + + + treatment_agents.append(treatment_agent) + + return treatment_agents - # Filter out None values if conversion failed - return [d for d in drug_exposures if d is not None] except Exception as e: logger.error(f"Database Error in get_drug_exposure: {str(e)}") @@ -645,19 +802,24 @@ async def get_drug_exposures(person_id: int): return [] -async def get_procedure_codes(person_id: int): +async def get_procedures(person_id: int): raw_sql = text(f""" SELECT DISTINCT - procedure_occurrence.procedure_concept_id + procedure_occurrence.procedure_concept_id, + procedure_occurrence.procedure_date as performed, + observation.value_as_concept_id as body_site_concept_id FROM {settings.CDM_SCHEMA}.episode episode INNER JOIN {settings.CDM_SCHEMA}.episode_event episode_event ON episode.episode_id = episode_event.episode_id AND episode_event.episode_event_field_concept_id = 1147082 INNER JOIN {settings.CDM_SCHEMA}.procedure_occurrence procedure_occurrence ON episode_event.event_id = procedure_occurrence.procedure_occurrence_id + LEFT JOIN {settings.CDM_SCHEMA}.observation observation + ON observation.observation_event_id = episode.episode_id + AND observation.observation_concept_id = 4181646 + AND observation.obs_event_field_concept_id = 798885 WHERE episode.person_id = :person_id AND episode.episode_concept_id = 32939 - AND procedure_occurrence.procedure_concept_id IS NOT NULL """) async for session in get_db_session(): @@ -665,18 +827,31 @@ async def get_procedure_codes(person_id: int): result = await session.execute(raw_sql, {"person_id": person_id}) rows = result.fetchall() - # Batch fetch ontologies - concept_ids = [row.procedure_concept_id for row in rows] + # Batch fetch ontologies for both procedure and body_site + concept_ids = [] + for row in rows: + concept_ids.extend([row.procedure_concept_id, row.body_site_concept_id]) + ontology_map = await get_ontologies(concept_ids) - # Convert rows to list of OntologyClass objects - procedures = [ontology_map.get(row.procedure_concept_id) for row in rows] + # Convert rows to list of procedure objects + procedures = [] + for row in rows: + code = ontology_map.get(row.procedure_concept_id) + body_site = ontology_map.get(row.body_site_concept_id) + performed = get_timestamp(row.performed) + if code: + procedure = { + "code": code, + "body_site": body_site, + "performed": performed + } + procedures.append(procedure) - # Filter out None values if conversion failed - return [p for p in procedures if p is not None] + return procedures except Exception as e: - logger.error(f"Database Error in get_procedure_code: {str(e)}") + logger.error(f"Database Error in get_procedures: {str(e)}") return [] return [] From 7bfdf05a0661803614d735f832716572b1902d22 Mon Sep 17 00:00:00 2001 From: Son Chau Date: Fri, 30 Jan 2026 10:23:53 -0800 Subject: [PATCH 2/9] 1800-01-01 date should be treated as None --- src/api/phenopacket_operations.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/api/phenopacket_operations.py b/src/api/phenopacket_operations.py index 4aacf67..b8c307d 100644 --- a/src/api/phenopacket_operations.py +++ b/src/api/phenopacket_operations.py @@ -516,8 +516,12 @@ def get_timestamp(datetime_value): Convert OMOP datetime to Phenopacket Timestamp format. """ if isinstance(datetime_value, datetime): + if datetime_value.date() == date(1800, 1, 1): + return None return {"iso8601timestamp": datetime_value.isoformat()} elif isinstance(datetime_value, date): + if datetime_value == date(1800, 1, 1): + return None return {"iso8601timestamp": datetime_value.isoformat()} return None @@ -541,6 +545,9 @@ def get_survival_time(disease_first_occurrence_date, death_date): else: end_date = death_date + if start_date == date(1800, 1, 1) or end_date == date(1800, 1, 1): + return None + # Calculate difference in days delta = end_date - start_date return delta.days From 73456020647fc62640d13006b4a2322988e304fe Mon Sep 17 00:00:00 2001 From: Marion Date: Fri, 30 Jan 2026 12:05:58 -0800 Subject: [PATCH 3/9] fix typo --- schema.yml | 2 +- src/api/phenopacket_operations.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/schema.yml b/schema.yml index 0c11cf2..93ad8b0 100644 --- a/schema.yml +++ b/schema.yml @@ -2352,7 +2352,7 @@ components: type: string description: > The agent or organization that created the phenopacket. - submitter_by: + submitted_by: type: string description: > Name of person who submitted the phenopacket diff --git a/src/api/phenopacket_operations.py b/src/api/phenopacket_operations.py index b8c307d..14cfce4 100644 --- a/src/api/phenopacket_operations.py +++ b/src/api/phenopacket_operations.py @@ -963,7 +963,7 @@ def get_meta_data(): return { "created": datetime.now(timezone.utc).isoformat(), "created_by": "DHDP", - "submitter_by": "DHDP", + "submitted_by": "DHDP", "phenopacket_schema_version": "2.0.0", "resources": [ { From 59c4af184dd98682a74aaec80231f0643e5a078f Mon Sep 17 00:00:00 2001 From: Marion Date: Mon, 2 Feb 2026 11:14:31 -0800 Subject: [PATCH 4/9] fix disease stages --- src/api/phenopacket_operations.py | 22 +++---- src/concept_mappings.json | 101 ++++++++++++++++++++++++++++++ src/config.py | 3 + 3 files changed, 111 insertions(+), 15 deletions(-) create mode 100644 src/concept_mappings.json diff --git a/src/api/phenopacket_operations.py b/src/api/phenopacket_operations.py index 14cfce4..c10d3e4 100644 --- a/src/api/phenopacket_operations.py +++ b/src/api/phenopacket_operations.py @@ -128,23 +128,15 @@ async def get_medical_actions(person_id: int): async def get_disease_stages(person_id: int): raw_sql = text(f""" SELECT DISTINCT - m.value_as_concept_id as disease_stage_concept_id - FROM {settings.CDM_SCHEMA}.measurement AS m + m.{settings.MAPPING_JSON['diseases']['disease_stage']['filtering_field']} as disease_stage_concept_id + FROM {settings.CDM_SCHEMA}.{settings.MAPPING_JSON['diseases']['disease_stage']['omop_object']} AS m WHERE m.person_id = :person_id - AND m.measurement_concept_id IN ( - SELECT descendant_concept_id - FROM {settings.CDM_SCHEMA}.concept_ancestor - WHERE ancestor_concept_id = ( - SELECT concept_id - FROM {settings.CDM_SCHEMA}.concept - WHERE vocabulary_id = 'SNOMED' - AND concept_code = '254292007' + AND ( + m.{settings.MAPPING_JSON['diseases']['disease_stage']['filtering_field']} IN ( + SELECT descendant_concept_id FROM {settings.CDM_SCHEMA}.concept_ancestor + WHERE ancestor_concept_id IN ({','.join([str(x) for x in settings.MAPPING_JSON['diseases']['disease_stage']['ancestor_ids']])})) + OR m.value_as_concept_id IN ({','.join([str(x) for x in settings.MAPPING_JSON['diseases']['disease_stage']['concept_ids']])}) ) - UNION - SELECT 37163866 - UNION - SELECT 607126 - ) """) async for session in get_db_session(): diff --git a/src/concept_mappings.json b/src/concept_mappings.json new file mode 100644 index 0000000..4d48ac0 --- /dev/null +++ b/src/concept_mappings.json @@ -0,0 +1,101 @@ +{ + "diseases":{ + "disease_stage": { + "omop_object": "measurement", + "filtering_field": "value_as_concept_id", + "concept_ids": [ + 4136272, + 4114652, + 4128936, + 45884284, + 45881605, + 45882496, + 45879332, + 45878383, + 45876317, + 45884292, + 45876326, + 45881861, + 45880980, + 45878383, + 45882495, + 45882494, + 1633775, + 45878383, + 45878383, + 45878383, + 45878383, + 45876317, + 45881607, + 45881606, + 45876317, + 45876317, + 45876317, + 45878652, + 46237073, + 46237073, + 46237073, + 46237073, + 45880979, + 37163968, + 45876316, + 46237999, + 46237478, + 45876316, + 45876316, + 45876316, + 45882497, + 45882497, + 45882497, + 45882497, + 45878382, + 46237084, + 46237084, + 46237084, + 46237084, + 45878643, + 45880978, + 37163997, + 37163998, + 45880978, + 45880978, + 45880978, + 45884285, + 45884285, + 45884285, + 45884285, + 45881608, + 46237081, + 46237479, + 45876327, + 45878395, + 45878395, + 46237998, + 46237085, + 45884283, + 45880109, + 45880108, + 46238000, + 46237068, + 45880108, + 45880108, + 45880108, + 45876315, + 45876315, + 45876315, + 45876315, + 45876314, + 46238005, + 46238005, + 45880986 + ], + "ancestor_ids":[ + 37163866, + 4130406, + 734333 + ] + } + + } + +} \ No newline at end of file diff --git a/src/config.py b/src/config.py index 4ce8b68..887a617 100644 --- a/src/config.py +++ b/src/config.py @@ -5,6 +5,7 @@ """ import os +import json class Settings: @@ -25,6 +26,8 @@ def DB_PASSWORD(self) -> str: CDM_SCHEMA: str = os.getenv("CDM_SCHEMA", "omop") TO_INGEST_DIR = "upload/to_ingest" RESULTS_DIR = "upload/results" + with open("src/concept_mappings.json", "r") as f: + MAPPING_JSON = json.load(f) @property def DATABASE_URI(self) -> str: From 9b98cdc79f299c3313fdbb41f2eba465f3547ede Mon Sep 17 00:00:00 2001 From: Marion Date: Mon, 2 Feb 2026 12:32:35 -0800 Subject: [PATCH 5/9] add followup status --- src/api/phenopacket_operations.py | 26 +++++++++----- src/concept_mappings.json | 60 +++++++------------------------ 2 files changed, 30 insertions(+), 56 deletions(-) diff --git a/src/api/phenopacket_operations.py b/src/api/phenopacket_operations.py index c10d3e4..298253a 100644 --- a/src/api/phenopacket_operations.py +++ b/src/api/phenopacket_operations.py @@ -128,7 +128,7 @@ async def get_medical_actions(person_id: int): async def get_disease_stages(person_id: int): raw_sql = text(f""" SELECT DISTINCT - m.{settings.MAPPING_JSON['diseases']['disease_stage']['filtering_field']} as disease_stage_concept_id + m.{settings.MAPPING_JSON['diseases']['disease_stage']['value_field']} as disease_stage_concept_id FROM {settings.CDM_SCHEMA}.{settings.MAPPING_JSON['diseases']['disease_stage']['omop_object']} AS m WHERE m.person_id = :person_id AND ( @@ -917,10 +917,13 @@ async def get_radiation_therapies(person_id: int): async def get_measurements(person_id: int): raw_sql = text(f""" SELECT DISTINCT - observation.value_as_concept_id as measurement_value_concept_id - FROM {settings.CDM_SCHEMA}.observation observation - WHERE observation.person_id = :person_id - AND observation.observation_concept_id = 43054909 + {settings.MAPPING_JSON['measurements']['omop_object']}.{settings.MAPPING_JSON['measurements']['value_field']} as measurement_value_concept_id, + {settings.MAPPING_JSON['measurements']['omop_object']}.{settings.MAPPING_JSON['measurements']['filtering_field']} as measurement_type_concept_id, + {settings.MAPPING_JSON['measurements']['omop_object']}.{settings.MAPPING_JSON['measurements']['date_field']} as measurement_date + FROM {settings.CDM_SCHEMA}.{settings.MAPPING_JSON['measurements']['omop_object']} observation + WHERE {settings.MAPPING_JSON['measurements']['omop_object']}.person_id = :person_id + AND {settings.MAPPING_JSON['measurements']['omop_object']}.{settings.MAPPING_JSON['measurements']['filtering_field']} + IN({','.join([str(x) for x in settings.MAPPING_JSON['measurements']['concept_ids']])}) """) async for session in get_db_session(): @@ -929,16 +932,21 @@ async def get_measurements(person_id: int): rows = result.fetchall() # Batch fetch ontologies - concept_ids = [row.measurement_value_concept_id for row in rows] - ontology_map = await get_ontologies(concept_ids) + value_concept_ids = [row.measurement_value_concept_id for row in rows] + value_ontology_map = await get_ontologies(value_concept_ids) + type_concept_ids = [row.measurement_type_concept_id for row in rows] + type_concept_map = await get_ontologies(type_concept_ids) measurements = [] for row in rows: - measurement_value = ontology_map.get(row.measurement_value_concept_id) + measurement_value = value_ontology_map.get(row.measurement_value_concept_id) + type_value = type_concept_map.get(row.measurement_type_concept_id) + date_value = row.measurement_date if measurement_value: measurement = { - "assay": {"id": "LOINC:72166-2", "label": "Tobacco smoking status"}, + "assay": type_value, "measurement_value": measurement_value, + "time_observed":get_timestamp(date_value) } measurements.append(measurement) diff --git a/src/concept_mappings.json b/src/concept_mappings.json index 4d48ac0..d5af730 100644 --- a/src/concept_mappings.json +++ b/src/concept_mappings.json @@ -1,8 +1,9 @@ { - "diseases":{ + "diseases": { "disease_stage": { "omop_object": "measurement", "filtering_field": "value_as_concept_id", + "value_field": "value_as_concept_id", "concept_ids": [ 4136272, 4114652, @@ -17,77 +18,33 @@ 45876326, 45881861, 45880980, - 45878383, 45882495, 45882494, 1633775, - 45878383, - 45878383, - 45878383, - 45878383, - 45876317, 45881607, 45881606, - 45876317, - 45876317, - 45876317, 45878652, 46237073, - 46237073, - 46237073, - 46237073, 45880979, 37163968, 45876316, 46237999, 46237478, - 45876316, - 45876316, - 45876316, - 45882497, - 45882497, - 45882497, 45882497, 45878382, 46237084, - 46237084, - 46237084, - 46237084, 45878643, 45880978, 37163997, 37163998, - 45880978, - 45880978, - 45880978, - 45884285, - 45884285, - 45884285, 45884285, 45881608, 46237081, 46237479, 45876327, 45878395, - 45878395, 46237998, - 46237085, - 45884283, - 45880109, - 45880108, - 46238000, - 46237068, - 45880108, - 45880108, - 45880108, - 45876315, - 45876315, - 45876315, - 45876315, - 45876314, - 46238005, - 46238005, - 45880986 + 46237085 ], "ancestor_ids":[ 37163866, @@ -95,7 +52,16 @@ 734333 ] } - + }, + "measurements": { + "omop_object": "observation", + "filtering_field": "observation_concept_id", + "value_field": "value_as_concept_id", + "concept_ids": [ + 4203711, + 43054909 + ], + "date_field": "observation_date" } } \ No newline at end of file From 5519a7e442a39381191a18fe4d1fa2fdaa967329 Mon Sep 17 00:00:00 2001 From: Marion Date: Mon, 2 Feb 2026 12:36:36 -0800 Subject: [PATCH 6/9] combine ontology lookip --- src/api/phenopacket_operations.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/api/phenopacket_operations.py b/src/api/phenopacket_operations.py index 298253a..66c122d 100644 --- a/src/api/phenopacket_operations.py +++ b/src/api/phenopacket_operations.py @@ -932,15 +932,13 @@ async def get_measurements(person_id: int): rows = result.fetchall() # Batch fetch ontologies - value_concept_ids = [row.measurement_value_concept_id for row in rows] - value_ontology_map = await get_ontologies(value_concept_ids) - type_concept_ids = [row.measurement_type_concept_id for row in rows] - type_concept_map = await get_ontologies(type_concept_ids) + concept_ids = [row.measurement_value_concept_id for row in rows] + [row.measurement_type_concept_id for row in rows] + ontology_map = await get_ontologies(concept_ids) measurements = [] for row in rows: - measurement_value = value_ontology_map.get(row.measurement_value_concept_id) - type_value = type_concept_map.get(row.measurement_type_concept_id) + measurement_value = ontology_map.get(row.measurement_value_concept_id) + type_value = ontology_map.get(row.measurement_type_concept_id) date_value = row.measurement_date if measurement_value: measurement = { From 24fb95c3396e0c39e765e5f95b9ed3ab2c46a4d9 Mon Sep 17 00:00:00 2001 From: Marion Date: Mon, 2 Feb 2026 15:55:23 -0800 Subject: [PATCH 7/9] add biomarkers to measurements --- src/api/phenopacket_operations.py | 138 +++++++++++++++++++++--------- src/concept_mappings.json | 35 +++++--- 2 files changed, 120 insertions(+), 53 deletions(-) diff --git a/src/api/phenopacket_operations.py b/src/api/phenopacket_operations.py index 66c122d..4021dff 100644 --- a/src/api/phenopacket_operations.py +++ b/src/api/phenopacket_operations.py @@ -128,7 +128,7 @@ async def get_medical_actions(person_id: int): async def get_disease_stages(person_id: int): raw_sql = text(f""" SELECT DISTINCT - m.{settings.MAPPING_JSON['diseases']['disease_stage']['value_field']} as disease_stage_concept_id + m.{settings.MAPPING_JSON['diseases']['disease_stage']['concept_value_field']} as disease_stage_concept_id FROM {settings.CDM_SCHEMA}.{settings.MAPPING_JSON['diseases']['disease_stage']['omop_object']} AS m WHERE m.person_id = :person_id AND ( @@ -915,46 +915,102 @@ async def get_radiation_therapies(person_id: int): async def get_measurements(person_id: int): - raw_sql = text(f""" - SELECT DISTINCT - {settings.MAPPING_JSON['measurements']['omop_object']}.{settings.MAPPING_JSON['measurements']['value_field']} as measurement_value_concept_id, - {settings.MAPPING_JSON['measurements']['omop_object']}.{settings.MAPPING_JSON['measurements']['filtering_field']} as measurement_type_concept_id, - {settings.MAPPING_JSON['measurements']['omop_object']}.{settings.MAPPING_JSON['measurements']['date_field']} as measurement_date - FROM {settings.CDM_SCHEMA}.{settings.MAPPING_JSON['measurements']['omop_object']} observation - WHERE {settings.MAPPING_JSON['measurements']['omop_object']}.person_id = :person_id - AND {settings.MAPPING_JSON['measurements']['omop_object']}.{settings.MAPPING_JSON['measurements']['filtering_field']} - IN({','.join([str(x) for x in settings.MAPPING_JSON['measurements']['concept_ids']])}) - """) - - async for session in get_db_session(): - try: - result = await session.execute(raw_sql, {"person_id": person_id}) - rows = result.fetchall() - - # Batch fetch ontologies - concept_ids = [row.measurement_value_concept_id for row in rows] + [row.measurement_type_concept_id for row in rows] - ontology_map = await get_ontologies(concept_ids) - - measurements = [] - for row in rows: - measurement_value = ontology_map.get(row.measurement_value_concept_id) - type_value = ontology_map.get(row.measurement_type_concept_id) - date_value = row.measurement_date - if measurement_value: - measurement = { - "assay": type_value, - "measurement_value": measurement_value, - "time_observed":get_timestamp(date_value) - } - measurements.append(measurement) - - return measurements if measurements else None - - except Exception as e: - logger.error(f"Database Error in get_measurements: {str(e)}") - return None - - return None + measurements = [] + for mapping in settings.MAPPING_JSON['measurements']: + if mapping['omop_object'] == "observation": + raw_sql = text(f""" + SELECT DISTINCT + {mapping['omop_object']}.{mapping['concept_value_field']} as measurement_value_concept_id, + {mapping['omop_object']}.{mapping['filtering_field']} as measurement_type_concept_id, + {mapping['omop_object']}.{mapping['date_field']} as measurement_date + FROM {settings.CDM_SCHEMA}.{mapping['omop_object']} + WHERE {mapping['omop_object']}.person_id = :person_id + AND {mapping['omop_object']}.{mapping['filtering_field']} + IN({','.join([str(x) for x in mapping['concept_ids']])}) + """) + + async for session in get_db_session(): + try: + result = await session.execute(raw_sql, {"person_id": person_id}) + rows = result.fetchall() + + # Batch fetch ontologies + concept_ids = [row.measurement_value_concept_id for row in rows] + [row.measurement_type_concept_id for row in rows] + ontology_map = await get_ontologies(concept_ids) + + for row in rows: + measurement_value = ontology_map.get(row.measurement_value_concept_id) + type_value = ontology_map.get(row.measurement_type_concept_id) + date_value = row.measurement_date + if measurement_value: + measurement = { + "assay": type_value, + "measurement_value": measurement_value, + "time_observed": get_timestamp(date_value) + } + measurements.append(measurement) + + except Exception as e: + logger.error(f"Database Error in get_measurements: {str(e)}") + return None + if mapping['omop_object'] == "measurement": + raw_sql = text(f""" + SELECT DISTINCT + {mapping['omop_object']}.{mapping['concept_value_field']} as measurement_value_concept_id, + {mapping['omop_object']}.{mapping['number_value_field']} as measurement_value, + {mapping['omop_object']}.{mapping['filtering_field']} as measurement_type_concept_id, + {mapping['omop_object']}.{mapping['date_field']} as measurement_date, + {mapping['omop_object']}.{mapping['unit_field']} as measurement_unit_concept_id + FROM {settings.CDM_SCHEMA}.{mapping['omop_object']} + WHERE {mapping['omop_object']}.person_id = :person_id + AND ({mapping['filtering_field']} IN ( + SELECT descendant_concept_id FROM {settings.CDM_SCHEMA}.concept_ancestor + WHERE ancestor_concept_id IN ({','.join([str(x) for x in mapping['ancestor_ids']])}))) + """) + + async for session in get_db_session(): + try: + result = await session.execute(raw_sql, {"person_id": person_id}) + rows = result.fetchall() + + # Batch fetch ontologies + concept_ids = ([row.measurement_value_concept_id for row in rows] + + [row.measurement_type_concept_id for row in rows] + + [row.measurement_unit_concept_id for row in rows]) + ontology_map = await get_ontologies(concept_ids) + + for row in rows: + if row.measurement_value_concept_id: + measurement_value = ontology_map.get(row.measurement_value_concept_id) + type_value = ontology_map.get(row.measurement_type_concept_id) + date_value = row.measurement_date + if measurement_value: + measurement = { + "assay": type_value, + "measurement_value": measurement_value, + "time_observed": get_timestamp(date_value) + } + measurements.append(measurement) + elif row.measurement_value: + type_value = ontology_map.get(row.measurement_type_concept_id) + unit_value = ontology_map.get(row.measurement_unit_concept_id) + measurement_value = row.measurement_value + if measurement_value: + measurement = { + "assay": type_value, + "measurement_value": { + "unit": unit_value, + "value": measurement_value + }, + "time_observed": get_timestamp(row.measurement_date) + } + measurements.append(measurement) + + except Exception as e: + logger.error(f"Database Error in get_measurements: {str(e)}") + return None + + return measurements if measurements else None def get_meta_data(): diff --git a/src/concept_mappings.json b/src/concept_mappings.json index d5af730..c8086b7 100644 --- a/src/concept_mappings.json +++ b/src/concept_mappings.json @@ -3,7 +3,7 @@ "disease_stage": { "omop_object": "measurement", "filtering_field": "value_as_concept_id", - "value_field": "value_as_concept_id", + "concept_value_field": "value_as_concept_id", "concept_ids": [ 4136272, 4114652, @@ -53,15 +53,26 @@ ] } }, - "measurements": { - "omop_object": "observation", - "filtering_field": "observation_concept_id", - "value_field": "value_as_concept_id", - "concept_ids": [ - 4203711, - 43054909 - ], - "date_field": "observation_date" - } - + "measurements": [ + { + "omop_object": "observation", + "filtering_field": "observation_concept_id", + "concept_value_field": "value_as_concept_id", + "concept_ids": [ + 4203711, + 43054909 + ], + "date_field": "observation_date" + }, + { + "omop_object": "measurement", + "filtering_field": "measurement_concept_id", + "concept_value_field": "value_as_concept_id", + "number_value_field": "value_as_number", + "unit_field": "unit_concept_id", + "date_field": "measurement_date", + "concept_ids": [], + "ancestor_ids": [4326835] + } + ] } \ No newline at end of file From 71ee131618532544de63e18cd839ef30845065d3 Mon Sep 17 00:00:00 2001 From: Marion Date: Tue, 3 Feb 2026 12:06:15 -0800 Subject: [PATCH 8/9] add pack years, relapse, fix cM mapping --- src/api/phenopacket_operations.py | 143 +++++++++++++++--------------- src/concept_mappings.json | 20 ++++- 2 files changed, 90 insertions(+), 73 deletions(-) diff --git a/src/api/phenopacket_operations.py b/src/api/phenopacket_operations.py index 4021dff..99cad2a 100644 --- a/src/api/phenopacket_operations.py +++ b/src/api/phenopacket_operations.py @@ -188,7 +188,7 @@ async def get_diseases(person_id: int): """) (clinical_tnm_finding_list, laterality_list, disease_stages) = await asyncio.gather( - get_tnm_findings(person_id, [4164336, 4164336, 4164466]), + get_tnm_findings(person_id, [4164336, 4164182, 4164466]), get_tnm_findings(person_id, [35918306]), get_disease_stages(person_id), ) @@ -238,7 +238,7 @@ async def get_diseases(person_id: int): async def get_tnm_findings(person_id: int, measurement_concept_ids: list[int]): """ - Get pathological TNM findings + Get TNM findings """ concept_ids_str = ",".join(map(str, measurement_concept_ids)) @@ -921,39 +921,41 @@ async def get_measurements(person_id: int): raw_sql = text(f""" SELECT DISTINCT {mapping['omop_object']}.{mapping['concept_value_field']} as measurement_value_concept_id, + {mapping['omop_object']}.{mapping['number_value_field']} as measurement_value, {mapping['omop_object']}.{mapping['filtering_field']} as measurement_type_concept_id, - {mapping['omop_object']}.{mapping['date_field']} as measurement_date + {mapping['omop_object']}.{mapping['date_field']} as measurement_date, + {mapping['omop_object']}.{mapping['unit_field']} as measurement_unit_concept_id FROM {settings.CDM_SCHEMA}.{mapping['omop_object']} WHERE {mapping['omop_object']}.person_id = :person_id AND {mapping['omop_object']}.{mapping['filtering_field']} IN({','.join([str(x) for x in mapping['concept_ids']])}) """) - async for session in get_db_session(): - try: - result = await session.execute(raw_sql, {"person_id": person_id}) - rows = result.fetchall() - - # Batch fetch ontologies - concept_ids = [row.measurement_value_concept_id for row in rows] + [row.measurement_type_concept_id for row in rows] - ontology_map = await get_ontologies(concept_ids) - - for row in rows: - measurement_value = ontology_map.get(row.measurement_value_concept_id) - type_value = ontology_map.get(row.measurement_type_concept_id) - date_value = row.measurement_date - if measurement_value: - measurement = { - "assay": type_value, - "measurement_value": measurement_value, - "time_observed": get_timestamp(date_value) - } - measurements.append(measurement) - - except Exception as e: - logger.error(f"Database Error in get_measurements: {str(e)}") - return None - if mapping['omop_object'] == "measurement": + # async for session in get_db_session(): + # try: + # result = await session.execute(raw_sql, {"person_id": person_id}) + # rows = result.fetchall() + + # # Batch fetch ontologies + # concept_ids = [row.measurement_value_concept_id for row in rows] + [row.measurement_type_concept_id for row in rows] + # ontology_map = await get_ontologies(concept_ids) + + # for row in rows: + # measurement_value = ontology_map.get(row.measurement_value_concept_id) + # type_value = ontology_map.get(row.measurement_type_concept_id) + # date_value = row.measurement_date + # if measurement_value: + # measurement = { + # "assay": type_value, + # "measurement_value": measurement_value, + # "time_observed": get_timestamp(date_value) + # } + # measurements.append(measurement) + + # except Exception as e: + # logger.error(f"Database Error in get_measurements: {str(e)}") + # return None + elif mapping['omop_object'] == "measurement": raw_sql = text(f""" SELECT DISTINCT {mapping['omop_object']}.{mapping['concept_value_field']} as measurement_value_concept_id, @@ -968,48 +970,49 @@ async def get_measurements(person_id: int): WHERE ancestor_concept_id IN ({','.join([str(x) for x in mapping['ancestor_ids']])}))) """) - async for session in get_db_session(): - try: - result = await session.execute(raw_sql, {"person_id": person_id}) - rows = result.fetchall() - - # Batch fetch ontologies - concept_ids = ([row.measurement_value_concept_id for row in rows] + - [row.measurement_type_concept_id for row in rows] + - [row.measurement_unit_concept_id for row in rows]) - ontology_map = await get_ontologies(concept_ids) - - for row in rows: - if row.measurement_value_concept_id: - measurement_value = ontology_map.get(row.measurement_value_concept_id) - type_value = ontology_map.get(row.measurement_type_concept_id) - date_value = row.measurement_date - if measurement_value: - measurement = { - "assay": type_value, - "measurement_value": measurement_value, - "time_observed": get_timestamp(date_value) - } - measurements.append(measurement) - elif row.measurement_value: - type_value = ontology_map.get(row.measurement_type_concept_id) - unit_value = ontology_map.get(row.measurement_unit_concept_id) - measurement_value = row.measurement_value - if measurement_value: - measurement = { - "assay": type_value, - "measurement_value": { - "unit": unit_value, - "value": measurement_value - }, - "time_observed": get_timestamp(row.measurement_date) - } - measurements.append(measurement) - - except Exception as e: - logger.error(f"Database Error in get_measurements: {str(e)}") - return None - + async for session in get_db_session(): + try: + result = await session.execute(raw_sql, {"person_id": person_id}) + rows = result.fetchall() + + # Batch fetch ontologies + concept_ids = ([row.measurement_value_concept_id for row in rows] + + [row.measurement_type_concept_id for row in rows] + + [row.measurement_unit_concept_id for row in rows] + [4129922]) + ontology_map = await get_ontologies(concept_ids) + + for row in rows: + if row.measurement_value_concept_id: + measurement_value = ontology_map.get(row.measurement_value_concept_id) + type_value = ontology_map.get(row.measurement_type_concept_id) + date_value = row.measurement_date + if measurement_value: + measurement = { + "assay": type_value, + "measurement_value": measurement_value, + "time_observed": get_timestamp(date_value) + } + measurements.append(measurement) + elif row.measurement_value: + type_value = ontology_map.get(row.measurement_type_concept_id) + unit_value = ontology_map.get(row.measurement_unit_concept_id) + if not unit_value: + unit_value = ontology_map.get(4129922) + measurement_value = row.measurement_value + if measurement_value: + measurement = { + "assay": type_value, + "measurement_value": { + "unit": unit_value, + "value": measurement_value + }, + "time_observed": get_timestamp(row.measurement_date) + } + measurements.append(measurement) + + except Exception as e: + logger.error(f"Database Error in get_measurements: {str(e)}") + return None return measurements if measurements else None diff --git a/src/concept_mappings.json b/src/concept_mappings.json index c8086b7..7b1306a 100644 --- a/src/concept_mappings.json +++ b/src/concept_mappings.json @@ -53,16 +53,31 @@ ] } }, + "biosamples": { + "measurements": { + "omop_object": "measurement", + "filtering_field": "measurement_concept_id", + "number_value_field": "value_as_number", + "concept_value_field": "value_as_concept_id", + "unit_field": "unit_concept_id", + "ancestor_ids": [4139794] + } + }, "measurements": [ { "omop_object": "observation", "filtering_field": "observation_concept_id", "concept_value_field": "value_as_concept_id", + "number_value_field": "value_as_number", "concept_ids": [ 4203711, - 43054909 + 43054909, + 4151768, + 4117444, + 3375793 ], - "date_field": "observation_date" + "date_field": "observation_date", + "unit_field": "unit_concept_id" }, { "omop_object": "measurement", @@ -71,7 +86,6 @@ "number_value_field": "value_as_number", "unit_field": "unit_concept_id", "date_field": "measurement_date", - "concept_ids": [], "ancestor_ids": [4326835] } ] From a638cf84bd850b928b02dbf56eefd53e2a37eccc Mon Sep 17 00:00:00 2001 From: Marion Date: Tue, 3 Feb 2026 15:49:41 -0800 Subject: [PATCH 9/9] group treatment info, condense method --- src/api/phenopacket_operations.py | 78 ++++++++++--------------------- src/concept_mappings.json | 21 +++++++++ 2 files changed, 46 insertions(+), 53 deletions(-) diff --git a/src/api/phenopacket_operations.py b/src/api/phenopacket_operations.py index 99cad2a..620c34f 100644 --- a/src/api/phenopacket_operations.py +++ b/src/api/phenopacket_operations.py @@ -68,8 +68,8 @@ async def get_medical_actions(person_id: int): procedures, radiation_therapies, ) = await asyncio.gather( - get_treatment_responses(person_id), - get_treatment_intents(person_id), + get_treatment_info_by_field(person_id, "response_to_treatment"), + get_treatment_info_by_field(person_id, "treatment_intent"), get_treatment_targets(person_id), get_treatment_agents(person_id), get_procedures(person_id), @@ -86,18 +86,17 @@ async def get_medical_actions(person_id: int): medical_actions = [] - # Use first intent/target for now since we can't figure out the link - treatment_intent = treatment_intents[0] if treatment_intents else None + # Use first target for now since we can't figure out the link treatment_target = treatment_targets[0] if treatment_targets else None # Create combinations of each treatment type with each response - for response in response_to_treatments: + for episode, response in response_to_treatments.items(): # Combine treatment agents with responses for agent in treatment_agents: medical_action = { "action": agent, "treatment_target": treatment_target, - "treatment_intent": treatment_intent, + "treatment_intent": treatment_intents[episode], "response_to_treatment": response, } medical_actions.append(medical_action) @@ -107,7 +106,7 @@ async def get_medical_actions(person_id: int): medical_action = { "action": procedure, "treatment_target": treatment_target, - "treatment_intent": treatment_intent, + "treatment_intent": treatment_intents[episode], "response_to_treatment": response, } medical_actions.append(medical_action) @@ -117,7 +116,7 @@ async def get_medical_actions(person_id: int): medical_action = { "action": radiation, "treatment_target": treatment_target, - "treatment_intent": treatment_intent, + "treatment_intent": treatment_intents[episode], "response_to_treatment": response, } medical_actions.append(medical_action) @@ -589,15 +588,16 @@ async def get_ontologies(concept_ids: list): return {} - -async def get_treatment_responses(person_id: int): +async def get_treatment_info_by_field(person_id: int, field: str): + mapping = settings.MAPPING_JSON['medical_actions'][field] raw_sql = text(f""" SELECT DISTINCT - observation.value_as_concept_id as treatment_response_concept_id + {mapping['omop_object']}.{mapping['concept_value_field']} as treatment_info_concept_id, + {mapping['omop_object']}.{mapping['grouping_field']} as episode_id FROM {settings.CDM_SCHEMA}.observation observation - WHERE observation.person_id = :person_id - AND observation.observation_concept_id = 4082405 - AND observation.value_as_concept_id IS NOT NULL + WHERE {mapping['omop_object']}.person_id = :person_id + AND {mapping['omop_object']}.{mapping['filtering_field']} IN ({','.join([str(x) for x in mapping['concept_ids']])}) + AND {mapping['omop_object']}.{mapping['concept_value_field']} IS NOT NULL """) async for session in get_db_session(): @@ -606,55 +606,27 @@ async def get_treatment_responses(person_id: int): rows = result.fetchall() # Batch fetch ontologies - concept_ids = [row.treatment_response_concept_id for row in rows] + concept_ids = [row.treatment_info_concept_id for row in rows] ontology_map = await get_ontologies(concept_ids) # Convert rows to list of OntologyClass objects - treatments = [ - ontology_map.get(row.treatment_response_concept_id) for row in rows - ] + treatment_info = {} + for row in rows: + treatment_info_ontology = ontology_map.get(row.treatment_info_concept_id) + if treatment_info_ontology: + treatment_info[row.episode_id] = treatment_info_ontology + else: + continue # Filter out None values if conversion failed - return [t for t in treatments if t is not None] + return treatment_info except Exception as e: logger.error(f"Database Error in get_treatments: {str(e)}") - return [] - - return [] - -async def get_treatment_intents(person_id: int): - raw_sql = text(f""" - SELECT DISTINCT - observation.value_as_concept_id as treatment_intent_concept_id - FROM {settings.CDM_SCHEMA}.observation observation - WHERE observation.person_id = :person_id - AND observation.observation_concept_id = 4133895 - AND observation.value_as_concept_id IS NOT NULL - """) - - async for session in get_db_session(): - try: - result = await session.execute(raw_sql, {"person_id": person_id}) - rows = result.fetchall() - - # Batch fetch ontologies - concept_ids = [row.treatment_intent_concept_id for row in rows] - ontology_map = await get_ontologies(concept_ids) - - # Convert rows to list of OntologyClass objects - intents = [ - ontology_map.get(row.treatment_intent_concept_id) for row in rows - ] - - # Filter out None values if conversion failed - return [i for i in intents if i is not None] + return {} - except Exception as e: - logger.error(f"Database Error in get_treatment_intents: {str(e)}") - return [] + return {} - return [] async def get_treatment_targets(person_id: int): raw_sql = text(f""" diff --git a/src/concept_mappings.json b/src/concept_mappings.json index 7b1306a..bb4f545 100644 --- a/src/concept_mappings.json +++ b/src/concept_mappings.json @@ -1,4 +1,25 @@ { + "medical_actions": { + "response_to_treatment": { + "omop_object": "observation", + "filtering_field": "observation_concept_id", + "concept_value_field": "value_as_concept_id", + "concept_ids": [ + 4082405 + ], + "grouping_field": "observation_event_id" + }, + "treatment_intent": { + "omop_object": "observation", + "filtering_field": "observation_concept_id", + "concept_value_field": "value_as_concept_id", + "concept_ids": [ + 4133895 + ], + "grouping_field": "observation_event_id" + } + + }, "diseases": { "disease_stage": { "omop_object": "measurement",