From 781f202d52184bb9338a672485cb78387890955e Mon Sep 17 00:00:00 2001 From: Wojciech Maciejewski Date: Tue, 29 Nov 2022 12:40:51 +0100 Subject: [PATCH 1/2] DR2-112 | Secure app method --- trench/backends/application.py | 7 ++++++- trench/backends/aws.py | 6 ++++-- trench/backends/base.py | 2 +- trench/backends/basic_mail.py | 4 +++- trench/backends/sms_api.py | 4 +++- trench/backends/twilio.py | 4 +++- trench/backends/yubikey.py | 4 +++- trench/exceptions.py | 8 ++++++++ trench/models.py | 6 ++++++ trench/views/base.py | 16 ++++++++++++---- 10 files changed, 49 insertions(+), 12 deletions(-) diff --git a/trench/backends/application.py b/trench/backends/application.py index 879270a3..a1d7e00f 100644 --- a/trench/backends/application.py +++ b/trench/backends/application.py @@ -1,9 +1,12 @@ +from typing import Optional + from django.contrib.auth import get_user_model from django.contrib.auth.models import AbstractUser import logging from trench.backends.base import AbstractMessageDispatcher +from trench.exceptions import GetCodeFromApplicationException from trench.responses import ( DispatchResponse, FailedDispatchResponse, @@ -16,8 +19,10 @@ class ApplicationMessageDispatcher(AbstractMessageDispatcher): - def dispatch_message(self) -> DispatchResponse: + def dispatch_message(self, url_name: Optional[str] = None) -> DispatchResponse: try: + if url_name and url_name == "mfa-request-code": + raise GetCodeFromApplicationException() qr_link = self._create_qr_link(self._mfa_method.user) return SuccessfulDispatchResponse(details=qr_link) except Exception as cause: # pragma: nocover diff --git a/trench/backends/aws.py b/trench/backends/aws.py index 7d8cfaed..827733a8 100644 --- a/trench/backends/aws.py +++ b/trench/backends/aws.py @@ -1,8 +1,9 @@ +from typing import Optional + from django.utils.translation import gettext_lazy as _ import logging import boto3 -import botocore.exceptions from trench.backends.base import AbstractMessageDispatcher from trench.responses import ( @@ -13,11 +14,12 @@ from trench.settings import AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION from botocore.exceptions import ClientError, EndpointConnectionError + class AWSMessageDispatcher(AbstractMessageDispatcher): _SMS_BODY = _("Your verification code is: ") _SUCCESS_DETAILS = _("SMS message with MFA code has been sent.") - def dispatch_message(self) -> DispatchResponse: + def dispatch_message(self, url_name: Optional[str] = None) -> DispatchResponse: try: client = boto3.client( "sns", diff --git a/trench/backends/base.py b/trench/backends/base.py index c32fe1a7..65e902b5 100644 --- a/trench/backends/base.py +++ b/trench/backends/base.py @@ -60,7 +60,7 @@ def _get_innermost_object(obj: Model, dotted_path: Optional[str] = None) -> Mode return obj # pragma: no cover @abstractmethod - def dispatch_message(self) -> DispatchResponse: + def dispatch_message(self, url_name: Optional[str] = None) -> DispatchResponse: raise NotImplementedError # pragma: no cover def create_code(self) -> str: diff --git a/trench/backends/basic_mail.py b/trench/backends/basic_mail.py index 1f02e3eb..917df986 100644 --- a/trench/backends/basic_mail.py +++ b/trench/backends/basic_mail.py @@ -1,3 +1,5 @@ +from typing import Optional + from django.conf import settings from django.core.mail import send_mail from django.template.loader import get_template @@ -19,7 +21,7 @@ class SendMailMessageDispatcher(AbstractMessageDispatcher): _KEY_MESSAGE = "message" _SUCCESS_DETAILS = _("Email message with MFA code has been sent.") - def dispatch_message(self) -> DispatchResponse: + def dispatch_message(self, url_name: Optional[str] = None) -> DispatchResponse: context = {"code": self.create_code()} email_plain_template = self._config[EMAIL_PLAIN_TEMPLATE] email_html_template = self._config[EMAIL_HTML_TEMPLATE] diff --git a/trench/backends/sms_api.py b/trench/backends/sms_api.py index eba393f5..82e6377d 100644 --- a/trench/backends/sms_api.py +++ b/trench/backends/sms_api.py @@ -1,3 +1,5 @@ +from typing import Optional + from django.utils.translation import gettext_lazy as _ import logging @@ -17,7 +19,7 @@ class SMSAPIMessageDispatcher(AbstractMessageDispatcher): _SMS_BODY = _("Your verification code is: ") _SUCCESS_DETAILS = _("SMS message with MFA code has been sent.") - def dispatch_message(self) -> DispatchResponse: + def dispatch_message(self, url_name: Optional[str] = None) -> DispatchResponse: try: client = SmsApiPlClient(access_token=self._config.get(SMSAPI_ACCESS_TOKEN)) from_number = self._config.get(SMSAPI_FROM_NUMBER) diff --git a/trench/backends/twilio.py b/trench/backends/twilio.py index 897c4332..a1e87a5d 100644 --- a/trench/backends/twilio.py +++ b/trench/backends/twilio.py @@ -1,3 +1,5 @@ +from typing import Optional + from django.utils.translation import gettext_lazy as _ import logging @@ -17,7 +19,7 @@ class TwilioMessageDispatcher(AbstractMessageDispatcher): _SMS_BODY = _("Your verification code is: ") _SUCCESS_DETAILS = _("SMS message with MFA code has been sent.") - def dispatch_message(self) -> DispatchResponse: + def dispatch_message(self, url_name: Optional[str] = None) -> DispatchResponse: try: client = Client() client.messages.create( diff --git a/trench/backends/yubikey.py b/trench/backends/yubikey.py index f184d876..3601a52e 100644 --- a/trench/backends/yubikey.py +++ b/trench/backends/yubikey.py @@ -1,3 +1,5 @@ +from typing import Optional + from django.utils.translation import gettext_lazy as _ import logging @@ -11,7 +13,7 @@ class YubiKeyMessageDispatcher(AbstractMessageDispatcher): - def dispatch_message(self) -> DispatchResponse: + def dispatch_message(self, url_name: Optional[str] = None) -> DispatchResponse: return SuccessfulDispatchResponse(details=_("Generate code using YubiKey")) def confirm_activation(self, code: str) -> None: diff --git a/trench/exceptions.py b/trench/exceptions.py index 00fd9ef5..d224c73e 100644 --- a/trench/exceptions.py +++ b/trench/exceptions.py @@ -42,6 +42,14 @@ def __init__(self) -> None: super().__init__(detail=_("OTP code not provided."), code="otp_code_missing") +class GetCodeFromApplicationException(MFAValidationError): + def __init__(self) -> None: + super().__init__( + detail=_("Get code from OTP application."), + code="get_code_from_application" + ) + + class MFAMethodDoesNotExistError(MFAValidationError): def __init__(self) -> None: super().__init__( diff --git a/trench/models.py b/trench/models.py index 0e741a29..f9cda32a 100644 --- a/trench/models.py +++ b/trench/models.py @@ -26,6 +26,12 @@ def get_by_name(self, user_id: Any, name: str) -> "MFAMethod": except self.model.DoesNotExist: raise MFAMethodDoesNotExistError() + def get_active_by_name(self, user_id: Any, name: str) -> "MFAMethod": + try: + return self.get(user_id=user_id, name=name, is_active=True) + except self.model.DoesNotExist: + raise MFAMethodDoesNotExistError() + def get_primary_active(self, user_id: Any) -> "MFAMethod": try: return self.get(user_id=user_id, is_primary=True, is_active=True) diff --git a/trench/views/base.py b/trench/views/base.py index 53a4dae8..33e9d227 100644 --- a/trench/views/base.py +++ b/trench/views/base.py @@ -46,7 +46,6 @@ from trench.settings import SOURCE_FIELD, trench_settings from trench.utils import available_method_choices, get_mfa_model, user_token_generator - User: AbstractUser = get_user_model() @@ -70,6 +69,7 @@ def post(self, request: Request) -> Response: ) except MFAValidationError as cause: return ErrorResponse(error=cause) + try: mfa_model = get_mfa_model() mfa_method = mfa_model.objects.get_primary_active(user_id=user.id) @@ -110,7 +110,10 @@ def post(request: Request, method: str) -> Response: user = request.user try: if source_field is not None and not hasattr(user, source_field): - raise MFASourceFieldDoesNotExistError(source_field, user.__class__.__name__) + raise MFASourceFieldDoesNotExistError( + source_field, + user.__class__.__name__ + ) mfa = create_mfa_method_command( user_id=user.id, @@ -224,8 +227,13 @@ def post(request: Request) -> Response: method = mfa_model.objects.get_primary_active_name( user_id=request.user.id ) - mfa = mfa_model.objects.get_by_name(user_id=request.user.id, name=method) - return get_mfa_handler(mfa_method=mfa).dispatch_message() + mfa = mfa_model.objects.get_active_by_name( + user_id=request.user.id, + name=method + ) + return get_mfa_handler(mfa_method=mfa).dispatch_message( + url_name=request.resolver_match.url_name + ) except MFAValidationError as cause: return ErrorResponse(error=cause) From cb7cf90599425d4e0b1df00c5a3ef610f50d5335 Mon Sep 17 00:00:00 2001 From: Wojciech Maciejewski Date: Tue, 29 Nov 2022 13:02:51 +0100 Subject: [PATCH 2/2] add tests --- .../tests/test_second_step_authentication.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/testproject/tests/test_second_step_authentication.py b/testproject/tests/test_second_step_authentication.py index aecf3b3b..c2e71a41 100644 --- a/testproject/tests/test_second_step_authentication.py +++ b/testproject/tests/test_second_step_authentication.py @@ -8,6 +8,7 @@ HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, + HTTP_422_UNPROCESSABLE_ENTITY, ) from rest_framework.test import APIClient from time import sleep @@ -518,6 +519,22 @@ def test_request_code_for_not_inactive_mfa_method(active_user_with_email_otp): assert response.status_code == HTTP_400_BAD_REQUEST assert response.data.get("error") == "Requested MFA method does not exist." +@flaky +@pytest.mark.django_db +def test_request_code_for_application_mfa_method(active_user_with_application_otp): + client = TrenchAPIClient() + mfa_method = active_user_with_application_otp.mfa_methods.first() + client.authenticate_multi_factor( + mfa_method=mfa_method, user=active_user_with_application_otp + ) + response = client.post( + path="/auth/code/request/", + data={"method": "app"}, + format="json", + ) + assert response.status_code == HTTP_422_UNPROCESSABLE_ENTITY + assert response.data.get("details") == "Get code from OTP application." + @flaky @pytest.mark.django_db