diff --git a/AUTHORS b/AUTHORS index 2d3f80527..2d8d5465b 100644 --- a/AUTHORS +++ b/AUTHORS @@ -36,6 +36,7 @@ Bas van Oostveen Brian Helba Carl Schwan Cihad GUNDOGDU +Cristian Prigoana Daniel Golding Daniel 'Vector' Kerr Darrel O'Pry @@ -43,6 +44,7 @@ Dave Burkholder David Fischer David Hill David Smith +David Uzumaki Dawid Wolski Diego Garcia Dominik George diff --git a/CHANGELOG.md b/CHANGELOG.md index f2d44ba4f..1821a8ae1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * #1506 Support for Wildcard Origin and Redirect URIs - Adds a new setting [ALLOW_URL_WILDCARDS](https://django-oauth-toolkit.readthedocs.io/en/latest/settings.html#allow-uri-wildcards). This feature is useful for working with CI service such as cloudflare, netlify, and vercel that offer branch deployments for development previews and user acceptance testing. * #1586 Turkish language support added +* #1539 Add device authorization grant support ### Changed The project is now hosted in the django-oauth organization. diff --git a/docs/_images/application-register-device-code.png b/docs/_images/application-register-device-code.png new file mode 100644 index 000000000..4eac6d262 Binary files /dev/null and b/docs/_images/application-register-device-code.png differ diff --git a/docs/_images/device-approve-deny.png b/docs/_images/device-approve-deny.png new file mode 100644 index 000000000..dcff24b25 Binary files /dev/null and b/docs/_images/device-approve-deny.png differ diff --git a/docs/_images/device-enter-code-displayed.png b/docs/_images/device-enter-code-displayed.png new file mode 100644 index 000000000..201137ce3 Binary files /dev/null and b/docs/_images/device-enter-code-displayed.png differ diff --git a/docs/tutorial/tutorial.rst b/docs/tutorial/tutorial.rst index 5a0662507..140313673 100644 --- a/docs/tutorial/tutorial.rst +++ b/docs/tutorial/tutorial.rst @@ -9,4 +9,4 @@ Tutorials tutorial_03 tutorial_04 tutorial_05 - + tutorial_06 diff --git a/docs/tutorial/tutorial_06.rst b/docs/tutorial/tutorial_06.rst new file mode 100644 index 000000000..386e4ef39 --- /dev/null +++ b/docs/tutorial/tutorial_06.rst @@ -0,0 +1,126 @@ +Part 6 - Device authorization grant flow +==================================================== + +Scenario +-------- +In :doc:`Part 1 ` you created your own :term:`Authorization Server` and it's running along just fine. +You have devices that your users have, and those users need to authenticate the device against your +:term:`Authorization Server` in order to make the required API calls. + +Device Authorization +-------------------- +The OAuth 2.0 device authorization grant is designed for Internet +connected devices that either lack a browser to perform a user-agent +based authorization or are input-constrained to the extent that +requiring the user to input text in order to authenticate during the +authorization flow is impractical. It enables OAuth clients on such +devices (like smart TVs, media consoles, digital picture frames, and +printers) to obtain user authorization to access protected resources +by using a user agent on a separate device. + +Point your browser to `http://127.0.0.1:8000/o/applications/register/` to create an application. + +Fill the form as shown in the screenshot below, and before saving, take note of the ``Client id``. +Make sure the client type is set to "Public." There are cases where a confidential client makes sense, +but generally, it is assumed the device is unable to safely store the client secret. + +.. image:: ../_images/application-register-device-code.png + :alt: Device Authorization application registration + +Ensure the setting ``OAUTH_DEVICE_VERIFICATION_URI`` is set to a URI you want to return in the +`verification_uri` key in the response. This is what the device will display to the user. + +1. Navigate to the tests/app/idp directory: + +.. code-block:: sh + + cd tests/app/idp + +then start the server + +.. code-block:: sh + + python manage.py runserver + +.. _RFC: https://www.rfc-editor.org/rfc/rfc8628 +.. _RFC section 3.5: https://datatracker.ietf.org/doc/html/rfc8628#section-3.5 + +2. To initiate device authorization, send this request (in the real world, the device +makes this request). In `RFC`_ Figure 1, this is step (A). + +.. code-block:: sh + + curl --location 'http://127.0.0.1:8000/o/device-authorization/' \ + --header 'Content-Type: application/x-www-form-urlencoded' \ + --data-urlencode 'client_id={your application client id}' + +The OAuth2 provider will return the following response. In `RFC`_ Figure 1, this is step (B). + +.. code-block:: json + + { + "verification_uri": "http://127.0.0.1:8000/o/device", + "expires_in": 1800, + "user_code": "A32RVADM", + "device_code": "G30j94v0kNfipD4KmGLTWeL4eZnKHm", + "interval": 5 + } + +In the real world, the device will somehow make the value of the `user_code` available to the user (either on-screen display, +or Bluetooth, NFC, etc.). In `RFC`_ Figure 1, this is step (C). + +3. Go to `http://127.0.0.1:8000/o/device` in your browser. + +.. image:: ../_images/device-enter-code-displayed.png + +Enter the code, and it will redirect you to the device-confirm endpoint. In `RFC`_ Figure 1, this is step (D). + +Device-confirm endpoint +----------------------- +4. Device polling occurs concurrently while the user approves or denies the request. + +.. image:: ../_images/device-approve-deny.png + +Device polling +-------------- +Send the following request (in the real world, the device makes this request). In `RFC`_ Figure 1, this is step (E). + +.. code-block:: sh + + curl --location 'http://localhost:8000/o/token/' \ + --header 'Content-Type: application/x-www-form-urlencoded' \ + --data-urlencode 'device_code={the device code from the device-authorization response}' \ + --data-urlencode 'client_id={your application client id}' \ + --data-urlencode 'grant_type=urn:ietf:params:oauth:grant-type:device_code' + +In `RFC`_ Figure 1, there are multiple options for step (F), as per `RFC section 3.5`_. Until the user enters the code +in the browser and approves, the response will be 400: + +.. code-block:: json + + {"error": "authorization_pending"} + +Or if the user has denied the device, the response is 400: + +.. code-block:: json + + {"error": "access_denied"} + +Or if the token has expired, the response is 400: + +.. code-block:: json + + {"error": "expired_token"} + + +However, after the user approves, the response will be 200: + +.. code-block:: json + + { + "access_token": "SkJMgyL432P04nHDPyB63DEAM0nVxk", + "expires_in": 36000, + "token_type": "Bearer", + "scope": "openid", + "refresh_token": "Go6VumurDfFAeCeKrpCKPDtElV77id" + } diff --git a/oauth2_provider/migrations/0013_alter_application_authorization_grant_type_device.py b/oauth2_provider/migrations/0013_alter_application_authorization_grant_type_device.py new file mode 100644 index 000000000..99769c398 --- /dev/null +++ b/oauth2_provider/migrations/0013_alter_application_authorization_grant_type_device.py @@ -0,0 +1,41 @@ +# Generated by Django 5.1.5 on 2025-01-24 14:00 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('oauth2_provider', '0012_add_token_checksum'), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.AlterField( + model_name='application', + name='authorization_grant_type', + field=models.CharField(choices=[('authorization-code', 'Authorization code'), ('urn:ietf:params:oauth:grant-type:device_code', 'Device Code'), ('implicit', 'Implicit'), ('password', 'Resource owner password-based'), ('client-credentials', 'Client credentials'), ('openid-hybrid', 'OpenID connect hybrid')], max_length=44), + ), + migrations.CreateModel( + name='DeviceGrant', + fields=[ + ('id', models.BigAutoField(primary_key=True, serialize=False)), + ('device_code', models.CharField(max_length=100, unique=True)), + ('user_code', models.CharField(max_length=100)), + ('scope', models.CharField(max_length=64, null=True)), + ('interval', models.IntegerField(default=5)), + ('expires', models.DateTimeField()), + ('status', models.CharField(blank=True, choices=[('authorized', 'Authorized'), ('authorization-pending', 'Authorization pending'), ('expired', 'Expired'), ('denied', 'Denied')], default='authorization-pending', max_length=64)), + ('client_id', models.CharField(db_index=True, max_length=100)), + ('last_checked', models.DateTimeField(auto_now=True)), + ('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='%(app_label)s_%(class)s', to=settings.AUTH_USER_MODEL)), + ], + options={ + 'abstract': False, + 'swappable': 'OAUTH2_PROVIDER_DEVICE_GRANT_MODEL', + 'constraints': [models.UniqueConstraint(fields=('device_code',), name='oauth2_provider_devicegrant_unique_device_code')], + }, + ), + ] diff --git a/oauth2_provider/models.py b/oauth2_provider/models.py index a76db37c0..523ade289 100644 --- a/oauth2_provider/models.py +++ b/oauth2_provider/models.py @@ -3,7 +3,10 @@ import time import uuid from contextlib import suppress -from datetime import timedelta +from dataclasses import dataclass +from datetime import datetime, timedelta +from datetime import timezone as dt_timezone +from typing import Callable, Optional, Union from urllib.parse import parse_qsl, urlparse from django.apps import apps @@ -86,12 +89,14 @@ class AbstractApplication(models.Model): ) GRANT_AUTHORIZATION_CODE = "authorization-code" + GRANT_DEVICE_CODE = "urn:ietf:params:oauth:grant-type:device_code" GRANT_IMPLICIT = "implicit" GRANT_PASSWORD = "password" GRANT_CLIENT_CREDENTIALS = "client-credentials" GRANT_OPENID_HYBRID = "openid-hybrid" GRANT_TYPES = ( (GRANT_AUTHORIZATION_CODE, _("Authorization code")), + (GRANT_DEVICE_CODE, _("Device Code")), (GRANT_IMPLICIT, _("Implicit")), (GRANT_PASSWORD, _("Resource owner password-based")), (GRANT_CLIENT_CREDENTIALS, _("Client credentials")), @@ -127,7 +132,7 @@ class AbstractApplication(models.Model): default="", ) client_type = models.CharField(max_length=32, choices=CLIENT_TYPES) - authorization_grant_type = models.CharField(max_length=32, choices=GRANT_TYPES) + authorization_grant_type = models.CharField(max_length=44, choices=GRANT_TYPES) client_secret = ClientSecretField( max_length=255, blank=True, @@ -650,11 +655,109 @@ class Meta(AbstractIDToken.Meta): swappable = "OAUTH2_PROVIDER_ID_TOKEN_MODEL" +class AbstractDeviceGrant(models.Model): + class Meta: + abstract = True + constraints = [ + models.UniqueConstraint( + fields=["device_code"], + name="%(app_label)s_%(class)s_unique_device_code", + ), + ] + + AUTHORIZED = "authorized" + AUTHORIZATION_PENDING = "authorization-pending" + EXPIRED = "expired" + DENIED = "denied" + + DEVICE_FLOW_STATUS = ( + (AUTHORIZED, _("Authorized")), + (AUTHORIZATION_PENDING, _("Authorization pending")), + (EXPIRED, _("Expired")), + (DENIED, _("Denied")), + ) + + id = models.BigAutoField(primary_key=True) + user = models.ForeignKey( + settings.AUTH_USER_MODEL, + related_name="%(app_label)s_%(class)s", + null=True, + blank=True, + on_delete=models.CASCADE, + ) + device_code = models.CharField(max_length=100, unique=True) + user_code = models.CharField(max_length=100) + scope = models.CharField(max_length=64, null=True) + interval = models.IntegerField(default=5) + expires = models.DateTimeField() + status = models.CharField( + max_length=64, blank=True, choices=DEVICE_FLOW_STATUS, default=AUTHORIZATION_PENDING + ) + client_id = models.CharField(max_length=100, db_index=True) + last_checked = models.DateTimeField(auto_now=True) + + def is_expired(self): + """ + Check device flow session expiration and set the status to "expired" if current time + is past the "expires" deadline. + """ + if self.status == self.EXPIRED: + return True + + now = datetime.now(tz=dt_timezone.utc) + if now >= self.expires: + self.status = self.EXPIRED + self.save(update_fields=["status"]) + return True + + return False + + +class DeviceGrant(AbstractDeviceGrant): + class Meta(AbstractDeviceGrant.Meta): + swappable = "OAUTH2_PROVIDER_DEVICE_GRANT_MODEL" + + +@dataclass +class DeviceRequest: + # https://datatracker.ietf.org/doc/html/rfc8628#section-3.1 + # scope is optional + client_id: str + scope: Optional[str] = None + + +@dataclass +class DeviceCodeResponse: + verification_uri: str + expires_in: int + user_code: int + device_code: str + interval: int + verification_uri_complete: Optional[Union[str, Callable]] = None + + +def create_device_grant(device_request: DeviceRequest, device_response: DeviceCodeResponse) -> DeviceGrant: + now = datetime.now(tz=dt_timezone.utc) + + return DeviceGrant.objects.create( + client_id=device_request.client_id, + device_code=device_response.device_code, + user_code=device_response.user_code, + scope=device_request.scope, + expires=now + timedelta(seconds=device_response.expires_in), + ) + + def get_application_model(): """Return the Application model that is active in this project.""" return apps.get_model(oauth2_settings.APPLICATION_MODEL) +def get_device_grant_model(): + """Return the DeviceGrant model that is active in this project.""" + return apps.get_model(oauth2_settings.DEVICE_GRANT_MODEL) + + def get_grant_model(): """Return the Grant model that is active in this project.""" return apps.get_model(oauth2_settings.GRANT_MODEL) diff --git a/oauth2_provider/oauth2_backends.py b/oauth2_provider/oauth2_backends.py index 3ddb9c90b..accd9d3f8 100644 --- a/oauth2_provider/oauth2_backends.py +++ b/oauth2_provider/oauth2_backends.py @@ -1,6 +1,7 @@ import json from urllib.parse import urlparse, urlunparse +from django.http import HttpRequest from oauthlib import oauth2 from oauthlib.common import Request as OauthlibRequest from oauthlib.common import quote, urlencode, urlencoded @@ -75,6 +76,8 @@ def extract_headers(self, request): del headers["wsgi.errors"] if "HTTP_AUTHORIZATION" in headers: headers["Authorization"] = headers["HTTP_AUTHORIZATION"] + if "CONTENT_TYPE" in headers: + headers["Content-Type"] = headers["CONTENT_TYPE"] # Add Access-Control-Allow-Origin header to the token endpoint response for authentication code grant, # if the origin is allowed by RequestValidator.is_origin_allowed. # https://github.com/oauthlib/oauthlib/pull/791 @@ -148,6 +151,16 @@ def create_authorization_response(self, request, scopes, credentials, allow): except oauth2.OAuth2Error as error: raise OAuthToolkitError(error=error, redirect_uri=credentials["redirect_uri"]) + def create_device_authorization_response(self, request: HttpRequest): + uri, http_method, body, headers = self._extract_params(request) + try: + headers, body, status = self.server.create_device_authorization_response( + uri, http_method, body, headers + ) + return headers, body, status + except OAuth2Error as exc: + return exc.headers, exc.json, exc.status_code + def create_token_response(self, request): """ A wrapper method that calls create_token_response on `server_class` instance. diff --git a/oauth2_provider/oauth2_validators.py b/oauth2_provider/oauth2_validators.py index db459a446..ec974b0c6 100644 --- a/oauth2_provider/oauth2_validators.py +++ b/oauth2_provider/oauth2_validators.py @@ -52,10 +52,12 @@ "client_credentials": (AbstractApplication.GRANT_CLIENT_CREDENTIALS,), "refresh_token": ( AbstractApplication.GRANT_AUTHORIZATION_CODE, + AbstractApplication.GRANT_DEVICE_CODE, AbstractApplication.GRANT_PASSWORD, AbstractApplication.GRANT_CLIENT_CREDENTIALS, AbstractApplication.GRANT_OPENID_HYBRID, ), + "urn:ietf:params:oauth:grant-type:device_code": (AbstractApplication.GRANT_DEVICE_CODE,), } Application = get_application_model() @@ -166,6 +168,11 @@ def _authenticate_basic_auth(self, request): elif request.client.client_id != client_id: log.debug("Failed basic auth: wrong client id %s" % client_id) return False + elif ( + request.client.client_type == "public" + and request.grant_type == "urn:ietf:params:oauth:grant-type:device_code" + ): + return True elif not self._check_secret(client_secret, request.client.client_secret): log.debug("Failed basic auth: wrong client secret %s" % client_secret) return False @@ -191,6 +198,11 @@ def _authenticate_request_body(self, request): if self._load_application(client_id, request) is None: log.debug("Failed body auth: Application %s does not exists" % client_id) return False + elif ( + request.client.client_type == "public" + and request.grant_type == "urn:ietf:params:oauth:grant-type:device_code" + ): + return True elif not self._check_secret(client_secret, request.client.client_secret): log.debug("Failed body auth: wrong client secret %s" % client_secret) return False diff --git a/oauth2_provider/settings.py b/oauth2_provider/settings.py index 9771aa4e7..216f36ba8 100644 --- a/oauth2_provider/settings.py +++ b/oauth2_provider/settings.py @@ -24,10 +24,13 @@ from django.utils.module_loading import import_string from oauthlib.common import Request +from oauth2_provider.utils import set_oauthlib_user_to_device_request_user, user_code_generator + USER_SETTINGS = getattr(settings, "OAUTH2_PROVIDER", None) APPLICATION_MODEL = getattr(settings, "OAUTH2_PROVIDER_APPLICATION_MODEL", "oauth2_provider.Application") +DEVICE_GRANT_MODEL = getattr(settings, "OAUTH2_PROVIDER_DEVICE_GRANT_MODEL", "oauth2_provider.DeviceGrant") ACCESS_TOKEN_MODEL = getattr(settings, "OAUTH2_PROVIDER_ACCESS_TOKEN_MODEL", "oauth2_provider.AccessToken") ID_TOKEN_MODEL = getattr(settings, "OAUTH2_PROVIDER_ID_TOKEN_MODEL", "oauth2_provider.IDToken") GRANT_MODEL = getattr(settings, "OAUTH2_PROVIDER_GRANT_MODEL", "oauth2_provider.Grant") @@ -39,6 +42,10 @@ "CLIENT_SECRET_GENERATOR_LENGTH": 128, "CLIENT_SECRET_HASHER": "default", "ACCESS_TOKEN_GENERATOR": None, + "OAUTH_DEVICE_VERIFICATION_URI": None, + "OAUTH_DEVICE_VERIFICATION_URI_COMPLETE": None, + "OAUTH_DEVICE_USER_CODE_GENERATOR": user_code_generator, + "OAUTH_PRE_TOKEN_VALIDATION": [set_oauthlib_user_to_device_request_user], "REFRESH_TOKEN_GENERATOR": None, "EXTRA_SERVER_KWARGS": {}, "OAUTH2_SERVER_CLASS": "oauthlib.oauth2.Server", @@ -61,6 +68,8 @@ "APPLICATION_MODEL": APPLICATION_MODEL, "ACCESS_TOKEN_MODEL": ACCESS_TOKEN_MODEL, "ID_TOKEN_MODEL": ID_TOKEN_MODEL, + "DEVICE_GRANT_MODEL": DEVICE_GRANT_MODEL, + "DEVICE_FLOW_INTERVAL": 5, "GRANT_MODEL": GRANT_MODEL, "REFRESH_TOKEN_MODEL": REFRESH_TOKEN_MODEL, "APPLICATION_ADMIN_CLASS": "oauth2_provider.admin.ApplicationAdmin", @@ -268,6 +277,11 @@ def server_kwargs(self): ("refresh_token_expires_in", "REFRESH_TOKEN_EXPIRE_SECONDS"), ("token_generator", "ACCESS_TOKEN_GENERATOR"), ("refresh_token_generator", "REFRESH_TOKEN_GENERATOR"), + ("verification_uri", "OAUTH_DEVICE_VERIFICATION_URI"), + ("verification_uri_complete", "OAUTH_DEVICE_VERIFICATION_URI_COMPLETE"), + ("interval", "DEVICE_FLOW_INTERVAL"), + ("user_code_generator", "OAUTH_DEVICE_USER_CODE_GENERATOR"), + ("pre_token", "OAUTH_PRE_TOKEN_VALIDATION"), ] } kwargs.update(self.EXTRA_SERVER_KWARGS) diff --git a/oauth2_provider/templates/oauth2_provider/device/accept_deny.html b/oauth2_provider/templates/oauth2_provider/device/accept_deny.html new file mode 100644 index 000000000..4fd31a6fb --- /dev/null +++ b/oauth2_provider/templates/oauth2_provider/device/accept_deny.html @@ -0,0 +1,16 @@ +{% extends "oauth2_provider/base.html" %} +{% block content %} + + + Accept or Deny + + +

