From e502fbc88e05609011717670a419c9a533d084a0 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Wed, 27 Apr 2022 15:11:33 +0800 Subject: [PATCH 01/17] Fix e2e tests --- .../feathr/_preprocessing_pyudf_manager.py | 28 ++++++++++---- feathr_project/test/test_fixture.py | 7 +--- .../test/test_pyduf_preprocessing_e2e.py | 2 + feathr_project/test/udf.py | 37 +++++++++++++++++++ 4 files changed, 62 insertions(+), 12 deletions(-) create mode 100644 feathr_project/test/udf.py diff --git a/feathr_project/feathr/_preprocessing_pyudf_manager.py b/feathr_project/feathr/_preprocessing_pyudf_manager.py index 75cc487d1..3451de9da 100644 --- a/feathr_project/feathr/_preprocessing_pyudf_manager.py +++ b/feathr_project/feathr/_preprocessing_pyudf_manager.py @@ -1,6 +1,7 @@ import inspect import os from pathlib import Path +import sys from typing import List, Optional, Union import pickle from jinja2 import Template @@ -37,15 +38,23 @@ def build_anchor_preprocessing_metadata(anchor_list, local_workspace_dir): feature_names_to_func_mapping = {} # features that have preprocessing defined. This is used to figure out if we need to kick off Pyspark # preprocessing for requested features. - features_with_preprocessing = [] + features_with_preprocessing = {} + dep_modules = set() for anchor in anchor_list: # only support batch source preprocessing for now. if not isinstance(anchor.source, HdfsSource): continue preprocessing_func = anchor.source.preprocessing if preprocessing_func: + # Record module needed by all preprocessing_func + for feature in anchor.features: + # Record module file name in metadata + features_with_preprocessing[feature.name] = sys.modules[preprocessing_func.__module__].__file__ + # Record module name except __main__ + if preprocessing_func.__module__ != "__main__": + dep_modules.add(preprocessing_func.__module__) feature_names = [feature.name for feature in anchor.features] - features_with_preprocessing = features_with_preprocessing + feature_names + # features_with_preprocessing = features_with_preprocessing + feature_names feature_names.sort() string_feature_list = ','.join(feature_names) feature_names_to_func_mapping[string_feature_list] = "cloudpickle.loads(%s)" % cloudpickle.dumps(preprocessing_func) @@ -53,7 +62,7 @@ def build_anchor_preprocessing_metadata(anchor_list, local_workspace_dir): if not features_with_preprocessing: return - _PreprocessingPyudfManager.write_feature_names_to_udf_name_file(feature_names_to_func_mapping, local_workspace_dir) + _PreprocessingPyudfManager.write_feature_names_to_udf_name_file(feature_names_to_func_mapping, dep_modules, local_workspace_dir) # Save necessary preprocessing-related metadata locally in your workspace # Typically it's used as a metadata for join/gen job to figure out if there is preprocessing UDF @@ -63,20 +72,23 @@ def build_anchor_preprocessing_metadata(anchor_list, local_workspace_dir): pickle.dump(features_with_preprocessing, file) @staticmethod - def write_feature_names_to_udf_name_file(feature_names_to_func_mapping, local_workspace_dir): + def write_feature_names_to_udf_name_file(feature_names_to_func_mapping, dep_modules, local_workspace_dir): """Persist feature names(sorted) of an anchor to the corresponding preprocessing function name to source path under the local_workspace_dir. """ # indent in since python needs correct indentation # Don't change the indentation tm = Template(""" +{% for module in dep_modules %} +import {{module}} +{% endfor %} feature_names_funcs = { {% for key, value in func_maps.items() %} "{{key}}" : {{value}}, {% endfor %} } """) - new_file = tm.render(func_maps=feature_names_to_func_mapping) + new_file = tm.render(dep_modules=dep_modules, func_maps=feature_names_to_func_mapping) full_file_name = os.path.join(local_workspace_dir, FEATHR_CLIENT_UDF_FILE_NAME) with open(full_file_name, "w") as text_file: @@ -108,10 +120,12 @@ def prepare_pyspark_udf_files(feature_names: List[str], local_workspace_dir): # Only if the requested features contain preprocessing logic, we will load Pyspark. Otherwise just use Scala # spark. has_py_udf_preprocessing = False + modules = set() for feature_name in feature_names: if feature_name in features_with_preprocessing: has_py_udf_preprocessing = True - break + if features_with_preprocessing[feature_name] != "__main__": + modules.add(features_with_preprocessing[feature_name]) if has_py_udf_preprocessing: pyspark_driver_path = os.path.join(local_workspace_dir, FEATHR_PYSPARK_DRIVER_FILE_NAME) @@ -134,5 +148,5 @@ def prepare_pyspark_udf_files(feature_names: List[str], local_workspace_dir): with open(pyspark_driver_path, "a") as handle: print("".join(lines), file=handle) - py_udf_files = [pyspark_driver_path] + py_udf_files = [pyspark_driver_path] + list(modules) return py_udf_files diff --git a/feathr_project/test/test_fixture.py b/feathr_project/test/test_fixture.py index a6605ca9a..959c53a4e 100644 --- a/feathr_project/test/test_fixture.py +++ b/feathr_project/test/test_fixture.py @@ -12,6 +12,8 @@ from feathr.client import FeathrClient from pyspark.sql import DataFrame +from udf import * + def basic_test_setup(config_path: str): @@ -166,11 +168,6 @@ def registry_test_setup(config_path: str): client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"}) - def add_new_dropoff_and_fare_amount_column(df: DataFrame): - df = df.withColumn("new_lpep_dropoff_datetime", col("lpep_dropoff_datetime")) - df = df.withColumn("new_fare_amount", col("fare_amount") + 1000000) - return df - batch_source = HdfsSource(name="nycTaxiBatchSource", path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", event_timestamp_column="lpep_dropoff_datetime", diff --git a/feathr_project/test/test_pyduf_preprocessing_e2e.py b/feathr_project/test/test_pyduf_preprocessing_e2e.py index f9e7cf602..1d4a9babb 100644 --- a/feathr_project/test/test_pyduf_preprocessing_e2e.py +++ b/feathr_project/test/test_pyduf_preprocessing_e2e.py @@ -17,6 +17,8 @@ from feathr import RedisSink from test_fixture import snowflake_test_setup +from udf import * + def trip_distance_preprocessing(df: DataFrame): df = df.withColumn("trip_distance", df.trip_distance.cast('double') - 90000) df = df.withColumn("fare_amount", df.fare_amount.cast('double') - 90000) diff --git a/feathr_project/test/udf.py b/feathr_project/test/udf.py new file mode 100644 index 000000000..6acb1b1b5 --- /dev/null +++ b/feathr_project/test/udf.py @@ -0,0 +1,37 @@ +from pyspark.sql import SparkSession, DataFrame +from pyspark.sql.functions import col,sum,avg,max,dayofweek,dayofyear + +def trip_distance_preprocessing(df: DataFrame): + df = df.withColumn("trip_distance", df.trip_distance.cast('double') - 90000) + df = df.withColumn("fare_amount", df.fare_amount.cast('double') - 90000) + + return df + +def add_new_dropoff_and_fare_amount_column(df: DataFrame): + df = df.withColumn("new_lpep_dropoff_datetime", col("lpep_dropoff_datetime")) + df = df.withColumn("new_fare_amount", col("fare_amount") + 1000000) + return df + +def add_new_fare_amount(df: DataFrame) -> DataFrame: + df = df.withColumn("fare_amount_new", col("fare_amount") + 8000000) + + return df + +def add_new_surcharge_amount_and_pickup_column(df: DataFrame) -> DataFrame: + df = df.withColumn("new_improvement_surcharge", col("improvement_surcharge") + 1000000) + df = df.withColumn("new_tip_amount", col("tip_amount") + 1000000) + df = df.withColumn("new_lpep_pickup_datetime", col("lpep_pickup_datetime")) + + return df + +def add_old_lpep_dropoff_datetime(df: DataFrame) -> DataFrame: + df = df.withColumn("old_lpep_dropoff_datetime", col("lpep_dropoff_datetime")) + + return df + +def feathr_udf_day_calc(df: DataFrame) -> DataFrame: + df = df.withColumn("f_day_of_week", dayofweek("lpep_dropoff_datetime")) + df = df.withColumn("f_day_of_year", dayofyear("lpep_dropoff_datetime")) + return df + + From 4c53e69d36b0d6b8d3ad50fea82f87ed36362787 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Wed, 27 Apr 2022 15:38:34 +0800 Subject: [PATCH 02/17] WIP --- .../feathr/_preprocessing_pyudf_manager.py | 12 ++++--- .../test/test_pyduf_preprocessing_e2e.py | 33 ------------------- feathr_project/test/udf.py | 6 +++- 3 files changed, 13 insertions(+), 38 deletions(-) diff --git a/feathr_project/feathr/_preprocessing_pyudf_manager.py b/feathr_project/feathr/_preprocessing_pyudf_manager.py index 3451de9da..7a9d23634 100644 --- a/feathr_project/feathr/_preprocessing_pyudf_manager.py +++ b/feathr_project/feathr/_preprocessing_pyudf_manager.py @@ -49,10 +49,14 @@ def build_anchor_preprocessing_metadata(anchor_list, local_workspace_dir): # Record module needed by all preprocessing_func for feature in anchor.features: # Record module file name in metadata - features_with_preprocessing[feature.name] = sys.modules[preprocessing_func.__module__].__file__ - # Record module name except __main__ - if preprocessing_func.__module__ != "__main__": - dep_modules.add(preprocessing_func.__module__) + try: + # Ingore non-exist modules and modules without corresponding file + # In case the module name cannot be retrieved + features_with_preprocessing[feature.name] = sys.modules[preprocessing_func.__module__].__file__ + finally: + # Record module name except __main__ + if preprocessing_func.__module__ != "__main__": + dep_modules.add(preprocessing_func.__module__) feature_names = [feature.name for feature in anchor.features] # features_with_preprocessing = features_with_preprocessing + feature_names feature_names.sort() diff --git a/feathr_project/test/test_pyduf_preprocessing_e2e.py b/feathr_project/test/test_pyduf_preprocessing_e2e.py index 1d4a9babb..9931c3768 100644 --- a/feathr_project/test/test_pyduf_preprocessing_e2e.py +++ b/feathr_project/test/test_pyduf_preprocessing_e2e.py @@ -19,39 +19,6 @@ from udf import * -def trip_distance_preprocessing(df: DataFrame): - df = df.withColumn("trip_distance", df.trip_distance.cast('double') - 90000) - df = df.withColumn("fare_amount", df.fare_amount.cast('double') - 90000) - - return df - -def add_new_dropoff_and_fare_amount_column(df: DataFrame): - df = df.withColumn("new_lpep_dropoff_datetime", col("lpep_dropoff_datetime")) - df = df.withColumn("new_fare_amount", col("fare_amount") + 1000000) - return df - -def add_new_fare_amount(df: DataFrame) -> DataFrame: - df = df.withColumn("fare_amount_new", col("fare_amount") + 8000000) - - return df - -def add_new_surcharge_amount_and_pickup_column(df: DataFrame) -> DataFrame: - df = df.withColumn("new_improvement_surcharge", col("improvement_surcharge") + 1000000) - df = df.withColumn("new_tip_amount", col("tip_amount") + 1000000) - df = df.withColumn("new_lpep_pickup_datetime", col("lpep_pickup_datetime")) - - return df - -def add_old_lpep_dropoff_datetime(df: DataFrame) -> DataFrame: - df = df.withColumn("old_lpep_dropoff_datetime", col("lpep_dropoff_datetime")) - - return df - -def feathr_udf_day_calc(df: DataFrame) -> DataFrame: - df = df.withColumn("f_day_of_week", dayofweek("lpep_dropoff_datetime")) - df = df.withColumn("f_day_of_year", dayofyear("lpep_dropoff_datetime")) - return df - def test_non_swa_feature_gen_with_offline_preprocessing(): """ Test non-SWA feature gen with preprocessing diff --git a/feathr_project/test/udf.py b/feathr_project/test/udf.py index 6acb1b1b5..e9e039590 100644 --- a/feathr_project/test/udf.py +++ b/feathr_project/test/udf.py @@ -1,5 +1,5 @@ from pyspark.sql import SparkSession, DataFrame -from pyspark.sql.functions import col,sum,avg,max,dayofweek,dayofyear +from pyspark.sql.functions import col,sum,avg,max,dayofweek,dayofyear,concat,lit def trip_distance_preprocessing(df: DataFrame): df = df.withColumn("trip_distance", df.trip_distance.cast('double') - 90000) @@ -34,4 +34,8 @@ def feathr_udf_day_calc(df: DataFrame) -> DataFrame: df = df.withColumn("f_day_of_year", dayofyear("lpep_dropoff_datetime")) return df +def snowflake_preprocessing(df: DataFrame) -> DataFrame: + df = df.withColumn("NEW_CC_DIVISION_NAME", concat(col("CC_DIVISION_NAME"), lit("0000"), col("CC_DIVISION_NAME"))) + df = df.withColumn("NEW_CC_ZIP", concat(col("CC_ZIP"), lit("____"), col("CC_ZIP"))) + return df From a07052ce0af735a8dac1adc8ef92eb5c8a026919 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Wed, 27 Apr 2022 16:42:18 +0800 Subject: [PATCH 03/17] WIP --- feathr_project/test/test_pyduf_preprocessing_e2e.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/feathr_project/test/test_pyduf_preprocessing_e2e.py b/feathr_project/test/test_pyduf_preprocessing_e2e.py index 9931c3768..063858883 100644 --- a/feathr_project/test/test_pyduf_preprocessing_e2e.py +++ b/feathr_project/test/test_pyduf_preprocessing_e2e.py @@ -357,12 +357,6 @@ def test_get_offline_feature_two_swa_with_diff_preprocessing(): assert res_df.shape[0] > 0 -def snowflake_preprocessing(df: DataFrame) -> DataFrame: - df = df.withColumn("NEW_CC_DIVISION_NAME", concat(col("CC_DIVISION_NAME"), lit("0000"), col("CC_DIVISION_NAME"))) - df = df.withColumn("NEW_CC_ZIP", concat(col("CC_ZIP"), lit("____"), col("CC_ZIP"))) - return df - - def test_feathr_get_offline_features_from_snowflake(): """ Test get_offline_features() can get feature data from Snowflake source correctly. From b7ddf01fb5ab95660410ae3a55f7c74721ffb8d1 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Wed, 27 Apr 2022 17:21:00 +0800 Subject: [PATCH 04/17] WIP --- feathr_project/feathr/_preprocessing_pyudf_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/feathr_project/feathr/_preprocessing_pyudf_manager.py b/feathr_project/feathr/_preprocessing_pyudf_manager.py index 7a9d23634..01f193a24 100644 --- a/feathr_project/feathr/_preprocessing_pyudf_manager.py +++ b/feathr_project/feathr/_preprocessing_pyudf_manager.py @@ -94,8 +94,9 @@ def write_feature_names_to_udf_name_file(feature_names_to_func_mapping, dep_modu """) new_file = tm.render(dep_modules=dep_modules, func_maps=feature_names_to_func_mapping) + os.makedirs(local_workspace_dir) full_file_name = os.path.join(local_workspace_dir, FEATHR_CLIENT_UDF_FILE_NAME) - with open(full_file_name, "w") as text_file: + with open(full_file_name, "w+") as text_file: print("".join(PROVIDED_IMPORTS), file=text_file) print(new_file, file=text_file) From dbadc95f3c5978fd5fcd241567c8a7e4dba4e766 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Wed, 27 Apr 2022 18:00:28 +0800 Subject: [PATCH 05/17] WIP --- feathr_project/feathr/_preprocessing_pyudf_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/feathr_project/feathr/_preprocessing_pyudf_manager.py b/feathr_project/feathr/_preprocessing_pyudf_manager.py index 01f193a24..2bb9a8f89 100644 --- a/feathr_project/feathr/_preprocessing_pyudf_manager.py +++ b/feathr_project/feathr/_preprocessing_pyudf_manager.py @@ -94,7 +94,7 @@ def write_feature_names_to_udf_name_file(feature_names_to_func_mapping, dep_modu """) new_file = tm.render(dep_modules=dep_modules, func_maps=feature_names_to_func_mapping) - os.makedirs(local_workspace_dir) + os.makedirs(local_workspace_dir, mode=0o777, exist_ok=True) full_file_name = os.path.join(local_workspace_dir, FEATHR_CLIENT_UDF_FILE_NAME) with open(full_file_name, "w+") as text_file: print("".join(PROVIDED_IMPORTS), file=text_file) From 766970df4c49cc6ac91ed39dbbeb43ab430f7716 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Wed, 27 Apr 2022 18:35:10 +0800 Subject: [PATCH 06/17] WIP --- feathr_project/test/test_pyduf_preprocessing_e2e.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/feathr_project/test/test_pyduf_preprocessing_e2e.py b/feathr_project/test/test_pyduf_preprocessing_e2e.py index 063858883..06e2e9c5f 100644 --- a/feathr_project/test/test_pyduf_preprocessing_e2e.py +++ b/feathr_project/test/test_pyduf_preprocessing_e2e.py @@ -72,7 +72,7 @@ def test_non_swa_feature_gen_with_offline_preprocessing(): client.materialize_features(settings) # just assume the job is successful without validating the actual result in Redis. Might need to consolidate # this part with the test_feathr_online_store test case - client.wait_job_to_finish(timeout_sec=900) + client.wait_job_to_finish(timeout_sec=3600) res = client.get_online_features(online_test_table, '2020-04-01 07:21:51', [ 'f_is_long_trip_distance', 'f_day_of_week']) @@ -134,7 +134,7 @@ def test_feature_swa_feature_gen_with_preprocessing(): client.materialize_features(settings) # just assume the job is successful without validating the actual result in Redis. Might need to consolidate # this part with the test_feathr_online_store test case - client.wait_job_to_finish(timeout_sec=900) + client.wait_job_to_finish(timeout_sec=3600) res = client.get_online_features(online_test_table, '265', ['f_location_avg_fare', 'f_location_max_fare']) assert res == [1000041.625, 1000100.0] @@ -234,7 +234,7 @@ def test_feathr_get_offline_features_hdfs_source(): output_path=output_path) # assuming the job can successfully run; otherwise it will throw exception - client.wait_job_to_finish(timeout_sec=900) + client.wait_job_to_finish(timeout_sec=3600) # download result and just assert the returned result is not empty res_df = get_result_df(client) @@ -350,7 +350,7 @@ def test_get_offline_feature_two_swa_with_diff_preprocessing(): output_path=output_path) # assuming the job can successfully run; otherwise it will throw exception - client.wait_job_to_finish(timeout_sec=900) + client.wait_job_to_finish(timeout_sec=3600) res_df = get_result_df(client) # download result and just assert the returned result is not empty @@ -412,7 +412,7 @@ def test_feathr_get_offline_features_from_snowflake(): output_path=output_path) # assuming the job can successfully run; otherwise it will throw exception - client.wait_job_to_finish(timeout_sec=900) + client.wait_job_to_finish(timeout_sec=3600) res = get_result_df(client) # just assume there are results. From a4615b3e9d02052b399477c953d5ad2d424fb0d1 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Wed, 27 Apr 2022 21:01:34 +0800 Subject: [PATCH 07/17] WIP --- feathr_project/test/test_pyduf_preprocessing_e2e.py | 1 - 1 file changed, 1 deletion(-) diff --git a/feathr_project/test/test_pyduf_preprocessing_e2e.py b/feathr_project/test/test_pyduf_preprocessing_e2e.py index 06e2e9c5f..b23adff52 100644 --- a/feathr_project/test/test_pyduf_preprocessing_e2e.py +++ b/feathr_project/test/test_pyduf_preprocessing_e2e.py @@ -78,7 +78,6 @@ def test_non_swa_feature_gen_with_offline_preprocessing(): 'f_is_long_trip_distance', 'f_day_of_week']) assert res == [8000006.0, 4] - def test_feature_swa_feature_gen_with_preprocessing(): """ Test SWA feature gen with preprocessing. From c976c34b7e97bc49a5dd0efeca5b301fc2d8a401 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Wed, 27 Apr 2022 22:29:55 +0800 Subject: [PATCH 08/17] WIP --- .../feathr/_preprocessing_pyudf_manager.py | 34 +++++++++++++++---- .../feathr/feathr_pyspark_driver_template.py | 20 +++++++++++ 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/feathr_project/feathr/_preprocessing_pyudf_manager.py b/feathr_project/feathr/_preprocessing_pyudf_manager.py index 2bb9a8f89..ac97cd762 100644 --- a/feathr_project/feathr/_preprocessing_pyudf_manager.py +++ b/feathr_project/feathr/_preprocessing_pyudf_manager.py @@ -5,6 +5,7 @@ from typing import List, Optional, Union import pickle from jinja2 import Template +from numpy import append from feathr.source import HdfsSource from pyspark import cloudpickle @@ -80,9 +81,22 @@ def write_feature_names_to_udf_name_file(feature_names_to_func_mapping, dep_modu """Persist feature names(sorted) of an anchor to the corresponding preprocessing function name to source path under the local_workspace_dir. """ + file_contents = [] + for m in dep_modules: + if m != "__main__": + try: + filename = sys.modules[m].__file__ + content = encode_file(filename) + file_contents.append( (os.path.basename(filename), content) ) + except: + pass + # indent in since python needs correct indentation # Don't change the indentation - tm = Template(""" + tm = Template(r""" +{% for f in file_contents %} +decode_file('{{f[0]}}', r'''{{f[1]}}''') +{% endfor %} {% for module in dep_modules %} import {{module}} {% endfor %} @@ -92,7 +106,7 @@ def write_feature_names_to_udf_name_file(feature_names_to_func_mapping, dep_modu {% endfor %} } """) - new_file = tm.render(dep_modules=dep_modules, func_maps=feature_names_to_func_mapping) + new_file = tm.render(file_contents=file_contents, dep_modules=dep_modules, func_maps=feature_names_to_func_mapping) os.makedirs(local_workspace_dir, mode=0o777, exist_ok=True) full_file_name = os.path.join(local_workspace_dir, FEATHR_CLIENT_UDF_FILE_NAME) @@ -125,12 +139,10 @@ def prepare_pyspark_udf_files(feature_names: List[str], local_workspace_dir): # Only if the requested features contain preprocessing logic, we will load Pyspark. Otherwise just use Scala # spark. has_py_udf_preprocessing = False - modules = set() for feature_name in feature_names: if feature_name in features_with_preprocessing: has_py_udf_preprocessing = True - if features_with_preprocessing[feature_name] != "__main__": - modules.add(features_with_preprocessing[feature_name]) + break if has_py_udf_preprocessing: pyspark_driver_path = os.path.join(local_workspace_dir, FEATHR_PYSPARK_DRIVER_FILE_NAME) @@ -153,5 +165,15 @@ def prepare_pyspark_udf_files(feature_names: List[str], local_workspace_dir): with open(pyspark_driver_path, "a") as handle: print("".join(lines), file=handle) - py_udf_files = [pyspark_driver_path] + list(modules) + py_udf_files = [pyspark_driver_path] return py_udf_files + +import base64 +def encode_file(filename: str) -> str: + f = open(filename, "rb") + content = f.read() + encoded = base64.b64encode(content).decode('ascii') + n = 80 + chunks = [encoded[i:i+n] for i in range(0, len(encoded), n)] + return '\n'.join(chunks) + \ No newline at end of file diff --git a/feathr_project/feathr/feathr_pyspark_driver_template.py b/feathr_project/feathr/feathr_pyspark_driver_template.py index f9a9c4193..a3974ba47 100644 --- a/feathr_project/feathr/feathr_pyspark_driver_template.py +++ b/feathr_project/feathr/feathr_pyspark_driver_template.py @@ -1,7 +1,10 @@ +from turtle import st from pyspark.sql import SparkSession, DataFrame, SQLContext import sys +import os.path from pyspark.sql.functions import * +import base64 # This is executed in Spark driver # The logger doesn't work in Pyspark so we just use print @@ -82,3 +85,20 @@ def submit_spark_job(feature_names_funcs): py4j_feature_job.mainWithPreprocessedDataFrame(job_param_java_array, new_preprocessed_df_map) return None +def decode_file(filename, encoded): + # Decode encoded content into filename under the same directory of this file + content = base64.b64decode(encoded) + try: + start_dir = "" + try: + this_filename = sys.modules[to_java_string_array.__module__].__file__ + start_dir = os.path.dirname(this_filename) + except: + start_dir = os.path.curdir + output_name = os.path.join(start_dir, filename) + print("Write file %s ..." % output_name) + with open(output_name, "w+b") as f: + f.write(content) + print("File %s written." % output_name) + finally: + pass From aa5f9538081a8ae49a0fa01839fa301a545eac34 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Wed, 27 Apr 2022 22:46:14 +0800 Subject: [PATCH 09/17] WIP --- feathr_project/feathr/feathr_pyspark_driver_template.py | 1 - 1 file changed, 1 deletion(-) diff --git a/feathr_project/feathr/feathr_pyspark_driver_template.py b/feathr_project/feathr/feathr_pyspark_driver_template.py index a3974ba47..c701dbd78 100644 --- a/feathr_project/feathr/feathr_pyspark_driver_template.py +++ b/feathr_project/feathr/feathr_pyspark_driver_template.py @@ -1,5 +1,4 @@ -from turtle import st from pyspark.sql import SparkSession, DataFrame, SQLContext import sys import os.path From f21676fc2483e9fb1d4bc6af1a2490615d6fa670 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Wed, 27 Apr 2022 23:16:57 +0800 Subject: [PATCH 10/17] WIP --- feathr_project/feathr/feathr_pyspark_driver_template.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/feathr_project/feathr/feathr_pyspark_driver_template.py b/feathr_project/feathr/feathr_pyspark_driver_template.py index c701dbd78..2326b5817 100644 --- a/feathr_project/feathr/feathr_pyspark_driver_template.py +++ b/feathr_project/feathr/feathr_pyspark_driver_template.py @@ -85,6 +85,8 @@ def submit_spark_job(feature_names_funcs): return None def decode_file(filename, encoded): + import base64 + import os.path # Decode encoded content into filename under the same directory of this file content = base64.b64decode(encoded) try: From 6aaa445d2ca5b22c2b946a18b4372dcacd37505a Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Wed, 27 Apr 2022 23:31:19 +0800 Subject: [PATCH 11/17] WIP --- feathr_project/feathr/feathr_pyspark_driver_template.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/feathr_project/feathr/feathr_pyspark_driver_template.py b/feathr_project/feathr/feathr_pyspark_driver_template.py index 2326b5817..06e6304dc 100644 --- a/feathr_project/feathr/feathr_pyspark_driver_template.py +++ b/feathr_project/feathr/feathr_pyspark_driver_template.py @@ -95,7 +95,8 @@ def decode_file(filename, encoded): this_filename = sys.modules[to_java_string_array.__module__].__file__ start_dir = os.path.dirname(this_filename) except: - start_dir = os.path.curdir + start_dir = os.path.curdir + sys.path.append(start_dir) output_name = os.path.join(start_dir, filename) print("Write file %s ..." % output_name) with open(output_name, "w+b") as f: From 3fd1c0263f98c414c23abf2d1477d92a6af9bcdb Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Thu, 28 Apr 2022 00:52:04 +0800 Subject: [PATCH 12/17] Add doc --- .../feathr/_preprocessing_pyudf_manager.py | 19 +++++++++++++++++++ .../feathr/feathr_pyspark_driver_template.py | 3 ++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/feathr_project/feathr/_preprocessing_pyudf_manager.py b/feathr_project/feathr/_preprocessing_pyudf_manager.py index ac97cd762..703e1eaa7 100644 --- a/feathr_project/feathr/_preprocessing_pyudf_manager.py +++ b/feathr_project/feathr/_preprocessing_pyudf_manager.py @@ -83,6 +83,22 @@ def write_feature_names_to_udf_name_file(feature_names_to_func_mapping, dep_modu """ file_contents = [] for m in dep_modules: + """ + The `feature_names_funcs` map contents a `cloudpickle.loads` statement that deserialize the + UDF function, due to the implementation limitation, cloudpickle shipped with PySpark cannot + actually serialize the function body if the function is in a module other than `__main__`, + which causes PyTest failed to run the UDF E2E test as it starts the test code in a local module + instead of `__main__`. + The solution is to bundle addition module with the main file. + First we encode module file in base64 and paste it in following format + ``` + decode_file("module_name.py", r'''BASE64_ENCODED_CONTENT''') + ``` + Then we can use `import module_name` to import this module, then `cloudpickle.loads` can work. + When the PySpark program is running, it first uses `decode_file` function in the template + to extract module file and drop it into the same directory as the main code, then we can run + `import UDF_module` and uses functions inside. + """ if m != "__main__": try: filename = sys.modules[m].__file__ @@ -170,6 +186,9 @@ def prepare_pyspark_udf_files(feature_names: List[str], local_workspace_dir): import base64 def encode_file(filename: str) -> str: + """ + Encode file content into a multiline base64 encoded string + """ f = open(filename, "rb") content = f.read() encoded = base64.b64encode(content).decode('ascii') diff --git a/feathr_project/feathr/feathr_pyspark_driver_template.py b/feathr_project/feathr/feathr_pyspark_driver_template.py index 06e6304dc..717a562c8 100644 --- a/feathr_project/feathr/feathr_pyspark_driver_template.py +++ b/feathr_project/feathr/feathr_pyspark_driver_template.py @@ -96,7 +96,8 @@ def decode_file(filename, encoded): start_dir = os.path.dirname(this_filename) except: start_dir = os.path.curdir - sys.path.append(start_dir) + if start_dir not in sys.path: + sys.path.append(start_dir) output_name = os.path.join(start_dir, filename) print("Write file %s ..." % output_name) with open(output_name, "w+b") as f: From 2397802167b2bd43ed5d6ed71d1bd5ebab6b5b59 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Thu, 28 Apr 2022 02:05:15 +0800 Subject: [PATCH 13/17] Ignore non-file modules --- feathr_project/feathr/_preprocessing_pyudf_manager.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/feathr_project/feathr/_preprocessing_pyudf_manager.py b/feathr_project/feathr/_preprocessing_pyudf_manager.py index 703e1eaa7..6a8c036c9 100644 --- a/feathr_project/feathr/_preprocessing_pyudf_manager.py +++ b/feathr_project/feathr/_preprocessing_pyudf_manager.py @@ -54,10 +54,11 @@ def build_anchor_preprocessing_metadata(anchor_list, local_workspace_dir): # Ingore non-exist modules and modules without corresponding file # In case the module name cannot be retrieved features_with_preprocessing[feature.name] = sys.modules[preprocessing_func.__module__].__file__ - finally: - # Record module name except __main__ - if preprocessing_func.__module__ != "__main__": - dep_modules.add(preprocessing_func.__module__) + except: + pass + # Record module name except __main__ + if preprocessing_func.__module__ != "__main__": + dep_modules.add(preprocessing_func.__module__) feature_names = [feature.name for feature in anchor.features] # features_with_preprocessing = features_with_preprocessing + feature_names feature_names.sort() From 0c70814ef70cb0dc7923447d0b3ab9bd7207feb4 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Thu, 28 Apr 2022 15:55:53 +0800 Subject: [PATCH 14/17] Merge with main overwrites some changes --- .../feathr/_preprocessing_pyudf_manager.py | 25 +++++++++++-------- .../feathr/feathr_pyspark_driver_template.py | 2 +- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/feathr_project/feathr/_preprocessing_pyudf_manager.py b/feathr_project/feathr/_preprocessing_pyudf_manager.py index 5326d1fbc..703e1eaa7 100644 --- a/feathr_project/feathr/_preprocessing_pyudf_manager.py +++ b/feathr_project/feathr/_preprocessing_pyudf_manager.py @@ -7,6 +7,7 @@ from jinja2 import Template from numpy import append from feathr.source import HdfsSource +from pyspark import cloudpickle # Some metadata that are only needed by Feathr FEATHR_PYSPARK_METADATA = 'generated_feathr_pyspark_metadata' @@ -18,7 +19,8 @@ FEATHR_PYSPARK_DRIVER_TEMPLATE_FILE_NAME = 'feathr_pyspark_driver_template.py' # Feathr provided imports for pyspark UDFs all go here PROVIDED_IMPORTS = ['\nfrom pyspark.sql import SparkSession, DataFrame\n'] + \ - ['from pyspark.sql.functions import *\n'] + ['from pyspark.sql.functions import *\n'] + \ + ['from pyspark import cloudpickle\n'] class _PreprocessingPyudfManager(object): @@ -29,8 +31,11 @@ def build_anchor_preprocessing_metadata(anchor_list, local_workspace_dir): """When the client build features, UDFs and features that need preprocessing will be stored as metadata. Those metadata will later be used when uploading the Pyspark jobs. """ - # feature names concatenated to UDF map - # for example, {'f1,f2,f3': my_udf1, 'f4,f5':my_udf2} + # feature names concatenated to UDF callable object map, it is like: + # { + # 'f1,f2': cloudpickle.loads('...pickledcode...'), + # 'f3': cloudpickle.loads('...pickledcode...'), + # } feature_names_to_func_mapping = {} # features that have preprocessing defined. This is used to figure out if we need to kick off Pyspark # preprocessing for requested features. @@ -49,16 +54,15 @@ def build_anchor_preprocessing_metadata(anchor_list, local_workspace_dir): # Ingore non-exist modules and modules without corresponding file # In case the module name cannot be retrieved features_with_preprocessing[feature.name] = sys.modules[preprocessing_func.__module__].__file__ - except: - pass - # Record module name except __main__ - if preprocessing_func.__module__ != "__main__": - dep_modules.add(preprocessing_func.__module__) + finally: + # Record module name except __main__ + if preprocessing_func.__module__ != "__main__": + dep_modules.add(preprocessing_func.__module__) feature_names = [feature.name for feature in anchor.features] # features_with_preprocessing = features_with_preprocessing + feature_names feature_names.sort() string_feature_list = ','.join(feature_names) - feature_names_to_func_mapping[string_feature_list] = anchor.source.preprocessing + feature_names_to_func_mapping[string_feature_list] = "cloudpickle.loads(%s)" % cloudpickle.dumps(preprocessing_func) if not features_with_preprocessing: return @@ -114,7 +118,7 @@ def write_feature_names_to_udf_name_file(feature_names_to_func_mapping, dep_modu {% endfor %} feature_names_funcs = { {% for key, value in func_maps.items() %} - "{{key}}" : {{value.__name__}}, + "{{key}}" : {{value}}, {% endfor %} } """) @@ -154,6 +158,7 @@ def prepare_pyspark_udf_files(feature_names: List[str], local_workspace_dir): for feature_name in feature_names: if feature_name in features_with_preprocessing: has_py_udf_preprocessing = True + break if has_py_udf_preprocessing: pyspark_driver_path = os.path.join(local_workspace_dir, FEATHR_PYSPARK_DRIVER_FILE_NAME) diff --git a/feathr_project/feathr/feathr_pyspark_driver_template.py b/feathr_project/feathr/feathr_pyspark_driver_template.py index 717a562c8..9749d7e9f 100644 --- a/feathr_project/feathr/feathr_pyspark_driver_template.py +++ b/feathr_project/feathr/feathr_pyspark_driver_template.py @@ -92,7 +92,7 @@ def decode_file(filename, encoded): try: start_dir = "" try: - this_filename = sys.modules[to_java_string_array.__module__].__file__ + this_filename = sys.modules[__name__].__file__ start_dir = os.path.dirname(this_filename) except: start_dir = os.path.curdir From 29bbd0b7754a777ae22c4b5c38df6a0683e14755 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Fri, 29 Apr 2022 04:06:13 +0800 Subject: [PATCH 15/17] Missing conflict --- feathr_project/feathr/_preprocessing_pyudf_manager.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/feathr_project/feathr/_preprocessing_pyudf_manager.py b/feathr_project/feathr/_preprocessing_pyudf_manager.py index 703e1eaa7..6a8c036c9 100644 --- a/feathr_project/feathr/_preprocessing_pyudf_manager.py +++ b/feathr_project/feathr/_preprocessing_pyudf_manager.py @@ -54,10 +54,11 @@ def build_anchor_preprocessing_metadata(anchor_list, local_workspace_dir): # Ingore non-exist modules and modules without corresponding file # In case the module name cannot be retrieved features_with_preprocessing[feature.name] = sys.modules[preprocessing_func.__module__].__file__ - finally: - # Record module name except __main__ - if preprocessing_func.__module__ != "__main__": - dep_modules.add(preprocessing_func.__module__) + except: + pass + # Record module name except __main__ + if preprocessing_func.__module__ != "__main__": + dep_modules.add(preprocessing_func.__module__) feature_names = [feature.name for feature in anchor.features] # features_with_preprocessing = features_with_preprocessing + feature_names feature_names.sort() From 6d01fdc04de38aaa4dc0b8fc147ce8124bce03e4 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Fri, 29 Apr 2022 14:52:00 +0800 Subject: [PATCH 16/17] Wrong condition --- feathr_project/feathr/_preprocessing_pyudf_manager.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/feathr_project/feathr/_preprocessing_pyudf_manager.py b/feathr_project/feathr/_preprocessing_pyudf_manager.py index 6a8c036c9..6a2b4a9e3 100644 --- a/feathr_project/feathr/_preprocessing_pyudf_manager.py +++ b/feathr_project/feathr/_preprocessing_pyudf_manager.py @@ -39,7 +39,7 @@ def build_anchor_preprocessing_metadata(anchor_list, local_workspace_dir): feature_names_to_func_mapping = {} # features that have preprocessing defined. This is used to figure out if we need to kick off Pyspark # preprocessing for requested features. - features_with_preprocessing = {} + features_with_preprocessing = [] dep_modules = set() for anchor in anchor_list: # only support batch source preprocessing for now. @@ -49,18 +49,11 @@ def build_anchor_preprocessing_metadata(anchor_list, local_workspace_dir): if preprocessing_func: # Record module needed by all preprocessing_func for feature in anchor.features: - # Record module file name in metadata - try: - # Ingore non-exist modules and modules without corresponding file - # In case the module name cannot be retrieved - features_with_preprocessing[feature.name] = sys.modules[preprocessing_func.__module__].__file__ - except: - pass # Record module name except __main__ if preprocessing_func.__module__ != "__main__": dep_modules.add(preprocessing_func.__module__) feature_names = [feature.name for feature in anchor.features] - # features_with_preprocessing = features_with_preprocessing + feature_names + features_with_preprocessing = features_with_preprocessing + feature_names feature_names.sort() string_feature_list = ','.join(feature_names) feature_names_to_func_mapping[string_feature_list] = "cloudpickle.loads(%s)" % cloudpickle.dumps(preprocessing_func) From 34f624677dfd9dbb0fa15f074d4f75345263d2db Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Fri, 29 Apr 2022 16:25:31 +0800 Subject: [PATCH 17/17] Add doc --- feathr_project/feathr/_preprocessing_pyudf_manager.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/feathr_project/feathr/_preprocessing_pyudf_manager.py b/feathr_project/feathr/_preprocessing_pyudf_manager.py index 6a2b4a9e3..c7207fbd6 100644 --- a/feathr_project/feathr/_preprocessing_pyudf_manager.py +++ b/feathr_project/feathr/_preprocessing_pyudf_manager.py @@ -181,7 +181,14 @@ def prepare_pyspark_udf_files(feature_names: List[str], local_workspace_dir): import base64 def encode_file(filename: str) -> str: """ - Encode file content into a multiline base64 encoded string + Encode file content into a multiline base64 encoded string. + This function is a replacement of the previous `inspect.getsource()`, + it bundles the whole module file instead of a single function to resolve + the importing issue. + Using base64 instead of the original source code to archive + the source file can avoid complicated escaping process, we can + directly quote base64 string in the source code by surrounding + it with `r'''/'''`. """ f = open(filename, "rb") content = f.read()