Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 32 additions & 28 deletions app/celery/tasks/emails.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,46 @@
from uuid import UUID
from app.emails.exceptions.email_client_exception import EmailClientException
from typing import Final

from celery import Task

from app.common.exceptions import ExternalProviderException
from app.common.schemas.pagination_schema import ListFilter
from app.emails.services.emails_service import EmailService
from app.db.session import SessionLocal
from app.emails import Email, EmailService, get_client
from app.main import celery


from app.core.config import get_settings
from app.users.schemas.user_schema import UserInDB
from app.users.services.users_service import UsersService

settings = get_settings()

BACKOFF_EXPONENTIAL_GROWTH_BASE: Final[int] = 2


@celery.task
def send_reminder_email() -> None:
session = SessionLocal()
try:
service = EmailService()

with SessionLocal() as session:
users = UsersService(session).list(ListFilter(page=1, page_size=100))
for user in users.data:
EmailService().send_user_remind_email(
UserInDB.model_validate(user)
)
finally:
session.close()


@celery.task(
autoretry_for=(EmailClientException,),
retry_backoff=settings.SEND_WELCOME_EMAIL_RETRY_BACKOFF_VALUE,
max_retries=settings.SEND_WELCOME_EMAIL_MAX_RETRIES,
retry_jitter=False,
)
def send_welcome_email(user_id: UUID) -> None:
session = SessionLocal()
service.send_user_remind_email(UserInDB.model_validate(user))


@celery.task(bind=True)
def send_email(self: Task, serialized_email: dict) -> None:
email = Email(**serialized_email)
client = get_client()

try:
user = UsersService(session).get_by_id(user_id)
if user:
EmailService().send_new_user_email(UserInDB.model_validate(user))
finally:
session.close()
client.send_email(email)
except ExternalProviderException as exc:
if not email.context:
raise

multiplicator = BACKOFF_EXPONENTIAL_GROWTH_BASE**self.request.retries
countdown_in_seconds = email.context.backoff_in_seconds * multiplicator

raise self.retry(
exc=exc,
max_retries=email.context.max_retries,
countdown=countdown_in_seconds,
)
30 changes: 19 additions & 11 deletions app/common/clients/base_request_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ def _make_request(
params: dict | None = None,
auth: tuple[str, str] | None = None,
json: dict | None = None,
error_message: str | None = None,
) -> requests.Response | None:
url = f"{self.base_url}{endpoint}" if self.base_url else endpoint

