Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion basket/news/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ def recover_user(request, body: RecoverUserSchema):

tasks.send_recovery_message.delay(
body.email,
user_data["token"],
user_data.get("lang", "en") or "en",
user_data.get("email_id"),
)
Expand Down
58 changes: 19 additions & 39 deletions basket/news/backends/braze.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,8 @@
BRAZE_OPTIMAL_DELAY = timedelta(minutes=5)


# These tasks cannot be placed in basket/news/tasks.py because it would
# This task cannot be placed in basket/news/tasks.py because it would
# create a circular dependency.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to leave this comment here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes because there is still one task (the alias one)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's worth removing the empty space between that comment and that task then? So it's clear that's what it's about

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yes, indeed

@rq_task
def migrate_external_id_task(current_external_id, new_external_id):
braze.interface.migrate_external_id(
[
{
"current_external_id": current_external_id,
"new_external_id": new_external_id,
}
]
)


@rq_task
def add_fxa_id_alias_task(external_id, fxa_id):
braze.interface.add_fxa_id_alias(external_id, fxa_id)
Expand Down Expand Up @@ -366,7 +354,6 @@ def __init__(self, interface):

def get(
self,
email_id=None,
token=None,
email=None,
fxa_id=None,
Expand All @@ -381,18 +368,6 @@ def get(
@return: dict, or None if not found
Copy link

@clara-campos clara-campos Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're not using the email_id param in braze.get, we should probably get rid of it to avoid even more confusion

"""

# If we only have a token or fxa_id and the Braze migrations for them haven't been
# completed we won't be able to look up the user. We add a temporary shim here which
# will fetch the email from CTMS. This shim can be disabled/removed after the migrations
# are complete.
if not email and settings.BRAZE_CTMS_SHIM_ENABLE:
try:
ctms_response = ctms.get(token=token, fxa_id=fxa_id)
if ctms_response:
email = ctms_response.get("email")
except Exception:
log.warn("Unable to fetch email from CTMS in braze.get shim")

user_response = self.interface.export_users(
email,
[
Expand Down Expand Up @@ -424,6 +399,20 @@ def get(

return self.from_vendor(user_data, subscriptions)

# If we only have an outdated token or the Braze fxa_id migrations haven't been
# completed we won't be able to look up the user. We add a temporary shim here which
# will fetch the email from CTMS. This shim can be disabled/removed after the migration
# is complete.
elif not email and (fxa_id or token) and settings.BRAZE_CTMS_SHIM_ENABLE:
try:
ctms_response = ctms.get(token=token, fxa_id=fxa_id)
if ctms_response:
ctms_email = ctms_response.get("email")
if ctms_email:
return self.get(email=ctms_email)
except Exception:
log.warn("Unable to fetch email from CTMS in braze.get shim")

def add(self, data):
"""
Create a user record.
Expand All @@ -441,14 +430,6 @@ def add(self, data):
enqueue_in=BRAZE_OPTIMAL_DELAY,
)

token = data.get("token")
if token and settings.BRAZE_PARALLEL_WRITE_ENABLE:
migrate_external_id_task.delay(
external_id,
token,
enqueue_in=BRAZE_OPTIMAL_DELAY,
)

return {"email": {"email_id": external_id}}

def update(self, existing_data, update_data):
Expand Down Expand Up @@ -539,7 +520,8 @@ def from_vendor(self, braze_user_data, subscription_groups):
"last_modified_date": user_attributes.get("updated_at"),
"optin": braze_user_data.get("email_subscribe") == "opted_in",
"optout": braze_user_data.get("email_subscribe") == "unsubscribed",
"token": user_attributes.get("basket_token"),
"token": braze_user_data["external_id"],
"ctms_legacy_token": user_attributes.get("basket_token"),
"fxa_service": user_attributes.get("fxa_first_service"),
"fxa_lang": user_attributes.get("fxa_lang"),
"fxa_primary_email": user_attributes.get("fxa_primary_email"),
Expand All @@ -563,9 +545,7 @@ def to_vendor(self, basket_user_data=None, update_data=None, events=None):
country = process_country(updated_user_data.get("country") or None)
language = process_lang(updated_user_data.get("lang") or None)

external_id = (
updated_user_data.get("token") if not existing_user_data and settings.BRAZE_ONLY_WRITE_ENABLE else updated_user_data.get("email_id")
)
external_id = updated_user_data.get("email_id")

if not external_id:
raise ValueError("Missing Braze external_id")
Expand Down Expand Up @@ -593,7 +573,7 @@ def to_vendor(self, basket_user_data=None, update_data=None, events=None):
"subscription_groups": subscription_groups,
"user_attributes_v1": [
{
"basket_token": updated_user_data.get("token"),
"basket_token": updated_user_data.get("ctms_legacy_token") or updated_user_data.get("token"),
"created_at": {"$time": updated_user_data.get("created_date", now)},
"email_lang": language,
"mailing_country": country,
Expand Down
4 changes: 4 additions & 0 deletions basket/news/backends/ctms.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,10 @@ def get(
alt_ids = []
if token:
alt_ids.append({"basket_token": token})

# While we transition from `basket_token` to `external_id` / `email_id` the
# token passed in can either be a `basket_token` or an `external_id` / `email_id`.
alt_ids.append({"email_id": token})
if email:
alt_ids.append({"primary_email": email})
if fxa_id:
Expand Down
20 changes: 17 additions & 3 deletions basket/news/management/commands/process_fxa_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sentry_sdk

from basket import metrics
from basket.news.backends.braze import BRAZE_OPTIMAL_DELAY
from basket.news.tasks import (
fxa_delete,
fxa_email_changed,
Expand Down Expand Up @@ -101,28 +102,28 @@ def handle(self, *args, **options):
msg.delete()
continue

enqueue_in = BRAZE_OPTIMAL_DELAY if should_delay_execution(event_type, event) else None
try:
if settings.BRAZE_PARALLEL_WRITE_ENABLE:
pre_generated_token = generate_token()
pre_generated_email_id = generate_token()
FXA_EVENT_TYPES[event_type].delay(
event,
use_braze_backend=True,
should_send_tx_messages=False,
pre_generated_token=pre_generated_token,
pre_generated_email_id=pre_generated_email_id,
enqueue_in=enqueue_in,
)
FXA_EVENT_TYPES[event_type].delay(
event,
use_braze_backend=False,
should_send_tx_messages=True,
pre_generated_token=pre_generated_token,
pre_generated_email_id=pre_generated_email_id,
)
elif settings.BRAZE_ONLY_WRITE_ENABLE:
FXA_EVENT_TYPES[event_type].delay(
event,
use_braze_backend=True,
enqueue_in=enqueue_in,
)
else:
FXA_EVENT_TYPES[event_type].delay(
Expand All @@ -142,3 +143,16 @@ def handle(self, *args, **options):
msg.delete()
except KeyboardInterrupt:
sys.exit("\nBuh bye")


def should_delay_execution(event_type, event):
"""
Braze can take up to 5 minutes to add an fxa_id alias. We use this function
to determine if an event relies on the fxa_id alias and should be delayed.
"""
if event_type == "primaryEmailChanged":
return True
elif event.get("uid") and not event.get("email"):
return True
else:
return False
42 changes: 19 additions & 23 deletions basket/news/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def fxa_email_changed(
data,
use_braze_backend=False,
pre_generated_token=None,
pre_generated_email_id=None,
**kwargs,
):
ts = data["ts"]
Expand Down Expand Up @@ -83,8 +82,8 @@ def fxa_email_changed(
"fxa_id": fxa_id,
"fxa_primary_email": email,
}
if pre_generated_email_id:
data["email_id"] = pre_generated_email_id
if pre_generated_token:
data["email_id"] = pre_generated_token

backend_data = data.copy()
contact = None
Expand Down Expand Up @@ -125,7 +124,6 @@ def fxa_verified(
use_braze_backend=False,
should_send_tx_messages=True,
pre_generated_token=None,
pre_generated_email_id=None,
):
"""Add new FxA users"""
# if we're not using the sandbox ignore testing domains
Expand Down Expand Up @@ -169,7 +167,6 @@ def fxa_verified(
use_braze_backend=use_braze_backend,
should_send_tx_messages=should_send_tx_messages,
pre_generated_token=pre_generated_token,
pre_generated_email_id=pre_generated_email_id,
)


Expand All @@ -179,7 +176,6 @@ def fxa_newsletters_update(
use_braze_backend=False,
should_send_tx_messages=True,
pre_generated_token=None,
pre_generated_email_id=None,
):
email = data["email"]
fxa_id = data["uid"]
Expand All @@ -199,7 +195,6 @@ def fxa_newsletters_update(
use_braze_backend=use_braze_backend,
should_send_tx_messages=should_send_tx_messages,
pre_generated_token=pre_generated_token,
pre_generated_email_id=pre_generated_email_id,
)


Expand All @@ -209,7 +204,6 @@ def fxa_login(
use_braze_backend=False,
should_send_tx_messages=True,
pre_generated_token=None,
pre_generated_email_id=None,
):
email = data["email"]
# if we're not using the sandbox ignore testing domains
Expand All @@ -230,7 +224,6 @@ def fxa_login(
use_braze_backend=use_braze_backend,
should_send_tx_messages=should_send_tx_messages,
pre_generated_token=pre_generated_token,
pre_generated_email_id=pre_generated_email_id,
)


Expand All @@ -253,7 +246,6 @@ def upsert_user(
use_braze_backend=False,
should_send_tx_messages=True,
pre_generated_token=None,
pre_generated_email_id=None,
):
"""
Update or insert (upsert) a contact record
Expand All @@ -275,7 +267,6 @@ def upsert_user(
use_braze_backend=use_braze_backend,
should_send_tx_messages=should_send_tx_messages,
pre_generated_token=pre_generated_token,
pre_generated_email_id=pre_generated_email_id,
)


Expand All @@ -286,7 +277,6 @@ def upsert_contact(
use_braze_backend=False,
should_send_tx_messages=True,
pre_generated_token=None,
pre_generated_email_id=None,
):
"""
Update or insert (upsert) a contact record
Expand All @@ -304,7 +294,15 @@ def upsert_contact(
newsletters = parse_newsletters_csv(data.get("newsletters"))
cur_newsletters = user_data and user_data.get("newsletters")

if user_data and data.get("token") and user_data.get("token") != data["token"]:
if (
user_data
and data.get("token")
and data["token"]
not in [
user_data.get("token"),
user_data.get("ctms_legacy_token"),
]
):
# We were passed a token but it doesn't match the user.
return None, None

Expand Down Expand Up @@ -374,8 +372,8 @@ def upsert_contact(
# no user found. create new one.
token = update_data["token"] = pre_generated_token or generate_token()

if pre_generated_email_id:
update_data["email_id"] = pre_generated_email_id
if pre_generated_token:
update_data["email_id"] = pre_generated_token

if settings.MAINTENANCE_MODE:
if use_braze_backend:
Expand All @@ -384,17 +382,16 @@ def upsert_contact(
ctms_add_or_update.delay(update_data)
else:
if use_braze_backend:
new_user = braze.add(update_data)
braze.add(update_data)
else:
new_user = ctms.add(update_data)
ctms.add(update_data)

if send_confirm and settings.SEND_CONFIRM_MESSAGES and should_send_tx_messages:
send_confirm_message.delay(
data["email"],
token,
data.get("lang", "en-US"),
send_confirm,
new_user and new_user.get("email", {}).get("email_id") or None,
)

return token, True
Expand Down Expand Up @@ -440,7 +437,6 @@ def upsert_contact(
token,
update_data.get("lang", user_data.get("lang", "en-US")),
send_confirm,
user_data.get("email_id"),
)

return token, False
Expand Down Expand Up @@ -495,13 +491,13 @@ def send_tx_messages(email, lang, message_ids):


@rq_task
def send_confirm_message(email, token, lang, message_type, email_id):
def send_confirm_message(email, token, lang, message_type):
lang = lang.strip()
lang = lang or "en-US"
message_id = f"newsletter-confirm-{message_type}"
txm = BrazeTxEmailMessage.objects.get_message(message_id, lang)
if txm:
send_tx_message(email, txm.message_id, txm.language, user_data={"basket_token": token, "email_id": email_id})
send_tx_message(email, txm.message_id, txm.language, user_data={"basket_token": token, "email_id": token})


@rq_task
Expand Down Expand Up @@ -559,11 +555,11 @@ def update_custom_unsub(token, reason, use_braze_backend=False):


@rq_task
def send_recovery_message(email, token, lang, email_id):
def send_recovery_message(email, lang, token):
message_id = "account-recovery"
txm = BrazeTxEmailMessage.objects.get_message(message_id, lang)
if txm:
user_data = {"basket_token": token, "email_id": email_id}
user_data = {"basket_token": token, "email_id": token}
send_tx_message(email, txm.message_id, txm.language, user_data=user_data)


Expand Down
2 changes: 1 addition & 1 deletion basket/news/tests/api/test_users_recover.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_good_email(self):
assert resp.status_code == 200, resp.content
data = resp.json()
self.validate_schema(data, OkSchema)
mock_send.assert_called_with(self.email, self.token, "en", self.email_id)
mock_send.assert_called_with(self.email, "en", self.email_id)

# 4xx errors

Expand Down
Loading