Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions detokenize/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Detokenize UDF's

This directory contains sample UDF's that take in a token or list of tokens, call Skyflow vault's detokenize endpoint, and return data in redacted, masked, or plain-text form.

## Before you start

Before you start, you need the following:

* A Skyflow vault that is populated with data. See [Tokenize from CSV](https://github.com/SkyflowFoundry/databricks_udfs/tree/main/tokenize_from_csv).
* A Skyflow service account's *credentials.json* file. The service account needs insert and tokenization permissions on the vault. See [Data governance overview](https://docs.skyflow.com/data-governance-overview/).

Additionally, gather the following information:

* Skyflow information:
* You Skyflow account ID, vault ID, and vault URL. You can find these in Skyflow Studio.
* The name of the table in your Skyflow vault where you want to insert the CSV data. This example uses the table name `persons`.

## Set up the UDF

1. Upload the *credentials.json* file to a Databricks volume. This example assumes the path is `/Volumes/main/default/test_volume/credentials.json`.

## Use the UDF

There are two versions of detokenize UDF's: registered and non-registered.

#### Registered UDF

The registered UDF can be utilized by making a single call to the registered detokenize function. The following steps outline the process.

1. Edit detokenize_registered_function.sql to include all of the applicable values for your Skyflow vault.
1. Copy the SQL into a Databricks notebook. Run the workbook to register the UDF.
1. Utilize the UDF by calling the registered function with desired paramaters.

#### Non-Registered UDF

The non-registered UDF can be utilized by creating a workbook that includes an SQL call to retrieve the rows of data you want to detokenize, followed by the detokenize python script. Databricks will pass the results of the SQL query to the detokenize flow via a shared variable "_sqldf". The following steps outline the process.

1. In a new Databricks workbook, create an SQL cell. This cell should contain a query for all of the tokens you want to detokenize. EX: `SELECT monotonically_increasing_id() seqno, name FROM `main`.`default`.`customers` LIMIT 3;`
1. In the same workbook, create a new python cell. Add your non-registered detokenize code to this cell.
1. Run the workbook. This will trigger the cells to run consecutively, with the results of the SQL cell being passed to the detokenize cell.
59 changes: 59 additions & 0 deletions detokenize/detokenize_registered_function.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
%sql
CREATE FUNCTION detokenize(token_count INTEGER, token_offset INTEGER, token_table STRING, token_columns ARRAY<STRING>)
RETURNS STRING
LANGUAGE PYTHON
AS $$

import numpy as np
import pandas as pd
import requests
import sys

from pyspark.sql.functions import col
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

from skyflow.service_account import generate_bearer_token

raw_names_df = _sqldf
BATCH_SIZE = 25
ACCOUNT_ID = <ACCOUNT_ID>
DETOKENIZE_URL = <DETOKENIZE_URL>
CREDS_FILE= <CREDS_FILE>
BEARER_TOKEN, _ = generate_bearer_token(CREDS_FILE)

def get_tokens(token_count, token_offset, token_table, token_columns):
--SELECT token columns FROM token table LIMIT token_count OFFSET token_offset
--return appropriate values
select_query_str = f"SELECT {','.join(token_columns)} FROM {token_table} LIMIT {token_count} OFFSET {token_offset}"
tokens = spark.sql(select_query_str)
return tokens

def detokenize_tokens(names) -> str:
batched_names = [names[i : i + BATCH_SIZE] for i in range(0, len(names), BATCH_SIZE)]
output = []
for cur_batch in batched_names:
detokenize_params = [{"token":cur_name, "redaction":"REDACTED"} for cur_name in cur_batch]
print(f"detokenize_params={detokenize_params}")
payload = {"detokenizationParameters":detokenize_params}
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'X-SKYFLOW-ACCOUNT-ID': ACCOUNT_ID,
'Authorization': f'Bearer {BEARER_TOKEN}'
}
try:
resp = requests.post(DETOKENIZE_URL, json=payload, headers=headers)
except Exception as e:
raise e
try:
data = resp.json()
for cur_record in data["records"]:
output.append(cur_record["value"])
except Exception as e:
print(f"error parsing detokenize return {data}. Error = {e}")
raise e
return str(output)

return detokenize_tokens(get_tokens(token_count, token_offset, token_table, token_columns))
$$;
50 changes: 50 additions & 0 deletions detokenize/detokenize_unregistered.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import pandas as pd
import requests

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

from skyflow.service_account import generate_bearer_token

raw_data_df = _sqldf
BATCH_SIZE = 25
ACCOUNT_ID = '<INSERT_ACCOUNT_ID_HERE>'
DETOKENIZE_URL = '<INSERT_DETOKENIZE_URL_HERE>'

#Should move to Kubernetes secrets for production env
CREDS_FILE='<INSERT_CREDS_FILE_PATH_HERE>'
BEARER_TOKEN, _ = generate_bearer_token(CREDS_FILE)

def detokenize(names: pd.Series) -> pd.Series:
batched_names = [names[i : i + BATCH_SIZE] for i in range(0, len(names), BATCH_SIZE)]
output = []
for cur_batch in batched_names:
detokenize_params = [{"token":cur_name,} for cur_name in cur_batch]
print(f"detokenize_params={detokenize_params}")
payload = {"detokenizationParameters":detokenize_params}
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'X-SKYFLOW-ACCOUNT-ID': ACCOUNT_ID,
'Authorization': f'Bearer {BEARER_TOKEN}'
}
try:
resp = requests.post(DETOKENIZE_URL, json=payload, headers=headers)
print(resp.json())
except Exception as e:
raise e
try:
data = resp.json()
for cur_record in data["records"]:
output.append(cur_record["value"])
except Exception as e:
print(f"error parsing detokenize return {data}. Error = {e}")
raise e
return pd.Series(output)

df = raw_data_df
call_udf = pandas_udf(detokenize, returnType=StringType())
data_series = df.select("name","ssn", "email_address").rdd.flatMap(lambda x: x).collect()
custom_df = spark.createDataFrame([(data_series[i][0], data_series[i][1], data_series[i][2],) for i in range(len(data_series))], ["name", "email_address","ssn"])
custom_df = custom_df.repartition(3)
display(df.select(call_udf("name"), call_udf("email_address"), call_udf("ssn")))