Please choose an action:

+
+ {% csrf_token %} + + +
+ + +{% endblock content %} diff --git a/oauth2_provider/templates/oauth2_provider/device/device_grant_status.html b/oauth2_provider/templates/oauth2_provider/device/device_grant_status.html new file mode 100644 index 000000000..f2f0a6292 --- /dev/null +++ b/oauth2_provider/templates/oauth2_provider/device/device_grant_status.html @@ -0,0 +1,11 @@ +{% extends "oauth2_provider/base.html" %} +{% block content %} + + + Device + + +

Device {{ object.get_status_display }}

+ + +{% endblock content %} diff --git a/oauth2_provider/templates/oauth2_provider/device/user_code.html b/oauth2_provider/templates/oauth2_provider/device/user_code.html new file mode 100644 index 000000000..774b95897 --- /dev/null +++ b/oauth2_provider/templates/oauth2_provider/device/user_code.html @@ -0,0 +1,16 @@ +{% extends "oauth2_provider/base.html" %} +{% block content %} + + + Device code + + +

Enter code displayed on device

+
+ {% csrf_token %} + {{ form.as_p }} + +
+ + +{% endblock content %} diff --git a/oauth2_provider/urls.py b/oauth2_provider/urls.py index 155822f45..ea974e045 100644 --- a/oauth2_provider/urls.py +++ b/oauth2_provider/urls.py @@ -11,6 +11,18 @@ path("token/", views.TokenView.as_view(), name="token"), path("revoke_token/", views.RevokeTokenView.as_view(), name="revoke-token"), path("introspect/", views.IntrospectTokenView.as_view(), name="introspect"), + path("device-authorization/", views.DeviceAuthorizationView.as_view(), name="device-authorization"), + path("device/", views.DeviceUserCodeView.as_view(), name="device"), + path( + "device-confirm//", + views.DeviceConfirmView.as_view(), + name="device-confirm", + ), + path( + "device-grant-status//", + views.DeviceGrantStatusView.as_view(), + name="device-grant-status", + ), ] diff --git a/oauth2_provider/utils.py b/oauth2_provider/utils.py index 3f48723c5..a009d8a0e 100644 --- a/oauth2_provider/utils.py +++ b/oauth2_provider/utils.py @@ -1,7 +1,9 @@ import functools +import random from django.conf import settings from jwcrypto import jwk +from oauthlib.common import Request @functools.lru_cache() @@ -32,3 +34,69 @@ def get_timezone(time_zone): return pytz.timezone(time_zone) return zoneinfo.ZoneInfo(time_zone) + + +def user_code_generator(user_code_length: int = 8) -> str: + """ + Recommended user code that retains enough entropy but doesn't + ruin the user experience of typing the code in. + + the below is based off: + https://datatracker.ietf.org/doc/html/rfc8628#section-5.1 + but with added explanation as to where 34.5 bits of entropy is coming from + + entropy (in bits) = length of user code * log2(length of set of chars) + e = 8 * log2(20) + e = 34.5 + + log2(20) is used here to say "you can make 20 yes/no decisions per user code single input character". + + _ _ _ _ - _ _ _ _ = 20^8 ~= 2^35.5 + * + + * you have 20 choices of chars to choose from (20 yes no decisions) + and so on for the other 7 spaces + + in english this means an attacker would need to try + 2^34.5 unique combinations to exhaust all possibilities. + however with a user code only being valid for 30 seconds + and rate limiting, a brute force attack is extremely unlikely + to work + + for our function we'll be using a base 32 character set + """ + if user_code_length < 1: + raise ValueError("user_code_length needs to be greater than 0") + + # base32 character space + character_space = "0123456789ABCDEFGHIJKLMNOPQRSTUV" + + # being explicit with length + user_code = [""] * user_code_length + + for i in range(user_code_length): + user_code[i] = random.choice(character_space) + + return "".join(user_code) + + +def set_oauthlib_user_to_device_request_user(request: Request) -> None: + """ + The user isn't known when the device flow is initiated by a device. + All we know is the client_id. + + However, when the user logins in order to submit the user code + from the device we now know which user is trying to authenticate + their device. We update the device user field at this point + and save it in the db. + + This function is added to the pre_token stage during the device code grant's + create_token_response where we have the oauthlib Request object which is what's used + to populate the user field in the device model + """ + # Since this function is used in the settings module, it will lead to circular imports + # since django isn't fully initialised yet when settings run + from oauth2_provider.models import DeviceGrant, get_device_grant_model + + device: DeviceGrant = get_device_grant_model().objects.get(device_code=request._params["device_code"]) + request.user = device.user diff --git a/oauth2_provider/views/__init__.py b/oauth2_provider/views/__init__.py index 9e32e17d8..24022f55e 100644 --- a/oauth2_provider/views/__init__.py +++ b/oauth2_provider/views/__init__.py @@ -17,3 +17,4 @@ from .introspect import IntrospectTokenView from .oidc import ConnectDiscoveryInfoView, JwksInfoView, RPInitiatedLogoutView, UserInfoView from .token import AuthorizedTokenDeleteView, AuthorizedTokensListView +from .device import DeviceAuthorizationView, DeviceUserCodeView, DeviceConfirmView, DeviceGrantStatusView diff --git a/oauth2_provider/views/base.py b/oauth2_provider/views/base.py index c5c904b14..43c8e3213 100644 --- a/oauth2_provider/views/base.py +++ b/oauth2_provider/views/base.py @@ -3,6 +3,7 @@ import logging from urllib.parse import parse_qsl, urlencode, urlparse +from django import http from django.contrib.auth.mixins import LoginRequiredMixin from django.contrib.auth.views import redirect_to_login from django.http import HttpResponse @@ -12,6 +13,9 @@ from django.views.decorators.csrf import csrf_exempt from django.views.decorators.debug import sensitive_post_parameters from django.views.generic import FormView, View +from oauthlib.oauth2.rfc8628 import errors as rfc8628_errors + +from oauth2_provider.models import DeviceGrant from ..compat import login_not_required from ..exceptions import OAuthToolkitError @@ -290,10 +294,13 @@ class TokenView(OAuthLibMixin, View): * Authorization code * Password * Client credentials + * Device code flow (specifically for the device polling stage) """ @method_decorator(sensitive_post_parameters("password", "client_secret")) - def post(self, request, *args, **kwargs): + def authorization_flow_token_response( + self, request: http.HttpRequest, *args, **kwargs + ) -> http.HttpResponse: url, headers, body, status = self.create_token_response(request) if status == 200: access_token = json.loads(body).get("access_token") @@ -307,6 +314,68 @@ def post(self, request, *args, **kwargs): response[k] = v return response + def device_flow_token_response( + self, request: http.HttpRequest, device_code: str, *args, **kwargs + ) -> http.HttpResponse: + try: + device = DeviceGrant.objects.get(device_code=device_code) + except DeviceGrant.DoesNotExist: + # The RFC does not mention what to return when the device is not found, + # but to keep it consistent with the other errors, we return the error + # in json format with an "error" key and the value formatted in the same + # way. + return http.HttpResponseNotFound( + content='{"error": "device_not_found"}', + content_type="application/json", + ) + + # Here we are returning the errors according to + # https://datatracker.ietf.org/doc/html/rfc8628#section-3.5 + # TODO: "slow_down" error (essentially rate-limiting). + if device.status == device.AUTHORIZATION_PENDING: + error = rfc8628_errors.AuthorizationPendingError() + elif device.status == device.DENIED: + error = rfc8628_errors.AccessDenied() + elif device.status == device.EXPIRED: + error = rfc8628_errors.ExpiredTokenError() + elif device.status != device.AUTHORIZED: + # It's technically impossible to get here because we've exhausted + # all the possible values for status. However, it does act as a + # reminder for developers when they add, in the future, new values + # (such as slow_down) that they must handle here. + return http.HttpResponseServerError( + content='{"error": "internal_error"}', + content_type="application/json", + ) + else: + # AUTHORIZED is the only accepted state, anything else is + # rejected. + error = None + + if error: + return http.HttpResponse( + content=error.json, + status=error.status_code, + content_type="application/json", + ) + + url, headers, body, status = self.create_token_response(request) + response = http.JsonResponse(data=json.loads(body), status=status) + + if status != 200: + return response + + for k, v in headers.items(): + response[k] = v + + return response + + def post(self, request: http.HttpRequest, *args, **kwargs) -> http.HttpResponse: + params = request.POST + if params.get("grant_type") == "urn:ietf:params:oauth:grant-type:device_code": + return self.device_flow_token_response(request, params["device_code"]) + return self.authorization_flow_token_response(request) + @method_decorator(csrf_exempt, name="dispatch") @method_decorator(login_not_required, name="dispatch") diff --git a/oauth2_provider/views/device.py b/oauth2_provider/views/device.py new file mode 100644 index 000000000..f3dccf2ba --- /dev/null +++ b/oauth2_provider/views/device.py @@ -0,0 +1,196 @@ +import json + +from django import forms, http +from django.contrib.auth.mixins import LoginRequiredMixin +from django.core.exceptions import ValidationError +from django.shortcuts import get_object_or_404 +from django.urls import reverse +from django.utils.decorators import method_decorator +from django.views.decorators.csrf import csrf_exempt +from django.views.generic import DetailView, FormView, View +from oauthlib.oauth2 import DeviceApplicationServer + +from oauth2_provider.compat import login_not_required +from oauth2_provider.models import ( + DeviceCodeResponse, + DeviceGrant, + DeviceRequest, + create_device_grant, + get_device_grant_model, +) +from oauth2_provider.views.mixins import OAuthLibMixin + + +@method_decorator(csrf_exempt, name="dispatch") +@method_decorator(login_not_required, name="dispatch") +class DeviceAuthorizationView(OAuthLibMixin, View): + server_class = DeviceApplicationServer + + def post(self, request, *args, **kwargs): + headers, response, status = self.create_device_authorization_response(request) + + if status != 200: + return http.JsonResponse(data=json.loads(response), status=status, headers=headers) + + device_request = DeviceRequest(client_id=request.POST["client_id"], scope=request.POST.get("scope")) + device_response = DeviceCodeResponse(**response) + create_device_grant(device_request, device_response) + + return http.JsonResponse(data=response, status=status, headers=headers) + + +class DeviceGrantForm(forms.Form): + user_code = forms.CharField(required=True) + + def clean_user_code(self): + """ + Performs validation on the user_code provided by the user and adds to the cleaned_data dict + the "device_grant" object associated with the user_code, which is useful to process the + response in the DeviceUserCodeView. + + It can raise one of the following ValidationErrors, with the associated codes: + + * incorrect_user_code: if a device grant associated with the user_code does not exist + * expired_user_code: if the device grant associated with the user_code has expired + * user_code_already_used: if the device grant associated with the user_code has been already + approved or denied. The only accepted state of the device grant is AUTHORIZATION_PENDING. + """ + cleaned_data = super().clean() + user_code: str = cleaned_data["user_code"] + try: + device_grant: DeviceGrant = get_device_grant_model().objects.get(user_code=user_code) + except DeviceGrant.DoesNotExist: + raise ValidationError("Incorrect user code", code="incorrect_user_code") + + if device_grant.is_expired(): + raise ValidationError("Expired user code", code="expired_user_code") + + # User of device has already made their decision for this device. + if device_grant.status != device_grant.AUTHORIZATION_PENDING: + raise ValidationError("User code has already been used", code="user_code_already_used") + + # Make the device_grant available to the View, saving one additional db call. + cleaned_data["device_grant"] = device_grant + + return user_code + + +class DeviceUserCodeView(LoginRequiredMixin, FormView): + """ + The view where the user is instructed (by the device) to come to in order to + enter the user code. More details in this section of the RFC: + https://datatracker.ietf.org/doc/html/rfc8628#section-3.3 + + Note: it's common to see in other implementations of this RFC that only ask the + user to sign in after they input the user code but since the user has to be signed + in regardless, to approve the device login we're making the decision here, for + simplicity, to require being logged in up front. + """ + + template_name = "oauth2_provider/device/user_code.html" + form_class = DeviceGrantForm + + def get_success_url(self): + return reverse( + "oauth2_provider:device-confirm", + kwargs={ + "client_id": self.device_grant.client_id, + "user_code": self.device_grant.user_code, + }, + ) + + def form_valid(self, form): + """ + Sets the device_grant on the instance so that it can be accessed + in get_success_url. It comes in handy when users want to overwrite + get_success_url, redirecting to the URL with the URL params pointing + to the current device. + """ + device_grant: DeviceGrant = form.cleaned_data["device_grant"] + + device_grant.user = self.request.user + device_grant.save(update_fields=["user"]) + + self.device_grant = device_grant + + return super().form_valid(form) + + +class DeviceConfirmForm(forms.Form): + """ + Simple form for the user to approve or deny the device. + """ + + action = forms.CharField(required=True) + + +class DeviceConfirmView(LoginRequiredMixin, FormView): + """ + The view where the user approves or denies a device. + """ + + template_name = "oauth2_provider/device/accept_deny.html" + form_class = DeviceConfirmForm + + def get_object(self): + """ + Returns the DeviceGrant object in the AUTHORIZATION_PENDING state identified + by the slugs client_id and user_code. Raises Http404 if not found. + """ + client_id, user_code = self.kwargs.get("client_id"), self.kwargs.get("user_code") + return get_object_or_404( + DeviceGrant, + client_id=client_id, + user_code=user_code, + status=DeviceGrant.AUTHORIZATION_PENDING, + ) + + def get_success_url(self): + return reverse( + "oauth2_provider:device-grant-status", + kwargs={ + "client_id": self.kwargs["client_id"], + "user_code": self.kwargs["user_code"], + }, + ) + + def get(self, request, *args, **kwargs): + """ + Enable GET requests for improved user experience. But validate that the URL params + are correct (i.e. there exists a device grant in the db that corresponds to the URL + params) by calling .get_object() + """ + _ = self.get_object() # raises 404 if URL parameters are incorrect + return super().get(request, args, kwargs) + + def form_valid(self, form): + """ + Uses get_object() to retrieves the DeviceGrant object and updates its state + to authorized or denied, based on the user input. + """ + device = self.get_object() + action = form.cleaned_data["action"] + + if action == "accept": + device.status = device.AUTHORIZED + device.save(update_fields=["status"]) + return super().form_valid(form) + elif action == "deny": + device.status = device.DENIED + device.save(update_fields=["status"]) + return super().form_valid(form) + else: + return http.HttpResponseBadRequest() + + +class DeviceGrantStatusView(LoginRequiredMixin, DetailView): + """ + The view to display the status of a DeviceGrant. + """ + + model = DeviceGrant + template_name = "oauth2_provider/device/device_grant_status.html" + + def get_object(self): + client_id, user_code = self.kwargs.get("client_id"), self.kwargs.get("user_code") + return get_object_or_404(DeviceGrant, client_id=client_id, user_code=user_code) diff --git a/oauth2_provider/views/mixins.py b/oauth2_provider/views/mixins.py index 203d0103b..be2a77e8d 100644 --- a/oauth2_provider/views/mixins.py +++ b/oauth2_provider/views/mixins.py @@ -2,7 +2,7 @@ from django.conf import settings from django.core.exceptions import ImproperlyConfigured, SuspiciousOperation -from django.http import HttpResponseForbidden, HttpResponseNotFound +from django.http import HttpRequest, HttpResponseForbidden, HttpResponseNotFound from ..exceptions import FatalClientError from ..scopes import get_scopes_backend @@ -114,6 +114,15 @@ def create_authorization_response(self, request, scopes, credentials, allow): core = self.get_oauthlib_core() return core.create_authorization_response(request, scopes, credentials, allow) + def create_device_authorization_response(self, request: HttpRequest): + """ + A wrapper method that calls create_device_authorization_response on `server_class` + instance. + :param request: The current django.http.HttpRequest object + """ + core = self.get_oauthlib_core() + return core.create_device_authorization_response(request) + def create_token_response(self, request): """ A wrapper method that calls create_token_response on `server_class` instance. diff --git a/pyproject.toml b/pyproject.toml index 27bdfb585..a9ade3b7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ classifiers = [ dependencies = [ "django >= 4.2", "requests >= 2.13.0", - "oauthlib >= 3.2.2", + "oauthlib >= 3.3.0", "jwcrypto >= 1.5.0", ] diff --git a/tests/app/README.md b/tests/app/README.md index a2632b262..a0e279122 100644 --- a/tests/app/README.md +++ b/tests/app/README.md @@ -29,9 +29,9 @@ password: password You can update data in the IDP and then dump the data to a new seed file as follows. - ``` +``` python -Xutf8 ./manage.py dumpdata -e sessions -e admin.logentry -e auth.permission -e contenttypes.contenttype -e oauth2_provider.accesstoken -e oauth2_provider.refreshtoken -e oauth2_provider.idtoken --natural-foreign --natural-primary --indent 2 > fixtures/seed.json - ``` +``` ## /test/app/rp diff --git a/tests/app/idp/idp/settings.py b/tests/app/idp/idp/settings.py index eee20982e..679407604 100644 --- a/tests/app/idp/idp/settings.py +++ b/tests/app/idp/idp/settings.py @@ -15,6 +15,8 @@ import environ +from oauth2_provider.utils import set_oauthlib_user_to_device_request_user, user_code_generator + # Build paths inside the project like this: BASE_DIR / 'subdir'. BASE_DIR = Path(__file__).resolve().parent.parent @@ -199,6 +201,10 @@ OAUTH2_PROVIDER = { "OAUTH2_VALIDATOR_CLASS": "idp.oauth.CustomOAuth2Validator", + "OAUTH_DEVICE_VERIFICATION_URI": "http://127.0.0.1:8000/o/device", + "OAUTH_PRE_TOKEN_VALIDATION": [set_oauthlib_user_to_device_request_user], + "OAUTH_DEVICE_USER_CODE_GENERATOR": user_code_generator, + "OAUTH_DEVICE_VERIFICATION_URI_COMPLETE": lambda x: f"http://127.0.0.1:8000/o/device?user_code={x}", "OIDC_ENABLED": env("OAUTH2_PROVIDER_OIDC_ENABLED"), "OIDC_RP_INITIATED_LOGOUT_ENABLED": env("OAUTH2_PROVIDER_OIDC_RP_INITIATED_LOGOUT_ENABLED"), # this key is just for out test app, you should never store a key like this in a production environment. diff --git a/tests/app/idp/requirements.txt b/tests/app/idp/requirements.txt index f607463d7..f8d653aba 100644 --- a/tests/app/idp/requirements.txt +++ b/tests/app/idp/requirements.txt @@ -1,5 +1,5 @@ Django>=4.2,<=5.2 -django-cors-headers==3.14.0 -django-environ==0.11.2 +django-cors-headers==4.6.0 +django-environ==0.12.0 -e ../../../ diff --git a/tests/app/idp/templates/device/accept_deny.html b/tests/app/idp/templates/device/accept_deny.html new file mode 100644 index 000000000..4fd31a6fb --- /dev/null +++ b/tests/app/idp/templates/device/accept_deny.html @@ -0,0 +1,16 @@ +{% extends "oauth2_provider/base.html" %} +{% block content %} + + + Accept or Deny + + +