try:
response = requests.request(
method=method,
url=(
f"{self.base_url}{endpoint}" if self.base_url else endpoint
),
url=url,
headers=headers,
data=data,
files=files,
Expand All @@ -40,12 +41,19 @@ def _make_request(
response.raise_for_status()
return response

except requests.exceptions.RequestException as error:
logger.error(str(endpoint))
logger.error(str(error))
logger.error(str(data))
logger.error(str(params))
logger.error(
f"response: {error.response.text if error.response else None}"
)
except requests.exceptions.RequestException as exc:
message = error_message or self._get_error_message(exc, url)
logger.error(message)
return None

def _get_error_message(
self,
exc: requests.exceptions.RequestException,
url: str,
) -> str:
msg_data = {"url": url, "exception": exc}

if exc.response:
msg_data["response"] = exc.response.text

return f"{self.__class__}: request failed. Data: {msg_data}"
28 changes: 20 additions & 8 deletions app/core/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import logging
import secrets
from functools import lru_cache
from typing import Any, Final, List, Optional, Union

from pydantic import AnyHttpUrl, PostgresDsn, field_validator, model_validator
from typing import Any, Final, List, Literal, Optional, Union

from pydantic import (
AnyHttpUrl,
PostgresDsn,
NonNegativeInt,
EmailStr,
field_validator,
model_validator,
)
from pydantic_core.core_schema import ValidationInfo
from pydantic_settings import BaseSettings, SettingsConfigDict

Expand All @@ -16,7 +23,8 @@ class Settings(BaseSettings):
frozen=True,
)
# APP
RUN_ENV: str = "local"
RUN_ENV: Literal["local", "develop", "staging", "production"] = "local"
PROCESS_TYPE: Literal["api", "worker", "beat"]
API_V1_STR: str = "/api/v1"
SECRET_KEY: str = secrets.token_urlsafe(32)
SERVER_NAME: str
Expand Down Expand Up @@ -47,10 +55,14 @@ class Settings(BaseSettings):
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60

# Mail
SENDER_EMAIL: str = "test@test.com"
SEND_WELCOME_EMAIL_MAX_RETRIES: int = 5
SEND_WELCOME_EMAIL_RETRY_BACKOFF_VALUE: int = 5
# Email - general
SENDER_EMAIL: EmailStr = "test@test.com"
SEND_EMAIL_MAX_RETRIES: NonNegativeInt = 5
SEND_EMAIL_RETRY_BACKOFF_VALUE: NonNegativeInt = 5

# Email - specific
SEND_WELCOME_EMAIL_MAX_RETRIES: NonNegativeInt = 5
SEND_WELCOME_EMAIL_RETRY_BACKOFF_VALUE: NonNegativeInt = 5

# Mailpit
MAILPIT_URI: str | None = None
Expand Down
2 changes: 1 addition & 1 deletion app/emails/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

from ._global_state import set_client, get_client

from .clients import MailpitEmailClient, ExampleEmailClient
from .clients import MailpitEmailClient, ExampleEmailClient, CeleryTaskEmailClient
from .services.emails_service import EmailService
1 change: 1 addition & 0 deletions app/emails/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .mailpit_email_client import MailpitEmailClient
from .example_email_client import ExampleEmailClient
from .celery_task_email_client import CeleryTaskEmailClient
21 changes: 21 additions & 0 deletions app/emails/clients/celery_task_email_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from app.emails.clients.base import BaseEmailClient
from app.emails.schema.email import Email, EmailContext


class CeleryTaskEmailClient(BaseEmailClient):
def __init__(self) -> None:
super().__init__()
from app.celery.tasks.emails import send_email

self.task = send_email

def send_email(self, /, email: Email) -> None:
if not email.context:
email.context = EmailContext()

serialized_email = email.model_dump(
mode="json",
exclude_unset=True,
)

self.task.delay(serialized_email)
11 changes: 5 additions & 6 deletions app/emails/clients/mailpit_email_client/_mailpit_email_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ def __init__(
super().__init__()
self.base_url = mailpit_uri or settings.MAILPIT_URI

def send_email(
self,
/,
email: Email,
) -> None:
def send_email(self, /, email: Email) -> None:
schema = _MailpitEmailSchema.from_email(email)

response = self._make_request(
Expand All @@ -35,5 +31,8 @@ def send_email(
)

if not response:
message = "Email not sent, see logs for details."
if email.context and email.context.error_message:
message = email.context.error_message
else:
message = "Email not sent, see logs for details."
raise ExternalProviderException(message)
12 changes: 11 additions & 1 deletion app/emails/schema/email.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
from pydantic import BaseModel, EmailStr
from pydantic import BaseModel, EmailStr, NonNegativeInt

from app.core.config import settings


class EmailContext(BaseModel):
max_retries: NonNegativeInt = settings.SEND_EMAIL_MAX_RETRIES
backoff_in_seconds: NonNegativeInt = (
settings.SEND_EMAIL_RETRY_BACKOFF_VALUE
)
error_message: str | None = None


class Email(BaseModel):
from_email: EmailStr = settings.SENDER_EMAIL
from_name: str = "FastApi"
Expand All @@ -18,3 +26,5 @@ class Email(BaseModel):
"MIME-Version": "1.0",
"Content-Type": "text/html",
}

context: EmailContext | None = None
13 changes: 10 additions & 3 deletions app/emails/services/emails_service.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from enum import Enum
from string import Template

from app.core.config import settings
from app.users.schemas.user_schema import UserInDB
from app.emails.clients.base import BaseEmailClient
from app.emails.schema.email import Email
from app.emails.schema.email import Email, EmailContext
from app.emails._global_state import get_client


Expand Down Expand Up @@ -44,7 +45,13 @@ def send_new_user_email(
"Welcome",
)

return self.email_client.send_email(email)
email.context = EmailContext(
max_retries=settings.SEND_WELCOME_EMAIL_MAX_RETRIES,
backoff_in_seconds=settings.SEND_WELCOME_EMAIL_RETRY_BACKOFF_VALUE,
error_message=f"Sending new user email to user {user.id} failed",
)

self.email_client.send_email(email)

def send_user_remind_email(
self,
Expand All @@ -56,4 +63,4 @@ def send_user_remind_email(
"Welcome",
)

return self.email_client.send_email(email)
self.email_client.send_email(email)
4 changes: 3 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@

from app import emails

if settings.RUN_ENV == "local":
if settings.PROCESS_TYPE == "api":
email_client = emails.CeleryTaskEmailClient()
elif settings.RUN_ENV == "local":
email_client = emails.MailpitEmailClient()
else:
email_client = emails.ExampleEmailClient()
Expand Down
7 changes: 3 additions & 4 deletions app/users/use_cases/create_user_use_case.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from fastapi import status
from fastapi.exceptions import HTTPException
from sqlalchemy.orm import Session
from fastapi import status

from app.auth.utils import security
from app.emails.services.emails_service import EmailService
from app.users.schemas.user_schema import (
CreateUserRequest,
UserCreate,
Expand All @@ -16,8 +17,6 @@ def __init__(self, session: Session):
self.session = session

def execute(self, create_user_request: CreateUserRequest) -> UserResponse:
from app.celery.tasks.emails import send_welcome_email

users_service = UsersService(self.session)
if users_service.get_by_email(create_user_request.email):
raise HTTPException(
Expand All @@ -34,7 +33,7 @@ def execute(self, create_user_request: CreateUserRequest) -> UserResponse:
)
)

send_welcome_email.delay(created_user.id) # type: ignore
EmailService().send_new_user_email(created_user)

return UserResponse(
id=created_user.id,
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ services:
env_file:
- .env
environment:
- PROCESS_TYPE=api
- PYTHONUNBUFFERED=1
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-test}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-test}
Expand All @@ -54,6 +55,7 @@ services:
env_file:
- .env
environment:
- PROCESS_TYPE=worker
- AWS_DEFAULT_REGION=us-east-1
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-test}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-test}
Expand All @@ -75,6 +77,7 @@ services:
env_file:
- .env
environment:
- PROCESS_TYPE=beat
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-test}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-test}
- AWS_ENDPOINT_URL=${AWS_ENDPOINT_URL:-http://localstack:4566}
Expand Down
17 changes: 16 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pytest-cov = "^5.0.0"
ruff = "^0.9.9"
types-pytz = "^2025.2.0.20250809"
types-requests = "^2.32.4.20250913"
celery-types = "^0.23.0"

[build-system]
requires = ["poetry-core"]
Expand Down