diff --git a/.gitignore b/.gitignore index 2eabda3..c66861e 100644 --- a/.gitignore +++ b/.gitignore @@ -162,5 +162,12 @@ cython_debug/ # VS code .vscode -# data heavy folders -data/ \ No newline at end of file +# data files +*.csv +*.xlsx +*.RData +*.DAT +*.R + +# DS_Store files +.DS_Store \ No newline at end of file diff --git a/dagster-dev.yaml b/dagster-dev.yaml deleted file mode 100644 index c89a742..0000000 --- a/dagster-dev.yaml +++ /dev/null @@ -1,27 +0,0 @@ -storage: -# update for BigQuery - postgres: - postgres_db: - username: - env: DAGSTER_PG_USERNAME - password: - env: DAGSTER_PG_PASSWORD - hostname: - env: DAGSTER_PG_HOST - db_name: - env: DAGSTER_PG_DB - port: 5432 - -# update for gcp -compute_logs: - module: dagster_aws.s3.compute_log_manager - class: S3ComputeLogManager - config: - bucket: "mycorp-dagster-compute-logs" - prefix: "dagster-test-" - -local_artifact_storage: - module: dagster.core.storage.root - class: LocalArtifactStorage - config: - base_dir: "/opt/dagster/local/" \ No newline at end of file diff --git a/dagster.yaml b/dagster.yaml deleted file mode 100644 index fe8226c..0000000 --- a/dagster.yaml +++ /dev/null @@ -1,27 +0,0 @@ -storage: - # update for bigQuery - postgres: - postgres_db: - username: - env: DAGSTER_PG_USERNAME - password: - env: DAGSTER_PG_PASSWORD - hostname: - env: DAGSTER_PG_HOST - db_name: - env: DAGSTER_PG_DB - port: 5432 - -compute_logs: - # update for gcp - module: dagster_aws.s3.compute_log_manager - class: S3ComputeLogManager - config: - bucket: "mycorp-dagster-compute-logs" - prefix: "dagster-test-" - -local_artifact_storage: - module: dagster.core.storage.root - class: LocalArtifactStorage - config: - base_dir: "/opt/dagster/local/" \ No newline at end of file diff --git a/data/Census/Census_raw/README.md b/data/Census/Census_raw/README.md new file mode 100644 index 0000000..dd6c70d --- /dev/null +++ b/data/Census/Census_raw/README.md @@ -0,0 +1,3 @@ +# Census raw files README + +TK \ No newline at end of file diff --git a/data/Census/README.md b/data/Census/README.md new file mode 100644 index 0000000..2f57e97 --- /dev/null +++ b/data/Census/README.md @@ -0,0 +1,3 @@ +# Census files README + +TK \ No newline at end of file diff --git a/data/NIH_child/NIH_conv/README.md b/data/NIH_child/NIH_conv/README.md new file mode 100644 index 0000000..05e35d3 --- /dev/null +++ b/data/NIH_child/NIH_conv/README.md @@ -0,0 +1,3 @@ +# NIH conv files README + +TK \ No newline at end of file diff --git a/data/NIH_child/NIH_conv/notes.md b/data/NIH_child/NIH_conv/notes.md new file mode 100644 index 0000000..56b94e1 --- /dev/null +++ b/data/NIH_child/NIH_conv/notes.md @@ -0,0 +1,18 @@ +## extract -- folder +- pulling from .DAT +- using r to convert to CSV + +## additional datas +- public health clinics +-- vaccination rates going down? +-- people's proximity to access changed over time? +- what has happened to quantity of public health clincs in each state over time? +- census data by state +- quantity of health insurance options by state? +- healthdata.gov: locations of hospitals by states, perhaps? + + +## dagster data +- stream of measal cases?! +- stream of covid cases?! +- list of clinics?? \ No newline at end of file diff --git a/data/NIH_child/README.md b/data/NIH_child/README.md new file mode 100644 index 0000000..83d51f2 --- /dev/null +++ b/data/NIH_child/README.md @@ -0,0 +1,3 @@ +# NIH files README + +TK \ No newline at end of file diff --git a/data/README.md b/data/README.md new file mode 100644 index 0000000..9db2f56 --- /dev/null +++ b/data/README.md @@ -0,0 +1,3 @@ +# Data folder README + +TK \ No newline at end of file diff --git a/vaccines/pyproject.toml b/pyproject.toml similarity index 100% rename from vaccines/pyproject.toml rename to pyproject.toml diff --git a/requirements.txt b/requirements.txt index cb1127a..e47ea0c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,15 @@ -dagster=1.3.6 -dagit=1.3.6 -dagster-docker -dagster-gcp=0.19.6 -polars=0.17.5 -petl=1.7.12 -pyarrow=11.0.0 -xlsx2csv=0.8.1 -connectorx=0.3.2a5 -SQLAlchemy=2.0.9 -requests=2.31.0 -google-cloud-bigquery=2.26.0 \ No newline at end of file +dagster == 1.3.7 +dagit == 1.3.7 +dagster-docker == 0.19.7 +dagster-duckdb == 0.19.7 +dagster-duckdb-pandas == 0.19.7 +dagster-gcp == 0.19.7 +openpyxl == 3.1.2 +polars == 0.17.5 +petl == 1.7.12 +pyarrow == 11.0.0 +xlsx2csv == 0.8.1 +connectorx == 0.3.2a5 +SQLAlchemy >= 1.0, < 2.0.0 +requests == 2.31.0 +google-cloud-bigquery == 3.11.0 \ No newline at end of file diff --git a/vaccines/setup.cfg b/setup.cfg similarity index 100% rename from vaccines/setup.cfg rename to setup.cfg diff --git a/vaccines/setup.py b/setup.py similarity index 100% rename from vaccines/setup.py rename to setup.py diff --git a/vaccines/__init__.py b/vaccines/__init__.py new file mode 100644 index 0000000..a61bde7 --- /dev/null +++ b/vaccines/__init__.py @@ -0,0 +1,32 @@ +# import os + +from dagster import Definitions + +from .assets import vaccine_assets +# from .jobs import TK +# from .resources import RESOURCES_LOCAL, RESOURCES_PROD, RESOURCES_STAGING +# from .sensors import make_slack_on_failure_sensor + +all_assets = [*vaccine_assets] + +''' +resources_by_deployment_name = { + "prod": RESOURCES_PROD, + "staging": RESOURCES_STAGING, + "local": RESOURCES_LOCAL, +} +''' + +# deployment_name = os.environ.get("DAGSTER_DEPLOYMENT", "local") + +# all_sensors = [activity_analytics_assets_sensor, recommender_assets_sensor] +''' +if deployment_name in ["prod", "staging"]: + all_sensors.append(make_slack_on_failure_sensor(base_url="my_dagit_url")) +''' + +defs = Definitions( + assets=all_assets, + # resources=resources_by_deployment_name[deployment_name], + # sensors=all_sensors, +) diff --git a/vaccines/assets/__init__.py b/vaccines/assets/__init__.py new file mode 100644 index 0000000..19ba959 --- /dev/null +++ b/vaccines/assets/__init__.py @@ -0,0 +1,7 @@ +from dagster import load_assets_from_package_module + +from . import vaccine_assets + +VACCINE_ASSETS = "vaccine_assets" + +vaccine_assets = load_assets_from_package_module(package_module=vaccine_assets) diff --git a/vaccines/assets/vaccine_assets/__init__.py b/vaccines/assets/vaccine_assets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/vaccines/vaccines/assets.py b/vaccines/assets/vaccine_assets/assets.py similarity index 100% rename from vaccines/vaccines/assets.py rename to vaccines/assets/vaccine_assets/assets.py diff --git a/vaccines/vaccines/__init__.py b/vaccines/vaccines/__init__.py deleted file mode 100644 index a36556a..0000000 --- a/vaccines/vaccines/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from dagster import Definitions, load_assets_from_modules - -from . import assets - -all_assets = load_assets_from_modules([assets]) - -defs = Definitions( - assets=all_assets, -) diff --git a/vaccines/vaccines_test/__init__.py b/vaccines/vaccines_test/__init__.py deleted file mode 100644 index 8b13789..0000000 --- a/vaccines/vaccines_test/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/vaccines/vaccines_test/test_assets.py b/vaccines/vaccines_test/test_assets.py deleted file mode 100644 index 8b13789..0000000 --- a/vaccines/vaccines_test/test_assets.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/vaccines_test/README.md b/vaccines_test/README.md new file mode 100644 index 0000000..67191d8 --- /dev/null +++ b/vaccines_test/README.md @@ -0,0 +1,48 @@ +# vaccines + +This is a [Dagster](https://dagster.io/) project scaffolded with [`dagster project scaffold`](https://docs.dagster.io/getting-started/create-new-project). + +## Getting started + +First, install your Dagster code location as a Python package. By using the --editable flag, pip will install your Python package in ["editable mode"](https://pip.pypa.io/en/latest/topics/local-project-installs/#editable-installs) so that as you develop, local code changes will automatically apply. + +```bash +pip install -e ".[dev]" +``` + +Then, start the Dagster UI web server: + +```bash +dagster dev +``` + +Open http://localhost:3000 with your browser to see the project. + +You can start writing assets in `vaccines/assets.py`. The assets are automatically loaded into the Dagster code location as you define them. + +## Development + + +### Adding new Python dependencies + +You can specify new Python dependencies in `setup.py`. + +### Unit testing + +Tests are in the `vaccines_tests` directory and you can run tests using `pytest`: + +```bash +pytest vaccines_tests +``` + +### Schedules and sensors + +If you want to enable Dagster [Schedules](https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules) or [Sensors](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors) for your jobs, the [Dagster Daemon](https://docs.dagster.io/deployment/dagster-daemon) process must be running. This is done automatically when you run `dagster dev`. + +Once your Dagster Daemon is running, you can start turning on schedules and sensors for your jobs. + +## Deploy on Dagster Cloud + +The easiest way to deploy your Dagster project is to use Dagster Cloud. + +Check out the [Dagster Cloud Documentation](https://docs.dagster.cloud) to learn more. diff --git a/vaccines_test/__init__.py b/vaccines_test/__init__.py new file mode 100644 index 0000000..a61bde7 --- /dev/null +++ b/vaccines_test/__init__.py @@ -0,0 +1,32 @@ +# import os + +from dagster import Definitions + +from .assets import vaccine_assets +# from .jobs import TK +# from .resources import RESOURCES_LOCAL, RESOURCES_PROD, RESOURCES_STAGING +# from .sensors import make_slack_on_failure_sensor + +all_assets = [*vaccine_assets] + +''' +resources_by_deployment_name = { + "prod": RESOURCES_PROD, + "staging": RESOURCES_STAGING, + "local": RESOURCES_LOCAL, +} +''' + +# deployment_name = os.environ.get("DAGSTER_DEPLOYMENT", "local") + +# all_sensors = [activity_analytics_assets_sensor, recommender_assets_sensor] +''' +if deployment_name in ["prod", "staging"]: + all_sensors.append(make_slack_on_failure_sensor(base_url="my_dagit_url")) +''' + +defs = Definitions( + assets=all_assets, + # resources=resources_by_deployment_name[deployment_name], + # sensors=all_sensors, +) diff --git a/vaccines_test/assets/__init__.py b/vaccines_test/assets/__init__.py new file mode 100644 index 0000000..19ba959 --- /dev/null +++ b/vaccines_test/assets/__init__.py @@ -0,0 +1,7 @@ +from dagster import load_assets_from_package_module + +from . import vaccine_assets + +VACCINE_ASSETS = "vaccine_assets" + +vaccine_assets = load_assets_from_package_module(package_module=vaccine_assets) diff --git a/vaccines_test/assets/vaccine_assets/__init__.py b/vaccines_test/assets/vaccine_assets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/vaccines_test/assets/vaccine_assets/assets.py b/vaccines_test/assets/vaccine_assets/assets.py new file mode 100644 index 0000000..6372b08 --- /dev/null +++ b/vaccines_test/assets/vaccine_assets/assets.py @@ -0,0 +1,135 @@ +import polars as pl +import pandas as pd +import os +import requests +from dagster import asset, Output, get_dagster_logger +from collections import defaultdict + +logger = get_dagster_logger() + +@asset( + group_name='utils' +) +def cities() -> tuple: + cities = ( + 'Chicago city', + 'New York city') + return cities + +# determine years to pull data for +@asset( + group_name='FIPS', +) +def files() -> list: + path = 'data/NIH_child/NIH_raw' # update from handcoded + files = os.listdir(path) + files = [file for file in files if file.endswith('.DAT')] + return files + +@asset( + group_name='utils' +) +def years(files:list) -> list: + years = [file.split('.')[0] for file in files] + years = [f'20{year[6:8]}' for year in years] + return years + +# pull FIPS data from census for each year +@asset( + group_name='FIPS', +) +def urls(years:list) -> Output[dict]: + urls = defaultdict(dict) + for year in years: + state = f'https://www2.census.gov/programs-surveys/popest/geographies/{year}/state-geocodes-v{year}.xlsx' + place = f'https://www2.census.gov/programs-surveys/popest/geographies/{year}/all-geocodes-v{year}.xlsx' + urls.update({year: {'state': state, 'place': place}}) + return Output( + value=urls, + metadata={ + year: urls[year] for year in years + } + ) + +# pull FIPS Codes in excel format for each year +@asset( + group_name='FIPS', +) +def fipsFiles(urls:dict) -> list: + for year, url in urls.items(): + place_url = url['place'] + path = 'data/Census/Census_raw' # update from handcoded + response = requests.get(place_url) + excel_path = f'{path}/{year}_FIPS_place.xlsx' + directory = os.path.dirname(excel_path) + os.makedirs(directory, exist_ok=True) + with open(excel_path, 'wb') as file: + file.write(response.content) + pd.read_excel( + excel_path, + skiprows=4, + engine='openpyxl', + ).to_csv(f'{path}/{year}_FIPS_place.csv', index=False) + files = os.listdir(path) + # update to more specific search criteria + return [f'{path}/{file}' for file in files if file.startswith('20')] + +# identify FIPS for Chicago, IL and New York, NY +@asset( + group_name='FIPS', +) +def fipsLocation(years:list, cities:tuple) -> dict: + fips = defaultdict(list) + for year in years: + file = f'data/Census/Census_raw/{year}_FIPS_place.csv' # update from handcoded + for city in cities: + fips[year].append(str(pl.read_csv(file) + .select(name=pl.col('^Area Name.*$') + .where(pl.col('^Area Name.*$') == city) + .limit(1)).to_series()[0])) + return fips + +# pull American Community Survey variables for each year +@asset( + group_name='ACS', +) +def acsVars(years:list) -> dict: + variables = {} + for year in years: + url = f'https://api.census.gov/data/{year}/acs/acs1/subject/variables.json' + print(url) + response = requests.get(url) + if response.status_code == 200: + variables[year] = response.json()['variables'] + else: + logger.error(f'Error with {year}: {response.status_code}') + return variables + + +# pull American Community Survey data for each year +@asset( + group_name='ACS', +) +def acsURLs(years:list, fipsLocation:dict, acsVars:dict) -> dict: + urls = defaultdict(list) + for year in acsVars.keys(): + cities = fipsLocation[year] + for city in cities: + key = 'cc1e68d7a1e08032441b961b0264d57bcfab83bb' + vars = ','.join(var for var in acsVars[year].keys()) + url = f'https://api.census.gov/data/{year}/acs/acs1/cprofile?get={vars}&for=place:{city}&key={key}' + url = url.replace(' ', '%20') + urls[year].append(url) + return urls + +# pull American Community Survey data for each year +@asset( + group_name='ACS', +) +def acsData(acsURLs:dict) -> dict: + data = defaultdict(list) + for year, urls in acsURLs.items(): + for url in urls: + response = requests.get(url) + data[year].append(response.json()) + return data \ No newline at end of file diff --git a/vaccines_test/jobs.py b/vaccines_test/jobs.py new file mode 100644 index 0000000..e3214b2 --- /dev/null +++ b/vaccines_test/jobs.py @@ -0,0 +1,16 @@ +from dagster import job +from ops.extract import ( + getFiles, + getYears, + setURLs, + pullFIPS, +) + + +# extract FIPS files +@job +def extractFIPS(): + files = getFiles() + years = getYears(files) + urls = setURLs(years) + pullFIPS(urls) \ No newline at end of file diff --git a/workspace.yaml b/workspace.yaml deleted file mode 100644 index e6839b6..0000000 --- a/workspace.yaml +++ /dev/null @@ -1,3 +0,0 @@ -load_from: - # References the file copied into your Dockerfile - - python_file: repo.py \ No newline at end of file