diff --git a/.gitignore b/.gitignore index c0782126f..5da21cb23 100644 --- a/.gitignore +++ b/.gitignore @@ -129,7 +129,6 @@ celerybeat.pid # Environments .env* !.env.sample -.venv .blog_env/ env/ @@ -173,3 +172,12 @@ cython_debug/ #.idea/ alembic/versions + +# Ignore environment and virtual environment files +.env +.env.* +.ven +.venv +env +alembic/env.py + diff --git a/README.md b/README.md index 65fc8b168..438708894 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ GRANT ALL PRIVILEGES ON DATABASE hng_fast_api TO user; **Starting the database** after cloning the database, dont run -`alembic revision --autogenerate -m 'initial migration'` +`alembic revision --autogenerate -m al'initial migration'` but run `alembic upgrade head` diff --git a/api/v1/routes/activity_logs.py b/api/v1/routes/activity_logs.py index 51017efa5..71b9efa73 100644 --- a/api/v1/routes/activity_logs.py +++ b/api/v1/routes/activity_logs.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, Depends, status, HTTPException +from fastapi import APIRouter, Depends, status, HTTPException, Query from fastapi.encoders import jsonable_encoder from sqlalchemy.orm import Session from api.v1.models.user import User @@ -30,11 +30,17 @@ async def create_activity_log( ) + @activity_logs.get("", response_model=list[ActivityLogResponse]) -async def get_all_activity_logs(current_user: User = Depends(user_service.get_current_super_admin), db: Session = Depends(get_db)): - '''Get all activity logs''' +async def get_all_activity_logs( + page: int = Query(1, ge=1), + limit: int = Query(10, le=100), + current_user: User = Depends(user_service.get_current_super_admin), + db: Session = Depends(get_db) + ): - activity_logs = activity_log_service.fetch_all(db=db) + """Get paginated activity logs""" + activity_logs = activity_log_service.fetch_all(db=db, page=page, limit=limit) return success_response( status_code=200, @@ -45,6 +51,8 @@ async def get_all_activity_logs(current_user: User = Depends(user_service.get_cu @activity_logs.get("/{user_id}", status_code=status.HTTP_200_OK) async def fetch_all_users_activity_log( user_id: str, + page: int = Query(1, ge=1), + limit: int = Query(10, le=100), db: Session = Depends(get_db), current_user: User = Depends(user_service.get_current_super_admin) ): diff --git a/api/v1/services/activity_logs.py b/api/v1/services/activity_logs.py index bca5dca6d..b8b44c392 100644 --- a/api/v1/services/activity_logs.py +++ b/api/v1/services/activity_logs.py @@ -1,39 +1,44 @@ from sqlalchemy.orm import Session from fastapi import HTTPException, status from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy import desc from api.v1.models.activity_logs import ActivityLog from typing import Optional, Any - - class ActivityLogService: """Activity Log service""" def create_activity_log(self, db: Session, user_id: str, action: str): """Creates a new activity log""" - activity_log = ActivityLog(user_id=user_id, action=action) db.add(activity_log) db.commit() db.refresh(activity_log) return activity_log - def fetch_all(self, db: Session, **query_params: Optional[Any]): - """Fetch all products with option tto search using query parameters""" - + def fetch_all(self, db: Session, page: int, limit: int, user_id: Optional[str] = None, **query_params: Optional[Any]): + """Fetch all products with option to search using query parameters""" + offset = (page - 1) * limit query = db.query(ActivityLog) - # Enable filter by query parameter + if user_id: # Filter by user id + query = query.filter(ActivityLog.user_id == user_id) + + # Enable filter by arbitrary query parameters if query_params: for column, value in query_params.items(): if hasattr(ActivityLog, column) and value: query = query.filter( getattr(ActivityLog, column).ilike(f"%{value}%") ) - - return query.all() - - def delete_activity_log_by_id(self, db: Session, log_id: str): + + + query = query.order_by(desc(ActivityLog.timestamp)) + paginated_logs = query.offset(offset).limit(limit).all() + return paginated_logs + + def delete_activity_log_by_id(self, db: Session, log_id: str) -> dict: + """Delete an activity log by ID with error handling""" log = db.query(ActivityLog).filter(ActivityLog.id == log_id).first() if not log: @@ -47,5 +52,4 @@ def delete_activity_log_by_id(self, db: Session, log_id: str): return {"status": "success", "detail": f"Activity log with ID {log_id} deleted successfully"} - activity_log_service = ActivityLogService() diff --git a/tests/v1/activity_logs/test_get_all_logs.py b/tests/v1/activity_logs/test_get_all_logs.py index 48593f50c..a17a5ec4f 100644 --- a/tests/v1/activity_logs/test_get_all_logs.py +++ b/tests/v1/activity_logs/test_get_all_logs.py @@ -104,3 +104,17 @@ def test_get_all_activity_logs_non_super_admin(mock_user_service, mock_db_sessio 'Authorization': f'Bearer {access_token}'}) assert response.status_code == status.HTTP_403_FORBIDDEN + + +@pytest.mark.usefixture("mock_db_session", "mock_user_service") +def test_fetch_all_pagination(test_client): + response = test_client.get("/activity-logs?page=1&limit=5") + assert response.status_code == 200 + data = response.json() + + assert len(data["data"]) == 5 # Ensure it returns exactly 5 logs + + # Verify sorting by created_at in descending order + timestamps = [log["created_at"] for log in data["data"]] + assert timestamps == sorted(timestamps, reverse=True) + \ No newline at end of file diff --git a/tests/v1/activity_logs/test_pagination_activity_log.py b/tests/v1/activity_logs/test_pagination_activity_log.py new file mode 100644 index 000000000..5a41f847b --- /dev/null +++ b/tests/v1/activity_logs/test_pagination_activity_log.py @@ -0,0 +1,126 @@ +import pytest +from fastapi.testclient import TestClient +from unittest.mock import patch, MagicMock +from main import app # Assuming your main app file is main.py +from api.v1.models.user import User +from api.v1.services.user import user_service +from api.v1.models.activity_logs import ActivityLog # Ensure you import your ActivityLog MODEL, not schema +from uuid_extensions import uuid7 +from api.db.database import get_db +from fastapi import status +from datetime import datetime, timezone, timedelta +from typing import List + + +@pytest.mark.usefixtures("mock_db_session", "mock_user_service") +def test_get_all_activity_logs_default_pagination(mock_user_service, mock_db_session): + """Test default pagination (no page/limit params).""" + mock_user = create_mock_user(mock_user_service, mock_db_session) + access_token = user_service.create_access_token(user_id=str(uuid7())) + + # Mock 12 activity logs to test default limit of 10 + mock_logs = [ + create_mock_activity_log(str(uuid7()), str(uuid7()), f"action_{i}", datetime.now(timezone.utc)) + for i in range(12) + ] + mock_db_session.query.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = mock_logs[:10] # Mock first 10 for page 1 + + response = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'}) + + assert response.status_code == status.HTTP_200_OK + response_data = response.json() + assert isinstance(response_data, list) + assert len(response_data) == 10 # Verify default limit is 10 + + +@pytest.mark.usefixtures("mock_db_session", "mock_user_service") +def test_get_all_activity_logs_limit_5(mock_user_service, mock_db_session): + """Test setting limit to 5.""" + mock_user = create_mock_user(mock_user_service, mock_db_session) + access_token = user_service.create_access_token(user_id=str(uuid7())) + + mock_logs = [ + create_mock_activity_log(str(uuid7()), str(uuid7()), f"action_{i}", datetime.now(timezone.utc)) + for i in range(7) # Mock 7 logs, limit should return max 5 + ] + mock_db_session.query.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = mock_logs[:5] # Mock first 5 for limit 5 + + + response = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'}, params={'limit': 5}) + + assert response.status_code == status.HTTP_200_OK + response_data = response.json() + assert isinstance(response_data, list) + assert len(response_data) == 5 # Verify limit is respected + + +@pytest.mark.usefixtures("mock_db_session", "mock_user_service") +def test_get_all_activity_logs_page_2_limit_5(mock_user_service, mock_db_session): + """Test getting page 2 with limit 5.""" + mock_user = create_mock_user(mock_user_service, mock_db_session) + access_token = user_service.create_access_token(user_id=str(uuid7())) + + mock_logs = [ + create_mock_activity_log(str(uuid7()), str(uuid7()), f"action_{i}", datetime.now(timezone.utc)) + for i in range(12) # Mock 12 logs + ] + mock_db_session.query.return_value.order_by.return_value.offset.return_value.limit.return_value.all.side_effect = [ # Use side_effect to return different data for each page request + mock_logs[:5], # Page 1 (items 0-4) + mock_logs[5:10], # Page 2 (items 5-9) + [] # Page 3 and beyond (empty list) + ] + + response_page1 = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'}, params={'page': 1, 'limit': 5}) + response_page2 = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'}, params={'page': 2, 'limit': 5}) + response_page3 = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'}, params={'page': 3, 'limit': 5}) + + + assert response_page1.status_code == status.HTTP_200_OK + assert len(response_page1.json()) == 5 + assert response_page2.status_code == status.HTTP_200_OK + assert len(response_page2.json()) == 5 + assert response_page3.status_code == status.HTTP_200_OK + assert len(response_page3.json()) == 0 # Page 3 should be empty + + +# Add more pagination test cases here, like testing page beyond last page, etc. + + +@pytest.mark.usefixtures("mock_db_session", "mock_user_service") +def test_get_all_activity_logs_empty(mock_user_service, mock_db_session): + """Test for fetching all activity logs with no data.""" + mock_user = create_mock_user(mock_user_service, mock_db_session) + access_token = user_service.create_access_token(user_id=str(uuid7())) + mock_db_session.query.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = [] # Mock empty list for no data + response = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'}) + + assert response.status_code == status.HTTP_200_OK + assert response.json() == [] # Verify empty list when no data + + +@pytest.mark.usefixtures("mock_db_session", "mock_user_service") +def test_get_all_activity_logs_with_data(mock_user_service, mock_db_session): + """Test for fetching all activity logs with data.""" + mock_user = create_mock_user(mock_user_service, mock_db_session) + access_token = user_service.create_access_token(user_id=str(uuid7())) + + mock_logs = [create_mock_activity_log(str(uuid7()), str(uuid7()), "profile Update", datetime.now(timezone.utc))] + mock_db_session.query.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = mock_logs # Mock a single log + + response = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'}) + + assert response.status_code == status.HTTP_200_OK + response_data = response.json() + assert isinstance(response_data, list) + assert len(response_data) == 1 # Verify one log returned + + +@pytest.mark.usefixtures("mock_db_session", "mock_user_service") +def test_get_all_activity_logs_non_super_admin(mock_user_service, mock_db_session): + """Test for fetching all activity logs as a non-super admin user.""" + mock_user = create_mock_user( + mock_user_service, mock_db_session, is_superadmin=False) + access_token = user_service.create_access_token(user_id=str(uuid7())) + response = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'}) + + assert response.status_code