diff --git a/escalated/api_middleware.py b/escalated/api_middleware.py new file mode 100644 index 0000000..6802c1e --- /dev/null +++ b/escalated/api_middleware.py @@ -0,0 +1,146 @@ +import hashlib +import json +import time + +from django.core.cache import cache +from django.http import JsonResponse +from django.utils import timezone + +from escalated.conf import get_setting +from escalated.models import ApiToken + + +class AuthenticateApiToken: + """ + Middleware that authenticates API requests using Bearer tokens. + + Extracts the token from the Authorization header, hashes it with SHA-256, + looks up the token in the database, checks expiration, resolves the user, + and updates last_used_at / last_used_ip. + """ + + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + return self.get_response(request) + + def process_view(self, request, view_func, view_args, view_kwargs): + # Extract Bearer token from Authorization header + auth_header = request.META.get("HTTP_AUTHORIZATION", "") + if not auth_header.startswith("Bearer "): + return JsonResponse({"message": "Unauthenticated."}, status=401) + + plain_text = auth_header[7:] + if not plain_text: + return JsonResponse({"message": "Unauthenticated."}, status=401) + + # Look up token + api_token = ApiToken.find_by_plain_text(plain_text) + if api_token is None: + return JsonResponse({"message": "Invalid token."}, status=401) + + # Check expiration + if api_token.is_expired: + return JsonResponse({"message": "Token has expired."}, status=401) + + # Resolve the token's owner + user = api_token.tokenable + if user is None: + return JsonResponse({"message": "Token owner not found."}, status=401) + + # Update last used info + api_token.last_used_at = timezone.now() + api_token.last_used_ip = _get_client_ip(request) + api_token.save(update_fields=["last_used_at", "last_used_ip", "updated_at"]) + + # Attach user and token to request + request.user = user + request.api_token = api_token + + return None + + +class ApiRateLimit: + """ + Per-token rate limiting middleware for API requests. + + Uses Django's cache framework with a sliding window per minute. + Adds X-RateLimit-Limit and X-RateLimit-Remaining headers to responses. + Returns 429 with Retry-After header when limit is exceeded. + """ + + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + return self.get_response(request) + + def process_view(self, request, view_func, view_args, view_kwargs): + max_attempts = get_setting("API_RATE_LIMIT") + + # Determine rate-limit key (by token ID or IP) + api_token = getattr(request, "api_token", None) + if api_token: + key = f"escalated_api:{api_token.pk}" + else: + key = f"escalated_api:{_get_client_ip(request)}" + + # Get current hit count + current = cache.get(key) + + if current is not None and current >= max_attempts: + # Calculate retry_after: TTL remaining on the cache key + retry_after = cache.ttl(key) if hasattr(cache, "ttl") else 60 + + response = JsonResponse( + {"message": "Too many requests.", "retry_after": retry_after}, + status=429, + ) + response["Retry-After"] = str(retry_after) + response["X-RateLimit-Limit"] = str(max_attempts) + response["X-RateLimit-Remaining"] = "0" + return response + + return None + + def process_response(self, request, response): + # Only add rate-limit headers to API responses + if not hasattr(request, "api_token") and not request.path.startswith( + "/" + get_setting("API_PREFIX") + ): + return response + + max_attempts = get_setting("API_RATE_LIMIT") + + api_token = getattr(request, "api_token", None) + if api_token: + key = f"escalated_api:{api_token.pk}" + else: + key = f"escalated_api:{_get_client_ip(request)}" + + # Increment the counter (60-second window) + current = cache.get(key) + if current is None: + cache.set(key, 1, 60) + current = 1 + else: + try: + current = cache.incr(key) + except ValueError: + cache.set(key, 1, 60) + current = 1 + + remaining = max(0, max_attempts - current) + response["X-RateLimit-Limit"] = str(max_attempts) + response["X-RateLimit-Remaining"] = str(remaining) + + return response + + +def _get_client_ip(request): + """Extract the client IP from the request.""" + x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") + if x_forwarded_for: + return x_forwarded_for.split(",")[0].strip() + return request.META.get("REMOTE_ADDR", "") diff --git a/escalated/api_serializers.py b/escalated/api_serializers.py new file mode 100644 index 0000000..52b5334 --- /dev/null +++ b/escalated/api_serializers.py @@ -0,0 +1,298 @@ +""" +Dict-based serializers for the REST API, producing JSON-compatible output that +matches the Laravel API response format. +""" + +from escalated.serializers import _format_dt, _user_dict + + +class ApiTicketCollectionSerializer: + """Slim ticket serializer for list endpoints.""" + + @staticmethod + def serialize(ticket): + assignee = ticket.assigned_to + department = ticket.department + + return { + "id": ticket.pk, + "reference": ticket.reference, + "subject": ticket.subject, + "status": ticket.status, + "status_label": ticket.get_status_display(), + "priority": ticket.priority, + "priority_label": ticket.get_priority_display(), + "requester": { + "name": ticket.requester_name, + "email": ticket.requester_email, + }, + "assignee": ( + { + "id": assignee.pk, + "name": getattr(assignee, "get_full_name", lambda: str(assignee))(), + } + if assignee + else None + ), + "department": ( + {"id": department.pk, "name": department.name} + if department + else None + ), + "sla_breached": ( + ticket.sla_first_response_breached + or ticket.sla_resolution_breached + ), + "created_at": _format_dt(ticket.created_at), + "updated_at": _format_dt(ticket.updated_at), + } + + @staticmethod + def serialize_list(tickets): + return [ApiTicketCollectionSerializer.serialize(t) for t in tickets] + + +class ApiTicketDetailSerializer: + """Full ticket serializer for detail endpoints.""" + + @staticmethod + def serialize(ticket, include_replies=True, include_activities=True): + assignee = ticket.assigned_to + department = ticket.department + + data = { + "id": ticket.pk, + "reference": ticket.reference, + "subject": ticket.subject, + "description": ticket.description, + "status": ticket.status, + "status_label": ticket.get_status_display(), + "priority": ticket.priority, + "priority_label": ticket.get_priority_display(), + "channel": ticket.channel, + "metadata": ticket.metadata, + "requester": { + "name": ticket.requester_name, + "email": ticket.requester_email, + }, + "assignee": ( + { + "id": assignee.pk, + "name": getattr(assignee, "get_full_name", lambda: str(assignee))(), + "email": getattr(assignee, "email", ""), + } + if assignee + else None + ), + "department": ( + {"id": department.pk, "name": department.name} + if department + else None + ), + "tags": [ + {"id": tag.pk, "name": tag.name, "color": tag.color} + for tag in ticket.tags.all() + ], + "sla": { + "first_response_due_at": _format_dt(ticket.first_response_due_at), + "first_response_at": _format_dt(ticket.first_response_at), + "first_response_breached": ticket.sla_first_response_breached, + "resolution_due_at": _format_dt(ticket.resolution_due_at), + "resolution_breached": ticket.sla_resolution_breached, + }, + "resolved_at": _format_dt(ticket.resolved_at), + "closed_at": _format_dt(ticket.closed_at), + "created_at": _format_dt(ticket.created_at), + "updated_at": _format_dt(ticket.updated_at), + } + + if include_replies: + data["replies"] = [ + ApiReplySerializer.serialize(reply) + for reply in ticket.replies.filter(is_deleted=False) + ] + + if include_activities: + data["activities"] = [ + ApiActivitySerializer.serialize(activity) + for activity in ticket.activities.all()[:20] + ] + + return data + + +class ApiReplySerializer: + """Reply serializer for API output.""" + + @staticmethod + def serialize(reply): + author = reply.author + data = { + "id": reply.pk, + "body": reply.body, + "is_internal_note": reply.is_internal_note, + "is_pinned": reply.is_pinned, + "author": ( + { + "id": author.pk, + "name": getattr(author, "get_full_name", lambda: str(author))(), + "email": getattr(author, "email", None), + } + if author + else None + ), + "attachments": [ + ApiAttachmentSerializer.serialize(a) + for a in reply.attachments.all() + ], + "created_at": _format_dt(reply.created_at), + } + return data + + +class ApiActivitySerializer: + """Activity serializer for API output.""" + + @staticmethod + def serialize(activity): + data = { + "id": activity.pk, + "type": activity.type, + "description": activity.get_type_display(), + "properties": activity.properties, + "created_at": _format_dt(activity.created_at), + } + try: + causer = activity.causer + data["causer"] = ( + {"id": causer.pk, "name": getattr(causer, "get_full_name", lambda: str(causer))()} + if causer + else None + ) + except Exception: + data["causer"] = None + return data + + +class ApiAttachmentSerializer: + """Attachment serializer for API output.""" + + @staticmethod + def serialize(attachment): + return { + "id": attachment.pk, + "filename": attachment.original_filename, + "mime_type": attachment.mime_type, + "size": attachment.size, + "url": attachment.file.url if attachment.file else None, + } + + +class ApiAgentSerializer: + """Agent (user) serializer for API output.""" + + @staticmethod + def serialize(user): + return { + "id": user.pk, + "name": getattr(user, "get_full_name", lambda: str(user))(), + "email": getattr(user, "email", ""), + } + + @staticmethod + def serialize_list(users): + return [ApiAgentSerializer.serialize(u) for u in users] + + +class ApiDepartmentSerializer: + """Department serializer for API output.""" + + @staticmethod + def serialize(department): + return { + "id": department.pk, + "name": department.name, + "description": department.description, + "is_active": department.is_active, + } + + @staticmethod + def serialize_list(departments): + return [ApiDepartmentSerializer.serialize(d) for d in departments] + + +class ApiTagSerializer: + """Tag serializer for API output.""" + + @staticmethod + def serialize(tag): + return { + "id": tag.pk, + "name": tag.name, + "color": tag.color, + } + + @staticmethod + def serialize_list(tags): + return [ApiTagSerializer.serialize(t) for t in tags] + + +class ApiCannedResponseSerializer: + """Canned response serializer for API output.""" + + @staticmethod + def serialize(response): + return { + "id": response.pk, + "title": response.title, + "body": response.body, + } + + @staticmethod + def serialize_list(responses): + return [ApiCannedResponseSerializer.serialize(r) for r in responses] + + +class ApiMacroSerializer: + """Macro serializer for API output.""" + + @staticmethod + def serialize(macro): + return { + "id": macro.pk, + "name": macro.name, + "actions": macro.actions, + "order": macro.order, + } + + @staticmethod + def serialize_list(macros): + return [ApiMacroSerializer.serialize(m) for m in macros] + + +class ApiTokenSerializer: + """API token serializer for admin views.""" + + @staticmethod + def serialize(token): + tokenable = token.tokenable + return { + "id": token.pk, + "name": token.name, + "user_name": ( + getattr(tokenable, "get_full_name", lambda: str(tokenable))() + if tokenable + else None + ), + "user_email": getattr(tokenable, "email", None) if tokenable else None, + "abilities": token.abilities, + "last_used_at": _format_dt(token.last_used_at), + "last_used_ip": token.last_used_ip, + "expires_at": _format_dt(token.expires_at), + "is_expired": token.is_expired, + "created_at": _format_dt(token.created_at), + } + + @staticmethod + def serialize_list(tokens): + return [ApiTokenSerializer.serialize(t) for t in tokens] diff --git a/escalated/api_urls.py b/escalated/api_urls.py new file mode 100644 index 0000000..9bb829c --- /dev/null +++ b/escalated/api_urls.py @@ -0,0 +1,70 @@ +""" +URL configuration for the Escalated REST API. + +All routes are mounted under the configured API_PREFIX (default: support/api/v1). +Authentication and rate limiting are handled by middleware. +""" + +from django.urls import path + +from escalated.views import api, admin_api_tokens + +app_name = "escalated_api" + +# API v1 routes (mounted under API_PREFIX by the consumer project) +api_patterns = [ + # Auth + path("auth/validate/", api.auth_validate, name="auth_validate"), + + # Dashboard + path("dashboard/", api.dashboard, name="dashboard"), + + # Tickets - list & create + path("tickets/", api.ticket_list, name="ticket_list"), + path("tickets/create/", api.ticket_create, name="ticket_create"), + + # Tickets - detail & actions (by reference or ID) + path("tickets//", api.ticket_show, name="ticket_show"), + path("tickets//reply/", api.ticket_reply, name="ticket_reply"), + path("tickets//status/", api.ticket_status, name="ticket_status"), + path("tickets//priority/", api.ticket_priority, name="ticket_priority"), + path("tickets//assign/", api.ticket_assign, name="ticket_assign"), + path("tickets//follow/", api.ticket_follow, name="ticket_follow"), + path("tickets//macro/", api.ticket_apply_macro, name="ticket_macro"), + path("tickets//tags/", api.ticket_tags, name="ticket_tags"), + path("tickets//delete/", api.ticket_destroy, name="ticket_destroy"), + + # Resources + path("agents/", api.resource_agents, name="agents"), + path("departments/", api.resource_departments, name="departments"), + path("tags/", api.resource_tags, name="tags"), + path("canned-responses/", api.resource_canned_responses, name="canned_responses"), + path("macros/", api.resource_macros, name="macros"), + path("realtime/config/", api.resource_realtime_config, name="realtime_config"), +] + +# Admin token management routes +admin_api_token_patterns = [ + path( + "admin/api-tokens/", + admin_api_tokens.api_tokens_index, + name="admin_api_tokens_index", + ), + path( + "admin/api-tokens/create/", + admin_api_tokens.api_tokens_create, + name="admin_api_tokens_create", + ), + path( + "admin/api-tokens//update/", + admin_api_tokens.api_tokens_update, + name="admin_api_tokens_update", + ), + path( + "admin/api-tokens//delete/", + admin_api_tokens.api_tokens_destroy, + name="admin_api_tokens_destroy", + ), +] + +urlpatterns = api_patterns + admin_api_token_patterns diff --git a/escalated/conf.py b/escalated/conf.py index 8dd9f13..84af532 100644 --- a/escalated/conf.py +++ b/escalated/conf.py @@ -45,6 +45,11 @@ "IMAP_USERNAME": None, "IMAP_PASSWORD": None, "IMAP_MAILBOX": "INBOX", + # REST API settings + "API_ENABLED": False, + "API_RATE_LIMIT": 60, + "API_TOKEN_EXPIRY_DAYS": None, + "API_PREFIX": "support/api/v1", # Plugin system settings "PLUGINS_ENABLED": True, "PLUGINS_PATH": None, # Defaults to /plugins/escalated at runtime diff --git a/escalated/migrations/0005_api_tokens.py b/escalated/migrations/0005_api_tokens.py new file mode 100644 index 0000000..7c6b471 --- /dev/null +++ b/escalated/migrations/0005_api_tokens.py @@ -0,0 +1,72 @@ +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + +from escalated.conf import get_table_name + + +class Migration(migrations.Migration): + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ("contenttypes", "0002_remove_content_type_name"), + ("escalated", "0004_v040_advanced_features"), + ] + + operations = [ + migrations.CreateModel( + name="ApiToken", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "tokenable_content_type", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + null=True, + blank=True, + related_name="escalated_api_tokens", + to="contenttypes.contenttype", + ), + ), + ( + "tokenable_object_id", + models.PositiveIntegerField(null=True, blank=True), + ), + ("name", models.CharField(max_length=255)), + ( + "token", + models.CharField(max_length=64, unique=True, db_index=True), + ), + ( + "abilities", + models.JSONField(default=list), + ), + ( + "last_used_at", + models.DateTimeField(null=True, blank=True), + ), + ( + "last_used_ip", + models.CharField(max_length=45, null=True, blank=True), + ), + ( + "expires_at", + models.DateTimeField(null=True, blank=True), + ), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ], + options={ + "db_table": get_table_name("api_tokens"), + "ordering": ["-created_at"], + }, + ), + ] diff --git a/escalated/migrations/0006_merge_api_tokens_and_plugins.py b/escalated/migrations/0006_merge_api_tokens_and_plugins.py new file mode 100644 index 0000000..d6d66ae --- /dev/null +++ b/escalated/migrations/0006_merge_api_tokens_and_plugins.py @@ -0,0 +1,11 @@ +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("escalated", "0005_api_tokens"), + ("escalated", "0005_escalatedplugin"), + ] + + operations = [] diff --git a/escalated/models.py b/escalated/models.py index b627b84..86eb176 100644 --- a/escalated/models.py +++ b/escalated/models.py @@ -1,3 +1,5 @@ +import hashlib +import secrets import uuid from datetime import timedelta @@ -770,3 +772,114 @@ def mark_spam(self): """Mark this inbound email as spam.""" self.status = self.Status.SPAM self.save(update_fields=["status", "updated_at"]) + + +# --------------------------------------------------------------------------- +# API Token +# --------------------------------------------------------------------------- + + +class ApiTokenQuerySet(models.QuerySet): + def active(self): + """Return tokens that are not expired.""" + return self.filter( + Q(expires_at__isnull=True) | Q(expires_at__gt=timezone.now()) + ) + + def expired(self): + """Return tokens that have expired.""" + return self.filter( + expires_at__isnull=False, + expires_at__lte=timezone.now(), + ) + + +class ApiTokenManager(models.Manager): + def get_queryset(self): + return ApiTokenQuerySet(self.model, using=self._db) + + def active(self): + return self.get_queryset().active() + + def expired(self): + return self.get_queryset().expired() + + +class ApiToken(models.Model): + """API token for authenticating REST API requests.""" + + # Tokenable via GenericForeignKey (like Laravel morphTo) + tokenable_content_type = models.ForeignKey( + ContentType, + on_delete=models.CASCADE, + null=True, + blank=True, + related_name="escalated_api_tokens", + ) + tokenable_object_id = models.PositiveIntegerField(null=True, blank=True) + tokenable = GenericForeignKey("tokenable_content_type", "tokenable_object_id") + + name = models.CharField(max_length=255) + token = models.CharField(max_length=64, unique=True, db_index=True) + abilities = models.JSONField(default=list) + last_used_at = models.DateTimeField(null=True, blank=True) + last_used_ip = models.CharField(max_length=45, null=True, blank=True) + expires_at = models.DateTimeField(null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + objects = ApiTokenManager() + + class Meta: + db_table = get_table_name("api_tokens") + ordering = ["-created_at"] + + def __str__(self): + return f"ApiToken({self.name})" + + def has_ability(self, ability): + """Check if this token has the given ability.""" + abilities = self.abilities or [] + return "*" in abilities or ability in abilities + + @property + def is_expired(self): + """Check if this token has expired.""" + if self.expires_at is None: + return False + return self.expires_at <= timezone.now() + + @classmethod + def create_token(cls, user, name, abilities=None, expires_at=None): + """ + Create a new API token for a user. + + Returns a dict with 'token' (the model instance) and + 'plain_text_token' (the unhashed token string to give to the user). + """ + if abilities is None: + abilities = ["*"] + + plain_text = secrets.token_hex(32) + hashed = hashlib.sha256(plain_text.encode()).hexdigest() + + ct = ContentType.objects.get_for_model(user) + token = cls.objects.create( + tokenable_content_type=ct, + tokenable_object_id=user.pk, + name=name, + token=hashed, + abilities=abilities, + expires_at=expires_at, + ) + + return {"token": token, "plain_text_token": plain_text} + + @classmethod + def find_by_plain_text(cls, plain_text): + """Look up a token by its plain-text value (hashes it first).""" + hashed = hashlib.sha256(plain_text.encode()).hexdigest() + try: + return cls.objects.get(token=hashed) + except cls.DoesNotExist: + return None diff --git a/escalated/urls.py b/escalated/urls.py index 48f2425..4ee0bd0 100644 --- a/escalated/urls.py +++ b/escalated/urls.py @@ -1,5 +1,6 @@ -from django.urls import path +from django.urls import path, include +from escalated.conf import get_setting from escalated.views import customer, agent, admin, guest, inbound, admin_plugins app_name = "escalated" @@ -116,3 +117,16 @@ + guest_patterns + inbound_patterns ) + +# Conditionally include API URLs when API is enabled +if get_setting("API_ENABLED"): + from escalated.api_urls import api_patterns, admin_api_token_patterns + + api_prefix = get_setting("API_PREFIX").strip("/") + + urlpatterns += [ + path(f"{api_prefix}/", include((api_patterns, "escalated_api"))), + ] + + # Admin API token management (under the main admin prefix) + urlpatterns += admin_api_token_patterns diff --git a/escalated/views/admin_api_tokens.py b/escalated/views/admin_api_tokens.py new file mode 100644 index 0000000..19650ec --- /dev/null +++ b/escalated/views/admin_api_tokens.py @@ -0,0 +1,221 @@ +""" +Admin views for managing API tokens. + +Provides a full CRUD interface for API tokens using Inertia.js rendering, +following the same pattern as other admin views in the package. +""" + +import json + +from django.contrib.auth import get_user_model +from django.contrib.auth.decorators import login_required +from django.http import HttpResponseForbidden, HttpResponseNotFound, JsonResponse +from django.shortcuts import redirect +from django.utils import timezone + +from escalated.api_serializers import ApiTokenSerializer +from escalated.conf import get_setting +from escalated.models import ApiToken +from escalated.permissions import is_admin, is_agent + +try: + from inertia import render +except ImportError: + # Fallback for environments without inertia-django + render = None + +User = get_user_model() + + +def _require_admin(request): + """Return an error response if user is not admin, else None.""" + if not request.user.is_authenticated: + return redirect("login") + if not is_admin(request.user): + return HttpResponseForbidden("Admin access required.") + return None + + +def _get_agent_users(): + """Return list of users who are agents or admins.""" + users = User.objects.filter(is_active=True) + return [ + {"id": u.pk, "name": u.get_full_name() or u.username, "email": u.email} + for u in users + if is_agent(u) or is_admin(u) + ] + + +@login_required +def api_tokens_index(request): + """ + List all API tokens with their associated users. + """ + check = _require_admin(request) + if check: + return check + + tokens = ApiToken.objects.order_by("-created_at") + token_data = ApiTokenSerializer.serialize_list(tokens) + + if render: + return render(request, "Escalated/Admin/ApiTokens/Index", props={ + "tokens": token_data, + "users": _get_agent_users(), + "api_enabled": get_setting("API_ENABLED"), + }) + + # JSON fallback for non-Inertia setups + return JsonResponse({ + "tokens": token_data, + "users": _get_agent_users(), + "api_enabled": get_setting("API_ENABLED"), + }) + + +@login_required +def api_tokens_create(request): + """ + Create a new API token. + + Accepts POST with: + name (required), user_id (required), abilities (list), expires_in_days (optional int) + """ + check = _require_admin(request) + if check: + return check + + if request.method != "POST": + return HttpResponseForbidden("Method not allowed") + + # Parse body — support both form-encoded and JSON + if request.content_type and "json" in request.content_type: + try: + data = json.loads(request.body) + except (json.JSONDecodeError, ValueError): + data = {} + else: + data = request.POST + + name = (data.get("name") or "").strip() if isinstance(data, dict) else (data.get("name", "")).strip() + user_id = data.get("user_id") + + if not name: + return JsonResponse( + {"message": "Validation failed.", "errors": {"name": "Name is required."}}, + status=422, + ) + if not user_id: + return JsonResponse( + {"message": "Validation failed.", "errors": {"user_id": "User ID is required."}}, + status=422, + ) + + try: + user = User.objects.get(pk=int(user_id)) + except (User.DoesNotExist, ValueError, TypeError): + return JsonResponse({"message": "User not found."}, status=404) + + # Parse abilities + abilities = data.get("abilities", ["*"]) + if isinstance(abilities, str): + try: + abilities = json.loads(abilities) + except (json.JSONDecodeError, ValueError): + abilities = ["*"] + + # Parse expiry + expires_in_days = data.get("expires_in_days") + expires_at = None + if expires_in_days: + try: + days = int(expires_in_days) + if 1 <= days <= 365: + expires_at = timezone.now() + timezone.timedelta(days=days) + except (ValueError, TypeError): + pass + + result = ApiToken.create_token( + user=user, + name=name, + abilities=abilities, + expires_at=expires_at, + ) + + return JsonResponse( + { + "message": "API token created.", + "plain_text_token": result["plain_text_token"], + "token": ApiTokenSerializer.serialize(result["token"]), + }, + status=201, + ) + + +@login_required +def api_tokens_update(request, token_id): + """ + Update an existing API token (name and/or abilities). + + Accepts POST/PUT/PATCH with: + name (optional), abilities (optional list) + """ + check = _require_admin(request) + if check: + return check + + if request.method not in ("POST", "PUT", "PATCH"): + return HttpResponseForbidden("Method not allowed") + + try: + token = ApiToken.objects.get(pk=token_id) + except ApiToken.DoesNotExist: + return HttpResponseNotFound("Token not found") + + # Parse body + if request.content_type and "json" in request.content_type: + try: + data = json.loads(request.body) + except (json.JSONDecodeError, ValueError): + data = {} + else: + data = request.POST + + name = data.get("name") + if name is not None: + token.name = name.strip() if isinstance(name, str) else name + + abilities = data.get("abilities") + if abilities is not None: + if isinstance(abilities, str): + try: + abilities = json.loads(abilities) + except (json.JSONDecodeError, ValueError): + abilities = None + if abilities is not None: + token.abilities = abilities + + token.save() + + return JsonResponse({"message": "Token updated.", "token": ApiTokenSerializer.serialize(token)}) + + +@login_required +def api_tokens_destroy(request, token_id): + """ + Revoke (delete) an API token. + """ + check = _require_admin(request) + if check: + return check + + if request.method not in ("POST", "DELETE"): + return HttpResponseForbidden("Method not allowed") + + try: + token = ApiToken.objects.get(pk=token_id) + token.delete() + except ApiToken.DoesNotExist: + pass + + return JsonResponse({"message": "Token revoked."}) diff --git a/escalated/views/api.py b/escalated/views/api.py new file mode 100644 index 0000000..9bccdbd --- /dev/null +++ b/escalated/views/api.py @@ -0,0 +1,735 @@ +""" +REST API views for Escalated. + +All views return JSON responses and require Bearer token authentication +(handled by AuthenticateApiToken middleware). Response format matches +the Laravel implementation for cross-framework compatibility. +""" + +import json + +from django.contrib.auth import get_user_model +from django.core.paginator import Paginator +from django.db.models import Q +from django.http import JsonResponse +from django.utils import timezone +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.http import require_GET, require_POST, require_http_methods + +from escalated.api_serializers import ( + ApiAgentSerializer, + ApiCannedResponseSerializer, + ApiDepartmentSerializer, + ApiMacroSerializer, + ApiReplySerializer, + ApiTagSerializer, + ApiTicketCollectionSerializer, + ApiTicketDetailSerializer, +) +from escalated.models import ( + CannedResponse, + Department, + Macro, + Tag, + Ticket, +) +from escalated.permissions import is_agent, is_admin +from escalated.services.ticket_service import TicketService + +User = get_user_model() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _require_ability(request, ability): + """ + Check that the request's API token has the given ability. + Returns a 403 JsonResponse on failure, or None on success. + """ + api_token = getattr(request, "api_token", None) + if api_token and not api_token.has_ability(ability): + return JsonResponse({"message": "Insufficient permissions."}, status=403) + return None + + +def _json_body(request): + """Parse JSON request body, returning an empty dict on failure.""" + try: + return json.loads(request.body) + except (json.JSONDecodeError, ValueError, TypeError): + return {} + + +def _resolve_ticket(reference): + """ + Resolve a ticket by reference string or numeric ID. + Returns (ticket, None) on success, or (None, JsonResponse) on failure. + """ + try: + ticket = Ticket.objects.select_related( + "assigned_to", "department", "sla_policy" + ).prefetch_related( + "tags", + "replies__author", + "replies__attachments", + "activities", + ).get(reference=reference) + return ticket, None + except Ticket.DoesNotExist: + pass + + # Fall back to lookup by numeric ID + try: + ticket_id = int(reference) + ticket = Ticket.objects.select_related( + "assigned_to", "department", "sla_policy" + ).prefetch_related( + "tags", + "replies__author", + "replies__attachments", + "activities", + ).get(pk=ticket_id) + return ticket, None + except (ValueError, Ticket.DoesNotExist): + return None, JsonResponse({"message": "Ticket not found."}, status=404) + + +# --------------------------------------------------------------------------- +# Auth +# --------------------------------------------------------------------------- + + +@csrf_exempt +@require_POST +def auth_validate(request): + """ + POST /auth/validate + + Validate the current API token and return user info + abilities. + """ + user = request.user + api_token = getattr(request, "api_token", None) + + return JsonResponse({ + "user": { + "id": user.pk, + "name": getattr(user, "get_full_name", lambda: str(user))(), + "email": getattr(user, "email", ""), + }, + "abilities": api_token.abilities if api_token else [], + "is_agent": is_agent(user), + "is_admin": is_admin(user), + "token_name": api_token.name if api_token else None, + "expires_at": ( + api_token.expires_at.isoformat() if api_token and api_token.expires_at else None + ), + }) + + +# --------------------------------------------------------------------------- +# Dashboard +# --------------------------------------------------------------------------- + + +@require_GET +def dashboard(request): + """ + GET /dashboard + + Return agent dashboard statistics. + """ + user_id = request.user.pk + today_start = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0) + week_start = today_start - timezone.timedelta(days=today_start.weekday()) + + stats = { + "open": Ticket.objects.open().count(), + "my_assigned": Ticket.objects.assigned_to(user_id).open().count(), + "unassigned": Ticket.objects.unassigned().open().count(), + "sla_breached": Ticket.objects.open().breached_sla().count(), + "resolved_today": Ticket.objects.filter( + resolved_at__gte=today_start + ).count(), + } + + recent_tickets = ( + Ticket.objects.select_related("assigned_to", "department") + .order_by("-created_at")[:10] + ) + + recent_tickets_data = [ + { + "id": t.pk, + "reference": t.reference, + "subject": t.subject, + "status": t.status, + "priority": t.priority, + "requester_name": t.requester_name, + "assignee_name": ( + getattr(t.assigned_to, "get_full_name", lambda: str(t.assigned_to))() + if t.assigned_to + else None + ), + "created_at": t.created_at.isoformat(), + } + for t in recent_tickets + ] + + # Needs attention: SLA breaching tickets + sla_breaching = Ticket.objects.open().breached_sla().select_related( + "assigned_to" + )[:5] + sla_breaching_data = [ + { + "reference": t.reference, + "subject": t.subject, + "priority": t.priority, + "requester_name": t.requester_name, + } + for t in sla_breaching + ] + + # Unassigned urgent tickets + unassigned_urgent = ( + Ticket.objects.unassigned() + .open() + .filter(priority__in=["urgent", "critical"])[:5] + ) + unassigned_urgent_data = [ + { + "reference": t.reference, + "subject": t.subject, + "priority": t.priority, + "requester_name": t.requester_name, + } + for t in unassigned_urgent + ] + + # My performance + my_resolved_this_week = Ticket.objects.assigned_to(user_id).filter( + resolved_at__gte=week_start + ).count() + + return JsonResponse({ + "stats": stats, + "recent_tickets": recent_tickets_data, + "needs_attention": { + "sla_breaching": sla_breaching_data, + "unassigned_urgent": unassigned_urgent_data, + }, + "my_performance": { + "resolved_this_week": my_resolved_this_week, + }, + }) + + +# --------------------------------------------------------------------------- +# Tickets +# --------------------------------------------------------------------------- + + +@require_GET +def ticket_list(request): + """ + GET /tickets + + List tickets with filtering, sorting, and pagination. + Query params: status, priority, department_id, assigned_to, unassigned, + search, sla_breached, following, sort_by, sort_dir, per_page, page + """ + tickets = Ticket.objects.select_related( + "assigned_to", "department" + ).prefetch_related("tags") + + # Filters + status = request.GET.get("status") + if status: + tickets = tickets.filter(status=status) + + priority = request.GET.get("priority") + if priority: + tickets = tickets.filter(priority=priority) + + department_id = request.GET.get("department_id") + if department_id: + tickets = tickets.filter(department_id=department_id) + + assigned_to = request.GET.get("assigned_to") + if assigned_to: + tickets = tickets.filter(assigned_to_id=assigned_to) + + unassigned = request.GET.get("unassigned") + if unassigned and unassigned.lower() in ("1", "true", "yes"): + tickets = tickets.filter(assigned_to__isnull=True) + + search = request.GET.get("search") + if search: + tickets = tickets.search(search) + + sla_breached = request.GET.get("sla_breached") + if sla_breached and sla_breached.lower() in ("1", "true", "yes"): + tickets = tickets.breached_sla() + + following = request.GET.get("following") + if following and following.lower() in ("1", "true", "yes"): + tickets = tickets.followed_by(request.user.pk) + + # Sorting + sort_by = request.GET.get("sort_by", "created_at") + sort_dir = request.GET.get("sort_dir", "desc") + allowed_sort_fields = [ + "created_at", "updated_at", "priority", "status", "subject", + ] + if sort_by in allowed_sort_fields: + order = f"-{sort_by}" if sort_dir == "desc" else sort_by + tickets = tickets.order_by(order) + + # Pagination + try: + per_page = min(int(request.GET.get("per_page", 25)), 100) + except (ValueError, TypeError): + per_page = 25 + + paginator = Paginator(tickets, per_page) + try: + page_num = int(request.GET.get("page", 1)) + except (ValueError, TypeError): + page_num = 1 + page = paginator.get_page(page_num) + + return JsonResponse({ + "data": ApiTicketCollectionSerializer.serialize_list(page.object_list), + "meta": { + "current_page": page.number, + "last_page": paginator.num_pages, + "per_page": per_page, + "total": paginator.count, + }, + }) + + +@require_GET +def ticket_show(request, reference): + """ + GET /tickets/ + + Return full ticket details with replies and activities. + """ + ticket, error = _resolve_ticket(reference) + if error: + return error + + return JsonResponse({ + "data": ApiTicketDetailSerializer.serialize(ticket), + }) + + +@csrf_exempt +@require_POST +def ticket_create(request): + """ + POST /tickets + + Create a new ticket. + + JSON body: + subject (required), description (required), priority, department_id, tags (array of IDs) + """ + data = _json_body(request) + + # Validation + subject = (data.get("subject") or "").strip() + description = (data.get("description") or "").strip() + + if not subject: + return JsonResponse( + {"message": "Validation failed.", "errors": {"subject": "Subject is required."}}, + status=422, + ) + if not description: + return JsonResponse( + {"message": "Validation failed.", "errors": {"description": "Description is required."}}, + status=422, + ) + + priority = data.get("priority", "medium") + valid_priorities = [p.value for p in Ticket.Priority] + if priority not in valid_priorities: + return JsonResponse( + {"message": "Validation failed.", "errors": {"priority": "Invalid priority."}}, + status=422, + ) + + service = TicketService() + ticket_data = { + "subject": subject, + "description": description, + "priority": priority, + "channel": "api", + } + + department_id = data.get("department_id") + if department_id: + ticket_data["department_id"] = department_id + + tag_ids = data.get("tags", []) + if tag_ids: + ticket_data["tag_ids"] = tag_ids + + ticket = service.create(request.user, ticket_data) + + # Reload with relations + ticket = Ticket.objects.select_related( + "assigned_to", "department" + ).prefetch_related("tags").get(pk=ticket.pk) + + return JsonResponse( + { + "data": ApiTicketDetailSerializer.serialize( + ticket, include_replies=False, include_activities=False + ), + "message": "Ticket created.", + }, + status=201, + ) + + +@csrf_exempt +@require_POST +def ticket_reply(request, reference): + """ + POST /tickets//reply + + Add a reply or internal note to a ticket. + + JSON body: + body (required), is_internal_note (optional bool) + """ + ticket, error = _resolve_ticket(reference) + if error: + return error + + data = _json_body(request) + body = (data.get("body") or "").strip() + if not body: + return JsonResponse( + {"message": "Validation failed.", "errors": {"body": "Body is required."}}, + status=422, + ) + + is_note = data.get("is_internal_note", False) + service = TicketService() + + if is_note: + reply = service.add_note(ticket, request.user, body) + else: + reply = service.reply(ticket, request.user, {"body": body}) + + user = request.user + return JsonResponse( + { + "data": { + "id": reply.pk, + "body": reply.body, + "is_internal_note": reply.is_internal_note, + "author": { + "id": user.pk, + "name": getattr(user, "get_full_name", lambda: str(user))(), + }, + "created_at": reply.created_at.isoformat(), + }, + "message": "Note added." if is_note else "Reply sent.", + }, + status=201, + ) + + +@csrf_exempt +@require_http_methods(["PATCH"]) +def ticket_status(request, reference): + """ + PATCH /tickets//status + + Update ticket status. + + JSON body: + status (required) + """ + ticket, error = _resolve_ticket(reference) + if error: + return error + + data = _json_body(request) + new_status = data.get("status") + valid_statuses = [s.value for s in Ticket.Status] + if new_status not in valid_statuses: + return JsonResponse( + {"message": "Validation failed.", "errors": {"status": "Invalid status."}}, + status=422, + ) + + service = TicketService() + service.change_status(ticket, request.user, new_status) + + return JsonResponse({"message": "Status updated.", "status": new_status}) + + +@csrf_exempt +@require_http_methods(["PATCH"]) +def ticket_priority(request, reference): + """ + PATCH /tickets//priority + + Update ticket priority. + + JSON body: + priority (required) + """ + ticket, error = _resolve_ticket(reference) + if error: + return error + + data = _json_body(request) + new_priority = data.get("priority") + valid_priorities = [p.value for p in Ticket.Priority] + if new_priority not in valid_priorities: + return JsonResponse( + {"message": "Validation failed.", "errors": {"priority": "Invalid priority."}}, + status=422, + ) + + service = TicketService() + service.change_priority(ticket, request.user, new_priority) + + return JsonResponse({"message": "Priority updated.", "priority": new_priority}) + + +@csrf_exempt +@require_POST +def ticket_assign(request, reference): + """ + POST /tickets//assign + + Assign ticket to an agent. + + JSON body: + agent_id (required, integer) + """ + ticket, error = _resolve_ticket(reference) + if error: + return error + + data = _json_body(request) + agent_id = data.get("agent_id") + if not agent_id: + return JsonResponse( + {"message": "Validation failed.", "errors": {"agent_id": "Agent ID is required."}}, + status=422, + ) + + try: + agent = User.objects.get(pk=int(agent_id)) + except (User.DoesNotExist, ValueError, TypeError): + return JsonResponse({"message": "Agent not found."}, status=404) + + service = TicketService() + service.assign(ticket, request.user, agent) + + return JsonResponse({"message": "Ticket assigned."}) + + +@csrf_exempt +@require_POST +def ticket_follow(request, reference): + """ + POST /tickets//follow + + Toggle follow/unfollow on a ticket. + """ + ticket, error = _resolve_ticket(reference) + if error: + return error + + user_id = request.user.pk + + if ticket.is_followed_by(user_id): + ticket.unfollow(user_id) + return JsonResponse({"message": "Unfollowed ticket.", "following": False}) + + ticket.follow(user_id) + return JsonResponse({"message": "Following ticket.", "following": True}) + + +@csrf_exempt +@require_POST +def ticket_apply_macro(request, reference): + """ + POST /tickets//macro + + Apply a macro to a ticket. + + JSON body: + macro_id (required, integer) + """ + ticket, error = _resolve_ticket(reference) + if error: + return error + + data = _json_body(request) + macro_id = data.get("macro_id") + if not macro_id: + return JsonResponse( + {"message": "Validation failed.", "errors": {"macro_id": "Macro ID is required."}}, + status=422, + ) + + try: + macro = Macro.objects.filter( + Q(is_shared=True) | Q(created_by=request.user) + ).get(pk=int(macro_id)) + except (Macro.DoesNotExist, ValueError, TypeError): + return JsonResponse({"message": "Macro not found."}, status=404) + + from escalated.services.macro_service import MacroService + + macro_service = MacroService() + macro_service.apply(macro, ticket, request.user) + + return JsonResponse({"message": f'Macro "{macro.name}" applied.'}) + + +@csrf_exempt +@require_POST +def ticket_tags(request, reference): + """ + POST /tickets//tags + + Sync tags on a ticket. Sends the desired list of tag IDs; + tags not in the list are removed, tags in the list are added. + + JSON body: + tag_ids (required, array of integers) + """ + ticket, error = _resolve_ticket(reference) + if error: + return error + + data = _json_body(request) + tag_ids = data.get("tag_ids") + if tag_ids is None or not isinstance(tag_ids, list): + return JsonResponse( + {"message": "Validation failed.", "errors": {"tag_ids": "tag_ids array is required."}}, + status=422, + ) + + new_tag_ids = set(int(t) for t in tag_ids) + current_tag_ids = set(ticket.tags.values_list("pk", flat=True)) + + to_add = list(new_tag_ids - current_tag_ids) + to_remove = list(current_tag_ids - new_tag_ids) + + service = TicketService() + if to_add: + service.add_tags(ticket, request.user, to_add) + if to_remove: + service.remove_tags(ticket, request.user, to_remove) + + return JsonResponse({"message": "Tags updated."}) + + +@csrf_exempt +@require_http_methods(["DELETE"]) +def ticket_destroy(request, reference): + """ + DELETE /tickets/ + + Delete (soft-delete) a ticket. + """ + ticket, error = _resolve_ticket(reference) + if error: + return error + + ticket.delete() + + return JsonResponse({"message": "Ticket deleted."}) + + +# --------------------------------------------------------------------------- +# Resources +# --------------------------------------------------------------------------- + + +@require_GET +def resource_agents(request): + """ + GET /agents + + List all agents (users who belong to active departments or are staff). + """ + users = User.objects.filter(is_active=True) + agents = [u for u in users if is_agent(u) or is_admin(u)] + + return JsonResponse({"data": ApiAgentSerializer.serialize_list(agents)}) + + +@require_GET +def resource_departments(request): + """ + GET /departments + + List all active departments. + """ + departments = Department.objects.filter(is_active=True) + return JsonResponse({"data": ApiDepartmentSerializer.serialize_list(departments)}) + + +@require_GET +def resource_tags(request): + """ + GET /tags + + List all tags. + """ + tags = Tag.objects.all() + return JsonResponse({"data": ApiTagSerializer.serialize_list(tags)}) + + +@require_GET +def resource_canned_responses(request): + """ + GET /canned-responses + + List canned responses available to the authenticated user. + """ + responses = CannedResponse.objects.filter( + Q(is_shared=True) | Q(created_by=request.user) + ) + return JsonResponse({"data": ApiCannedResponseSerializer.serialize_list(responses)}) + + +@require_GET +def resource_macros(request): + """ + GET /macros + + List macros available to the authenticated user. + """ + macros = Macro.objects.filter( + Q(is_shared=True) | Q(created_by=request.user) + ).order_by("order") + return JsonResponse({"data": ApiMacroSerializer.serialize_list(macros)}) + + +@require_GET +def resource_realtime_config(request): + """ + GET /realtime/config + + Return WebSocket/realtime configuration (if any). + """ + # Django doesn't have a standard broadcasting config, so return null. + # Consumers can override this in their project. + return JsonResponse(None, safe=False) diff --git a/tests/conftest.py b/tests/conftest.py index b33c195..cfb6308 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,4 @@ import pytest - from tests.factories import ( UserFactory, TicketFactory, @@ -9,9 +8,10 @@ SlaPolicyFactory, EscalationRuleFactory, CannedResponseFactory, + MacroFactory, + ApiTokenFactory, ) - @pytest.fixture def user(db): return UserFactory() @@ -63,3 +63,20 @@ def canned_response(db, agent_user): @pytest.fixture def escalation_rule(db): return EscalationRuleFactory() + + +@pytest.fixture +def macro(db, agent_user): + return MacroFactory(created_by=agent_user) + + +@pytest.fixture +def api_token(db, agent_user): + """Create an API token for an agent. The `plain_text` attr holds the raw string.""" + return ApiTokenFactory(user=agent_user) + + +@pytest.fixture +def admin_api_token(db, admin_user): + """Create an API token for an admin. The `plain_text` attr holds the raw string.""" + return ApiTokenFactory(user=admin_user) diff --git a/tests/factories.py b/tests/factories.py index c83d4eb..da9bdab 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -1,8 +1,12 @@ +import hashlib +import secrets + import factory from django.contrib.auth.models import User from django.contrib.contenttypes.models import ContentType from escalated.models import ( + ApiToken, Ticket, Reply, Tag, @@ -10,6 +14,7 @@ SlaPolicy, EscalationRule, CannedResponse, + Macro, ) @@ -138,3 +143,56 @@ class Meta: category = "general" created_by = factory.SubFactory(UserFactory) is_shared = True + + +class MacroFactory(factory.django.DjangoModelFactory): + class Meta: + model = Macro + + name = factory.Sequence(lambda n: f"Macro {n}") + description = factory.Faker("sentence") + actions = factory.LazyFunction(lambda: [{"type": "set_status", "value": "open"}]) + is_shared = True + order = factory.Sequence(lambda n: n) + created_by = factory.SubFactory(UserFactory) + + +class ApiTokenFactory(factory.django.DjangoModelFactory): + """ + Factory that creates an ApiToken with a known plain-text value. + + Usage: + token = ApiTokenFactory(user=some_user) + # token.plain_text is the raw token string + # token is the ApiToken model instance + """ + + class Meta: + model = ApiToken + exclude = ["user", "plain_text"] + + user = factory.SubFactory(UserFactory) + plain_text = factory.LazyFunction(lambda: secrets.token_hex(32)) + name = factory.Sequence(lambda n: f"Test Token {n}") + token = factory.LazyAttribute( + lambda o: hashlib.sha256(o.plain_text.encode()).hexdigest() + ) + abilities = factory.LazyFunction(lambda: ["*"]) + expires_at = None + + @classmethod + def _create(cls, model_class, *args, **kwargs): + user = kwargs.pop("user", None) + plain_text = kwargs.pop("plain_text", secrets.token_hex(32)) + + if user: + ct = ContentType.objects.get_for_model(user) + kwargs["tokenable_content_type"] = ct + kwargs["tokenable_object_id"] = user.pk + + kwargs["token"] = hashlib.sha256(plain_text.encode()).hexdigest() + + instance = super()._create(model_class, *args, **kwargs) + # Attach the plain_text for test usage + instance.plain_text = plain_text + return instance diff --git a/tests/integration/test_admin_api_tokens.py b/tests/integration/test_admin_api_tokens.py new file mode 100644 index 0000000..ca494ad --- /dev/null +++ b/tests/integration/test_admin_api_tokens.py @@ -0,0 +1,243 @@ +""" +Integration tests for the admin API token management views. +""" + +import json +from unittest.mock import patch, MagicMock + +import pytest +from django.test import RequestFactory + +from escalated.models import ApiToken +from escalated.views import admin_api_tokens +from tests.factories import ( + ApiTokenFactory, + DepartmentFactory, + UserFactory, +) + + +@pytest.fixture +def rf(): + return RequestFactory() + + +def _attach_session(request): + """Attach a mock session to the request.""" + from django.contrib.sessions.backends.db import SessionStore + request.session = SessionStore() + + +# --------------------------------------------------------------------------- +# Index +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestAdminApiTokensIndex: + @patch("escalated.views.admin_api_tokens.render") + def test_index_returns_tokens_for_admin(self, mock_render, rf): + admin = UserFactory(username="admin_idx", is_staff=True, is_superuser=True) + user = UserFactory(username="token_owner") + department = DepartmentFactory() + department.agents.add(user) + ApiTokenFactory(user=user) + ApiTokenFactory(user=user) + + mock_render.return_value = MagicMock(status_code=200) + + request = rf.get("/admin/api-tokens/") + request.user = admin + _attach_session(request) + + admin_api_tokens.api_tokens_index(request) + + mock_render.assert_called_once() + call_args = mock_render.call_args + props = call_args[1]["props"] if "props" in call_args[1] else call_args[0][2] + assert "tokens" in props + assert len(props["tokens"]) == 2 + + def test_index_forbidden_for_non_admin(self, rf): + user = UserFactory(username="nonadmin_idx") + + request = rf.get("/admin/api-tokens/") + request.user = user + _attach_session(request) + + response = admin_api_tokens.api_tokens_index(request) + assert response.status_code == 403 + + +# --------------------------------------------------------------------------- +# Create +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestAdminApiTokensCreate: + def test_create_returns_plain_text_token(self, rf): + admin = UserFactory(username="admin_create", is_staff=True, is_superuser=True) + user = UserFactory(username="create_owner") + + request = rf.post( + "/admin/api-tokens/create/", + data=json.dumps({ + "name": "Test Token", + "user_id": user.pk, + "abilities": ["agent"], + "expires_in_days": 30, + }), + content_type="application/json", + ) + request.user = admin + _attach_session(request) + + response = admin_api_tokens.api_tokens_create(request) + + assert response.status_code == 201 + data = json.loads(response.content) + assert "plain_text_token" in data + assert len(data["plain_text_token"]) == 64 + assert data["token"]["name"] == "Test Token" + + # Verify token exists in DB + assert ApiToken.objects.filter(name="Test Token").exists() + + def test_create_missing_name_returns_422(self, rf): + admin = UserFactory(username="admin_no_name", is_staff=True, is_superuser=True) + user = UserFactory(username="owner_no_name") + + request = rf.post( + "/admin/api-tokens/create/", + data=json.dumps({"user_id": user.pk}), + content_type="application/json", + ) + request.user = admin + _attach_session(request) + + response = admin_api_tokens.api_tokens_create(request) + assert response.status_code == 422 + + def test_create_missing_user_returns_422(self, rf): + admin = UserFactory(username="admin_no_user", is_staff=True, is_superuser=True) + + request = rf.post( + "/admin/api-tokens/create/", + data=json.dumps({"name": "Token"}), + content_type="application/json", + ) + request.user = admin + _attach_session(request) + + response = admin_api_tokens.api_tokens_create(request) + assert response.status_code == 422 + + def test_create_nonexistent_user_returns_404(self, rf): + admin = UserFactory(username="admin_bad_user", is_staff=True, is_superuser=True) + + request = rf.post( + "/admin/api-tokens/create/", + data=json.dumps({"name": "Token", "user_id": 99999}), + content_type="application/json", + ) + request.user = admin + _attach_session(request) + + response = admin_api_tokens.api_tokens_create(request) + assert response.status_code == 404 + + def test_create_forbidden_for_non_admin(self, rf): + user = UserFactory(username="nonadmin_create") + + request = rf.post( + "/admin/api-tokens/create/", + data=json.dumps({"name": "Token", "user_id": user.pk}), + content_type="application/json", + ) + request.user = user + _attach_session(request) + + response = admin_api_tokens.api_tokens_create(request) + assert response.status_code == 403 + + +# --------------------------------------------------------------------------- +# Update +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestAdminApiTokensUpdate: + def test_update_name_and_abilities(self, rf): + admin = UserFactory(username="admin_update", is_staff=True, is_superuser=True) + user = UserFactory(username="update_owner") + token = ApiTokenFactory(user=user, abilities=["agent"]) + + request = rf.post( + f"/admin/api-tokens/{token.pk}/update/", + data=json.dumps({ + "name": "Updated Name", + "abilities": ["agent", "admin"], + }), + content_type="application/json", + ) + request.user = admin + _attach_session(request) + + response = admin_api_tokens.api_tokens_update(request, token.pk) + + assert response.status_code == 200 + data = json.loads(response.content) + assert data["token"]["name"] == "Updated Name" + + token.refresh_from_db() + assert token.name == "Updated Name" + assert token.abilities == ["agent", "admin"] + + def test_update_not_found_returns_404(self, rf): + admin = UserFactory(username="admin_update_404", is_staff=True, is_superuser=True) + + request = rf.post( + "/admin/api-tokens/99999/update/", + data=json.dumps({"name": "X"}), + content_type="application/json", + ) + request.user = admin + _attach_session(request) + + response = admin_api_tokens.api_tokens_update(request, 99999) + assert response.status_code == 404 + + +# --------------------------------------------------------------------------- +# Destroy +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestAdminApiTokensDestroy: + def test_destroy_removes_token(self, rf): + admin = UserFactory(username="admin_destroy", is_staff=True, is_superuser=True) + user = UserFactory(username="destroy_owner") + token = ApiTokenFactory(user=user) + token_pk = token.pk + + request = rf.post(f"/admin/api-tokens/{token.pk}/delete/") + request.user = admin + _attach_session(request) + + response = admin_api_tokens.api_tokens_destroy(request, token_pk) + + assert response.status_code == 200 + assert not ApiToken.objects.filter(pk=token_pk).exists() + + def test_destroy_nonexistent_token_is_idempotent(self, rf): + admin = UserFactory(username="admin_destroy_ok", is_staff=True, is_superuser=True) + + request = rf.post("/admin/api-tokens/99999/delete/") + request.user = admin + _attach_session(request) + + response = admin_api_tokens.api_tokens_destroy(request, 99999) + assert response.status_code == 200 diff --git a/tests/integration/test_api_views.py b/tests/integration/test_api_views.py new file mode 100644 index 0000000..8d182a4 --- /dev/null +++ b/tests/integration/test_api_views.py @@ -0,0 +1,859 @@ +""" +Integration tests for the Escalated REST API views. + +These tests exercise the views directly using RequestFactory, +simulating authenticated API requests by pre-setting request.user +and request.api_token (as the middleware would do). +""" + +import json +from unittest.mock import patch, MagicMock + +import pytest +from django.test import RequestFactory +from django.utils import timezone + +from escalated.models import ( + ApiToken, + CannedResponse, + Department, + Macro, + Tag, + Ticket, +) +from escalated.views import api +from tests.factories import ( + ApiTokenFactory, + CannedResponseFactory, + DepartmentFactory, + MacroFactory, + ReplyFactory, + TagFactory, + TicketFactory, + UserFactory, +) + + +@pytest.fixture +def rf(): + return RequestFactory() + + +def _api_get(rf, path, user, api_token, query_params=None): + """Create a GET request simulating API authentication.""" + request = rf.get(path, data=query_params or {}) + request.user = user + request.api_token = api_token + return request + + +def _api_post(rf, path, user, api_token, data=None): + """Create a POST request with JSON body simulating API authentication.""" + body = json.dumps(data or {}) + request = rf.post( + path, + data=body, + content_type="application/json", + ) + request.user = user + request.api_token = api_token + return request + + +def _api_patch(rf, path, user, api_token, data=None): + """Create a PATCH request with JSON body simulating API authentication.""" + body = json.dumps(data or {}) + request = rf.patch( + path, + data=body, + content_type="application/json", + ) + request.user = user + request.api_token = api_token + return request + + +def _api_delete(rf, path, user, api_token): + """Create a DELETE request simulating API authentication.""" + request = rf.delete(path) + request.user = user + request.api_token = api_token + return request + + +# --------------------------------------------------------------------------- +# Auth +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiAuthValidate: + def test_validate_returns_user_info(self, rf): + user = UserFactory(username="auth_validate_user", is_staff=True) + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user, abilities=["agent", "admin"]) + + request = _api_post(rf, "/api/auth/validate/", user, token) + response = api.auth_validate(request) + + assert response.status_code == 200 + data = json.loads(response.content) + assert data["user"]["id"] == user.pk + assert data["user"]["email"] == user.email + assert data["abilities"] == ["agent", "admin"] + assert data["token_name"] == token.name + assert data["is_agent"] is True + assert data["is_admin"] is True + + +# --------------------------------------------------------------------------- +# Dashboard +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiDashboard: + def test_dashboard_returns_stats(self, rf): + user = UserFactory(username="dash_user") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + # Create some tickets + TicketFactory(status=Ticket.Status.OPEN) + TicketFactory(status=Ticket.Status.OPEN, assigned_to=user) + TicketFactory(status=Ticket.Status.OPEN, assigned_to=None) + + request = _api_get(rf, "/api/dashboard/", user, token) + response = api.dashboard(request) + + assert response.status_code == 200 + data = json.loads(response.content) + assert "stats" in data + assert "open" in data["stats"] + assert "my_assigned" in data["stats"] + assert "unassigned" in data["stats"] + assert "sla_breached" in data["stats"] + assert "resolved_today" in data["stats"] + assert "recent_tickets" in data + assert "needs_attention" in data + assert "my_performance" in data + + +# --------------------------------------------------------------------------- +# Tickets - List +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiTicketList: + def test_ticket_list_returns_paginated_data(self, rf): + user = UserFactory(username="list_user") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + for _ in range(3): + TicketFactory() + + request = _api_get(rf, "/api/tickets/", user, token) + response = api.ticket_list(request) + + assert response.status_code == 200 + data = json.loads(response.content) + assert "data" in data + assert "meta" in data + assert data["meta"]["total"] == 3 + assert len(data["data"]) == 3 + + def test_ticket_list_filter_by_status(self, rf): + user = UserFactory(username="list_status") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + TicketFactory(status=Ticket.Status.OPEN) + TicketFactory(status=Ticket.Status.CLOSED) + + request = _api_get( + rf, "/api/tickets/", user, token, {"status": "open"} + ) + response = api.ticket_list(request) + + data = json.loads(response.content) + assert data["meta"]["total"] == 1 + assert data["data"][0]["status"] == "open" + + def test_ticket_list_filter_by_priority(self, rf): + user = UserFactory(username="list_priority") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + TicketFactory(priority=Ticket.Priority.HIGH) + TicketFactory(priority=Ticket.Priority.LOW) + + request = _api_get( + rf, "/api/tickets/", user, token, {"priority": "high"} + ) + response = api.ticket_list(request) + + data = json.loads(response.content) + assert data["meta"]["total"] == 1 + assert data["data"][0]["priority"] == "high" + + def test_ticket_list_filter_by_search(self, rf): + user = UserFactory(username="list_search") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + TicketFactory(subject="Payment refund issue") + TicketFactory(subject="General question") + + request = _api_get( + rf, "/api/tickets/", user, token, {"search": "Payment"} + ) + response = api.ticket_list(request) + + data = json.loads(response.content) + assert data["meta"]["total"] == 1 + + def test_ticket_list_pagination(self, rf): + user = UserFactory(username="list_page") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + for _ in range(30): + TicketFactory() + + request = _api_get( + rf, "/api/tickets/", user, token, {"per_page": "10", "page": "2"} + ) + response = api.ticket_list(request) + + data = json.loads(response.content) + assert data["meta"]["per_page"] == 10 + assert data["meta"]["current_page"] == 2 + assert len(data["data"]) == 10 + + +# --------------------------------------------------------------------------- +# Tickets - Show +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiTicketShow: + def test_ticket_show_by_reference(self, rf): + user = UserFactory(username="show_ref") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory(requester=user) + + request = _api_get(rf, f"/api/tickets/{ticket.reference}/", user, token) + response = api.ticket_show(request, ticket.reference) + + assert response.status_code == 200 + data = json.loads(response.content) + assert data["data"]["reference"] == ticket.reference + assert data["data"]["subject"] == ticket.subject + + def test_ticket_show_by_id(self, rf): + user = UserFactory(username="show_id") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + + request = _api_get(rf, f"/api/tickets/{ticket.pk}/", user, token) + response = api.ticket_show(request, str(ticket.pk)) + + assert response.status_code == 200 + data = json.loads(response.content) + assert data["data"]["id"] == ticket.pk + + def test_ticket_show_not_found(self, rf): + user = UserFactory(username="show_404") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + request = _api_get(rf, "/api/tickets/NONEXIST/", user, token) + response = api.ticket_show(request, "NONEXIST") + + assert response.status_code == 404 + + def test_ticket_show_includes_replies(self, rf): + user = UserFactory(username="show_replies") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + ReplyFactory(ticket=ticket, author=user, body="Test reply") + + request = _api_get(rf, f"/api/tickets/{ticket.reference}/", user, token) + response = api.ticket_show(request, ticket.reference) + + data = json.loads(response.content) + assert "replies" in data["data"] + assert len(data["data"]["replies"]) == 1 + assert data["data"]["replies"][0]["body"] == "Test reply" + + +# --------------------------------------------------------------------------- +# Tickets - Create +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiTicketCreate: + @patch("escalated.views.api.TicketService") + def test_ticket_create_success(self, MockService, rf): + user = UserFactory(username="create_user") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + # Create real ticket to return + ticket = TicketFactory( + requester=user, subject="API Test", description="API Description" + ) + + mock_svc = MagicMock() + mock_svc.create.return_value = ticket + MockService.return_value = mock_svc + + request = _api_post(rf, "/api/tickets/create/", user, token, { + "subject": "API Test", + "description": "API Description", + "priority": "high", + }) + response = api.ticket_create(request) + + assert response.status_code == 201 + data = json.loads(response.content) + assert data["message"] == "Ticket created." + assert "data" in data + + def test_ticket_create_missing_subject_returns_422(self, rf): + user = UserFactory(username="create_no_subject") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + request = _api_post(rf, "/api/tickets/create/", user, token, { + "description": "Some desc", + }) + response = api.ticket_create(request) + + assert response.status_code == 422 + data = json.loads(response.content) + assert "subject" in data["errors"] + + def test_ticket_create_missing_description_returns_422(self, rf): + user = UserFactory(username="create_no_desc") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + request = _api_post(rf, "/api/tickets/create/", user, token, { + "subject": "Subject only", + }) + response = api.ticket_create(request) + + assert response.status_code == 422 + data = json.loads(response.content) + assert "description" in data["errors"] + + def test_ticket_create_invalid_priority_returns_422(self, rf): + user = UserFactory(username="create_bad_prio") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + request = _api_post(rf, "/api/tickets/create/", user, token, { + "subject": "Test", + "description": "Test", + "priority": "invalid_priority", + }) + response = api.ticket_create(request) + + assert response.status_code == 422 + data = json.loads(response.content) + assert "priority" in data["errors"] + + +# --------------------------------------------------------------------------- +# Tickets - Reply +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiTicketReply: + @patch("escalated.views.api.TicketService") + def test_ticket_reply_success(self, MockService, rf): + user = UserFactory(username="reply_user") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + reply = ReplyFactory(ticket=ticket, author=user, body="Reply body") + + mock_svc = MagicMock() + mock_svc.reply.return_value = reply + MockService.return_value = mock_svc + + request = _api_post( + rf, f"/api/tickets/{ticket.reference}/reply/", user, token, + {"body": "Reply body"}, + ) + response = api.ticket_reply(request, ticket.reference) + + assert response.status_code == 201 + data = json.loads(response.content) + assert data["message"] == "Reply sent." + assert data["data"]["body"] == "Reply body" + + def test_ticket_reply_missing_body_returns_422(self, rf): + user = UserFactory(username="reply_no_body") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + + request = _api_post( + rf, f"/api/tickets/{ticket.reference}/reply/", user, token, {} + ) + response = api.ticket_reply(request, ticket.reference) + + assert response.status_code == 422 + + def test_ticket_reply_not_found(self, rf): + user = UserFactory(username="reply_404") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + request = _api_post( + rf, "/api/tickets/NONEXIST/reply/", user, token, + {"body": "Reply"}, + ) + response = api.ticket_reply(request, "NONEXIST") + + assert response.status_code == 404 + + +# --------------------------------------------------------------------------- +# Tickets - Status +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiTicketStatus: + @patch("escalated.views.api.TicketService") + def test_ticket_status_update(self, MockService, rf): + user = UserFactory(username="status_user") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory(status=Ticket.Status.OPEN) + + mock_svc = MagicMock() + MockService.return_value = mock_svc + + request = _api_patch( + rf, f"/api/tickets/{ticket.reference}/status/", user, token, + {"status": "in_progress"}, + ) + response = api.ticket_status(request, ticket.reference) + + assert response.status_code == 200 + data = json.loads(response.content) + assert data["message"] == "Status updated." + assert data["status"] == "in_progress" + + def test_ticket_status_invalid_returns_422(self, rf): + user = UserFactory(username="status_invalid") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + + request = _api_patch( + rf, f"/api/tickets/{ticket.reference}/status/", user, token, + {"status": "nonexistent"}, + ) + response = api.ticket_status(request, ticket.reference) + + assert response.status_code == 422 + + +# --------------------------------------------------------------------------- +# Tickets - Priority +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiTicketPriority: + @patch("escalated.views.api.TicketService") + def test_ticket_priority_update(self, MockService, rf): + user = UserFactory(username="priority_user") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory(priority=Ticket.Priority.MEDIUM) + + mock_svc = MagicMock() + MockService.return_value = mock_svc + + request = _api_patch( + rf, f"/api/tickets/{ticket.reference}/priority/", user, token, + {"priority": "urgent"}, + ) + response = api.ticket_priority(request, ticket.reference) + + assert response.status_code == 200 + data = json.loads(response.content) + assert data["priority"] == "urgent" + + def test_ticket_priority_invalid_returns_422(self, rf): + user = UserFactory(username="prio_invalid") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + + request = _api_patch( + rf, f"/api/tickets/{ticket.reference}/priority/", user, token, + {"priority": "super_duper"}, + ) + response = api.ticket_priority(request, ticket.reference) + + assert response.status_code == 422 + + +# --------------------------------------------------------------------------- +# Tickets - Assign +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiTicketAssign: + @patch("escalated.views.api.TicketService") + def test_ticket_assign_success(self, MockService, rf): + user = UserFactory(username="assign_user") + agent = UserFactory(username="assign_agent") + department = DepartmentFactory() + department.agents.add(user, agent) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + + mock_svc = MagicMock() + MockService.return_value = mock_svc + + request = _api_post( + rf, f"/api/tickets/{ticket.reference}/assign/", user, token, + {"agent_id": agent.pk}, + ) + response = api.ticket_assign(request, ticket.reference) + + assert response.status_code == 200 + data = json.loads(response.content) + assert data["message"] == "Ticket assigned." + + def test_ticket_assign_missing_agent_id_returns_422(self, rf): + user = UserFactory(username="assign_missing") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + + request = _api_post( + rf, f"/api/tickets/{ticket.reference}/assign/", user, token, {} + ) + response = api.ticket_assign(request, ticket.reference) + + assert response.status_code == 422 + + def test_ticket_assign_agent_not_found_returns_404(self, rf): + user = UserFactory(username="assign_404") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + + request = _api_post( + rf, f"/api/tickets/{ticket.reference}/assign/", user, token, + {"agent_id": 99999}, + ) + response = api.ticket_assign(request, ticket.reference) + + assert response.status_code == 404 + + +# --------------------------------------------------------------------------- +# Tickets - Follow +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiTicketFollow: + def test_ticket_follow_toggles(self, rf): + user = UserFactory(username="follow_user") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + + # Follow + request = _api_post( + rf, f"/api/tickets/{ticket.reference}/follow/", user, token + ) + response = api.ticket_follow(request, ticket.reference) + + data = json.loads(response.content) + assert data["following"] is True + + # Unfollow + request = _api_post( + rf, f"/api/tickets/{ticket.reference}/follow/", user, token + ) + response = api.ticket_follow(request, ticket.reference) + + data = json.loads(response.content) + assert data["following"] is False + + +# --------------------------------------------------------------------------- +# Tickets - Tags +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiTicketTags: + @patch("escalated.views.api.TicketService") + def test_ticket_tags_sync(self, MockService, rf): + user = UserFactory(username="tags_user") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + tag1 = TagFactory(name="Bug", slug="bug") + tag2 = TagFactory(name="Feature", slug="feature") + + mock_svc = MagicMock() + MockService.return_value = mock_svc + + request = _api_post( + rf, f"/api/tickets/{ticket.reference}/tags/", user, token, + {"tag_ids": [tag1.pk, tag2.pk]}, + ) + response = api.ticket_tags(request, ticket.reference) + + assert response.status_code == 200 + data = json.loads(response.content) + assert data["message"] == "Tags updated." + + def test_ticket_tags_missing_tag_ids_returns_422(self, rf): + user = UserFactory(username="tags_missing") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + + request = _api_post( + rf, f"/api/tickets/{ticket.reference}/tags/", user, token, {} + ) + response = api.ticket_tags(request, ticket.reference) + + assert response.status_code == 422 + + +# --------------------------------------------------------------------------- +# Tickets - Macro +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiTicketMacro: + def test_ticket_macro_not_found_returns_404(self, rf): + user = UserFactory(username="macro_404") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + + request = _api_post( + rf, f"/api/tickets/{ticket.reference}/macro/", user, token, + {"macro_id": 99999}, + ) + response = api.ticket_apply_macro(request, ticket.reference) + + assert response.status_code == 404 + + def test_ticket_macro_missing_id_returns_422(self, rf): + user = UserFactory(username="macro_missing") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + + request = _api_post( + rf, f"/api/tickets/{ticket.reference}/macro/", user, token, {} + ) + response = api.ticket_apply_macro(request, ticket.reference) + + assert response.status_code == 422 + + +# --------------------------------------------------------------------------- +# Tickets - Delete +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiTicketDelete: + def test_ticket_delete_success(self, rf): + user = UserFactory(username="delete_user") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + ticket = TicketFactory() + ticket_pk = ticket.pk + + request = _api_delete( + rf, f"/api/tickets/{ticket.reference}/delete/", user, token + ) + response = api.ticket_destroy(request, ticket.reference) + + assert response.status_code == 200 + data = json.loads(response.content) + assert data["message"] == "Ticket deleted." + assert not Ticket.objects.filter(pk=ticket_pk).exists() + + def test_ticket_delete_not_found(self, rf): + user = UserFactory(username="delete_404") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + request = _api_delete( + rf, "/api/tickets/NONEXIST/delete/", user, token + ) + response = api.ticket_destroy(request, "NONEXIST") + + assert response.status_code == 404 + + +# --------------------------------------------------------------------------- +# Resources +# --------------------------------------------------------------------------- + + +@pytest.mark.django_db +class TestApiResources: + def test_resource_agents(self, rf): + user = UserFactory(username="res_agents", is_staff=True) + token = ApiTokenFactory(user=user) + + department = DepartmentFactory() + agent1 = UserFactory(username="agent_res1") + department.agents.add(agent1) + + request = _api_get(rf, "/api/agents/", user, token) + response = api.resource_agents(request) + + assert response.status_code == 200 + data = json.loads(response.content) + assert "data" in data + # Should include at least the agent and the staff user + assert len(data["data"]) >= 1 + + def test_resource_departments(self, rf): + user = UserFactory(username="res_depts") + token = ApiTokenFactory(user=user) + + DepartmentFactory(name="Support", slug="support") + DepartmentFactory(name="Sales", slug="sales") + + request = _api_get(rf, "/api/departments/", user, token) + response = api.resource_departments(request) + + assert response.status_code == 200 + data = json.loads(response.content) + assert len(data["data"]) >= 2 + + def test_resource_tags(self, rf): + user = UserFactory(username="res_tags") + token = ApiTokenFactory(user=user) + + TagFactory(name="Bug", slug="bug-res") + TagFactory(name="Feature", slug="feature-res") + + request = _api_get(rf, "/api/tags/", user, token) + response = api.resource_tags(request) + + assert response.status_code == 200 + data = json.loads(response.content) + assert len(data["data"]) >= 2 + + def test_resource_canned_responses(self, rf): + user = UserFactory(username="res_canned") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + CannedResponseFactory(created_by=user, title="Hello") + + request = _api_get(rf, "/api/canned-responses/", user, token) + response = api.resource_canned_responses(request) + + assert response.status_code == 200 + data = json.loads(response.content) + assert len(data["data"]) >= 1 + + def test_resource_macros(self, rf): + user = UserFactory(username="res_macros") + department = DepartmentFactory() + department.agents.add(user) + token = ApiTokenFactory(user=user) + + MacroFactory(created_by=user, name="Close and tag") + + request = _api_get(rf, "/api/macros/", user, token) + response = api.resource_macros(request) + + assert response.status_code == 200 + data = json.loads(response.content) + assert len(data["data"]) >= 1 + + def test_resource_realtime_config(self, rf): + user = UserFactory(username="res_realtime") + token = ApiTokenFactory(user=user) + + request = _api_get(rf, "/api/realtime/config/", user, token) + response = api.resource_realtime_config(request) + + assert response.status_code == 200 diff --git a/tests/settings.py b/tests/settings.py index f914aa8..030c12a 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -17,6 +17,12 @@ AUTH_USER_MODEL = "auth.User" ROOT_URLCONF = "tests.urls" +CACHES = { + "default": { + "BACKEND": "django.core.cache.backends.locmem.LocMemCache", + } +} + ESCALATED = { "MODE": "self_hosted", "TABLE_PREFIX": "escalated_", @@ -31,6 +37,10 @@ }, "NOTIFICATION_CHANNELS": [], "WEBHOOK_URL": None, + "API_ENABLED": True, + "API_RATE_LIMIT": 60, + "API_TOKEN_EXPIRY_DAYS": None, + "API_PREFIX": "support/api/v1", } TEMPLATES = [ diff --git a/tests/unit/test_api_middleware.py b/tests/unit/test_api_middleware.py new file mode 100644 index 0000000..6eefcef --- /dev/null +++ b/tests/unit/test_api_middleware.py @@ -0,0 +1,207 @@ +""" +Unit tests for the API authentication and rate limit middleware. +""" + +import json +from datetime import timedelta +from unittest.mock import MagicMock, patch + +import pytest +from django.test import RequestFactory +from django.utils import timezone + +from escalated.api_middleware import AuthenticateApiToken, ApiRateLimit +from escalated.models import ApiToken +from tests.factories import UserFactory, DepartmentFactory, ApiTokenFactory + + +@pytest.fixture +def rf(): + return RequestFactory() + + +def _make_middleware_pair(): + """Create instances of both middleware.""" + auth = AuthenticateApiToken(lambda r: MagicMock(status_code=200)) + rate = ApiRateLimit(lambda r: MagicMock(status_code=200)) + return auth, rate + + +@pytest.mark.django_db +class TestAuthenticateApiToken: + def test_missing_authorization_header_returns_401(self, rf): + auth, _ = _make_middleware_pair() + request = rf.get("/api/test/") + + response = auth.process_view(request, None, [], {}) + assert response is not None + assert response.status_code == 401 + data = json.loads(response.content) + assert data["message"] == "Unauthenticated." + + def test_non_bearer_authorization_returns_401(self, rf): + auth, _ = _make_middleware_pair() + request = rf.get("/api/test/", HTTP_AUTHORIZATION="Basic abc123") + + response = auth.process_view(request, None, [], {}) + assert response is not None + assert response.status_code == 401 + + def test_empty_bearer_token_returns_401(self, rf): + auth, _ = _make_middleware_pair() + request = rf.get("/api/test/", HTTP_AUTHORIZATION="Bearer ") + + response = auth.process_view(request, None, [], {}) + assert response is not None + assert response.status_code == 401 + + def test_invalid_token_returns_401(self, rf): + auth, _ = _make_middleware_pair() + request = rf.get( + "/api/test/", + HTTP_AUTHORIZATION="Bearer invalid_token_value", + ) + + response = auth.process_view(request, None, [], {}) + assert response is not None + assert response.status_code == 401 + data = json.loads(response.content) + assert data["message"] == "Invalid token." + + def test_expired_token_returns_401(self, rf): + user = UserFactory(username="expired_mw") + department = DepartmentFactory() + department.agents.add(user) + + result = ApiToken.create_token( + user, "Expired", + expires_at=timezone.now() - timedelta(days=1), + ) + + auth, _ = _make_middleware_pair() + request = rf.get( + "/api/test/", + HTTP_AUTHORIZATION=f"Bearer {result['plain_text_token']}", + ) + + response = auth.process_view(request, None, [], {}) + assert response is not None + assert response.status_code == 401 + data = json.loads(response.content) + assert data["message"] == "Token has expired." + + def test_valid_token_sets_user_and_returns_none(self, rf): + user = UserFactory(username="valid_mw") + department = DepartmentFactory() + department.agents.add(user) + + result = ApiToken.create_token(user, "Valid") + + auth, _ = _make_middleware_pair() + request = rf.get( + "/api/test/", + HTTP_AUTHORIZATION=f"Bearer {result['plain_text_token']}", + ) + + response = auth.process_view(request, None, [], {}) + assert response is None # Middleware passes through + assert request.user == user + assert request.api_token.pk == result["token"].pk + + def test_valid_token_updates_last_used(self, rf): + user = UserFactory(username="used_mw") + department = DepartmentFactory() + department.agents.add(user) + + result = ApiToken.create_token(user, "Used") + assert result["token"].last_used_at is None + + auth, _ = _make_middleware_pair() + request = rf.get( + "/api/test/", + HTTP_AUTHORIZATION=f"Bearer {result['plain_text_token']}", + ) + + auth.process_view(request, None, [], {}) + + result["token"].refresh_from_db() + assert result["token"].last_used_at is not None + + def test_token_owner_deleted_returns_401(self, rf): + user = UserFactory(username="deleted_owner") + result = ApiToken.create_token(user, "Orphan") + + # Delete the user — the token remains in DB + user.delete() + + auth, _ = _make_middleware_pair() + request = rf.get( + "/api/test/", + HTTP_AUTHORIZATION=f"Bearer {result['plain_text_token']}", + ) + + response = auth.process_view(request, None, [], {}) + assert response is not None + assert response.status_code == 401 + data = json.loads(response.content) + assert data["message"] == "Token owner not found." + + +@pytest.mark.django_db +class TestApiRateLimit: + def test_rate_limit_allows_requests_under_limit(self, rf): + _, rate = _make_middleware_pair() + user = UserFactory(username="rate_ok") + token = ApiTokenFactory(user=user) + + request = rf.get("/api/test/") + request.api_token = token + + # Should pass through (return None) + response = rate.process_view(request, None, [], {}) + assert response is None + + @patch("escalated.api_middleware.get_setting") + def test_rate_limit_blocks_when_exceeded(self, mock_setting, rf): + mock_setting.return_value = 2 # Only 2 requests/min + + _, rate = _make_middleware_pair() + user = UserFactory(username="rate_block") + token = ApiTokenFactory(user=user) + + from django.core.cache import cache + cache.clear() + + # Set cache to already have 2 hits + cache.set(f"escalated_api:{token.pk}", 2, 60) + + request = rf.get("/api/test/") + request.api_token = token + + response = rate.process_view(request, None, [], {}) + assert response is not None + assert response.status_code == 429 + data = json.loads(response.content) + assert data["message"] == "Too many requests." + assert "Retry-After" in response + + def test_rate_limit_adds_headers_to_response(self, rf): + _, rate = _make_middleware_pair() + user = UserFactory(username="rate_headers") + token = ApiTokenFactory(user=user) + + from django.core.cache import cache + cache.clear() + + request = rf.get("/api/test/") + request.api_token = token + + mock_response = MagicMock() + mock_response.__setitem__ = MagicMock() + mock_response.__getitem__ = MagicMock() + + result = rate.process_response(request, mock_response) + # Should have set X-RateLimit-Limit and X-RateLimit-Remaining + calls = {c[0][0] for c in mock_response.__setitem__.call_args_list} + assert "X-RateLimit-Limit" in calls + assert "X-RateLimit-Remaining" in calls diff --git a/tests/unit/test_api_token_model.py b/tests/unit/test_api_token_model.py new file mode 100644 index 0000000..9589f46 --- /dev/null +++ b/tests/unit/test_api_token_model.py @@ -0,0 +1,144 @@ +""" +Unit tests for the ApiToken model. +""" + +import hashlib +import secrets +from datetime import timedelta + +import pytest +from django.utils import timezone + +from escalated.models import ApiToken +from tests.factories import UserFactory, ApiTokenFactory + + +@pytest.mark.django_db +class TestApiTokenModel: + def test_create_token_returns_model_and_plain_text(self): + user = UserFactory(username="token_user") + result = ApiToken.create_token(user, "My Token") + + assert "token" in result + assert "plain_text_token" in result + assert isinstance(result["token"], ApiToken) + assert len(result["plain_text_token"]) == 64 # hex(32) = 64 chars + + def test_create_token_hashes_with_sha256(self): + user = UserFactory(username="hash_user") + result = ApiToken.create_token(user, "Hash Test") + + plain = result["plain_text_token"] + expected_hash = hashlib.sha256(plain.encode()).hexdigest() + assert result["token"].token == expected_hash + + def test_create_token_stores_abilities(self): + user = UserFactory(username="abilities_user") + result = ApiToken.create_token(user, "Abilities Test", abilities=["agent", "admin"]) + + assert result["token"].abilities == ["agent", "admin"] + + def test_create_token_default_abilities_is_wildcard(self): + user = UserFactory(username="default_abilities") + result = ApiToken.create_token(user, "Default") + + assert result["token"].abilities == ["*"] + + def test_create_token_with_expiry(self): + user = UserFactory(username="expiry_user") + expires_at = timezone.now() + timedelta(days=30) + result = ApiToken.create_token(user, "Expiry Test", expires_at=expires_at) + + assert result["token"].expires_at is not None + assert result["token"].is_expired is False + + def test_find_by_plain_text_success(self): + user = UserFactory(username="find_user") + result = ApiToken.create_token(user, "Find Test") + + found = ApiToken.find_by_plain_text(result["plain_text_token"]) + assert found is not None + assert found.pk == result["token"].pk + + def test_find_by_plain_text_returns_none_for_invalid(self): + found = ApiToken.find_by_plain_text("nonexistent_token_value") + assert found is None + + def test_has_ability_wildcard(self): + user = UserFactory(username="wildcard_user") + result = ApiToken.create_token(user, "Wildcard", abilities=["*"]) + + assert result["token"].has_ability("agent") is True + assert result["token"].has_ability("admin") is True + assert result["token"].has_ability("anything") is True + + def test_has_ability_specific(self): + user = UserFactory(username="specific_user") + result = ApiToken.create_token(user, "Specific", abilities=["agent"]) + + assert result["token"].has_ability("agent") is True + assert result["token"].has_ability("admin") is False + + def test_has_ability_empty(self): + user = UserFactory(username="empty_abilities") + result = ApiToken.create_token(user, "Empty", abilities=[]) + + assert result["token"].has_ability("agent") is False + + def test_is_expired_false_when_no_expiry(self): + user = UserFactory(username="no_expiry") + result = ApiToken.create_token(user, "No Expiry") + + assert result["token"].is_expired is False + + def test_is_expired_false_when_future_expiry(self): + user = UserFactory(username="future_expiry") + expires_at = timezone.now() + timedelta(days=30) + result = ApiToken.create_token(user, "Future", expires_at=expires_at) + + assert result["token"].is_expired is False + + def test_is_expired_true_when_past_expiry(self): + user = UserFactory(username="past_expiry") + expires_at = timezone.now() - timedelta(days=1) + result = ApiToken.create_token(user, "Past", expires_at=expires_at) + + assert result["token"].is_expired is True + + def test_active_queryset(self): + user = UserFactory(username="qs_active") + result_active = ApiToken.create_token(user, "Active") + result_expired = ApiToken.create_token( + user, "Expired", + expires_at=timezone.now() - timedelta(days=1), + ) + + active = ApiToken.objects.active() + assert result_active["token"] in active + assert result_expired["token"] not in active + + def test_expired_queryset(self): + user = UserFactory(username="qs_expired") + result_active = ApiToken.create_token(user, "Active") + result_expired = ApiToken.create_token( + user, "Expired", + expires_at=timezone.now() - timedelta(days=1), + ) + + expired = ApiToken.objects.expired() + assert result_expired["token"] in expired + assert result_active["token"] not in expired + + def test_tokenable_resolves_to_user(self): + user = UserFactory(username="tokenable_user") + result = ApiToken.create_token(user, "Tokenable Test") + + token = result["token"] + assert token.tokenable == user + assert token.tokenable.pk == user.pk + + def test_str_representation(self): + user = UserFactory(username="str_user") + result = ApiToken.create_token(user, "My Token Name") + + assert "My Token Name" in str(result["token"])