diff --git a/schema.yml b/schema.yml index 0c11cf2..93ad8b0 100644 --- a/schema.yml +++ b/schema.yml @@ -2352,7 +2352,7 @@ components: type: string description: > The agent or organization that created the phenopacket. - submitter_by: + submitted_by: type: string description: > Name of person who submitted the phenopacket diff --git a/src/api/phenopacket_operations.py b/src/api/phenopacket_operations.py index bef85e7..620c34f 100644 --- a/src/api/phenopacket_operations.py +++ b/src/api/phenopacket_operations.py @@ -57,42 +57,56 @@ async def get_by_id(dataset_id: str, id: int): async def get_medical_actions(person_id: int): + # TODO: need to group/link each + # response_to_treatments, treatment_intents, treatment_targets + # instead of getting only 1 ( response_to_treatments, + treatment_intents, + treatment_targets, treatment_agents, - procedure_codes, + procedures, radiation_therapies, ) = await asyncio.gather( - get_treatments(person_id), - get_drug_exposures(person_id), - get_procedure_codes(person_id), + get_treatment_info_by_field(person_id, "response_to_treatment"), + get_treatment_info_by_field(person_id, "treatment_intent"), + get_treatment_targets(person_id), + get_treatment_agents(person_id), + get_procedures(person_id), get_radiation_therapies(person_id), ) if not response_to_treatments: - if not (treatment_agents or procedure_codes or radiation_therapies): + if not (treatment_agents or procedures or radiation_therapies): return None return None - if not (treatment_agents or procedure_codes or radiation_therapies): + if not (treatment_agents or procedures or radiation_therapies): return None medical_actions = [] + # Use first target for now since we can't figure out the link + treatment_target = treatment_targets[0] if treatment_targets else None + # Create combinations of each treatment type with each response - for response in response_to_treatments: + for episode, response in response_to_treatments.items(): # Combine treatment agents with responses for agent in treatment_agents: medical_action = { - "action": {"agent": agent}, + "action": agent, + "treatment_target": treatment_target, + "treatment_intent": treatment_intents[episode], "response_to_treatment": response, } medical_actions.append(medical_action) # Combine procedures with responses - for procedure_code in procedure_codes: + for procedure in procedures: medical_action = { - "action": {"code": procedure_code}, + "action": procedure, + "treatment_target": treatment_target, + "treatment_intent": treatment_intents[episode], "response_to_treatment": response, } medical_actions.append(medical_action) @@ -101,6 +115,8 @@ async def get_medical_actions(person_id: int): for radiation in radiation_therapies: medical_action = { "action": radiation, + "treatment_target": treatment_target, + "treatment_intent": treatment_intents[episode], "response_to_treatment": response, } medical_actions.append(medical_action) @@ -111,23 +127,15 @@ async def get_medical_actions(person_id: int): async def get_disease_stages(person_id: int): raw_sql = text(f""" SELECT DISTINCT - m.value_as_concept_id as disease_stage_concept_id - FROM {settings.CDM_SCHEMA}.measurement AS m + m.{settings.MAPPING_JSON['diseases']['disease_stage']['concept_value_field']} as disease_stage_concept_id + FROM {settings.CDM_SCHEMA}.{settings.MAPPING_JSON['diseases']['disease_stage']['omop_object']} AS m WHERE m.person_id = :person_id - AND m.measurement_concept_id IN ( - SELECT descendant_concept_id - FROM {settings.CDM_SCHEMA}.concept_ancestor - WHERE ancestor_concept_id = ( - SELECT concept_id - FROM {settings.CDM_SCHEMA}.concept - WHERE vocabulary_id = 'SNOMED' - AND concept_code = '254292007' + AND ( + m.{settings.MAPPING_JSON['diseases']['disease_stage']['filtering_field']} IN ( + SELECT descendant_concept_id FROM {settings.CDM_SCHEMA}.concept_ancestor + WHERE ancestor_concept_id IN ({','.join([str(x) for x in settings.MAPPING_JSON['diseases']['disease_stage']['ancestor_ids']])})) + OR m.value_as_concept_id IN ({','.join([str(x) for x in settings.MAPPING_JSON['diseases']['disease_stage']['concept_ids']])}) ) - UNION - SELECT 37163866 - UNION - SELECT 607126 - ) """) async for session in get_db_session(): @@ -179,7 +187,7 @@ async def get_diseases(person_id: int): """) (clinical_tnm_finding_list, laterality_list, disease_stages) = await asyncio.gather( - get_tnm_findings(person_id, [4164336, 4164336, 4164466]), + get_tnm_findings(person_id, [4164336, 4164182, 4164466]), get_tnm_findings(person_id, [35918306]), get_disease_stages(person_id), ) @@ -229,7 +237,7 @@ async def get_diseases(person_id: int): async def get_tnm_findings(person_id: int, measurement_concept_ids: list[int]): """ - Get pathological TNM findings + Get TNM findings """ concept_ids_str = ",".join(map(str, measurement_concept_ids)) @@ -499,8 +507,12 @@ def get_timestamp(datetime_value): Convert OMOP datetime to Phenopacket Timestamp format. """ if isinstance(datetime_value, datetime): + if datetime_value.date() == date(1800, 1, 1): + return None return {"iso8601timestamp": datetime_value.isoformat()} elif isinstance(datetime_value, date): + if datetime_value == date(1800, 1, 1): + return None return {"iso8601timestamp": datetime_value.isoformat()} return None @@ -524,6 +536,9 @@ def get_survival_time(disease_first_occurrence_date, death_date): else: end_date = death_date + if start_date == date(1800, 1, 1) or end_date == date(1800, 1, 1): + return None + # Calculate difference in days delta = end_date - start_date return delta.days @@ -573,15 +588,16 @@ async def get_ontologies(concept_ids: list): return {} - -async def get_treatments(person_id: int): +async def get_treatment_info_by_field(person_id: int, field: str): + mapping = settings.MAPPING_JSON['medical_actions'][field] raw_sql = text(f""" SELECT DISTINCT - observation.value_as_concept_id as treatment_response_concept_id + {mapping['omop_object']}.{mapping['concept_value_field']} as treatment_info_concept_id, + {mapping['omop_object']}.{mapping['grouping_field']} as episode_id FROM {settings.CDM_SCHEMA}.observation observation - WHERE observation.person_id = :person_id - AND observation.observation_concept_id = 4082405 - AND observation.value_as_concept_id IS NOT NULL + WHERE {mapping['omop_object']}.person_id = :person_id + AND {mapping['omop_object']}.{mapping['filtering_field']} IN ({','.join([str(x) for x in mapping['concept_ids']])}) + AND {mapping['omop_object']}.{mapping['concept_value_field']} IS NOT NULL """) async for session in get_db_session(): @@ -590,28 +606,76 @@ async def get_treatments(person_id: int): rows = result.fetchall() # Batch fetch ontologies - concept_ids = [row.treatment_response_concept_id for row in rows] + concept_ids = [row.treatment_info_concept_id for row in rows] ontology_map = await get_ontologies(concept_ids) # Convert rows to list of OntologyClass objects - treatments = [ - ontology_map.get(row.treatment_response_concept_id) for row in rows - ] + treatment_info = {} + for row in rows: + treatment_info_ontology = ontology_map.get(row.treatment_info_concept_id) + if treatment_info_ontology: + treatment_info[row.episode_id] = treatment_info_ontology + else: + continue # Filter out None values if conversion failed - return [t for t in treatments if t is not None] + return treatment_info except Exception as e: logger.error(f"Database Error in get_treatments: {str(e)}") + return {} + + return {} + + +async def get_treatment_targets(person_id: int): + raw_sql = text(f""" + SELECT DISTINCT + condition_occurrence.condition_concept_id as treatment_target_concept_id + FROM {settings.CDM_SCHEMA}.episode episode + INNER JOIN {settings.CDM_SCHEMA}.episode_event episode_event + ON episode.episode_id = episode_event.episode_id + AND episode_event.episode_event_field_concept_id = 1147127 + INNER JOIN {settings.CDM_SCHEMA}.condition_occurrence condition_occurrence + ON episode_event.event_id = condition_occurrence.condition_occurrence_id + WHERE episode.person_id = :person_id + AND episode.episode_concept_id = 32528 + """) + + async for session in get_db_session(): + try: + result = await session.execute(raw_sql, {"person_id": person_id}) + rows = result.fetchall() + + # Batch fetch ontologies + concept_ids = [row.treatment_target_concept_id for row in rows] + ontology_map = await get_ontologies(concept_ids) + + # Convert rows to list of OntologyClass objects + targets = [ + ontology_map.get(row.treatment_target_concept_id) for row in rows + ] + + # Filter out None values if conversion failed + return [t for t in targets if t is not None] + + except Exception as e: + logger.error(f"Database Error in get_treatment_targets: {str(e)}") return [] return [] - -async def get_drug_exposures(person_id: int): +async def get_treatment_agents(person_id: int): raw_sql = text(f""" SELECT DISTINCT - drug_exposure.drug_concept_id + drug_exposure.drug_concept_id, + drug_exposure.dose_unit_source_value as quantity_unit, + drug_exposure.quantity as quantity_value, + drug_exposure.drug_exposure_end_date as dose_intervals_end, + drug_exposure.drug_exposure_start_date as dose_intervals_start, + drug_exposure.dose_unit_source_value as dose_intervals_quantity_unit, + drug_exposure.quantity as dose_intervals_quantity_value, + drug_exposure.route_concept_id as route_concept_id FROM {settings.CDM_SCHEMA}.episode episode INNER JOIN {settings.CDM_SCHEMA}.episode_event episode_event ON episode.episode_id = episode_event.episode_id @@ -621,6 +685,7 @@ async def get_drug_exposures(person_id: int): WHERE episode.person_id = :person_id AND episode.episode_concept_id = 32941 AND drug_exposure.drug_concept_id IS NOT NULL + AND drug_exposure.drug_type_concept_id = 32833 """) async for session in get_db_session(): @@ -628,15 +693,78 @@ async def get_drug_exposures(person_id: int): result = await session.execute(raw_sql, {"person_id": person_id}) rows = result.fetchall() - # Batch fetch ontologies + # Batch fetch ontologies for both drug_concept_id and route_concept_id concept_ids = [row.drug_concept_id for row in rows] + # Add route_concept_ids that are not None + concept_ids.extend([row.route_concept_id for row in rows if row.route_concept_id is not None]) ontology_map = await get_ontologies(concept_ids) - # Convert rows to list of OntologyClass objects - drug_exposures = [ontology_map.get(row.drug_concept_id) for row in rows] + # Convert rows to list of treatment agent objects + treatment_agents = [] + for row in rows: + agent = ontology_map.get(row.drug_concept_id) + if agent: + treatment_agent = { + "agent": agent, + "drug_type": "UNKNOWN_DRUG_TYPE" + } + + # Add route_of_administration + if row.route_concept_id is not None: + route = ontology_map.get(row.route_concept_id) + if route: + treatment_agent["route_of_administration"] = route + else: + treatment_agent["route_of_administration"] = { + "id": "SNOMED:261665006", + "label": "Unknown" + } + else: + treatment_agent["route_of_administration"] = { + "id": "SNOMED:261665006", + "label": "Unknown" + } + + # TODO: need to figureout quantity_unit ontology + # treatment_agent["cumulative_dose"] = { + # "value": row.quantity_value, + # "unit": row.quantity_unit + # } + + # TODO: need to figureout quantity_unit ontology + # Add dose_intervals if we have the necessary data + # if row.dose_intervals_start or row.dose_intervals_end or row.dose_intervals_quantity_value: + # dose_intervals = {} + # treatment_agent["schedule_frequency"] = { + # "id": "SNOMED:261665006", + # "label": "Unknown" + # } + + # # Add quantity if available + # if row.dose_intervals_quantity_value is not None: + # dose_intervals["quantity"] = { + # "value": row.dose_intervals_quantity_value, + # "unit": {} + # } + + + # # Add interval if start or end date is available + # if row.dose_intervals_start or row.dose_intervals_end: + # interval = {} + # if row.dose_intervals_start: + # interval["start"] = row.dose_intervals_start.isoformat() + # if row.dose_intervals_end: + # interval["end"] = row.dose_intervals_end.isoformat() + # dose_intervals["interval"] = interval + + # if dose_intervals: + # treatment_agent["dose_intervals"] = dose_intervals + + + treatment_agents.append(treatment_agent) + + return treatment_agents - # Filter out None values if conversion failed - return [d for d in drug_exposures if d is not None] except Exception as e: logger.error(f"Database Error in get_drug_exposure: {str(e)}") @@ -645,19 +773,24 @@ async def get_drug_exposures(person_id: int): return [] -async def get_procedure_codes(person_id: int): +async def get_procedures(person_id: int): raw_sql = text(f""" SELECT DISTINCT - procedure_occurrence.procedure_concept_id + procedure_occurrence.procedure_concept_id, + procedure_occurrence.procedure_date as performed, + observation.value_as_concept_id as body_site_concept_id FROM {settings.CDM_SCHEMA}.episode episode INNER JOIN {settings.CDM_SCHEMA}.episode_event episode_event ON episode.episode_id = episode_event.episode_id AND episode_event.episode_event_field_concept_id = 1147082 INNER JOIN {settings.CDM_SCHEMA}.procedure_occurrence procedure_occurrence ON episode_event.event_id = procedure_occurrence.procedure_occurrence_id + LEFT JOIN {settings.CDM_SCHEMA}.observation observation + ON observation.observation_event_id = episode.episode_id + AND observation.observation_concept_id = 4181646 + AND observation.obs_event_field_concept_id = 798885 WHERE episode.person_id = :person_id AND episode.episode_concept_id = 32939 - AND procedure_occurrence.procedure_concept_id IS NOT NULL """) async for session in get_db_session(): @@ -665,18 +798,31 @@ async def get_procedure_codes(person_id: int): result = await session.execute(raw_sql, {"person_id": person_id}) rows = result.fetchall() - # Batch fetch ontologies - concept_ids = [row.procedure_concept_id for row in rows] + # Batch fetch ontologies for both procedure and body_site + concept_ids = [] + for row in rows: + concept_ids.extend([row.procedure_concept_id, row.body_site_concept_id]) + ontology_map = await get_ontologies(concept_ids) - # Convert rows to list of OntologyClass objects - procedures = [ontology_map.get(row.procedure_concept_id) for row in rows] + # Convert rows to list of procedure objects + procedures = [] + for row in rows: + code = ontology_map.get(row.procedure_concept_id) + body_site = ontology_map.get(row.body_site_concept_id) + performed = get_timestamp(row.performed) + if code: + procedure = { + "code": code, + "body_site": body_site, + "performed": performed + } + procedures.append(procedure) - # Filter out None values if conversion failed - return [p for p in procedures if p is not None] + return procedures except Exception as e: - logger.error(f"Database Error in get_procedure_code: {str(e)}") + logger.error(f"Database Error in get_procedures: {str(e)}") return [] return [] @@ -741,47 +887,112 @@ async def get_radiation_therapies(person_id: int): async def get_measurements(person_id: int): - raw_sql = text(f""" - SELECT DISTINCT - observation.value_as_concept_id as measurement_value_concept_id - FROM {settings.CDM_SCHEMA}.observation observation - WHERE observation.person_id = :person_id - AND observation.observation_concept_id = 43054909 - """) - - async for session in get_db_session(): - try: - result = await session.execute(raw_sql, {"person_id": person_id}) - rows = result.fetchall() - - # Batch fetch ontologies - concept_ids = [row.measurement_value_concept_id for row in rows] - ontology_map = await get_ontologies(concept_ids) - - measurements = [] - for row in rows: - measurement_value = ontology_map.get(row.measurement_value_concept_id) - if measurement_value: - measurement = { - "assay": {"id": "LOINC:72166-2", "label": "Tobacco smoking status"}, - "measurement_value": measurement_value, - } - measurements.append(measurement) - - return measurements if measurements else None - - except Exception as e: - logger.error(f"Database Error in get_measurements: {str(e)}") - return None - - return None + measurements = [] + for mapping in settings.MAPPING_JSON['measurements']: + if mapping['omop_object'] == "observation": + raw_sql = text(f""" + SELECT DISTINCT + {mapping['omop_object']}.{mapping['concept_value_field']} as measurement_value_concept_id, + {mapping['omop_object']}.{mapping['number_value_field']} as measurement_value, + {mapping['omop_object']}.{mapping['filtering_field']} as measurement_type_concept_id, + {mapping['omop_object']}.{mapping['date_field']} as measurement_date, + {mapping['omop_object']}.{mapping['unit_field']} as measurement_unit_concept_id + FROM {settings.CDM_SCHEMA}.{mapping['omop_object']} + WHERE {mapping['omop_object']}.person_id = :person_id + AND {mapping['omop_object']}.{mapping['filtering_field']} + IN({','.join([str(x) for x in mapping['concept_ids']])}) + """) + + # async for session in get_db_session(): + # try: + # result = await session.execute(raw_sql, {"person_id": person_id}) + # rows = result.fetchall() + + # # Batch fetch ontologies + # concept_ids = [row.measurement_value_concept_id for row in rows] + [row.measurement_type_concept_id for row in rows] + # ontology_map = await get_ontologies(concept_ids) + + # for row in rows: + # measurement_value = ontology_map.get(row.measurement_value_concept_id) + # type_value = ontology_map.get(row.measurement_type_concept_id) + # date_value = row.measurement_date + # if measurement_value: + # measurement = { + # "assay": type_value, + # "measurement_value": measurement_value, + # "time_observed": get_timestamp(date_value) + # } + # measurements.append(measurement) + + # except Exception as e: + # logger.error(f"Database Error in get_measurements: {str(e)}") + # return None + elif mapping['omop_object'] == "measurement": + raw_sql = text(f""" + SELECT DISTINCT + {mapping['omop_object']}.{mapping['concept_value_field']} as measurement_value_concept_id, + {mapping['omop_object']}.{mapping['number_value_field']} as measurement_value, + {mapping['omop_object']}.{mapping['filtering_field']} as measurement_type_concept_id, + {mapping['omop_object']}.{mapping['date_field']} as measurement_date, + {mapping['omop_object']}.{mapping['unit_field']} as measurement_unit_concept_id + FROM {settings.CDM_SCHEMA}.{mapping['omop_object']} + WHERE {mapping['omop_object']}.person_id = :person_id + AND ({mapping['filtering_field']} IN ( + SELECT descendant_concept_id FROM {settings.CDM_SCHEMA}.concept_ancestor + WHERE ancestor_concept_id IN ({','.join([str(x) for x in mapping['ancestor_ids']])}))) + """) + + async for session in get_db_session(): + try: + result = await session.execute(raw_sql, {"person_id": person_id}) + rows = result.fetchall() + + # Batch fetch ontologies + concept_ids = ([row.measurement_value_concept_id for row in rows] + + [row.measurement_type_concept_id for row in rows] + + [row.measurement_unit_concept_id for row in rows] + [4129922]) + ontology_map = await get_ontologies(concept_ids) + + for row in rows: + if row.measurement_value_concept_id: + measurement_value = ontology_map.get(row.measurement_value_concept_id) + type_value = ontology_map.get(row.measurement_type_concept_id) + date_value = row.measurement_date + if measurement_value: + measurement = { + "assay": type_value, + "measurement_value": measurement_value, + "time_observed": get_timestamp(date_value) + } + measurements.append(measurement) + elif row.measurement_value: + type_value = ontology_map.get(row.measurement_type_concept_id) + unit_value = ontology_map.get(row.measurement_unit_concept_id) + if not unit_value: + unit_value = ontology_map.get(4129922) + measurement_value = row.measurement_value + if measurement_value: + measurement = { + "assay": type_value, + "measurement_value": { + "unit": unit_value, + "value": measurement_value + }, + "time_observed": get_timestamp(row.measurement_date) + } + measurements.append(measurement) + + except Exception as e: + logger.error(f"Database Error in get_measurements: {str(e)}") + return None + return measurements if measurements else None def get_meta_data(): return { "created": datetime.now(timezone.utc).isoformat(), "created_by": "DHDP", - "submitter_by": "DHDP", + "submitted_by": "DHDP", "phenopacket_schema_version": "2.0.0", "resources": [ { diff --git a/src/concept_mappings.json b/src/concept_mappings.json new file mode 100644 index 0000000..bb4f545 --- /dev/null +++ b/src/concept_mappings.json @@ -0,0 +1,113 @@ +{ + "medical_actions": { + "response_to_treatment": { + "omop_object": "observation", + "filtering_field": "observation_concept_id", + "concept_value_field": "value_as_concept_id", + "concept_ids": [ + 4082405 + ], + "grouping_field": "observation_event_id" + }, + "treatment_intent": { + "omop_object": "observation", + "filtering_field": "observation_concept_id", + "concept_value_field": "value_as_concept_id", + "concept_ids": [ + 4133895 + ], + "grouping_field": "observation_event_id" + } + + }, + "diseases": { + "disease_stage": { + "omop_object": "measurement", + "filtering_field": "value_as_concept_id", + "concept_value_field": "value_as_concept_id", + "concept_ids": [ + 4136272, + 4114652, + 4128936, + 45884284, + 45881605, + 45882496, + 45879332, + 45878383, + 45876317, + 45884292, + 45876326, + 45881861, + 45880980, + 45882495, + 45882494, + 1633775, + 45881607, + 45881606, + 45878652, + 46237073, + 45880979, + 37163968, + 45876316, + 46237999, + 46237478, + 45882497, + 45878382, + 46237084, + 45878643, + 45880978, + 37163997, + 37163998, + 45884285, + 45881608, + 46237081, + 46237479, + 45876327, + 45878395, + 46237998, + 46237085 + ], + "ancestor_ids":[ + 37163866, + 4130406, + 734333 + ] + } + }, + "biosamples": { + "measurements": { + "omop_object": "measurement", + "filtering_field": "measurement_concept_id", + "number_value_field": "value_as_number", + "concept_value_field": "value_as_concept_id", + "unit_field": "unit_concept_id", + "ancestor_ids": [4139794] + } + }, + "measurements": [ + { + "omop_object": "observation", + "filtering_field": "observation_concept_id", + "concept_value_field": "value_as_concept_id", + "number_value_field": "value_as_number", + "concept_ids": [ + 4203711, + 43054909, + 4151768, + 4117444, + 3375793 + ], + "date_field": "observation_date", + "unit_field": "unit_concept_id" + }, + { + "omop_object": "measurement", + "filtering_field": "measurement_concept_id", + "concept_value_field": "value_as_concept_id", + "number_value_field": "value_as_number", + "unit_field": "unit_concept_id", + "date_field": "measurement_date", + "ancestor_ids": [4326835] + } + ] +} \ No newline at end of file diff --git a/src/config.py b/src/config.py index 4ce8b68..887a617 100644 --- a/src/config.py +++ b/src/config.py @@ -5,6 +5,7 @@ """ import os +import json class Settings: @@ -25,6 +26,8 @@ def DB_PASSWORD(self) -> str: CDM_SCHEMA: str = os.getenv("CDM_SCHEMA", "omop") TO_INGEST_DIR = "upload/to_ingest" RESULTS_DIR = "upload/results" + with open("src/concept_mappings.json", "r") as f: + MAPPING_JSON = json.load(f) @property def DATABASE_URI(self) -> str: