Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 97 additions & 2 deletions app/celery/reporting_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from flask import current_app
from notifications_utils.statsd_decorators import statsd
from notifications_utils.timezones import convert_utc_to_local_timezone
from sqlalchemy import func
from sqlalchemy.dialects.postgresql import insert

from app import annual_limit_client, notify_celery
from app import annual_limit_client, db, notify_celery
from app.config import QueueNames
from app.cronitor import cronitor
from app.dao.annual_limits_data_dao import (
Expand All @@ -21,7 +23,7 @@
update_fact_notification_status,
)
from app.dao.users_dao import get_services_for_all_users
from app.models import Service
from app.models import FactNotificationStatus, MonthlyNotificationStatsSummary, Service
from app.user.rest import send_annual_usage_data


Expand Down Expand Up @@ -136,6 +138,99 @@ def create_nightly_notification_status_for_day(process_day):
)


@notify_celery.task(name="create-monthly-notification-stats-summary")
@statsd(namespace="tasks")
def create_monthly_notification_status_summary():
"""
Refresh the monthly_notification_stats table for the current and previous month.
Uses PostgreSQL upsert (INSERT ... ON CONFLICT) for efficient updates.
Only processes last 2 months since historical data rarely changes.

***IMPORTANT***
This function assumes it is run after the nightly notification status.
IT DOESN'T HAVE ALL THE DATA FROM ft_notification_status
We do NOT store all notifications, only non-test key notifications and notifications
that have been delivered and sent.

function we are optimizing for:
def fetch_delivered_notification_stats_by_month(filter_heartbeats=None):
query = (
db.session.query(
func.date_trunc("month", FactNotificationStatus.bst_date).cast(db.Text).label("month"),
FactNotificationStatus.notification_type,
func.sum(FactNotificationStatus.notification_count).label("count"),
)
.filter(
FactNotificationStatus.key_type != KEY_TYPE_TEST,
FactNotificationStatus.notification_status.in_([NOTIFICATION_DELIVERED, NOTIFICATION_SENT]),
FactNotificationStatus.bst_date >= "2019-11-01", # GC Notify start date
)
.group_by(
func.date_trunc("month", FactNotificationStatus.bst_date),
FactNotificationStatus.notification_type,
)
.order_by(
func.date_trunc("month", FactNotificationStatus.bst_date).desc(),
FactNotificationStatus.notification_type,
)
)
"""
current_app.logger.info("create-monthly-notification-stats-summary STARTED")
start_time = datetime.now(timezone.utc)

# Calculate the first day of current month and previous month
today = convert_utc_to_local_timezone(datetime.utcnow()).date()
current_month_start = today.replace(day=1)
previous_month_start = (current_month_start - timedelta(days=1)).replace(day=1)

try:
# Single efficient query with upsert logic
# This aggregates data for last 2 months and upserts in one statement
table = MonthlyNotificationStatsSummary.__table__

stmt = insert(table).from_select(
["month", "service_id", "notification_type", "notification_count", "updated_at"],
db.session.query(
func.date_trunc("month", FactNotificationStatus.bst_date).cast(db.Text).label("month"),
FactNotificationStatus.service_id,
FactNotificationStatus.notification_type,
func.sum(FactNotificationStatus.notification_count).label("notification_count"),
func.now().label("updated_at"),
)
.filter(
FactNotificationStatus.key_type != "test",
FactNotificationStatus.notification_status.in_(["delivered", "sent"]),
FactNotificationStatus.bst_date >= previous_month_start,
)
.group_by(
func.date_trunc("month", FactNotificationStatus.bst_date),
FactNotificationStatus.service_id,
FactNotificationStatus.notification_type,
),
)

stmt = stmt.on_conflict_do_update(
index_elements=["month", "service_id", "notification_type"],
set_={"notification_count": stmt.excluded.notification_count, "updated_at": stmt.excluded.updated_at},
)

result = db.session.execute(stmt)
db.session.commit()

end_time = datetime.now(timezone.utc)
duration = (end_time - start_time).total_seconds()

current_app.logger.info(
"create-monthly-notification-stats-summary COMPLETED: {} rows affected in {:.2f} seconds for months: {}, {}".format(
result.rowcount, duration, previous_month_start.strftime("%Y-%m"), current_month_start.strftime("%Y-%m")
)
)

