This repository contains the course project for Fundamentals of Data Engineering (CS 6830). The goal is to build automated batch and real-time pipelines around Utah Transit Authority's General Transit Feed Specification (GTFS) data and surface the results in dashboards.
- Transit site: https://www.rideuta.com/
- GTFS schedule endpoint: https://www.rideuta.com/-/media/ride-uta/Files/GTFS/GTFS.zip
- GTFS real-time vehicle endpoint: https://apps.rideuta.com/tms/gtfs/Vehicle
- Recommended pull cadence for the real-time API: every 1–30 seconds
- Public GitHub repository with thorough documentation to showcase the pipelines as part of a portfolio
- Real-time tooling:
uta-realtimeCLI streams decoded vehicles into Amazon Kinesis
src/uta_data_pipeline/: Python package that implements the batch ingestion, transform, and quality commandsdocs/: supplemental documentation, including ERD diagrams and data discovery notesdags/: reference Airflow DAGs that orchestrate the batch pipelinescreenshots/: UI captures for dashboards or verificationpyproject.toml: packaging metadata used to install the CLI in editable mode during development
-
Create and activate a virtual environment.
-
Install the package in editable mode with optional extras as needed:
python -m pip install --upgrade pip python -m pip install -e . -
Run the batch CLI locally to download and load the GTFS schedule into DuckDB:
uta-batch download uta-batch load data/gtfs.zip --database data/gtfs.duckdb
-
Execute the unit tests to validate the downloader, loader, and quality checks:
python -m pip install duckdb pytest python -m pytest
The project builds two complementary data paths around GTFS:
- Batch pipeline for slowly changing data such as schedules.
- Real-time pipeline for rapidly changing vehicle positions.
Combined outputs feed dashboards that display current vehicle positions, estimated arrival times, and on-time performance.
UTA publishes GTFS data in two forms:
- GTFS Schedule: a downloadable ZIP containing CSV tables.
- GTFS Realtime (GTFS-RT): a protobuf API at https://apps.rideuta.com/tms/gtfs/Vehicle. Libraries decode the protobuf payload into structured vehicle entities.
The assignment spans four weekly milestones: batch ingestion and discovery; real-time ingestion; combining batch and real-time data; and dashboards.
- Data discovery: Verify the stable GTFS Schedule URL and document it in
docs/data_discovery.md. - Data description: Create and maintain the ER diagram in
docs/ERD.md. - Automated download + ingestion:
uta_data_pipeline.batchdownloads the schedule archive, stages it locally or in S3, and loads all tables into DuckDB/SQLite. - Batch transformation + quality: Transform command materializes typed analytics tables and the quality command enforces row-count guardrails.
- Orchestration: Reference Apache Airflow DAG in
dags/week1_batch_dag.pyschedules the download → load → transform → quality steps daily.
All commands share the same CLI entry point:
# Download the latest schedule locally and optionally upload it to S3
python -m uta_data_pipeline.batch download --destination data/gtfs.zip --s3-bucket my-bucket --s3-prefix uta/gtfs
# Load and transform the data into DuckDB
python -m uta_data_pipeline.batch load data/gtfs.zip --database data/gtfs.duckdb
python -m uta_data_pipeline.batch transform data/gtfs.duckdb --target-schema gtfs_clean
# Run minimum row-count quality checks (for example inside CI/CD)
python -m uta_data_pipeline.batch quality data/gtfs.duckdb --schema gtfs --min-rows 1The commands emit deterministic, parseable output and raise non-zero exit codes when network, ingestion, or quality failures occur. The helpers can also be imported directly inside Airflow tasks or Dagster ops. When DuckDB is unavailable (for example in restricted CI environments), the loader automatically falls back to SQLite while preserving the same function signatures.
Layered real-time polling and streaming features include:
- Data description: GTFS-RT vehicle entity fields and header semantics documented in
docs/week2.md. - Polling:
uta_data_pipeline.realtime.fetch_vehicle_feedretrieves the protobuf payload; theuta-realtime pollCLI runs every 30 seconds by default. - Streaming:
publish_vehicles_to_kinesisbatches JSON records (one per vehicle) into Amazon Kinesis using vehicle IDs as partition keys. - Testing:
tests/test_realtime.pyvalidates feed decoding and Kinesis batching behavior.
The uta-realtime console script polls the vehicle endpoint and streams decoded records into Amazon Kinesis. A describe subcommand is available for quick inspection of the protobuf payload without pushing data:
uta-realtime describe
uta-realtime poll --stream uta-realtime --interval 30 --iterations 2Install the project in editable mode to iterate locally:
python -m pip install --upgrade pip
python -m pip install -e .Optional extras are provided for AWS S3 upload helpers (pip install "uta-data-pipeline[s3]"), running the reference Airflow DAG locally (pip install "uta-data-pipeline[airflow]"), and the GTFS Realtime poller (pip install "uta-data-pipeline[realtime]").
dags/week1_batch_dag.py defines an Apache Airflow DAG that runs daily at 05:00 Mountain Time. The DAG stores intermediate files inside the Airflow worker (or EFS on MWAA), calls the download → load → transform → quality helpers sequentially, and surfaces row-count metadata through task logs/XCom. Deploy the DAG by copying the file into your Airflow dags/ directory and configuring the UTA_DATA_DIR environment variable if you need to override the default /opt/airflow/data staging path.
Create a virtual environment (optional), install the lightweight dependencies, and execute the unit tests:
python -m pip install --upgrade pip
python -m pip install duckdb pytest
python -m pytestThe tests exercise the downloader, DuckDB loader, and data-quality guardrails end-to-end using synthetic GTFS archives so no external network connectivity is required.








