diff --git a/landoapi/hg.py b/landoapi/hg.py index 122614d9..28adb7f5 100644 --- a/landoapi/hg.py +++ b/landoapi/hg.py @@ -1,12 +1,14 @@ # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +from __future__ import annotations import copy from contextlib import contextmanager import configparser import logging import os from pathlib import Path +from re import search import shlex import shutil import tempfile @@ -17,6 +19,7 @@ import hglib from landoapi.hgexports import PatchHelper +from landoapi.validation import is_valid_email logger = logging.getLogger(__name__) @@ -353,10 +356,15 @@ def apply_patch(self, patch_io_buf): # --landing_system is provided by the set_landing_system hgext. date = patch_helper.header("Date") user = patch_helper.header("User") - if not user: raise ValueError("Missing `User` header!") + email = self.extract_email_from_username(user) + if not is_valid_email(email): + raise ValueError( + f"Invalid email ({email}) configured for Mercurial user!" + ) + if not date: raise ValueError("Missing `Date` header!") @@ -519,3 +527,16 @@ def read_checkout_file(self, path: str) -> str: with checkout_file_path.open() as f: return f.read() + + @staticmethod + def extract_email_from_username(username: str | bytes) -> str: + """Extracts an email from a Mercurial username, if it exists. + + Not guaranteed to return a valid email, make sure to validate.""" + email = search(r"<.*?>", str(username)) + if email: + return email.group(0).replace("<", "").replace(">", "") + + # If there is no value between angle brackets in the string, + # then there is no Mercurial email configured + return "" diff --git a/landoapi/validation.py b/landoapi/validation.py index 677671a4..84a66e91 100644 --- a/landoapi/validation.py +++ b/landoapi/validation.py @@ -35,3 +35,23 @@ def parse_landing_path(landing_path: list[dict]) -> list[tuple[int, int]]: f"The provided landing_path was malformed.\n{str(e)}", type="https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/400", ) + + +def is_valid_email(email: str) -> bool: + """Given a string, determines if it is a valid email. + + For the prefix, it will check for alphanumeric characters and acceptable + special characters (.-_), but still ensure an alphanumeric comes before + the @ symbol. + + For the suffix, it will check for an alphanumeric subdomain and accept hyphens. + It then checks the TLD to make sure it only contains alphabet characters with + a minimum length of two. + + Pattern modified from: + https://stackabuse.com/python-validate-email-address-with-regular-expressions-regex + """ + accepted_email_re = re.compile( + r"([A-Za-z\d\-_.])*[A-Za-z\d]+@[A-Za-z\d\-]+(\.[A-Z|a-z]{2,})+" + ) + return accepted_email_re.match(email) is not None diff --git a/tests/test_hg.py b/tests/test_hg.py index c4412c63..69cdc657 100644 --- a/tests/test_hg.py +++ b/tests/test_hg.py @@ -173,6 +173,37 @@ def test_integrated_hgrepo_can_log(hg_clone): +adding another line """.strip() +PATCH_NO_EMAIL = b""" +# HG changeset patch +# User Test User +# Date 0 0 +# Thu Jan 01 00:00:00 1970 +0000 +# Diff Start Line 7 +add another file. +diff --git a/test.txt b/test.txt +--- a/test.txt ++++ b/test.txt +@@ -1,1 +1,2 @@ + TEST ++adding another line +""".strip() + + +PATCH_INVALID_EMAIL = b""" +# HG changeset patch +# User Test User +# Date 0 0 +# Thu Jan 01 00:00:00 1970 +0000 +# Diff Start Line 7 +add another file. +diff --git a/test.txt b/test.txt +--- a/test.txt ++++ b/test.txt +@@ -1,1 +1,2 @@ + TEST ++adding another line +""".strip() + def test_integrated_hgrepo_apply_patch(hg_clone): repo = HgRepo(hg_clone.strpath) @@ -186,6 +217,14 @@ def test_integrated_hgrepo_apply_patch(hg_clone): with pytest.raises(PatchConflict), repo.for_pull(): repo.apply_patch(io.BytesIO(PATCH_WITH_CONFLICT)) + # Mercurial users with invalid emails raise an error + with pytest.raises(ValueError), repo.for_pull(): + repo.apply_patch(io.BytesIO(PATCH_INVALID_EMAIL)) + + # Mercurial users with no email configured raise an error + with pytest.raises(ValueError), repo.for_pull(): + repo.apply_patch(io.BytesIO(PATCH_NO_EMAIL)) + with repo.for_pull(): repo.apply_patch(io.BytesIO(PATCH_NORMAL)) # Commit created. @@ -251,3 +290,22 @@ def test_hgrepo_request_user(hg_clone): assert REQUEST_USER_ENV_VAR in os.environ assert os.environ[REQUEST_USER_ENV_VAR] == "test@example.com" assert REQUEST_USER_ENV_VAR not in os.environ + + +def test_extract_email(hg_clone): + repo = HgRepo(hg_clone.strpath) + + # Empty username + assert repo.extract_email_from_username("") == "" + + # Username without email + assert repo.extract_email_from_username("test user") == "" + + # Username with email + assert ( + repo.extract_email_from_username("test user ") + == "test@email.com" + ) + + # Username with invalid email + assert repo.extract_email_from_username("test ") == "test@test" diff --git a/tests/test_validation.py b/tests/test_validation.py index f9ab357d..2b523b57 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -4,7 +4,7 @@ import pytest from connexion import ProblemException -from landoapi.validation import revision_id_to_int +from landoapi.validation import revision_id_to_int, is_valid_email def test_convertion_success(): @@ -20,3 +20,28 @@ def test_convertion_failure_string(id): def test_convertion_failure_integer(): with pytest.raises(TypeError): revision_id_to_int(123) + + +def test_is_valid_email(): + invalid_emails = [ + "Test User", + "test ", + "test ", + "test@", + "-@...", + "test@mozilla.", + "test@.com", + ] + valid_emails = [ + "test@test.com", + "test-email@test.com", + "test_email@test.com", + "test@test-domain.com", + "test.name@test.co.uk", + "colombia@test.co", + "iceland@test.is", + "deutsch@test.de", + ] + + assert True not in [is_valid_email(value) for value in invalid_emails] + assert False not in [is_valid_email(value) for value in valid_emails]