except Exception as e:
db.session.rollback()
current_app.logger.error("create-monthly-notification-stats-summary FAILED: {}".format(e))


@notify_celery.task(name="insert-quarter-data-for-annual-limits")
@statsd(namespace="tasks")
def insert_quarter_data_for_annual_limits(process_day=None):
Expand Down
6 changes: 6 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,12 @@ class Config(object):
"schedule": crontab(hour=9, minute=0), # 4:00 EST in UTC
"options": {"queue": QueueNames.PERIODIC},
},
# Make the monthly-notification-stats-status table everyday
"create-monthly-notification-status-summary": {
"task": "create-monthly-notification-status-summary",
"schedule": crontab(hour=6, minute=30), # 01:30 EST in UTC, after 'create-nightly-notification-status'
"options": {"queue": QueueNames.REPORTING},
},
# quarterly queue
"insert-quarter-data-for-annual-limits-q1": {
"task": "insert-quarter-data-for-annual-limits",
Expand Down
175 changes: 175 additions & 0 deletions tests/app/celery/test_reporting_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from app import annual_limit_client
from app.celery.reporting_tasks import (
create_monthly_notification_status_summary,
create_nightly_billing,
create_nightly_billing_for_day,
create_nightly_notification_status,
Expand All @@ -35,6 +36,7 @@
AnnualLimitsData,
FactBilling,
FactNotificationStatus,
MonthlyNotificationStatsSummary,
Notification,
)

Expand Down Expand Up @@ -665,3 +667,176 @@ def test_send_quarter_email(self, sample_user, mocker, notify_db_session):
markdown_list_en,
markdown_list_fr,
)


class TestCreateMonthlyNotificationStatsSummary:
@freeze_time("2019-03-15T12:00:00")
def test_create_monthly_notification_stats_summary_creates_summary_data(self, notify_db_session):
"""Test that the task creates summary data for current and previous month"""
service_1 = create_service(service_name="service_1")
service_2 = create_service(service_name="service_2")

# Current month (March 2019)
create_ft_notification_status(date(2019, 3, 1), "sms", service_1, notification_status="delivered", count=5)
create_ft_notification_status(date(2019, 3, 10), "email", service_1, notification_status="sent", count=3)
create_ft_notification_status(date(2019, 3, 5), "sms", service_2, notification_status="delivered", count=10)
create_ft_notification_status(date(2019, 3, 5), "email", service_2, notification_status="delivered", count=10)
create_ft_notification_status(date(2019, 3, 6), "email", service_2, notification_status="delivered", count=20)

# Previous month (February 2019)
create_ft_notification_status(date(2019, 2, 15), "sms", service_1, notification_status="delivered", count=20)
create_ft_notification_status(date(2019, 2, 20), "email", service_2, notification_status="sent", count=15)

# Older data (should not be included)
create_ft_notification_status(date(2019, 1, 10), "sms", service_1, notification_status="delivered", count=100)

assert MonthlyNotificationStatsSummary.query.count() == 0

create_monthly_notification_status_summary()

results = MonthlyNotificationStatsSummary.query.order_by(
MonthlyNotificationStatsSummary.month,
MonthlyNotificationStatsSummary.service_id,
MonthlyNotificationStatsSummary.notification_type,
).all()

assert len(results) == 6

# Check March 2019 data
march_service1_sms = [
r
for r in results
if r.month.startswith("2019-03-01") and r.service_id == service_1.id and r.notification_type == "sms"
][0]
assert march_service1_sms.notification_count == 5

march_service1_email = [
r
for r in results
if r.month.startswith("2019-03-01") and r.service_id == service_1.id and r.notification_type == "email"
][0]
assert march_service1_email.notification_count == 3

march_service2_sms = [
r
for r in results
if r.month.startswith("2019-03-01") and r.service_id == service_2.id and r.notification_type == "sms"
][0]
assert march_service2_sms.notification_count == 10

march_service2_email = [
r
for r in results
if r.month.startswith("2019-03-01") and r.service_id == service_2.id and r.notification_type == "email"
][0]
assert march_service2_email.notification_count == 30

# Check February 2019 data
feb_service1_sms = [
r
for r in results
if r.month.startswith("2019-02-01") and r.service_id == service_1.id and r.notification_type == "sms"
][0]
assert feb_service1_sms.notification_count == 20

feb_service2_email = [
r
for r in results
if r.month.startswith("2019-02-01") and r.service_id == service_2.id and r.notification_type == "email"
][0]
assert feb_service2_email.notification_count == 15

# Verify January data was not included
jan_data = [r for r in results if r.month.startswith("2019-01-01")]
assert len(jan_data) == 0

@freeze_time("2019-03-15T12:00:00")
def test_create_monthly_notification_stats_summary_excludes_test_keys(self, notify_db_session):
"""Test that test key notifications are excluded from summary"""
service = create_service(service_name="test_service")

# Real notifications
create_ft_notification_status(
date(2019, 3, 1), "sms", service, key_type="normal", notification_status="delivered", count=10
)

# Test key notifications (should be excluded)
create_ft_notification_status(
date(2019, 3, 2), "sms", service, key_type="test", notification_status="delivered", count=100
)

create_monthly_notification_status_summary()

results = MonthlyNotificationStatsSummary.query.all()
assert len(results) == 1
assert results[0].notification_count == 10

@freeze_time("2019-03-15T12:00:00")
def test_create_monthly_notification_stats_summary_only_includes_delivered_and_sent(self, notify_db_session):
"""Test that only delivered and sent notifications are included"""
service = create_service(service_name="test_service")

# Delivered notifications
create_ft_notification_status(date(2019, 3, 1), "sms", service, notification_status="delivered", count=5)

# Sent notifications
create_ft_notification_status(date(2019, 3, 2), "email", service, notification_status="sent", count=3)

# Other statuses (should be excluded)
create_ft_notification_status(date(2019, 3, 3), "sms", service, notification_status="failed", count=10)
create_ft_notification_status(date(2019, 3, 4), "sms", service, notification_status="created", count=20)
create_ft_notification_status(date(2019, 3, 5), "sms", service, notification_status="temporary-failure", count=15)

create_monthly_notification_status_summary()

results = MonthlyNotificationStatsSummary.query.all()
assert len(results) == 2

sms_result = [r for r in results if r.notification_type == "sms"][0]
assert sms_result.notification_count == 5

email_result = [r for r in results if r.notification_type == "email"][0]
assert email_result.notification_count == 3

@freeze_time("2019-03-15T12:00:00")
def test_create_monthly_notification_stats_summary_aggregates_multiple_days(self, notify_db_session):
"""Test that multiple days in the same month are aggregated correctly"""
service = create_service(service_name="test_service")

# Multiple days in March
create_ft_notification_status(date(2019, 3, 1), "sms", service, notification_status="delivered", count=5)
create_ft_notification_status(date(2019, 3, 10), "sms", service, notification_status="delivered", count=10)
create_ft_notification_status(date(2019, 3, 15), "sms", service, notification_status="sent", count=8)
create_ft_notification_status(date(2019, 3, 20), "sms", service, notification_status="delivered", count=12)

create_monthly_notification_status_summary()

results = MonthlyNotificationStatsSummary.query.all()
assert len(results) == 1
assert results[0].notification_count == 35 # 5 + 10 + 8 + 12

@freeze_time("2019-03-15T12:00:00")
def test_create_monthly_notification_stats_summary_updates_existing_data(self, notify_db_session):
"""Test that the task updates existing summary data (upsert behavior)"""
service = create_service(service_name="test_service")

# Initial data
create_ft_notification_status(date(2019, 3, 1), "sms", service, notification_status="delivered", count=10)

create_monthly_notification_status_summary()

results = MonthlyNotificationStatsSummary.query.all()
assert len(results) == 1
assert results[0].notification_count == 10
initial_updated_at = results[0].updated_at

# Add more data for the same month
create_ft_notification_status(date(2019, 3, 10), "sms", service, notification_status="delivered", count=5)

# Run the task again
create_monthly_notification_status_summary()

results = MonthlyNotificationStatsSummary.query.all()
assert len(results) == 1
assert results[0].notification_count == 15 # Updated from 10 to 15
assert results[0].updated_at > initial_updated_at # Timestamp should be updated
Loading