From 28c44f99eea07a3dd6bbc99ade094ae3ce8c117f Mon Sep 17 00:00:00 2001 From: aycz Date: Sun, 27 Apr 2025 21:03:43 -0500 Subject: [PATCH 1/5] Adds untested reports 1-5; TODO: Report 6 --- .gitignore | 11 +- Exercises/Exercise-6/Dockerfile | 2 +- Exercises/Exercise-6/docker-build-log.txt | 44 ++++ Exercises/Exercise-6/main.py | 245 ++++++++++++++++++++++ Exercises/Exercise-6/requirements.txt | 3 +- 5 files changed, 300 insertions(+), 5 deletions(-) create mode 100644 Exercises/Exercise-6/docker-build-log.txt diff --git a/.gitignore b/.gitignore index c9a12169..3a867c6c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,8 @@ -*.idea* -*.DS_Store* -*postgres-data* \ No newline at end of file +# virtual environment files... +**/.venv/* +**/.virtual/* + +# caching files... +**/__pycache__/* +*.pyc +*.DS_Store diff --git a/Exercises/Exercise-6/Dockerfile b/Exercises/Exercise-6/Dockerfile index 149bbecb..8c7a7cb9 100644 --- a/Exercises/Exercise-6/Dockerfile +++ b/Exercises/Exercise-6/Dockerfile @@ -1,4 +1,4 @@ -FROM ubuntu:18.04 +FROM python:latest RUN apt-get update && \ apt-get install -y default-jdk scala wget vim software-properties-common python3.8 python3-pip curl unzip libpq-dev build-essential libssl-dev libffi-dev python3-dev && \ diff --git a/Exercises/Exercise-6/docker-build-log.txt b/Exercises/Exercise-6/docker-build-log.txt new file mode 100644 index 00000000..04f4aff9 --- /dev/null +++ b/Exercises/Exercise-6/docker-build-log.txt @@ -0,0 +1,44 @@ +[+] Building 63.3s (6/10) docker:desktop-linux + => [internal] load build definition from Dockerfile 0.0s + => => transferring dockerfile: 1.13kB 0.0s + => [internal] load metadata for docker.io/library/python:latest 1.7s + => [internal] load .dockerignore 0.0s + => => transferring context: 2B 0.0s + => CACHED [1/6] FROM docker.io/library/python:latest@sha256:1f7ef1e8f35bc8629b05f4df943175f2851ba05f4a 0.0s + => => resolve docker.io/library/python:latest@sha256:1f7ef1e8f35bc8629b05f4df943175f2851ba05f4a509f723 0.0s + => [internal] load build context 0.0s + => => transferring context: 581B 0.0s + => ERROR [2/6] RUN apt-get update && apt-get install -y default-jdk scala wget vim software-prope 61.5s +------ + > [2/6] RUN apt-get update && apt-get install -y default-jdk scala wget vim software-properties-common python3.8 python3-pip curl unzip libpq-dev build-essential libssl-dev libffi-dev python3-dev && apt-get clean: +0.847 Get:1 http://deb.debian.org/debian bookworm InRelease [151 kB] +1.551 Get:2 http://deb.debian.org/debian bookworm-updates InRelease [55.4 kB] +2.071 Get:3 http://deb.debian.org/debian-security bookworm-security InRelease [48.0 kB] +2.319 Get:4 http://deb.debian.org/debian bookworm/main amd64 Packages [8792 kB] +58.52 Get:5 http://deb.debian.org/debian bookworm-updates/main amd64 Packages [512 B] +58.61 Get:6 http://deb.debian.org/debian-security bookworm-security/main amd64 Packages [252 kB] +59.94 Fetched 9300 kB in 59s (156 kB/s) +59.94 Reading package lists... +60.65 Reading package lists... +61.31 Building dependency tree... +61.46 Reading state information... +61.46 Package python3.8 is not available, but is referred to by another package. +61.46 This may mean that the package is missing, has been obsoleted, or +61.46 is only available from another source +61.46 +61.46 E: Package 'python3.8' has no installation candidate +------ + + 1 warning found (use docker --debug to expand): + - WorkdirRelativePath: Relative workdir "app" can have unexpected results if the base image changes (line 17) +Dockerfile:3 +-------------------- + 2 | + 3 | >>> RUN apt-get update && \ + 4 | >>> apt-get install -y default-jdk scala wget vim software-properties-common python3.8 python3-pip curl unzip libpq-dev build-essential libssl-dev libffi-dev python3-dev && \ + 5 | >>> apt-get clean + 6 | +-------------------- +ERROR: failed to solve: process "/bin/sh -c apt-get update && apt-get install -y default-jdk scala wget vim software-properties-common python3.8 python3-pip curl unzip libpq-dev build-essential libssl-dev libffi-dev python3-dev && apt-get clean" did not complete successfully: exit code: 100 + +View build details: docker-desktop://dashboard/build/desktop-linux/desktop-linux/qo0cekwxvrdlyagw4c1x1f09r diff --git a/Exercises/Exercise-6/main.py b/Exercises/Exercise-6/main.py index aa8f7efd..5e17c261 100644 --- a/Exercises/Exercise-6/main.py +++ b/Exercises/Exercise-6/main.py @@ -1,10 +1,255 @@ +import os +from zipfile import ZipFile + +import pandas as pd from pyspark.sql import SparkSession +from pyspark.sql.functions as F +from pyspark.sql.types as T +from pyspark.sql.window import Window + +REPORTS = { + 1: "daily_trip_dur.csv", + 2: "daily_num_trips.csv", + 3: "monthly_top_stations.csv", + 4: "two_week_top_three_stations.csv", + 5: "gender_trip_length.csv", + 6: "top_ten_ages_long_short_trips.csv" +} + +''' +WT: 90+90+45+47+72 + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +https://community.databricks.com/t5/data-governance/i-have-to-read-zipped-csv-file-using-spark-without-unzipping-it/td-p/17156 + +i can use pandas to read in csv's that are zipped. caveats: + "still there is one disclaimer: "If using ‘zip’ or ‘tar’, the ZIP file must contain only one data file to be read in." + + and there is also obvious trade-off: using pandas means no distribution, + no scalability and exposure to OOM errors - but maybe in your specific case it is acceptable." + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +six Qs and their report names: + +REPORTS = { + 1: "daily_trip_dur.csv", + 2: "daily_num_trips.csv", + 3: "monthly_top_stations.csv", + 4: "two_week_top_three_stations.csv", + 5: "gender_trip_length.csv", + 6: "top_ten_ages_long_short_trips.csv" +} + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +from pyspark.sql.functions -> can import `col`, `expr` e.g. col("colName") or expr("colName") can also use a string literal i.e. ("colName") +common aggregations including avg, sum, max, and min from pyspark.sql.functions + +df_children_with_schema = spark.createDataFrame( + data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)], + schema = StructType([ + StructField('name', StringType(), True), + StructField('age', IntegerType(), True) + ]) +) +display(df_children_with_schema) + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +chaining actions... +df_chained = ( + df_order.filter(col("o_orderstatus") == "F") + .groupBy(col("o_orderpriority")) + .agg(count(col("o_orderkey")).alias("n_orders")) + .sort(col("n_orders").desc()) +) + +display(df_chained) + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Assign this variable your file path +file_path = "" + +(df_joined.write + .format("csv") + .mode("overwrite") + .write(file_path) +) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# import org.apache.spark.sql.functions._ +# df.withColumn("modified",date_format(to_date(col("modified"), "MM/dd/yy"), "yyyy-MM-dd")) +# .withColumn("created",to_utc_timestamp(to_timestamp(col("created"), "MM/dd/yy HH:mm"), "UTC")) +# +''' def main(): spark = SparkSession.builder.appName("Exercise6").enableHiveSupport().getOrCreate() # your code here + spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true') + + cwd = os.getcwd() + path_0, path_1 = os.path.join(cwd, 'Divvy_Trips_2019_Q4.zip'), os.path.join(cwd, 'Divvy_Trips_2020_Q1.zip') + + sdf_0, sdf_1 = None, None + + # headers: + # ride_id <-> trip_id, + # rideable_type, + # started_at <-> start_time, + # ended_at <-> end_time, + # start_station_name <-> from_station_name, + # start_station_id <-> from_station_id, + # end_station_name <-> to_station_name, + # end_station_id <-> to_station_id, + # start_lat, + # start_lng, + # end_lat, + # end_lng, + # member_casual, + # bikeid, + # tripduration, + # usertype, + # gender, + # birthyear + # 2020 file has no `tripduration` -> must calculate in seconds to match 2019 column. + FILE_IMPORT_SCHEMA = T.StructType([ + T.StructField('ride_id', T.StringType(), True), + T.StructField('trip_id', T.StringType(), True), + T.StructField('rideable_type', T.StringType(), True), + T.StructField('started_at', T.TimestampType(), True), + T.StructField('start_time', T.TimestampType(), True), + T.StructField('ended_at', T.TimestampType(), True), + T.StructField('end_time', T.TimestampType(), True), + T.StructField('start_station_name', T.StringType(), True), + T.StructField('from_station_name', T.StringType(), True), + T.StructField('start_station_id', T.StringType(), True), + T.StructField('from_station_id', T.StringType(), True), + T.StructField('end_station_name', T.StringType(), True), + T.StructField('to_station_name', T.StringType(), True), + T.StructField('end_station_id', T.StringType(), True), + T.StructField('to_station_id', T.StringType(), True), + T.StructField('start_lat', T.FloatType(), True), + T.StructField('start_lng', T.FloatType(), True), + T.StructField('end_lat', T.FloatType(), True), + T.StructField('end_lng', T.FloatType(), True), + T.StructField('member_casual', T.StringType(), True), + T.StructField('bikeid', T.StringType(), True), + T.StructField('tripduration', T.FloatType(), True), + T.StructField('usertype', T.StringType(), True), + T.StructField('gender', T.StringType(), True), + T.StructField('birthyear', T.DateType(), True), + + ]) + + RENAME_MAP = { + 'ride_id': 'trip_id', + 'started_at': 'start_time', + 'ended_at': 'end_time', + 'from_station_name': 'start_station_name', + 'from_station_id': 'start_station_id', + 'to_station_name': 'end_station_name', + 'to_station_id': 'end_station_id' + } + + # helper for renaming columns for a seamless join + def rename_columns(df, renameMap): + for name, rename in renameMap.items(): + df = df.withColumnRenamed(name, rename) + return df + + + # unzip + with ZipFile(path_0) as czip: + with czip.open('Divvy_Trips_2019_Q4.csv') as csv_0: + sdf_0 = ( + spark + .read + .format('csv') + .option('header', True) + .schema(FILE_IMPORT_SCHEMA) + .load(csv_0) + .transform(lambda sdf: rename_columns(sdf, RENAME_MAP)) + ) + + with ZipFile(path_1) as czip: + with czip.open('Divvy_Trips_2020_Q1.csv') as csv_1: + sdf_1 = ( + spark + .read + .format('csv') + .option('header', True) + .schema(FILE_IMPORT_SCHEMA) + .load(csv_1) + .transform(lambda sdf: rename_columns(sdf, RENAME_MAP)) + ) + + # join the df's' + sdf = sdf_0.join(sdf_1, how='outer') + + # generate report 1 + # what is the avg trip duration per day? + # find all null tripduration values, and attempt to calulcate. then join that df to complete df on trip_id + emptyTripDur_sdf = sdf.filter(sdf.tripduration.isNull()) + # cast as a `double` to increase precision past integer values. + calculatedTripDur_sdf = emptyTripDur_sdf.withColumn('tripduration', + F.col('end_time').cast(T.DoubleType()) - F.col('start_time').cast(T.DoubleType())) + # join the dataframes on trip id + sdf = sdf.join(calculatedTripDur_sdf, on = (sdf.trip_id) == calculatedTripDur_sdf.trip_id, how = 'outer') + # make a date column + sdf = sdf.withColumn('end_date', F.col('end_time').cast(T.DateType())) + + avgTripDurationDay_sdf = (sdf.groupBy('end_date').agg(F.avg(sdf.tripduration)).select(F.col('end_date'), F.col('trip_duration'))) # output csv... + ################################################################################################################### + + # generate report 2 + countTripsByDay_sdf = sdf.groupBy('end_date').withColumn('trip_count', F.count('trip_id')).select(F.col('end_date'), F.col('trip_count')) # output csv... + + ################################################################################################################### + + # generate report 3 + # make a month-year column to aggregate for popular station + # use windows to go with dense_rank() + month_station_sdf = sdf.withColumn('start_month_date', F.to_date(sdf.start_time, 'MM/yyyy')) + # popularStartStation_sdf = sdf.groupBy('start_month_date', 'start_station_name').agg(F.count().alias('station_count')).sort(F.col('station_count').desc()).limit(1).select('month_date', 'start_station_name') + station_count_sdf = month_station_sdf.groupBy('start_month_date', 'start_station_id').agg(F.count().alias('month_visit_count')) + window = Window.partitionBy('start_month_date').orderBy(F.col('month_visit_count').desc()) + + top_station_monthly_sdf = station_count_sdf.withColumn('rank', F.dense_rank().over(window)).filter(F.col('rank') == 1).drop('rank') + + ################################################################################################################### + + # generate report 4 + # + # find final timestamp and backtrack two weeks. use methodology of above, but limit 3 instead and group by day. + sorted_sdf = sdf.sort(F.col('start_time').desc()) + sorted_sdf = sorted_sdf.withColumn('last_day', F.lit(sorted_sdf.first()['start_time'])) + last_2w_sdf = sorted_sdf.filter(F.datediff(sorted_sdf.last_day - sorted_sdf.start_time) < 15) + + last_2w_sdf = last_2w_sdf.withColumn('start_date', F.to_date(last_2w_sdf.start_time)) + last_2w_sdf = last_2w_sdf.groupBy('start_date', 'start_station_id').agg(F.count().alias('day_visit_count')) + window = Window.partitionBy('start_date').orderBy(F.col('day_visit_count').desc()) + + top_three_daily_station_sdf = last_2w_sdf.withColumn('rank', F.dense_rank().over(window)).filter(F.col('rank') < 4).drop('rank') + + # sdf = sdf.na.fill('0', subset=['tripduration']) # fill in empty durations + # sdf = sdf.na.drop('all', subset=['end_time']) # drop any missing timestamps; needed for aggregation + # + ################################################################################################################### + + # generate report 5 + # do males or females take longer trips... + # drop any NA gender valued rows + # group by male and agg avg their trip duration and alias new col + # group by female and agg avg their trip duration and alias new col + # rank the counts and output + + valid_sdf = sdf.filter(sdf.gender.isNotNull()) + gender_avg_trip_sdf = valid_sdf.groupBy('gender').agg(F.avg(valid_sdf.trip_duration).alias('avg_gender_trip_duration')) + + + ################################################################################################################### + # generate report 6 if __name__ == "__main__": main() diff --git a/Exercises/Exercise-6/requirements.txt b/Exercises/Exercise-6/requirements.txt index 3a5974da..f9a5fe35 100644 --- a/Exercises/Exercise-6/requirements.txt +++ b/Exercises/Exercise-6/requirements.txt @@ -1,2 +1,3 @@ +pandas pytest -pyspark \ No newline at end of file +pyspark From 1fe22ff12fd31271ab1b03f824576419847dac58 Mon Sep 17 00:00:00 2001 From: aycz Date: Thu, 15 May 2025 13:44:11 -0500 Subject: [PATCH 2/5] Adds logic for report 6; Need CSV output. --- Exercises/Exercise-6/main.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/Exercises/Exercise-6/main.py b/Exercises/Exercise-6/main.py index 5e17c261..93110ecf 100644 --- a/Exercises/Exercise-6/main.py +++ b/Exercises/Exercise-6/main.py @@ -244,12 +244,28 @@ def rename_columns(df, renameMap): # rank the counts and output valid_sdf = sdf.filter(sdf.gender.isNotNull()) - gender_avg_trip_sdf = valid_sdf.groupBy('gender').agg(F.avg(valid_sdf.trip_duration).alias('avg_gender_trip_duration')) + gender_avg_trip_sdf = valid_sdf.groupBy('gender').agg(F.avg(valid_sdf.trip_duration).alias('gender_avg_trip_duration')) ################################################################################################################### # generate report 6 + # ? top 10 ages: longest trips, shortest trips + # sort by trip duration, desc. + # take top 10, and final 10 + # get ages column + # merge into one df, sorted on trip duration, desc + # or or + # we can group by age, avg agg respectively on their trip durations, and take top + bottom 10 of ages when sorted desc on avg trip duration. + # + valid_sdf = sdf.filter(sdf.birthyear.isNotNull()) + ages_sdf = sdf.withColumn('age', F.floor(F.months_between(F.current_date() - F.col('birthyear')) / 12).cast(T.IntegerType())) + ages_sdf = ages_sdf.groupBy('age').agg(F.avg(ages_sdf.trip_duration).alias('age_avg_trip_duration')) + + top_ten_sdf = ages_sdf.orderBy(F.col('age_avg_trip_duration').desc()).limit(10) + bottom_ten_sdf = ages_sdf.orderBy(F.col('age_avg_trip_duration').asc()).limit(10) + top_bottom_ten_sdf = bottom_ten_sdf.union(top_ten_sdf) + if __name__ == "__main__": main() From ba35ca3036182144c3d32a45f3cf5cb0f24e96ae Mon Sep 17 00:00:00 2001 From: aycz Date: Thu, 15 May 2025 13:44:40 -0500 Subject: [PATCH 3/5] Refreshes dockerfile and requirements. --- Exercises/Exercise-6/Dockerfile | 40 +++++++++++++++++++-------- Exercises/Exercise-6/requirements.txt | 4 +++ 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/Exercises/Exercise-6/Dockerfile b/Exercises/Exercise-6/Dockerfile index 8c7a7cb9..7ec7ce8f 100644 --- a/Exercises/Exercise-6/Dockerfile +++ b/Exercises/Exercise-6/Dockerfile @@ -1,23 +1,39 @@ -FROM python:latest +#FROM python:latest +FROM python:3.11-slim + +# RUN apt-get update && \ +# apt-get install -y default-jdk scala wget vim software-properties-common python3.8 python3-pip curl unzip libpq-dev build-essential libssl-dev libffi-dev python3-dev && \ +# apt-get clean RUN apt-get update && \ - apt-get install -y default-jdk scala wget vim software-properties-common python3.8 python3-pip curl unzip libpq-dev build-essential libssl-dev libffi-dev python3-dev && \ - apt-get clean + apt-get install -y default-jdk curl wget unzip && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +ENV SPARK_VERSION=3.0.1 +ENV HADOOP_VERSION=3.2 +ENV SPARK_HOME=/usr/local/spark +ENV PATH=$SPARK_HOME/bin:$PATH + +# RUN wget https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz && \ +# tar xvf spark-3.0.1-bin-hadoop3.2.tgz && \ +# mv spark-3.0.1-bin-hadoop3.2/ /usr/local/spark && \ +# ln -s /usr/local/spark spark && \ +# wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.890/aws-java-sdk-bundle-1.11.890.jar && \ +# mv aws-java-sdk-bundle-1.11.890.jar /spark/jars && \ +# wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar && \ +# mv hadoop-aws-3.2.0.jar /spark/jars -RUN wget https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz && \ - tar xvf spark-3.0.1-bin-hadoop3.2.tgz && \ - mv spark-3.0.1-bin-hadoop3.2/ /usr/local/spark && \ - ln -s /usr/local/spark spark && \ - wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.890/aws-java-sdk-bundle-1.11.890.jar && \ - mv aws-java-sdk-bundle-1.11.890.jar /spark/jars && \ - wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar && \ - mv hadoop-aws-3.2.0.jar /spark/jars +RUN wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz && \ + tar xvf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz && \ + mv spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} ${SPARK_HOME} && \ + rm spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz WORKDIR app COPY . /app -RUN pip3 install markupsafe==1.1.1 cryptography==3.3.2 cython==0.29.21 numpy==1.18.5 && pip3 install -r requirements.txt +RUN pip3 install --no-cache-dir -r requirements.txt ENV PYSPARK_PYTHON=python3 ENV PYSPARK_SUBMIT_ARGS='--packages io.delta:delta-core_2.12:0.8.0 pyspark-shell' diff --git a/Exercises/Exercise-6/requirements.txt b/Exercises/Exercise-6/requirements.txt index f9a5fe35..3d22cf3a 100644 --- a/Exercises/Exercise-6/requirements.txt +++ b/Exercises/Exercise-6/requirements.txt @@ -1,3 +1,7 @@ +cython==0.29.21 +cryptography==3.3.2 +markupsafe==1.1.1 +numpy pandas pytest pyspark From bd49aaea9b3e82e3565f4392b301b24fdb8a7e0a Mon Sep 17 00:00:00 2001 From: aycz Date: Thu, 21 Aug 2025 12:39:49 -0500 Subject: [PATCH 4/5] WIP... --- Exercises/Exercise-6/main.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Exercises/Exercise-6/main.py b/Exercises/Exercise-6/main.py index 93110ecf..f1a1834d 100644 --- a/Exercises/Exercise-6/main.py +++ b/Exercises/Exercise-6/main.py @@ -198,7 +198,10 @@ def rename_columns(df, renameMap): # make a date column sdf = sdf.withColumn('end_date', F.col('end_time').cast(T.DateType())) - avgTripDurationDay_sdf = (sdf.groupBy('end_date').agg(F.avg(sdf.tripduration)).select(F.col('end_date'), F.col('trip_duration'))) # output csv... + avgTripDurationDay_sdf = (sdf.groupBy('end_date').agg(F.avg(sdf.tripduration).alias('trip_duration')).select(F.col('end_date'), F.col('trip_duration'))) # output csv... + + avgTripDurationDay_sdf.coalesce(1).write.mode('overwrite').option('header', 'true').csv('/reports/report_1') + return ################################################################################################################### # generate report 2 @@ -212,7 +215,7 @@ def rename_columns(df, renameMap): month_station_sdf = sdf.withColumn('start_month_date', F.to_date(sdf.start_time, 'MM/yyyy')) # popularStartStation_sdf = sdf.groupBy('start_month_date', 'start_station_name').agg(F.count().alias('station_count')).sort(F.col('station_count').desc()).limit(1).select('month_date', 'start_station_name') station_count_sdf = month_station_sdf.groupBy('start_month_date', 'start_station_id').agg(F.count().alias('month_visit_count')) - window = Window.partitionBy('start_month_date').orderBy(F.col('month_visit_count').desc()) + window = Window.partitionBy('start_month_date').orderBy(F.col('month_visit_count').desc()) # could add additional arg describing to order by alphabet to have a definite first. top_station_monthly_sdf = station_count_sdf.withColumn('rank', F.dense_rank().over(window)).filter(F.col('rank') == 1).drop('rank') From 0746d3f00c2f5f3a86c3f6a5ce467660eb2d3e4f Mon Sep 17 00:00:00 2001 From: aycz Date: Thu, 4 Dec 2025 15:59:50 -0600 Subject: [PATCH 5/5] WIP: Contain all report logics into respective functions; output reports to csv files. --- Exercises/Exercise-6/docker-build-log.txt | 44 ---- Exercises/Exercise-6/main.py | 245 ++++++++-------------- 2 files changed, 88 insertions(+), 201 deletions(-) delete mode 100644 Exercises/Exercise-6/docker-build-log.txt diff --git a/Exercises/Exercise-6/docker-build-log.txt b/Exercises/Exercise-6/docker-build-log.txt deleted file mode 100644 index 04f4aff9..00000000 --- a/Exercises/Exercise-6/docker-build-log.txt +++ /dev/null @@ -1,44 +0,0 @@ -[+] Building 63.3s (6/10) docker:desktop-linux - => [internal] load build definition from Dockerfile 0.0s - => => transferring dockerfile: 1.13kB 0.0s - => [internal] load metadata for docker.io/library/python:latest 1.7s - => [internal] load .dockerignore 0.0s - => => transferring context: 2B 0.0s - => CACHED [1/6] FROM docker.io/library/python:latest@sha256:1f7ef1e8f35bc8629b05f4df943175f2851ba05f4a 0.0s - => => resolve docker.io/library/python:latest@sha256:1f7ef1e8f35bc8629b05f4df943175f2851ba05f4a509f723 0.0s - => [internal] load build context 0.0s - => => transferring context: 581B 0.0s - => ERROR [2/6] RUN apt-get update && apt-get install -y default-jdk scala wget vim software-prope 61.5s ------- - > [2/6] RUN apt-get update && apt-get install -y default-jdk scala wget vim software-properties-common python3.8 python3-pip curl unzip libpq-dev build-essential libssl-dev libffi-dev python3-dev && apt-get clean: -0.847 Get:1 http://deb.debian.org/debian bookworm InRelease [151 kB] -1.551 Get:2 http://deb.debian.org/debian bookworm-updates InRelease [55.4 kB] -2.071 Get:3 http://deb.debian.org/debian-security bookworm-security InRelease [48.0 kB] -2.319 Get:4 http://deb.debian.org/debian bookworm/main amd64 Packages [8792 kB] -58.52 Get:5 http://deb.debian.org/debian bookworm-updates/main amd64 Packages [512 B] -58.61 Get:6 http://deb.debian.org/debian-security bookworm-security/main amd64 Packages [252 kB] -59.94 Fetched 9300 kB in 59s (156 kB/s) -59.94 Reading package lists... -60.65 Reading package lists... -61.31 Building dependency tree... -61.46 Reading state information... -61.46 Package python3.8 is not available, but is referred to by another package. -61.46 This may mean that the package is missing, has been obsoleted, or -61.46 is only available from another source -61.46 -61.46 E: Package 'python3.8' has no installation candidate ------- - - 1 warning found (use docker --debug to expand): - - WorkdirRelativePath: Relative workdir "app" can have unexpected results if the base image changes (line 17) -Dockerfile:3 --------------------- - 2 | - 3 | >>> RUN apt-get update && \ - 4 | >>> apt-get install -y default-jdk scala wget vim software-properties-common python3.8 python3-pip curl unzip libpq-dev build-essential libssl-dev libffi-dev python3-dev && \ - 5 | >>> apt-get clean - 6 | --------------------- -ERROR: failed to solve: process "/bin/sh -c apt-get update && apt-get install -y default-jdk scala wget vim software-properties-common python3.8 python3-pip curl unzip libpq-dev build-essential libssl-dev libffi-dev python3-dev && apt-get clean" did not complete successfully: exit code: 100 - -View build details: docker-desktop://dashboard/build/desktop-linux/desktop-linux/qo0cekwxvrdlyagw4c1x1f09r diff --git a/Exercises/Exercise-6/main.py b/Exercises/Exercise-6/main.py index f1a1834d..84eec8e9 100644 --- a/Exercises/Exercise-6/main.py +++ b/Exercises/Exercise-6/main.py @@ -16,75 +16,96 @@ 6: "top_ten_ages_long_short_trips.csv" } -''' -WT: 90+90+45+47+72 -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -https://community.databricks.com/t5/data-governance/i-have-to-read-zipped-csv-file-using-spark-without-unzipping-it/td-p/17156 +def top_ten_ages(sdf): + # generate report 6 + # ? top 10 ages: longest trips, shortest trips + # sort by trip duration, desc. + # take top 10, and final 10 + # get ages column + # merge into one df, sorted on trip duration, desc + # or or + # we can group by age, avg agg respectively on their trip durations, and take top + bottom 10 of ages when sorted desc on avg trip duration. + # + valid_sdf = sdf.filter(sdf.birthyear.isNotNull()) + ages_sdf = sdf.withColumn('age', F.floor(F.months_between(F.current_date() - F.col('birthyear')) / 12).cast(T.IntegerType())) + ages_sdf = ages_sdf.groupBy('age').agg(F.avg(ages_sdf.trip_duration).alias('age_avg_trip_duration')) -i can use pandas to read in csv's that are zipped. caveats: - "still there is one disclaimer: "If using ‘zip’ or ‘tar’, the ZIP file must contain only one data file to be read in." + top_ten_sdf = ages_sdf.orderBy(F.col('age_avg_trip_duration').desc()).limit(10) + bottom_ten_sdf = ages_sdf.orderBy(F.col('age_avg_trip_duration').asc()).limit(10) + top_bottom_ten_sdf = bottom_ten_sdf.union(top_ten_sdf) # output csv... - and there is also obvious trade-off: using pandas means no distribution, - no scalability and exposure to OOM errors - but maybe in your specific case it is acceptable." -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -six Qs and their report names: +def avg_trip_by_gender(sdf): + # generate report 5 + # do males or females take longer trips... + # drop any NA gender valued rows + # group by male and agg avg their trip duration and alias new col + # group by female and agg avg their trip duration and alias new col + # rank the counts and output -REPORTS = { - 1: "daily_trip_dur.csv", - 2: "daily_num_trips.csv", - 3: "monthly_top_stations.csv", - 4: "two_week_top_three_stations.csv", - 5: "gender_trip_length.csv", - 6: "top_ten_ages_long_short_trips.csv" -} + valid_sdf = sdf.filter(sdf.gender.isNotNull()) + gender_avg_trip_sdf = valid_sdf.groupBy('gender').agg(F.avg(valid_sdf.trip_duration).alias('gender_avg_trip_duration')) # output csv... + + +def top_three_daily_stations(sdf): + # generate report 4 + # + # find final timestamp and backtrack two weeks. use methodology of above, but limit 3 instead and group by day. + sorted_sdf = sdf.sort(F.col('start_time').desc()) + sorted_sdf = sorted_sdf.withColumn('last_day', F.lit(sorted_sdf.first()['start_time'])) + last_2w_sdf = sorted_sdf.filter(F.datediff(sorted_sdf.last_day - sorted_sdf.start_time) < 15) + + last_2w_sdf = last_2w_sdf.withColumn('start_date', F.to_date(last_2w_sdf.start_time)) + last_2w_sdf = last_2w_sdf.groupBy('start_date', 'start_station_id').agg(F.count().alias('day_visit_count')) + window = Window.partitionBy('start_date').orderBy(F.col('day_visit_count').desc()) + + top_three_daily_station_sdf = last_2w_sdf.withColumn('rank', F.dense_rank().over(window)).filter(F.col('rank') < 4).drop('rank') # output csv + + # sdf = sdf.na.fill('0', subset=['tripduration']) # fill in empty durations + # sdf = sdf.na.drop('all', subset=['end_time']) # drop any missing timestamps; needed for aggregation + # + + +def top_station_monthly(sdf): + # generate report 3 + # make a month-year column to aggregate for popular station + # use windows to go with dense_rank() + month_station_sdf = sdf.withColumn('start_month_date', F.to_date(sdf.start_time, 'MM/yyyy')) + # popularStartStation_sdf = sdf.groupBy('start_month_date', 'start_station_name').agg(F.count().alias('station_count')).sort(F.col('station_count').desc()).limit(1).select('month_date', 'start_station_name') + station_count_sdf = month_station_sdf.groupBy('start_month_date', 'start_station_id').agg(F.count().alias('month_visit_count')) + window = Window.partitionBy('start_month_date').orderBy(F.col('month_visit_count').desc()) # could add additional arg describing to order by alphabet to have a definite first. + + top_station_monthly_sdf = station_count_sdf.withColumn('rank', F.dense_rank().over(window)).filter(F.col('rank') == 1).drop('rank') # output csv... + +def trip_count_per_day(sdf): + # generate report 2 + countTripsByDay_sdf = sdf.groupBy('end_date').withColumn('trip_count', F.count('trip_id')).select(F.col('end_date'), F.col('trip_count')) # output csv... + + +def avg_trip_per_day(sdf): + # generate report 1 + # what is the avg trip duration per day? + # find all null tripduration values, and attempt to calulcate. then join that df to complete df on trip_id + emptyTripDur_sdf = sdf.filter(sdf.tripduration.isNull()) + # cast as a `double` to increase precision past integer values. + calculatedTripDur_sdf = emptyTripDur_sdf.withColumn('tripduration', + F.col('end_time').cast(T.DoubleType()) - F.col('start_time').cast(T.DoubleType())) + # join the dataframes on trip id + sdf = sdf.join(calculatedTripDur_sdf, on = (sdf.trip_id) == calculatedTripDur_sdf.trip_id, how = 'outer') + # make a date column + sdf = sdf.withColumn('end_date', F.col('end_time').cast(T.DateType())) + + avgTripDurationDay_sdf = (sdf.groupBy('end_date').agg(F.avg(sdf.tripduration).alias('trip_duration')).select(F.col('end_date'), F.col('trip_duration'))) # output csv... -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -from pyspark.sql.functions -> can import `col`, `expr` e.g. col("colName") or expr("colName") can also use a string literal i.e. ("colName") -common aggregations including avg, sum, max, and min from pyspark.sql.functions - -df_children_with_schema = spark.createDataFrame( - data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)], - schema = StructType([ - StructField('name', StringType(), True), - StructField('age', IntegerType(), True) - ]) -) -display(df_children_with_schema) - -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -chaining actions... -df_chained = ( - df_order.filter(col("o_orderstatus") == "F") - .groupBy(col("o_orderpriority")) - .agg(count(col("o_orderkey")).alias("n_orders")) - .sort(col("n_orders").desc()) -) - -display(df_chained) - -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -# Assign this variable your file path -file_path = "" - -(df_joined.write - .format("csv") - .mode("overwrite") - .write(file_path) -) -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -# import org.apache.spark.sql.functions._ -# df.withColumn("modified",date_format(to_date(col("modified"), "MM/dd/yy"), "yyyy-MM-dd")) -# .withColumn("created",to_utc_timestamp(to_timestamp(col("created"), "MM/dd/yy HH:mm"), "UTC")) -# -''' + avgTripDurationDay_sdf.coalesce(1).write.mode('overwrite').option('header', 'true').csv('/reports/report_1') + + return def main(): spark = SparkSession.builder.appName("Exercise6").enableHiveSupport().getOrCreate() - # your code here + spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true') cwd = os.getcwd() @@ -92,26 +113,6 @@ def main(): sdf_0, sdf_1 = None, None - # headers: - # ride_id <-> trip_id, - # rideable_type, - # started_at <-> start_time, - # ended_at <-> end_time, - # start_station_name <-> from_station_name, - # start_station_id <-> from_station_id, - # end_station_name <-> to_station_name, - # end_station_id <-> to_station_id, - # start_lat, - # start_lng, - # end_lat, - # end_lng, - # member_casual, - # bikeid, - # tripduration, - # usertype, - # gender, - # birthyear - # 2020 file has no `tripduration` -> must calculate in seconds to match 2019 column. FILE_IMPORT_SCHEMA = T.StructType([ T.StructField('ride_id', T.StringType(), True), T.StructField('trip_id', T.StringType(), True), @@ -183,91 +184,21 @@ def rename_columns(df, renameMap): .transform(lambda sdf: rename_columns(sdf, RENAME_MAP)) ) - # join the df's' sdf = sdf_0.join(sdf_1, how='outer') - # generate report 1 - # what is the avg trip duration per day? - # find all null tripduration values, and attempt to calulcate. then join that df to complete df on trip_id - emptyTripDur_sdf = sdf.filter(sdf.tripduration.isNull()) - # cast as a `double` to increase precision past integer values. - calculatedTripDur_sdf = emptyTripDur_sdf.withColumn('tripduration', - F.col('end_time').cast(T.DoubleType()) - F.col('start_time').cast(T.DoubleType())) - # join the dataframes on trip id - sdf = sdf.join(calculatedTripDur_sdf, on = (sdf.trip_id) == calculatedTripDur_sdf.trip_id, how = 'outer') - # make a date column - sdf = sdf.withColumn('end_date', F.col('end_time').cast(T.DateType())) + avg_trip_per_day(sdf) - avgTripDurationDay_sdf = (sdf.groupBy('end_date').agg(F.avg(sdf.tripduration).alias('trip_duration')).select(F.col('end_date'), F.col('trip_duration'))) # output csv... + trip_count_per_day(sdf) - avgTripDurationDay_sdf.coalesce(1).write.mode('overwrite').option('header', 'true').csv('/reports/report_1') - return - ################################################################################################################### + top_station_monthly(sdf) - # generate report 2 - countTripsByDay_sdf = sdf.groupBy('end_date').withColumn('trip_count', F.count('trip_id')).select(F.col('end_date'), F.col('trip_count')) # output csv... - - ################################################################################################################### - - # generate report 3 - # make a month-year column to aggregate for popular station - # use windows to go with dense_rank() - month_station_sdf = sdf.withColumn('start_month_date', F.to_date(sdf.start_time, 'MM/yyyy')) - # popularStartStation_sdf = sdf.groupBy('start_month_date', 'start_station_name').agg(F.count().alias('station_count')).sort(F.col('station_count').desc()).limit(1).select('month_date', 'start_station_name') - station_count_sdf = month_station_sdf.groupBy('start_month_date', 'start_station_id').agg(F.count().alias('month_visit_count')) - window = Window.partitionBy('start_month_date').orderBy(F.col('month_visit_count').desc()) # could add additional arg describing to order by alphabet to have a definite first. - - top_station_monthly_sdf = station_count_sdf.withColumn('rank', F.dense_rank().over(window)).filter(F.col('rank') == 1).drop('rank') - - ################################################################################################################### + top_three_daily_stations(sdf) - # generate report 4 - # - # find final timestamp and backtrack two weeks. use methodology of above, but limit 3 instead and group by day. - sorted_sdf = sdf.sort(F.col('start_time').desc()) - sorted_sdf = sorted_sdf.withColumn('last_day', F.lit(sorted_sdf.first()['start_time'])) - last_2w_sdf = sorted_sdf.filter(F.datediff(sorted_sdf.last_day - sorted_sdf.start_time) < 15) + avg_trip_by_gender(sdf) - last_2w_sdf = last_2w_sdf.withColumn('start_date', F.to_date(last_2w_sdf.start_time)) - last_2w_sdf = last_2w_sdf.groupBy('start_date', 'start_station_id').agg(F.count().alias('day_visit_count')) - window = Window.partitionBy('start_date').orderBy(F.col('day_visit_count').desc()) - - top_three_daily_station_sdf = last_2w_sdf.withColumn('rank', F.dense_rank().over(window)).filter(F.col('rank') < 4).drop('rank') - - # sdf = sdf.na.fill('0', subset=['tripduration']) # fill in empty durations - # sdf = sdf.na.drop('all', subset=['end_time']) # drop any missing timestamps; needed for aggregation - # - ################################################################################################################### - - # generate report 5 - # do males or females take longer trips... - # drop any NA gender valued rows - # group by male and agg avg their trip duration and alias new col - # group by female and agg avg their trip duration and alias new col - # rank the counts and output - - valid_sdf = sdf.filter(sdf.gender.isNotNull()) - gender_avg_trip_sdf = valid_sdf.groupBy('gender').agg(F.avg(valid_sdf.trip_duration).alias('gender_avg_trip_duration')) - - - ################################################################################################################### - - # generate report 6 - # ? top 10 ages: longest trips, shortest trips - # sort by trip duration, desc. - # take top 10, and final 10 - # get ages column - # merge into one df, sorted on trip duration, desc - # or or - # we can group by age, avg agg respectively on their trip durations, and take top + bottom 10 of ages when sorted desc on avg trip duration. - # - valid_sdf = sdf.filter(sdf.birthyear.isNotNull()) - ages_sdf = sdf.withColumn('age', F.floor(F.months_between(F.current_date() - F.col('birthyear')) / 12).cast(T.IntegerType())) - ages_sdf = ages_sdf.groupBy('age').agg(F.avg(ages_sdf.trip_duration).alias('age_avg_trip_duration')) - - top_ten_sdf = ages_sdf.orderBy(F.col('age_avg_trip_duration').desc()).limit(10) - bottom_ten_sdf = ages_sdf.orderBy(F.col('age_avg_trip_duration').asc()).limit(10) - top_bottom_ten_sdf = bottom_ten_sdf.union(top_ten_sdf) + top_ten_ages(sdf) + + return if __name__ == "__main__":