diff --git a/detokenize/README.md b/detokenize/README.md new file mode 100644 index 0000000..1b63ed1 --- /dev/null +++ b/detokenize/README.md @@ -0,0 +1,40 @@ +# Detokenize UDF's + +This directory contains sample UDF's that take in a token or list of tokens, call Skyflow vault's detokenize endpoint, and return data in redacted, masked, or plain-text form. + +## Before you start + +Before you start, you need the following: + +* A Skyflow vault that is populated with data. See [Tokenize from CSV](https://github.com/SkyflowFoundry/databricks_udfs/tree/main/tokenize_from_csv). +* A Skyflow service account's *credentials.json* file. The service account needs insert and tokenization permissions on the vault. See [Data governance overview](https://docs.skyflow.com/data-governance-overview/). + +Additionally, gather the following information: + +* Skyflow information: + * You Skyflow account ID, vault ID, and vault URL. You can find these in Skyflow Studio. + * The name of the table in your Skyflow vault where you want to insert the CSV data. This example uses the table name `persons`. + +## Set up the UDF + +1. Upload the *credentials.json* file to a Databricks volume. This example assumes the path is `/Volumes/main/default/test_volume/credentials.json`. + +## Use the UDF + +There are two versions of detokenize UDF's: registered and non-registered. + +#### Registered UDF + +The registered UDF can be utilized by making a single call to the registered detokenize function. The following steps outline the process. + +1. Edit detokenize_registered_function.sql to include all of the applicable values for your Skyflow vault. +1. Copy the SQL into a Databricks notebook. Run the workbook to register the UDF. +1. Utilize the UDF by calling the registered function with desired paramaters. + +#### Non-Registered UDF + +The non-registered UDF can be utilized by creating a workbook that includes an SQL call to retrieve the rows of data you want to detokenize, followed by the detokenize python script. Databricks will pass the results of the SQL query to the detokenize flow via a shared variable "_sqldf". The following steps outline the process. + +1. In a new Databricks workbook, create an SQL cell. This cell should contain a query for all of the tokens you want to detokenize. EX: `SELECT monotonically_increasing_id() seqno, name FROM `main`.`default`.`customers` LIMIT 3;` +1. In the same workbook, create a new python cell. Add your non-registered detokenize code to this cell. +1. Run the workbook. This will trigger the cells to run consecutively, with the results of the SQL cell being passed to the detokenize cell. \ No newline at end of file diff --git a/detokenize/detokenize_registered_function.sql b/detokenize/detokenize_registered_function.sql new file mode 100644 index 0000000..e838681 --- /dev/null +++ b/detokenize/detokenize_registered_function.sql @@ -0,0 +1,59 @@ +%sql +CREATE FUNCTION detokenize(token_count INTEGER, token_offset INTEGER, token_table STRING, token_columns ARRAY) +RETURNS STRING +LANGUAGE PYTHON +AS $$ + + import numpy as np + import pandas as pd + import requests + import sys + + from pyspark.sql.functions import col + from pyspark.sql.functions import pandas_udf + from pyspark.sql.types import StringType + + from skyflow.service_account import generate_bearer_token + + raw_names_df = _sqldf + BATCH_SIZE = 25 + ACCOUNT_ID = + DETOKENIZE_URL = + CREDS_FILE= + BEARER_TOKEN, _ = generate_bearer_token(CREDS_FILE) + +def get_tokens(token_count, token_offset, token_table, token_columns): + --SELECT token columns FROM token table LIMIT token_count OFFSET token_offset + --return appropriate values + select_query_str = f"SELECT {','.join(token_columns)} FROM {token_table} LIMIT {token_count} OFFSET {token_offset}" + tokens = spark.sql(select_query_str) + return tokens + +def detokenize_tokens(names) -> str: + batched_names = [names[i : i + BATCH_SIZE] for i in range(0, len(names), BATCH_SIZE)] + output = [] + for cur_batch in batched_names: + detokenize_params = [{"token":cur_name, "redaction":"REDACTED"} for cur_name in cur_batch] + print(f"detokenize_params={detokenize_params}") + payload = {"detokenizationParameters":detokenize_params} + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'X-SKYFLOW-ACCOUNT-ID': ACCOUNT_ID, + 'Authorization': f'Bearer {BEARER_TOKEN}' + } + try: + resp = requests.post(DETOKENIZE_URL, json=payload, headers=headers) + except Exception as e: + raise e + try: + data = resp.json() + for cur_record in data["records"]: + output.append(cur_record["value"]) + except Exception as e: + print(f"error parsing detokenize return {data}. Error = {e}") + raise e + return str(output) + +return detokenize_tokens(get_tokens(token_count, token_offset, token_table, token_columns)) +$$; \ No newline at end of file diff --git a/detokenize/detokenize_unregistered.py b/detokenize/detokenize_unregistered.py new file mode 100644 index 0000000..e5f4bfb --- /dev/null +++ b/detokenize/detokenize_unregistered.py @@ -0,0 +1,50 @@ +import pandas as pd +import requests + +from pyspark.sql.functions import pandas_udf +from pyspark.sql.types import StringType + +from skyflow.service_account import generate_bearer_token + +raw_data_df = _sqldf +BATCH_SIZE = 25 +ACCOUNT_ID = '' +DETOKENIZE_URL = '' + +#Should move to Kubernetes secrets for production env +CREDS_FILE='' +BEARER_TOKEN, _ = generate_bearer_token(CREDS_FILE) + +def detokenize(names: pd.Series) -> pd.Series: + batched_names = [names[i : i + BATCH_SIZE] for i in range(0, len(names), BATCH_SIZE)] + output = [] + for cur_batch in batched_names: + detokenize_params = [{"token":cur_name,} for cur_name in cur_batch] + print(f"detokenize_params={detokenize_params}") + payload = {"detokenizationParameters":detokenize_params} + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'X-SKYFLOW-ACCOUNT-ID': ACCOUNT_ID, + 'Authorization': f'Bearer {BEARER_TOKEN}' + } + try: + resp = requests.post(DETOKENIZE_URL, json=payload, headers=headers) + print(resp.json()) + except Exception as e: + raise e + try: + data = resp.json() + for cur_record in data["records"]: + output.append(cur_record["value"]) + except Exception as e: + print(f"error parsing detokenize return {data}. Error = {e}") + raise e + return pd.Series(output) + +df = raw_data_df +call_udf = pandas_udf(detokenize, returnType=StringType()) +data_series = df.select("name","ssn", "email_address").rdd.flatMap(lambda x: x).collect() +custom_df = spark.createDataFrame([(data_series[i][0], data_series[i][1], data_series[i][2],) for i in range(len(data_series))], ["name", "email_address","ssn"]) +custom_df = custom_df.repartition(3) +display(df.select(call_udf("name"), call_udf("email_address"), call_udf("ssn"))) \ No newline at end of file