Skip to content

Commit 067d693

Browse files
committed
PP-690:Push Databricks Detokenize UDF to Foundry
Adding Detokenize UDF to Foundry PP-690
1 parent 89c13b2 commit 067d693

File tree

1 file changed

+59
-0
lines changed

1 file changed

+59
-0
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
%sql
2+
CREATE FUNCTION detokenize(token_count INTEGER, token_offset INTEGER, token_table STRING, token_columns ARRAY<STRING>)
3+
RETURNS STRING
4+
LANGUAGE PYTHON
5+
AS $$
6+
7+
import numpy as np
8+
import pandas as pd
9+
import requests
10+
import sys
11+
12+
from pyspark.sql.functions import col
13+
from pyspark.sql.functions import pandas_udf
14+
from pyspark.sql.types import StringType
15+
16+
from skyflow.service_account import generate_bearer_token
17+
18+
raw_names_df = _sqldf
19+
BATCH_SIZE = 25
20+
ACCOUNT_ID = <ACCOUNT_ID>
21+
DETOKENIZE_URL = <DETOKENIZE_URL>
22+
CREDS_FILE= <CREDS_FILE>
23+
BEARER_TOKEN, _ = generate_bearer_token(CREDS_FILE)
24+
25+
def get_tokens(token_count, token_offset, token_table, token_columns):
26+
--SELECT token columns FROM token table LIMIT token_count OFFSET token_offset
27+
--return appropriate values
28+
select_query_str = f"SELECT {','.join(token_columns)} FROM {token_table} LIMIT {token_count} OFFSET {token_offset}"
29+
tokens = spark.sql(select_query_str)
30+
return tokens
31+
32+
def detokenize_tokens(names) -> str:
33+
batched_names = [names[i : i + BATCH_SIZE] for i in range(0, len(names), BATCH_SIZE)]
34+
output = []
35+
for cur_batch in batched_names:
36+
detokenize_params = [{"token":cur_name, "redaction":"REDACTED"} for cur_name in cur_batch]
37+
print(f"detokenize_params={detokenize_params}")
38+
payload = {"detokenizationParameters":detokenize_params}
39+
headers = {
40+
'Content-Type': 'application/json',
41+
'Accept': 'application/json',
42+
'X-SKYFLOW-ACCOUNT-ID': ACCOUNT_ID,
43+
'Authorization': f'Bearer {BEARER_TOKEN}'
44+
}
45+
try:
46+
resp = requests.post(DETOKENIZE_URL, json=payload, headers=headers)
47+
except Exception as e:
48+
raise e
49+
try:
50+
data = resp.json()
51+
for cur_record in data["records"]:
52+
output.append(cur_record["value"])
53+
except Exception as e:
54+
print(f"error parsing detokenize return {data}. Error = {e}")
55+
raise e
56+
return str(output)
57+
58+
return detokenize_tokens(get_tokens(token_count, token_offset, token_table, token_columns))
59+
$$;

0 commit comments

Comments
 (0)