Please choose an action:

+
+ {% csrf_token %} + + +
+ + +{% endblock content %} diff --git a/tests/app/idp/templates/device/user_code.html b/tests/app/idp/templates/device/user_code.html new file mode 100644 index 000000000..774b95897 --- /dev/null +++ b/tests/app/idp/templates/device/user_code.html @@ -0,0 +1,16 @@ +{% extends "oauth2_provider/base.html" %} +{% block content %} + + + Device code + + +

Enter code displayed on device

+
+ {% csrf_token %} + {{ form.as_p }} + +
+ + +{% endblock content %} diff --git a/tests/test_device.py b/tests/test_device.py new file mode 100644 index 000000000..8c2d75977 --- /dev/null +++ b/tests/test_device.py @@ -0,0 +1,696 @@ +from datetime import datetime, timedelta, timezone +from unittest import mock +from urllib.parse import urlencode + +import django.http.response +import pytest +from django import http +from django.conf import settings +from django.contrib.auth import get_user_model +from django.test import RequestFactory +from django.urls import reverse + +import oauth2_provider.models +from oauth2_provider.models import ( + get_access_token_model, + get_application_model, + get_device_grant_model, + get_refresh_token_model, +) +from oauth2_provider.utils import set_oauthlib_user_to_device_request_user + +from . import presets +from .common_testing import OAuth2ProviderTestCase as TestCase + + +Application = get_application_model() +AccessToken = get_access_token_model() +RefreshToken = get_refresh_token_model() +UserModel = get_user_model() +DeviceModel: oauth2_provider.models.DeviceGrant = get_device_grant_model() + + +@pytest.mark.usefixtures("oauth2_settings") +@pytest.mark.oauth2_settings(presets.DEFAULT_SCOPES_RW) +class DeviceFlowBaseTestCase(TestCase): + factory = RequestFactory() + + @classmethod + def setUpTestData(cls): + cls.test_user = UserModel.objects.create_user("test_user", "test@example.com", "123456") + cls.dev_user = UserModel.objects.create_user("dev_user", "dev@example.com", "123456") + + cls.application = Application.objects.create( + name="test_client_credentials_app", + user=cls.dev_user, + client_type=Application.CLIENT_PUBLIC, + authorization_grant_type=Application.GRANT_DEVICE_CODE, + client_secret="abcdefghijklmnopqrstuvwxyz1234567890", + ) + + def tearDown(self): + DeviceModel.objects.all().delete() + return super().tearDown() + + +class TestDeviceFlow(DeviceFlowBaseTestCase): + """ + The first 2 tests test the device flow in order + how the device flow works + """ + + @mock.patch( + "oauthlib.oauth2.rfc8628.endpoints.device_authorization.generate_token", + lambda: "abc", + ) + def test_device_flow_authorization_initiation(self): + """ + Tests the initial stage of the flow when the device sends its device authorization + request to the authorization server. + + Device Authorization Request(https://datatracker.ietf.org/doc/html/rfc8628#section-3.1) + + This request shape: + POST /device_authorization HTTP/1.1 + Host: server.example.com + Content-Type: application/x-www-form-urlencoded + + client_id=1406020730&scope=example_scope + + Should respond with this response shape: + Device Authorization Response (https://datatracker.ietf.org/doc/html/rfc8628#section-3.2) + { + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "expires_in": 1800, + "interval": 5 + } + """ + + self.oauth2_settings.OAUTH_DEVICE_VERIFICATION_URI = "example.com/device" + self.oauth2_settings.OAUTH_DEVICE_USER_CODE_GENERATOR = lambda: "xyz" + + request_data: dict[str, str] = { + "client_id": self.application.client_id, + } + request_as_x_www_form_urlencoded: str = urlencode(request_data) + + response: django.http.response.JsonResponse = self.client.post( + reverse("oauth2_provider:device-authorization"), + data=request_as_x_www_form_urlencoded, + content_type="application/x-www-form-urlencoded", + ) + + assert response.status_code == 200 + + # let's make sure the device was created in the db + assert DeviceModel.objects.get(device_code="abc").status == DeviceModel.AUTHORIZATION_PENDING + + assert response.json() == { + "verification_uri": "example.com/device", + "expires_in": 1800, + "user_code": "xyz", + "device_code": "abc", + "interval": 5, + } + + @mock.patch( + "oauthlib.oauth2.rfc8628.endpoints.device_authorization.generate_token", + lambda: "abc", + ) + def test_device_flow_authorization_user_code_confirm_and_access_token(self): + """ + This is a full user journey test. + + The device initiates the flow by calling the /device-authorization endpoint and starts + polling the /authorize endpoint getting back error until the user approves in the + browser. + + In the meantime, the user visits the /device endpoint in their browsers to submit the + user code and approve, after which the /authorize returns the tokens to the device. + """ + + # ----------------------- + # 0: Setup device flow, where the device sends an authorization request and + # starts polling. The polling will fail because the user has not approved yet + # ----------------------- + self.oauth2_settings.OAUTH_DEVICE_VERIFICATION_URI = "example.com/device" + self.oauth2_settings.OAUTH_DEVICE_USER_CODE_GENERATOR = lambda: "xyz" + self.oauth2_settings.OAUTH_PRE_TOKEN_VALIDATION = [set_oauthlib_user_to_device_request_user] + + request_data: dict[str, str] = { + "client_id": self.application.client_id, + } + request_as_x_www_form_urlencoded: str = urlencode(request_data) + + device_authorization_response: http.response.JsonResponse = self.client.post( + reverse("oauth2_provider:device-authorization"), + data=request_as_x_www_form_urlencoded, + content_type="application/x-www-form-urlencoded", + ) + + assert device_authorization_response.__getitem__("content-type") == "application/json" + device = DeviceModel.objects.get(device_code="abc") + self.assertJSONEqual( + raw=device_authorization_response.content, + expected_data={ + "verification_uri": "example.com/device", + "expires_in": 1800, + "user_code": device.user_code, + "device_code": device.device_code, + "interval": 5, + }, + ) + + # Device polls /token and gets back error because the user hasn't approved yet + token_payload = { + "device_code": device.device_code, + "client_id": self.application.client_id, + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + } + + token_response: http.response.JsonResponse = self.client.post( + "/o/token/", + data=urlencode(token_payload), + content_type="application/x-www-form-urlencoded", + ) + # TokenView should always respond with application/json as it's meant to be + # consumed by devices. + assert token_response.__getitem__("content-type") == "application/json" + assert token_response.status_code == 400 + self.assertJSONEqual(raw=token_response.content, expected_data={"error": "authorization_pending"}) + + # /device and /device_confirm require a user to be logged in + # to access it + UserModel.objects.create_user( + username="test_user_device_flow", + email="test_device@example.com", + password="password123", + ) + self.client.login(username="test_user_device_flow", password="password123") + + # -------------------------------------------------------------------------------- + # 1. User visits the /device endpoint in their browsers and submits the user code + # submits wrong code then right code + # -------------------------------------------------------------------------------- + + # 1. User visits the /device endpoint in their browsers and submits the user code + # (GET Request to load it) + get_response = self.client.get(reverse("oauth2_provider:device")) + assert get_response.status_code == 200 + assert "form" in get_response.context # Ensure the form is rendered in the context + + # 1.1.0 User visits the /device endpoint in their browsers and submits wrong user code + self.assertContains( + self.client.post(reverse("oauth2_provider:device"), data={"user_code": "invalid_code"}), + status_code=200, + text="Incorrect user code", + count=1, + ) + + # Note: the device not being in the expected test covered in the other tests + + # 1.1.1: user submits valid user code + device_confirm_url = reverse( + "oauth2_provider:device-confirm", + kwargs={"user_code": "xyz", "client_id": self.application.client_id}, + ) + + self.assertRedirects( + response=self.client.post( + reverse("oauth2_provider:device"), + data={"user_code": "xyz"}, + ), + expected_url=device_confirm_url, + ) + + # -------------------------------------------------------------------------------- + # 2: We redirect to the accept/deny form (the user is still in their browser) + # and approves + # -------------------------------------------------------------------------------- + device_grant_status_url = reverse( + "oauth2_provider:device-grant-status", + kwargs={"user_code": "xyz", "client_id": self.application.client_id}, + ) + + self.assertRedirects( + response=self.client.post(device_confirm_url, data={"action": "accept"}), + expected_url=device_grant_status_url, + ) + + # -------------------------------------------------------------------------------- + # 3: We redirect to the device grant status page (the user is still in their browser) + # -------------------------------------------------------------------------------- + self.assertContains( + response=self.client.get(device_grant_status_url), + text="Device Authorized", + count=1, + ) + + device = DeviceModel.objects.get(device_code="abc") + assert device.status == device.AUTHORIZED + + # ------------------------- + # 4: Device polls /token successfully + # ------------------------- + token_payload = { + "device_code": device.device_code, + "client_id": self.application.client_id, + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + } + + token_response = self.client.post( + "/o/token/", + data=urlencode(token_payload), + content_type="application/x-www-form-urlencoded", + ) + # TokenView should always respond with application/json as it's meant to be + # consumed by devices. + assert token_response.__getitem__("content-type") == "application/json" + assert token_response.status_code == 200 + + token_data = token_response.json() + assert token_data == { + "access_token": mock.ANY, + "expires_in": 36000, + "token_type": "Bearer", + "scope": "read write", + "refresh_token": mock.ANY, + } + + # ensure the access token and refresh token have the same user as the device that just authenticated + access_token: oauth2_provider.models.AccessToken = AccessToken.objects.get( + token=token_data["access_token"] + ) + assert access_token.user == device.user + + refresh_token: oauth2_provider.models.RefreshToken = RefreshToken.objects.get( + token=token_data["refresh_token"] + ) + assert refresh_token.user == device.user + + def test_device_flow_authorization_device_invalid_state_returns_form_error(self): + """ + This test asserts that only devices in the expected state (authorization-pending) + can be approved/denied by the user. + """ + + UserModel.objects.create_user( + username="test_user_device_flow", + email="test_device@example.com", + password="password123", + ) + self.client.login(username="test_user_device_flow", password="password123") + + device = DeviceModel( + client_id="client_id", + device_code="device_code", + user_code="user_code", + scope="scope", + expires=datetime.now() + timedelta(days=1), + ) + device.save() + + # This simulates pytest.mark.parameterize, which unfortunately does not work with unittest + # and consequently with Django TestCase. + for invalid_state in ["authorized", "denied", "LOL_status"]: + # Set the device into an incorrect state. + device.status = invalid_state + device.save(update_fields=["status"]) + + self.assertContains( + response=self.client.post( + reverse("oauth2_provider:device"), + data={"user_code": "user_code"}, + ), + status_code=200, + text="User code has already been used", + count=1, + ) + + def test_device_flow_authorization_device_expired_returns_form_error(self): + """ + This test asserts that only devices in the expected state (authorization-pending) + can be approved/denied by the user. + """ + + UserModel.objects.create_user( + username="test_user_device_flow", + email="test_device@example.com", + password="password123", + ) + self.client.login(username="test_user_device_flow", password="password123") + + device = DeviceModel( + client_id="client_id", + device_code="device_code", + user_code="user_code", + scope="scope", + expires=datetime.now() + timedelta(seconds=-1), # <- essentially expired + ) + device.save() + + self.assertContains( + response=self.client.post( + reverse("oauth2_provider:device"), + data={"user_code": "user_code"}, + ), + status_code=200, + text="Expired user code", + count=1, + ) + + def test_token_view_returns_error_if_device_in_invalid_state(self): + """ + This test asserts that the token view returns the appropriate errors as specified + in https://datatracker.ietf.org/doc/html/rfc8628#section-3.5, in case the device + has not yet been approved by the user. + """ + + device = DeviceModel( + client_id="client_id", + device_code="device_code", + user_code="user_code", + scope="scope", + expires=datetime.now() + timedelta(seconds=60), + ) + device.save() + + token_payload = { + "device_code": "device_code", + "client_id": "client_id", + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + } + + testcases = [ + ("authorization-pending", '{"error": "authorization_pending"}', 400), + ("expired", '{"error": "expired_token"}', 400), + ("denied", '{"error": "access_denied"}', 400), + ("LOL_status", '{"error": "internal_error"}', 500), + ] + for invalid_state, expected_error_message, expected_error_code in testcases: + device.status = invalid_state + device.save(update_fields=["status"]) + + response = self.client.post( + "/o/token/", + data=urlencode(token_payload), + content_type="application/x-www-form-urlencoded", + ) + self.assertContains( + response=response, + status_code=expected_error_code, + text=expected_error_message, + count=1, + ) + # TokenView should always respond with application/json as it's meant to be + # consumed by devices. + self.assertEqual(response.__getitem__("content-type"), "application/json") + + def test_token_view_returns_404_error_if_device_not_found(self): + device = DeviceModel( + client_id="client_id", + device_code="device_code", + user_code="user_code", + scope="scope", + expires=datetime.now() + timedelta(seconds=60), + ) + device.save() + + token_payload = { + "device_code": "another_device_code", + "client_id": "client_id", + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + } + + response = self.client.post( + "/o/token/", + data=urlencode(token_payload), + content_type="application/x-www-form-urlencoded", + ) + self.assertContains( + response=response, + status_code=404, + text="device_not_found", + count=1, + ) + # TokenView should always respond with application/json as it's meant to be + # consumed by devices. + self.assertEqual(response.__getitem__("content-type"), "application/json") + + def test_token_view_status_equals_what_oauthlib_token_response_method_returns(self): + """ + Tests the use case where oauthlib create_token_response returns a status different + than 200. + """ + + class MockOauthlibCoreClass: + def create_token_response(self, _): + return "url", {"headers_are_ignored": True}, '{"Key": "Value"}', 299 + + device = DeviceModel( + client_id="client_id", + device_code="device_code", + user_code="user_code", + scope="scope", + expires=datetime.now() + timedelta(seconds=60), + status="authorized", + ) + device.save() + + token_payload = { + "device_code": "device_code", + "client_id": "client_id", + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + } + + with mock.patch( + "oauth2_provider.views.mixins.OAuthLibMixin.get_oauthlib_core", MockOauthlibCoreClass + ): + response = self.client.post( + "/o/token/", + data=urlencode(token_payload), + content_type="application/x-www-form-urlencoded", + ) + + self.assertEqual(response["content-type"], "application/json") + self.assertContains( + response=response, + status_code=299, + text='{"Key": "Value"}', + count=1, + ) + assert not response.has_header("headers_are_ignored") + + @mock.patch( + "oauthlib.oauth2.rfc8628.endpoints.device_authorization.generate_token", + lambda: "abc", + ) + def test_device_polling_interval_can_be_changed(self): + """ + Tests the device polling rate(interval) can be changed to something other than the default + of 5 seconds. + """ + + self.oauth2_settings.OAUTH_DEVICE_VERIFICATION_URI = "example.com/device" + self.oauth2_settings.OAUTH_DEVICE_USER_CODE_GENERATOR = lambda: "xyz" + + self.oauth2_settings.DEVICE_FLOW_INTERVAL = 10 + + request_data: dict[str, str] = { + "client_id": self.application.client_id, + } + request_as_x_www_form_urlencoded: str = urlencode(request_data) + + response: django.http.response.JsonResponse = self.client.post( + reverse("oauth2_provider:device-authorization"), + data=request_as_x_www_form_urlencoded, + content_type="application/x-www-form-urlencoded", + ) + + assert response.status_code == 200 + + assert response.json() == { + "verification_uri": "example.com/device", + "expires_in": 1800, + "user_code": "xyz", + "device_code": "abc", + "interval": 10, + } + + def test_incorrect_client_id_sent(self): + """ + Ensure the correct error is returned when an invalid client is sent + """ + request_data: dict[str, str] = { + "client_id": "client_id_that_does_not_exist", + } + request_as_x_www_form_urlencoded: str = urlencode(request_data) + + response: django.http.response.JsonResponse = self.client.post( + reverse("oauth2_provider:device-authorization"), + data=request_as_x_www_form_urlencoded, + content_type="application/x-www-form-urlencoded", + ) + + assert response.status_code == 400 + + assert response.json() == { + "error": "invalid_request", + "error_description": "Invalid client_id parameter value.", + } + + def test_missing_client_id(self): + """ + Ensure the correct error is returned when the client id is missing. + """ + request_data: dict[str, str] = { + "not_client_id": "client_id_that_does_not_exist", + } + request_as_x_www_form_urlencoded: str = urlencode(request_data) + + response: django.http.response.JsonResponse = self.client.post( + reverse("oauth2_provider:device-authorization"), + data=request_as_x_www_form_urlencoded, + content_type="application/x-www-form-urlencoded", + ) + + assert response.status_code == 400 + + assert response.json() == { + "error": "invalid_request", + "error_description": "Missing client_id parameter.", + } + + def test_device_confirm_and_user_code_views_require_login(self): + URLs = [ + reverse("oauth2_provider:device-confirm", kwargs={"user_code": None, "client_id": "abc"}), + reverse("oauth2_provider:device-confirm", kwargs={"user_code": "abc", "client_id": "abc"}), + reverse("oauth2_provider:device"), + ] + + for url in URLs: + r = self.client.get(url) + assert r.status_code == 302 + assert r["Location"] == f"{settings.LOGIN_URL}?next={url}" + + r = self.client.post(url) + assert r.status_code == 302 + assert r["Location"] == f"{settings.LOGIN_URL}?next={url}" + + def test_device_confirm_view_GET_returns_404_when_device_does_not_exist(self): + UserModel.objects.create_user( + username="test_user_device_flow", + email="test_device@example.com", + password="password123", + ) + self.client.login(username="test_user_device_flow", password="password123") + + device = DeviceModel( + client_id="client_id", + device_code="device_code", + user_code="user_code", + scope="scope", + expires=datetime.now(), + ) + device.save() + + self.assertContains( + response=self.client.get( + reverse( + "oauth2_provider:device-confirm", + kwargs={"user_code": "not_user_code", "client_id": "not_client_id"}, + ) + ), + status_code=404, + text="The requested resource was not found on this server.", + ) + + # Asserts for valid user_code and client_id but invalid states + for invalid_state in ["authorized", "denied", "expired"]: + device.status = invalid_state + device.save(update_fields=["status"]) + + self.assertContains( + response=self.client.get( + reverse( + "oauth2_provider:device-confirm", + kwargs={"user_code": "not_user_code", "client_id": "client_id"}, + ) + ), + status_code=404, + text="The requested resource was not found on this server.", + ) + + def test_device_confirm_view_POST_returns_404_when_device_does_not_exist(self): + UserModel.objects.create_user( + username="test_user_device_flow", + email="test_device@example.com", + password="password123", + ) + self.client.login(username="test_user_device_flow", password="password123") + + device = DeviceModel( + client_id="client_id", + device_code="device_code", + user_code="user_code", + scope="scope", + expires=datetime.now(), + ) + device.save() + + self.assertContains( + response=self.client.post( + reverse( + "oauth2_provider:device-confirm", + kwargs={"user_code": "not_user_code", "client_id": "client_id"}, + ), + data={"action": "accept"}, + ), + status_code=404, + text="The requested resource was not found on this server.", + count=1, + ) + + # Asserts for valid user_code and client_id but invalid states + for invalid_state in ["authorized", "denied", "expired"]: + device.status = invalid_state + device.save(update_fields=["status"]) + + self.assertContains( + response=self.client.post( + reverse( + "oauth2_provider:device-confirm", + kwargs={"user_code": "user_code", "client_id": "client_id"}, + ), + data={"action": "accept"}, + ), + status_code=404, + text="The requested resource was not found on this server.", + count=1, + ) + + def test_device_is_expired_method_sets_status_to_expired_if_deadline_passed(self): + device = DeviceModel( + client_id="client_id", + device_code="device_code", + user_code="user_code", + scope="scope", + expires=datetime.now(tz=timezone.utc) + timedelta(seconds=-1), # <- essentially expired + ) + device.save() + + assert device.status == device.AUTHORIZATION_PENDING # default value + + # call is_expired() which should update the state + is_expired = device.is_expired() + + assert is_expired + assert device.status == device.EXPIRED + + # calling again is_expired() should return true and not change the state + is_expired = device.is_expired() + + assert is_expired + assert device.status == device.EXPIRED diff --git a/tests/test_oauth2_validators.py b/tests/test_oauth2_validators.py index 14c74506e..7e7e46de7 100644 --- a/tests/test_oauth2_validators.py +++ b/tests/test_oauth2_validators.py @@ -180,6 +180,12 @@ def test_authenticate_basic_auth_not_utf8(self): self.request.headers = {"HTTP_AUTHORIZATION": "Basic test"} self.assertFalse(self.validator._authenticate_basic_auth(self.request)) + def test_authenticate_basic_auth_public_app_with_device_code(self): + self.request.grant_type = "urn:ietf:params:oauth:grant-type:device_code" + self.request.headers = get_basic_auth_header("client_id", CLEARTEXT_SECRET) + self.application.client_type = Application.CLIENT_PUBLIC + self.assertTrue(self.validator._authenticate_basic_auth(self.request)) + def test_authenticate_check_secret(self): hashed = make_password(CLEARTEXT_SECRET) self.assertTrue(self.validator._check_secret(CLEARTEXT_SECRET, CLEARTEXT_SECRET)) diff --git a/tests/test_utils.py b/tests/test_utils.py index 2c319b6ea..eef4b985c 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,3 +1,5 @@ +import pytest + from oauth2_provider import utils @@ -25,3 +27,24 @@ def test_jwk_from_pem_caches_jwk(): jwk3 = utils.jwk_from_pem(a_different_tiny_rsa_key) assert jwk3 is not jwk1 + + +def test_user_code_generator(): + # Default argument, 8 characters + user_code = utils.user_code_generator() + assert isinstance(user_code, str) + assert len(user_code) == 8 + + for character in user_code: + assert character >= "0" + assert character <= "V" + + another_user_code = utils.user_code_generator() + assert another_user_code != user_code + + shorter_user_code = utils.user_code_generator(user_code_length=1) + assert len(shorter_user_code) == 1 + + with pytest.raises(ValueError): + utils.user_code_generator(user_code_length=0) + utils.user_code_generator(user_code_length=-1) diff --git a/tox.ini b/tox.ini index 0a85f5fb8..29e93a2ae 100644 --- a/tox.ini +++ b/tox.ini @@ -46,7 +46,7 @@ deps = dj52: Django>=5.2,<6.0 djmain: https://github.com/django/django/archive/main.tar.gz djangorestframework - oauthlib>=3.2.2 + oauthlib>=3.3.0 jwcrypto coverage pytest @@ -79,7 +79,7 @@ commands = deps = Jinja2<3.1 sphinx<3 - oauthlib>=3.2.2 + oauthlib>=3.3.0 m2r>=0.2.1 mistune<2 sphinx-rtd-theme