diff --git a/README.md b/README.md index 283ac04..4ff28bc 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@

---- +______________________________________________________________________ ## 📖 Documentation @@ -52,7 +52,7 @@ This README provides a quick reference for LLMs and developers, but the full documentation contains detailed guides, examples, and best practices. ---- +______________________________________________________________________ ## 0. About @@ -78,6 +78,7 @@ This README provides a quick reference for LLMs and developers, but the full doc 💬 **[Join our Discord community](https://discord.com/invite/TEmPs22gqB)** - Connect with other developers using the FastAPI boilerplate! Our Discord server features: + - **🤝 Networking** - Connect with fellow developers and share experiences - **💡 Product Updates** - Stay updated with FastroAI and our other products - **📸 Showcase** - Share what you've built using our tools @@ -140,7 +141,7 @@ Whether you're just getting started or building production applications, our com 1. [Admin Panel](#513-admin-panel) 1. [Running](#514-running) 1. [Create Application](#515-create-application) - 2. [Opting Out of Services](#516-opting-out-of-services) + 1. [Opting Out of Services](#516-opting-out-of-services) 1. [Running in Production](#6-running-in-production) 1. [Uvicorn Workers with Gunicorn](#61-uvicorn-workers-with-gunicorn) 1. [Running With NGINX](#62-running-with-nginx) @@ -289,6 +290,7 @@ CRUD_ADMIN_REDIS_SSL=false # default=false, use SSL for Redis co ``` **Session Backend Options:** + - **Memory** (default): Development-friendly, sessions reset on restart - **Redis** (production): High performance, scalable, persistent sessions - **Database**: Audit-friendly with admin visibility @@ -600,7 +602,7 @@ And to apply the migration uv run alembic upgrade head ``` -> [!NOTE] +> \[!NOTE\] > If you do not have uv, you may run it without uv after running `pip install alembic` ## 5. Extending @@ -1057,11 +1059,7 @@ router = APIRouter(tags=["entities"]) @router.get("/entities/{id}", response_model=EntityRead) -async def read_entity( - request: Request, - id: int, - db: Annotated[AsyncSession, Depends(async_get_db)] -): +async def read_entity(request: Request, id: int, db: Annotated[AsyncSession, Depends(async_get_db)]): entity = await crud_entity.get(db=db, id=id) if entity is None: # Explicit None check @@ -1071,10 +1069,7 @@ async def read_entity( @router.get("/entities", response_model=List[EntityRead]) -async def read_entities( - request: Request, - db: Annotated[AsyncSession, Depends(async_get_db)] -): +async def read_entities(request: Request, db: Annotated[AsyncSession, Depends(async_get_db)]): entities = await crud_entity.get_multi(db=db, is_deleted=False) return entities ``` @@ -1150,10 +1145,7 @@ from app.schemas.entity import EntityRead @router.get("/entities", response_model=PaginatedListResponse[EntityRead]) async def read_entities( - request: Request, - db: Annotated[AsyncSession, Depends(async_get_db)], - page: int = 1, - items_per_page: int = 10 + request: Request, db: Annotated[AsyncSession, Depends(async_get_db)], page: int = 1, items_per_page: int = 10 ): entities_data = await crud_entity.get_multi( db=db, @@ -1173,18 +1165,15 @@ async def read_entities( To add exceptions you may just import from `app/core/exceptions/http_exceptions` and optionally add a detail: ```python -from app.core.exceptions.http_exceptions import ( - NotFoundException, - ForbiddenException, - DuplicateValueException -) +from app.core.exceptions.http_exceptions import NotFoundException, ForbiddenException, DuplicateValueException + @router.post("/entities", response_model=EntityRead, status_code=201) async def create_entity( request: Request, entity_data: EntityCreate, db: Annotated[AsyncSession, Depends(async_get_db)], - current_user: Annotated[UserRead, Depends(get_current_user)] + current_user: Annotated[UserRead, Depends(get_current_user)], ): # Check if entity already exists if await crud_entity.exists(db=db, name=entity_data.name) is True: @@ -1204,11 +1193,7 @@ async def create_entity( @router.get("/entities/{id}", response_model=EntityRead) -async def read_entity( - request: Request, - id: int, - db: Annotated[AsyncSession, Depends(async_get_db)] -): +async def read_entity(request: Request, id: int, db: Annotated[AsyncSession, Depends(async_get_db)]): entity = await crud_entity.get(db=db, id=id) if entity is None: # Explicit None check @@ -1399,7 +1384,7 @@ For `client-side caching`, all you have to do is let the `Settings` class define Depending on the problem your API is solving, you might want to implement a job queue. A job queue allows you to run tasks in the background, and is usually aimed at functions that require longer run times and don't directly impact user response in your frontend. As a rule of thumb, if a task takes more than 2 seconds to run, can be executed asynchronously, and its result is not needed for the next step of the user's interaction, then it is a good candidate for the job queue. -> [!TIP] +> \[!TIP\] > Very common candidates for background functions are calls to and from LLM endpoints (e.g. OpenAI or Openrouter). This is because they span tens of seconds and often need to be further parsed and saved. #### Background task creation @@ -1418,6 +1403,7 @@ Then add the function to the `WorkerSettings` class `functions` variable in `app from .functions import sample_background_task from .your_module import sample_complex_background_task + class WorkerSettings: functions = [sample_background_task, sample_complex_background_task] ... @@ -1442,7 +1428,7 @@ async def get_task(task_id: str): And finally run the worker in parallel to your fastapi application. -> [!IMPORTANT] +> \[!IMPORTANT\] > For any change to the `sample_background_task` to be reflected in the worker, you need to restart the worker (e.g. the docker container). If you are using `docker compose`, the worker is already running. @@ -1462,6 +1448,7 @@ To do this, you can add the database session to the `ctx` object in the `startup from arq.worker import Worker from ...core.db.database import async_get_db + async def startup(ctx: Worker) -> None: ctx["db"] = await anext(async_get_db()) logging.info("Worker Started") @@ -1477,17 +1464,16 @@ This will allow you to have the async database session always available in any b ```python from arq.worker import Worker + async def your_background_function( ctx: Worker, post_id: int, - ... ) -> Any: db = ctx["db"] post = crud_posts.get(db=db, schema_to_select=PostRead, id=post_id) - ... ``` -> [!WARNING] +> \[!WARNING\] > When using database sessions, you will want to use Pydantic objects. However, these objects don't mingle well with the seralization required by ARQ tasks and will be retrieved as a dictionary. ### 5.11 Rate Limiting @@ -1661,6 +1647,7 @@ This authentication setup in the provides a robust, secure, and user-friendly wa The boilerplate includes a powerful web-based admin interface built with [CRUDAdmin](https://github.com/benavlabs/crudadmin) that provides a comprehensive database management system. > **About CRUDAdmin**: CRUDAdmin is a modern admin interface generator for FastAPI applications. Learn more at: +> > - **📚 Documentation**: [benavlabs.github.io/crudadmin](https://benavlabs.github.io/crudadmin/) > - **💻 GitHub**: [github.com/benavlabs/crudadmin](https://github.com/benavlabs/crudadmin) @@ -1685,6 +1672,7 @@ http://localhost:8000/admin ``` Use the admin credentials you defined in your `.env` file: + - Username: `ADMIN_USERNAME` - Password: `ADMIN_PASSWORD` @@ -1709,6 +1697,7 @@ To add new models to the admin panel, edit `src/app/admin/views.py`: from your_app.models import YourModel from your_app.schemas import YourCreateSchema, YourUpdateSchema + def register_admin_views(admin: CRUDAdmin) -> None: # ... existing models ... @@ -1716,7 +1705,7 @@ def register_admin_views(admin: CRUDAdmin) -> None: model=YourModel, create_schema=YourCreateSchema, update_schema=YourUpdateSchema, - allowed_actions={"view", "create", "update", "delete"} + allowed_actions={"view", "create", "update", "delete"}, ) ``` @@ -1731,7 +1720,7 @@ admin.add_view( create_schema=ArticleCreate, update_schema=ArticleUpdate, select_schema=ArticleSelect, # Exclude problematic fields from read operations - allowed_actions={"view", "create", "update", "delete"} + allowed_actions={"view", "create", "update", "delete"}, ) # Password field handling @@ -1740,7 +1729,7 @@ admin.add_view( create_schema=UserCreateWithPassword, update_schema=UserUpdateWithPassword, password_transformer=password_transformer, # Handles password hashing - allowed_actions={"view", "create", "update"} + allowed_actions={"view", "create", "update"}, ) # Read-only models @@ -1748,7 +1737,7 @@ admin.add_view( model=AuditLog, create_schema=AuditLogSchema, update_schema=AuditLogSchema, - allowed_actions={"view"} # Only viewing allowed + allowed_actions={"view"}, # Only viewing allowed ) ``` @@ -1758,9 +1747,9 @@ For production environments, consider using Redis for better performance: ```python # Enable Redis sessions in your environment -CRUD_ADMIN_REDIS_ENABLED=true -CRUD_ADMIN_REDIS_HOST=localhost -CRUD_ADMIN_REDIS_PORT=6379 +CRUD_ADMIN_REDIS_ENABLED = true +CRUD_ADMIN_REDIS_HOST = localhost +CRUD_ADMIN_REDIS_PORT = 6379 ``` ### 5.14 Running @@ -1783,6 +1772,7 @@ And for the worker: ```sh uv run arq src.app.core.worker.settings.WorkerSettings ``` + ### 5.15 Create Application If you want to stop tables from being created every time you run the api, you should disable this here: @@ -1823,6 +1813,7 @@ env_path = os.path.join(current_file_dir, "..", "..", ".env") config = Config(env_path) ... + class Settings( AppSettings, PostgresSettings, @@ -1836,6 +1827,7 @@ class Settings( DefaultRateLimitSettings, CRUDAdminSettings, EnvironmentSettings, + CORSSettings, ): pass @@ -1855,6 +1847,7 @@ class Settings( ClientSideCacheSettings, DefaultRateLimitSettings, EnvironmentSettings, + CORSSettings, ): pass ``` @@ -2126,6 +2119,7 @@ import pytest from unittest.mock import AsyncMock, patch from src.app.api.v1.users import write_user + class TestWriteUser: @pytest.mark.asyncio async def test_create_user_success(self, mock_db, sample_user_data): diff --git a/docs/user-guide/authentication/jwt-tokens.md b/docs/user-guide/authentication/jwt-tokens.md index 1b4d30c..4e5490b 100644 --- a/docs/user-guide/authentication/jwt-tokens.md +++ b/docs/user-guide/authentication/jwt-tokens.md @@ -21,6 +21,7 @@ JWT tokens are self-contained, digitally signed packages of information that can The authentication system uses a **dual-token approach** for maximum security and user experience: ### Access Tokens + Access tokens are short-lived credentials that prove a user's identity for API requests. Think of them as temporary keys that grant access to protected resources. - **Purpose**: Authenticate API requests and authorize actions @@ -31,6 +32,7 @@ Access tokens are short-lived credentials that prove a user's identity for API r **Why Short-Lived?** If an access token is stolen (e.g., through XSS), the damage window is limited to 30 minutes before it expires naturally. ### Refresh Tokens + Refresh tokens are longer-lived credentials used solely to generate new access tokens. They provide a balance between security and user convenience. - **Purpose**: Generate new access tokens without requiring re-login @@ -57,13 +59,11 @@ access_token = await create_access_token(data={"sub": username}) # Custom expiration for special cases (e.g., admin sessions) custom_expires = timedelta(minutes=60) -access_token = await create_access_token( - data={"sub": username}, - expires_delta=custom_expires -) +access_token = await create_access_token(data={"sub": username}, expires_delta=custom_expires) ``` **When to Customize Expiration:** + - **High-security environments**: Shorter expiration (15 minutes) - **Development/testing**: Longer expiration for convenience - **Admin operations**: Variable expiration based on sensitivity @@ -80,10 +80,7 @@ refresh_token = await create_refresh_token(data={"sub": username}) # Extended refresh token for "remember me" functionality extended_expires = timedelta(days=30) -refresh_token = await create_refresh_token( - data={"sub": username}, - expires_delta=extended_expires -) +refresh_token = await create_refresh_token(data={"sub": username}, expires_delta=extended_expires) ``` ### Token Structure @@ -93,22 +90,23 @@ JWT tokens consist of three parts separated by dots: `header.payload.signature`. ```python # Access token payload structure { - "sub": "username", # Subject (user identifier) - "exp": 1234567890, # Expiration timestamp (Unix) - "token_type": "access", # Distinguishes from refresh tokens - "iat": 1234567890 # Issued at (automatic) + "sub": "username", # Subject (user identifier) + "exp": 1234567890, # Expiration timestamp (Unix) + "token_type": "access", # Distinguishes from refresh tokens + "iat": 1234567890, # Issued at (automatic) } # Refresh token payload structure { - "sub": "username", # Same user identifier - "exp": 1234567890, # Longer expiration time - "token_type": "refresh", # Prevents confusion/misuse - "iat": 1234567890 # Issue timestamp + "sub": "username", # Same user identifier + "exp": 1234567890, # Longer expiration time + "token_type": "refresh", # Prevents confusion/misuse + "iat": 1234567890, # Issue timestamp } ``` **Key Fields Explained:** + - **`sub` (Subject)**: Identifies the user - can be username, email, or user ID - **`exp` (Expiration)**: Unix timestamp when token becomes invalid - **`token_type`**: Custom field preventing tokens from being used incorrectly @@ -144,9 +142,7 @@ Refresh token verification follows the same process but with different validatio token_data = await verify_token(token, TokenType.REFRESH, db) if token_data: # Generate new access token - new_access_token = await create_access_token( - data={"sub": token_data.username_or_email} - ) + new_access_token = await create_access_token(data={"sub": token_data.username_or_email}) return {"access_token": new_access_token, "token_type": "bearer"} else: # Refresh token invalid - user must log in again @@ -163,22 +159,22 @@ async def verify_token(token: str, expected_token_type: TokenType, db: AsyncSess is_blacklisted = await crud_token_blacklist.exists(db, token=token) if is_blacklisted: return None - + try: # 2. Verify signature and decode payload payload = jwt.decode(token, SECRET_KEY.get_secret_value(), algorithms=[ALGORITHM]) - + # 3. Extract and validate claims username_or_email: str | None = payload.get("sub") token_type: str | None = payload.get("token_type") - + # 4. Ensure token type matches expectation if username_or_email is None or token_type != expected_token_type: return None - + # 5. Return validated data return TokenData(username_or_email=username_or_email) - + except JWTError: # Token is malformed, expired, or signature invalid return None @@ -187,10 +183,10 @@ async def verify_token(token: str, expected_token_type: TokenType, db: AsyncSess **Security Checks Explained:** 1. **Blacklist Check**: Prevents use of tokens from logged-out users -2. **Signature Verification**: Ensures token hasn't been tampered with -3. **Expiration Check**: Automatically handled by JWT library -4. **Type Validation**: Prevents refresh tokens from being used as access tokens -5. **Subject Validation**: Ensures token contains valid user identifier +1. **Signature Verification**: Ensures token hasn't been tampered with +1. **Expiration Check**: Automatically handled by JWT library +1. **Type Validation**: Prevents refresh tokens from being used as access tokens +1. **Subject Validation**: Ensures token contains valid user identifier ## Client-Side Authentication Flow @@ -199,6 +195,7 @@ Understanding the complete authentication flow helps frontend developers integra ### Recommended Client Flow **1. Login Process** + ```javascript // Send credentials to login endpoint const response = await fetch('/api/v1/login', { @@ -215,6 +212,7 @@ sessionStorage.setItem('access_token', access_token); ``` **2. Making Authenticated Requests** + ```javascript // Include access token in Authorization header const response = await fetch('/api/v1/protected-endpoint', { @@ -226,6 +224,7 @@ const response = await fetch('/api/v1/protected-endpoint', { ``` **3. Handling Token Expiration** + ```javascript // Automatic token refresh on 401 errors async function apiCall(url, options = {}) { @@ -237,18 +236,18 @@ async function apiCall(url, options = {}) { }, credentials: 'include' }); - + // If token expired, try to refresh if (response.status === 401) { const refreshResponse = await fetch('/api/v1/refresh', { method: 'POST', credentials: 'include' // Sends refresh token cookie }); - + if (refreshResponse.ok) { const { access_token } = await refreshResponse.json(); sessionStorage.setItem('access_token', access_token); - + // Retry original request response = await fetch(url, { ...options, @@ -263,12 +262,13 @@ async function apiCall(url, options = {}) { window.location.href = '/login'; } } - + return response; } ``` **4. Logout Process** + ```javascript // Clear tokens and call logout endpoint await fetch('/api/v1/logout', { @@ -288,10 +288,10 @@ The refresh token cookie is configured for maximum security: response.set_cookie( key="refresh_token", value=refresh_token, - httponly=True, # Prevents JavaScript access (XSS protection) - secure=True, # HTTPS only in production - samesite="Lax", # CSRF protection with good usability - max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60 + httponly=True, # Prevents JavaScript access (XSS protection) + secure=True, # HTTPS only in production + samesite="Lax", # CSRF protection with good usability + max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60, ) ``` @@ -317,14 +317,15 @@ The system uses a database table to track invalidated tokens: # models/token_blacklist.py class TokenBlacklist(Base): __tablename__ = "token_blacklist" - + id: Mapped[int] = mapped_column(primary_key=True) token: Mapped[str] = mapped_column(unique=True, index=True) # Full token string - expires_at: Mapped[datetime] = mapped_column() # When to clean up + expires_at: Mapped[datetime] = mapped_column() # When to clean up created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow) ``` **Design Considerations:** + - **Unique constraint**: Prevents duplicate entries - **Index on token**: Fast lookup during verification - **Expires_at field**: Enables automatic cleanup of old entries @@ -352,16 +353,13 @@ async def blacklist_token(token: str, db: AsyncSession) -> None: # 1. Decode token to extract expiration (no verification needed) payload = jwt.decode(token, SECRET_KEY.get_secret_value(), algorithms=[ALGORITHM]) exp_timestamp = payload.get("exp") - + if exp_timestamp is not None: # 2. Convert Unix timestamp to datetime expires_at = datetime.fromtimestamp(exp_timestamp) - + # 3. Store in blacklist with expiration - await crud_token_blacklist.create( - db, - object=TokenBlacklistCreate(token=token, expires_at=expires_at) - ) + await crud_token_blacklist.create(db, object=TokenBlacklistCreate(token=token, expires_at=expires_at)) ``` **Cleanup Strategy**: Blacklisted tokens can be automatically removed from the database after their natural expiration time, preventing unlimited database growth. @@ -378,24 +376,17 @@ async def login_for_access_token( db: Annotated[AsyncSession, Depends(async_get_db)], ) -> dict[str, str]: # 1. Authenticate user - user = await authenticate_user( - username_or_email=form_data.username, - password=form_data.password, - db=db - ) - + user = await authenticate_user(username_or_email=form_data.username, password=form_data.password, db=db) + if not user: - raise HTTPException( - status_code=401, - detail="Incorrect username or password" - ) - + raise HTTPException(status_code=401, detail="Incorrect username or password") + # 2. Create access token access_token = await create_access_token(data={"sub": user["username"]}) - + # 3. Create refresh token refresh_token = await create_refresh_token(data={"sub": user["username"]}) - + # 4. Set refresh token as HTTP-only cookie response.set_cookie( key="refresh_token", @@ -403,9 +394,9 @@ async def login_for_access_token( httponly=True, secure=True, samesite="strict", - max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60 + max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60, ) - + return {"access_token": access_token, "token_type": "bearer"} ``` @@ -414,31 +405,25 @@ async def login_for_access_token( ```python @router.post("/refresh", response_model=Token) async def refresh_access_token( - response: Response, - db: Annotated[AsyncSession, Depends(async_get_db)], - refresh_token: str = Cookie(None) + response: Response, db: Annotated[AsyncSession, Depends(async_get_db)], refresh_token: str = Cookie(None) ) -> dict[str, str]: if not refresh_token: raise HTTPException(status_code=401, detail="Refresh token missing") - + # 1. Verify refresh token token_data = await verify_token(refresh_token, TokenType.REFRESH, db) if not token_data: raise HTTPException(status_code=401, detail="Invalid refresh token") - + # 2. Create new access token - new_access_token = await create_access_token( - data={"sub": token_data.username_or_email} - ) - + new_access_token = await create_access_token(data={"sub": token_data.username_or_email}) + # 3. Optionally create new refresh token (token rotation) - new_refresh_token = await create_refresh_token( - data={"sub": token_data.username_or_email} - ) - + new_refresh_token = await create_refresh_token(data={"sub": token_data.username_or_email}) + # 4. Blacklist old refresh token await blacklist_token(refresh_token, db) - + # 5. Set new refresh token cookie response.set_cookie( key="refresh_token", @@ -446,9 +431,9 @@ async def refresh_access_token( httponly=True, secure=True, samesite="strict", - max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60 + max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60, ) - + return {"access_token": new_access_token, "token_type": "bearer"} ``` @@ -461,23 +446,18 @@ async def logout( db: Annotated[AsyncSession, Depends(async_get_db)], current_user: dict = Depends(get_current_user), token: str = Depends(oauth2_scheme), - refresh_token: str = Cookie(None) + refresh_token: str = Cookie(None), ) -> dict[str, str]: # 1. Blacklist access token await blacklist_token(token, db) - + # 2. Blacklist refresh token if present if refresh_token: await blacklist_token(refresh_token, db) - + # 3. Clear refresh token cookie - response.delete_cookie( - key="refresh_token", - httponly=True, - secure=True, - samesite="strict" - ) - + response.delete_cookie(key="refresh_token", httponly=True, secure=True, samesite="strict") + return {"message": "Successfully logged out"} ``` @@ -486,25 +466,18 @@ async def logout( ### get_current_user ```python -async def get_current_user( - db: AsyncSession = Depends(async_get_db), - token: str = Depends(oauth2_scheme) -) -> dict: +async def get_current_user(db: AsyncSession = Depends(async_get_db), token: str = Depends(oauth2_scheme)) -> dict: # 1. Verify token token_data = await verify_token(token, TokenType.ACCESS, db) if not token_data: raise HTTPException(status_code=401, detail="Invalid token") - + # 2. Get user from database - user = await crud_users.get( - db=db, - username=token_data.username_or_email, - schema_to_select=UserRead - ) - + user = await crud_users.get(db=db, username=token_data.username_or_email, schema_to_select=UserRead) + if user is None: raise HTTPException(status_code=401, detail="User not found") - + return user ``` @@ -512,12 +485,11 @@ async def get_current_user( ```python async def get_optional_user( - db: AsyncSession = Depends(async_get_db), - token: str = Depends(optional_oauth2_scheme) + db: AsyncSession = Depends(async_get_db), token: str = Depends(optional_oauth2_scheme) ) -> dict | None: if not token: return None - + try: return await get_current_user(db=db, token=token) except HTTPException: @@ -527,14 +499,9 @@ async def get_optional_user( ### get_current_superuser ```python -async def get_current_superuser( - current_user: dict = Depends(get_current_user) -) -> dict: +async def get_current_superuser(current_user: dict = Depends(get_current_user)) -> dict: if not current_user.get("is_superuser", False): - raise HTTPException( - status_code=403, - detail="Not enough permissions" - ) + raise HTTPException(status_code=403, detail="Not enough permissions") return current_user ``` @@ -551,7 +518,7 @@ REFRESH_TOKEN_EXPIRE_DAYS=7 # Security Headers SECURE_COOKIES=true -CORS_ORIGINS=["http://localhost:3000", "https://yourapp.com"] +CORS_ORIGINS="http://localhost:3000,https://yourapp.com" ``` ### Security Configuration @@ -563,7 +530,7 @@ class Settings(BaseSettings): ALGORITHM: str = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 REFRESH_TOKEN_EXPIRE_DAYS: int = 7 - + # Cookie settings SECURE_COOKIES: bool = True COOKIE_DOMAIN: str | None = None @@ -600,18 +567,15 @@ class Settings(BaseSettings): For service-to-service communication: ```python -async def get_api_key_user( - api_key: str = Header(None), - db: AsyncSession = Depends(async_get_db) -) -> dict: +async def get_api_key_user(api_key: str = Header(None), db: AsyncSession = Depends(async_get_db)) -> dict: if not api_key: raise HTTPException(status_code=401, detail="API key required") - + # Verify API key user = await crud_users.get(db=db, api_key=api_key) if not user: raise HTTPException(status_code=401, detail="Invalid API key") - + return user ``` @@ -619,9 +583,7 @@ async def get_api_key_user( ```python async def get_authenticated_user( - db: AsyncSession = Depends(async_get_db), - token: str = Depends(optional_oauth2_scheme), - api_key: str = Header(None) + db: AsyncSession = Depends(async_get_db), token: str = Depends(optional_oauth2_scheme), api_key: str = Header(None) ) -> dict: # Try JWT token first if token: @@ -629,11 +591,11 @@ async def get_authenticated_user( return await get_current_user(db=db, token=token) except HTTPException: pass - + # Fall back to API key if api_key: return await get_api_key_user(api_key=api_key, db=db) - + raise HTTPException(status_code=401, detail="Authentication required") ``` @@ -651,6 +613,7 @@ async def get_authenticated_user( ```python # Enable debug logging import logging + logging.getLogger("app.core.security").setLevel(logging.DEBUG) # Test token validation @@ -658,12 +621,12 @@ async def debug_token(token: str, db: AsyncSession): try: payload = jwt.decode(token, SECRET_KEY.get_secret_value(), algorithms=[ALGORITHM]) print(f"Token payload: {payload}") - + is_blacklisted = await crud_token_blacklist.exists(db, token=token) print(f"Is blacklisted: {is_blacklisted}") - + except JWTError as e: print(f"JWT Error: {e}") ``` -This comprehensive JWT implementation provides secure, scalable authentication for your FastAPI application. \ No newline at end of file +This comprehensive JWT implementation provides secure, scalable authentication for your FastAPI application. diff --git a/docs/user-guide/configuration/environment-specific.md b/docs/user-guide/configuration/environment-specific.md index d544cbb..eba0bab 100644 --- a/docs/user-guide/configuration/environment-specific.md +++ b/docs/user-guide/configuration/environment-specific.md @@ -7,7 +7,7 @@ Learn how to configure your FastAPI application for different environments (deve The boilerplate supports three environment types: - **`local`** - Development environment with full debugging -- **`staging`** - Pre-production testing environment +- **`staging`** - Pre-production testing environment - **`production`** - Production environment with security hardening Set the environment type with: @@ -38,7 +38,7 @@ POSTGRES_SERVER="localhost" POSTGRES_PORT=5432 POSTGRES_DB="myapp_dev" -# ------------- crypt ------------- +# ------------- security ------------- SECRET_KEY="dev-secret-key-not-for-production-use" ALGORITHM="HS256" ACCESS_TOKEN_EXPIRE_MINUTES=60 # Longer for development @@ -77,15 +77,6 @@ DATABASE_ECHO=true # Log all SQL queries ```python # Development-specific features if settings.ENVIRONMENT == "local": - # Enable detailed error pages - app.add_middleware( - CORSMiddleware, - allow_origins=["*"], # Allow all origins in development - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) - # Enable API documentation app.openapi_url = "/openapi.json" app.docs_url = "/docs" @@ -152,11 +143,13 @@ POSTGRES_SERVER="staging-db.example.com" POSTGRES_PORT=5432 POSTGRES_DB="myapp_staging" -# ------------- crypt ------------- +# ------------- security ------------- SECRET_KEY="staging-secret-key-different-from-production" ALGORITHM="HS256" ACCESS_TOKEN_EXPIRE_MINUTES=30 REFRESH_TOKEN_EXPIRE_DAYS=7 +CORS_ORIGINS="https://staging.example.com" +CORS_METHODS="GET,POST,PUT,DELETE" # ------------- redis ------------- REDIS_CACHE_HOST="staging-redis.example.com" @@ -191,15 +184,6 @@ DATABASE_ECHO=false ```python # Staging-specific features if settings.ENVIRONMENT == "staging": - # Restricted CORS - app.add_middleware( - CORSMiddleware, - allow_origins=["https://staging.example.com"], - allow_credentials=True, - allow_methods=["GET", "POST", "PUT", "DELETE"], - allow_headers=["*"], - ) - # API docs available to superusers only @app.get("/docs", include_in_schema=False) async def custom_swagger_ui(current_user: User = Depends(get_current_superuser)): @@ -270,11 +254,14 @@ POSTGRES_SERVER="prod-db.example.com" POSTGRES_PORT=5433 # Custom port for security POSTGRES_DB="myapp_production" -# ------------- crypt ------------- +# ------------- security ------------- SECRET_KEY="ultra-secure-production-key-generated-with-openssl-rand-hex-32" ALGORITHM="HS256" ACCESS_TOKEN_EXPIRE_MINUTES=15 # Shorter for security REFRESH_TOKEN_EXPIRE_DAYS=3 # Shorter for security +CORS_ORIGINS="https://example.com,https://www.example.com" +CORS_METHODS="GET,POST,PUT,DELETE" +CORS_HEADERS="Authorization,Content-Type" # ------------- redis ------------- REDIS_CACHE_HOST="prod-redis.example.com" @@ -309,20 +296,11 @@ DATABASE_ECHO=false ```python # Production-specific features if settings.ENVIRONMENT == "production": - # Strict CORS - app.add_middleware( - CORSMiddleware, - allow_origins=["https://example.com", "https://www.example.com"], - allow_credentials=True, - allow_methods=["GET", "POST", "PUT", "DELETE"], - allow_headers=["Authorization", "Content-Type"], - ) - # Disable API documentation app.openapi_url = None app.docs_url = None app.redoc_url = None - + # Add security headers @app.middleware("http") async def add_security_headers(request: Request, call_next): @@ -423,17 +401,18 @@ class Settings(BaseSettings): @property def IS_DEVELOPMENT(self) -> bool: return self.ENVIRONMENT == "local" - + @computed_field @property def IS_PRODUCTION(self) -> bool: return self.ENVIRONMENT == "production" - + @computed_field @property def IS_STAGING(self) -> bool: return self.ENVIRONMENT == "staging" + # Use in application if settings.IS_DEVELOPMENT: # Development-only code @@ -457,12 +436,12 @@ def validate_environment_config(self) -> "Settings": raise ValueError("SECRET_KEY must be at least 32 characters in production") if "dev" in self.SECRET_KEY.lower(): raise ValueError("Production SECRET_KEY cannot contain 'dev'") - + if self.ENVIRONMENT == "local": # Development warnings if not self.DEBUG: logger.warning("DEBUG is False in development environment") - + return self ``` @@ -492,21 +471,22 @@ import asyncio from src.app.core.config import settings from src.app.core.db.database import async_get_db + async def validate_configuration(): """Validate configuration for current environment.""" print(f"Validating configuration for {settings.ENVIRONMENT} environment...") - + # Basic settings validation assert settings.APP_NAME, "APP_NAME is required" assert settings.SECRET_KEY, "SECRET_KEY is required" assert len(settings.SECRET_KEY) >= 32, "SECRET_KEY must be at least 32 characters" - + # Environment-specific validation if settings.ENVIRONMENT == "production": assert not settings.DEBUG, "DEBUG must be False in production" assert "dev" not in settings.SECRET_KEY.lower(), "Production SECRET_KEY invalid" assert settings.POSTGRES_PORT != 5432, "Use custom PostgreSQL port in production" - + # Test database connection try: db = await anext(async_get_db()) @@ -515,10 +495,11 @@ async def validate_configuration(): except Exception as e: print(f"✗ Database connection failed: {e}") return False - + print("✓ Configuration validation passed") return True + if __name__ == "__main__": asyncio.run(validate_configuration()) ``` @@ -585,7 +566,7 @@ SECURITY_CONFIGS = { "enable_cors_origins": ["https://example.com"], "enable_docs": False, "log_level": "WARNING", - } + }, } config = SECURITY_CONFIGS[settings.ENVIRONMENT] @@ -628,7 +609,7 @@ LOGGING_CONFIG = { "handlers": ["console"], }, "staging": { - "level": "INFO", + "level": "INFO", "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "handlers": ["console", "file"], }, @@ -636,7 +617,7 @@ LOGGING_CONFIG = { "level": "WARNING", "format": "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s", "handlers": ["file", "syslog"], - } + }, } ``` @@ -650,21 +631,24 @@ async def health_check(): "environment": settings.ENVIRONMENT, "version": settings.APP_VERSION, } - + # Add detailed info in non-production if not settings.IS_PRODUCTION: - health_info.update({ - "database": await check_database_health(), - "redis": await check_redis_health(), - "worker_queue": await check_worker_health(), - }) - + health_info.update( + { + "database": await check_database_health(), + "redis": await check_redis_health(), + "worker_queue": await check_worker_health(), + } + ) + return health_info ``` ## Best Practices ### Security + - Use different secret keys for each environment - Disable debug mode in staging and production - Use custom ports in production @@ -672,21 +656,24 @@ async def health_check(): - Remove API documentation in production ### Performance + - Configure appropriate resource limits per environment - Use caching in staging and production - Set shorter token expiration in production - Use connection pooling in production ### Configuration + - Keep environment files in version control (except production) - Use validation to prevent misconfiguration - Document all environment-specific settings - Test configuration changes in staging first ### Monitoring + - Use appropriate log levels per environment - Monitor different metrics in each environment - Set up alerts for production only - Use health checks for all environments -Environment-specific configuration ensures your application runs securely and efficiently in each deployment stage. Start with development settings and progressively harden for production! \ No newline at end of file +Environment-specific configuration ensures your application runs securely and efficiently in each deployment stage. Start with development settings and progressively harden for production! diff --git a/docs/user-guide/configuration/environment-variables.md b/docs/user-guide/configuration/environment-variables.md index 199a529..6da60d8 100644 --- a/docs/user-guide/configuration/environment-variables.md +++ b/docs/user-guide/configuration/environment-variables.md @@ -92,10 +92,8 @@ REFRESH_TOKEN_EXPIRE_DAYS=7 - `REFRESH_TOKEN_EXPIRE_DAYS`: How long refresh tokens remain valid !!! danger "Security Warning" - Never use default values in production. Generate a strong secret key: - ```bash - openssl rand -hex 32 - ``` +Never use default values in production. Generate a strong secret key: +`bash openssl rand -hex 32 ` ### Redis Configuration @@ -107,7 +105,7 @@ REDIS_CACHE_HOST="localhost" # Use "redis" for Docker Compose REDIS_CACHE_PORT=6379 # ------------- redis queue ------------- -REDIS_QUEUE_HOST="localhost" # Use "redis" for Docker Compose +REDIS_QUEUE_HOST="localhost" # Use "redis" for Docker Compose REDIS_QUEUE_PORT=6379 # ------------- redis rate limit ------------- @@ -256,7 +254,7 @@ The main `Settings` class inherits from multiple setting groups: ```python class Settings( AppSettings, - PostgresSettings, + PostgresSettings, CryptSettings, FirstUserSettings, RedisCacheSettings, @@ -265,6 +263,7 @@ class Settings( RedisRateLimiterSettings, DefaultRateLimitSettings, EnvironmentSettings, + CORSSettings, ): pass ``` @@ -279,6 +278,7 @@ class CustomSettings(BaseSettings): CUSTOM_TIMEOUT: int = 30 ENABLE_FEATURE_X: bool = False + # Add to main Settings class class Settings( AppSettings, @@ -300,7 +300,7 @@ class Settings( CryptSettings, FirstUserSettings, # Removed: RedisCacheSettings - # Removed: RedisQueueSettings + # Removed: RedisQueueSettings # Removed: RedisRateLimiterSettings EnvironmentSettings, ): @@ -326,21 +326,23 @@ SQLAlchemy connection pool settings in `src/app/core/db/database.py`: ```python engine = create_async_engine( DATABASE_URL, - pool_size=20, # Number of connections to maintain - max_overflow=30, # Additional connections allowed - pool_timeout=30, # Seconds to wait for connection - pool_recycle=1800, # Seconds before connection refresh + pool_size=20, # Number of connections to maintain + max_overflow=30, # Additional connections allowed + pool_timeout=30, # Seconds to wait for connection + pool_recycle=1800, # Seconds before connection refresh ) ``` ### Database Best Practices **Connection Pool Sizing:** + - Start with `pool_size=20`, `max_overflow=30` - Monitor connection usage and adjust based on load - Use connection pooling monitoring tools **Migration Strategy:** + - Always backup database before running migrations - Test migrations on staging environment first - Use `alembic revision --autogenerate` for model changes @@ -357,21 +359,19 @@ def create_access_token(data: dict, expires_delta: timedelta = None): if expires_delta: expire = datetime.utcnow() + expires_delta else: - expire = datetime.utcnow() + timedelta( - minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES - ) + expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) ``` ### CORS Configuration -Configure Cross-Origin Resource Sharing in `src/app/main.py`: +Customize Cross-Origin Resource Sharing in `src/app/core/setup.py`: ```python app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:3000"], # Specify allowed origins allow_credentials=True, - allow_methods=["GET", "POST"], # Specify allowed methods + allow_methods=["GET", "POST"], # Specify allowed methods allow_headers=["*"], ) ``` @@ -380,10 +380,7 @@ app.add_middleware( ```python # Never use wildcard (*) in production -allow_origins=[ - "https://yourapp.com", - "https://www.yourapp.com" -], +allow_origins = (["https://yourapp.com", "https://www.yourapp.com"],) ``` ### Security Headers @@ -393,6 +390,7 @@ Add security headers middleware: ```python from starlette.middleware.base import BaseHTTPMiddleware + class SecurityHeadersMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): response = await call_next(request) @@ -416,11 +414,7 @@ from logging.handlers import RotatingFileHandler LOGGING_LEVEL = logging.INFO # Configure file rotation -file_handler = RotatingFileHandler( - 'logs/app.log', - maxBytes=10485760, # 10MB - backupCount=5 # Keep 5 backup files -) +file_handler = RotatingFileHandler("logs/app.log", maxBytes=10485760, backupCount=5) # 10MB # Keep 5 backup files ``` ### Structured Logging @@ -435,7 +429,7 @@ structlog.configure( structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, - structlog.processors.JSONRenderer() + structlog.processors.JSONRenderer(), ], logger_factory=structlog.stdlib.LoggerFactory(), ) @@ -445,11 +439,7 @@ structlog.configure( ```python # Environment-specific log levels -LOG_LEVELS = { - "local": logging.DEBUG, - "staging": logging.INFO, - "production": logging.WARNING -} +LOG_LEVELS = {"local": logging.DEBUG, "staging": logging.INFO, "production": logging.WARNING} LOGGING_LEVEL = LOG_LEVELS.get(settings.ENVIRONMENT, logging.INFO) ``` @@ -500,12 +490,12 @@ Add custom middleware in `src/app/core/setup.py`: ```python def create_application(router, settings, **kwargs): app = FastAPI(...) - + # Add custom middleware app.add_middleware(CustomMiddleware, setting=value) app.add_middleware(TimingMiddleware) app.add_middleware(RequestIDMiddleware) - + return app ``` @@ -516,10 +506,11 @@ Implement feature flags: ```python class FeatureSettings(BaseSettings): ENABLE_ADVANCED_CACHING: bool = False - ENABLE_ANALYTICS: bool = True + ENABLE_ANALYTICS: bool = True ENABLE_EXPERIMENTAL_FEATURES: bool = False ENABLE_API_VERSIONING: bool = True + # Use in endpoints if settings.ENABLE_ADVANCED_CACHING: # Advanced caching logic @@ -536,11 +527,11 @@ Add validation to prevent misconfiguration: def validate_settings(): if not settings.SECRET_KEY: raise ValueError("SECRET_KEY must be set") - + if settings.ENVIRONMENT == "production": if settings.SECRET_KEY == "dev-secret-key": raise ValueError("Production must use secure SECRET_KEY") - + if settings.DEBUG: raise ValueError("DEBUG must be False in production") ``` @@ -563,6 +554,7 @@ async def startup_event(): ### Common Issues **Environment Variables Not Loading:** + ```bash # Check file location and permissions ls -la src/.env @@ -575,6 +567,7 @@ python -c "from src.app.core.config import settings; print(settings.APP_NAME)" ``` **Database Connection Failed:** + ```bash # Test connection manually psql -h localhost -U postgres -d myapp @@ -586,13 +579,14 @@ brew services list | grep postgresql ``` **Redis Connection Failed:** + ```bash # Test Redis connection redis-cli -h localhost -p 6379 ping # Check Redis status systemctl status redis -# or on macOS +# or on macOS brew services list | grep redis ``` @@ -606,10 +600,11 @@ import asyncio from src.app.core.config import settings from src.app.core.db.database import async_get_db + async def test_config(): print(f"App: {settings.APP_NAME}") print(f"Environment: {settings.ENVIRONMENT}") - + # Test database try: db = await anext(async_get_db()) @@ -617,20 +612,23 @@ async def test_config(): await db.close() except Exception as e: print(f"✗ Database connection failed: {e}") - + # Test Redis (if enabled) try: from src.app.core.utils.cache import redis_client + await redis_client.ping() print("✓ Redis connection successful") except Exception as e: print(f"✗ Redis connection failed: {e}") + if __name__ == "__main__": asyncio.run(test_config()) ``` Run with: + ```bash uv run python test_config.py -``` \ No newline at end of file +``` diff --git a/docs/user-guide/configuration/settings-classes.md b/docs/user-guide/configuration/settings-classes.md index 2a9e932..277ef8a 100644 --- a/docs/user-guide/configuration/settings-classes.md +++ b/docs/user-guide/configuration/settings-classes.md @@ -10,7 +10,7 @@ The main `Settings` class inherits from multiple specialized setting groups: # src/app/core/config.py class Settings( AppSettings, - PostgresSettings, + PostgresSettings, CryptSettings, FirstUserSettings, RedisCacheSettings, @@ -19,9 +19,11 @@ class Settings( RedisRateLimiterSettings, DefaultRateLimitSettings, EnvironmentSettings, + CORSSettings, ): pass + # Single instance used throughout the app settings = Settings() ``` @@ -29,6 +31,7 @@ settings = Settings() ## Built-in Settings Groups ### Application Settings + Basic app metadata and configuration: ```python @@ -42,6 +45,7 @@ class AppSettings(BaseSettings): ``` ### Database Settings + PostgreSQL connection configuration: ```python @@ -63,6 +67,7 @@ class PostgresSettings(BaseSettings): ``` ### Security Settings + JWT and authentication configuration: ```python @@ -81,6 +86,7 @@ class CryptSettings(BaseSettings): ``` ### Redis Settings + Separate Redis instances for different services: ```python @@ -88,16 +94,19 @@ class RedisCacheSettings(BaseSettings): REDIS_CACHE_HOST: str = "localhost" REDIS_CACHE_PORT: int = 6379 + class RedisQueueSettings(BaseSettings): REDIS_QUEUE_HOST: str = "localhost" REDIS_QUEUE_PORT: int = 6379 + class RedisRateLimiterSettings(BaseSettings): REDIS_RATE_LIMIT_HOST: str = "localhost" REDIS_RATE_LIMIT_PORT: int = 6379 ``` ### Rate Limiting Settings + Default rate limiting configuration: ```python @@ -107,6 +116,7 @@ class DefaultRateLimitSettings(BaseSettings): ``` ### Admin User Settings + First superuser account creation: ```python @@ -146,6 +156,7 @@ class CustomSettings(BaseSettings): raise ValueError("MAX_UPLOAD_SIZE cannot exceed 100MB") return v + # Add to main Settings class class Settings( AppSettings, @@ -194,12 +205,12 @@ class FeatureSettings(BaseSettings): ENABLE_CACHING: bool = True ENABLE_RATE_LIMITING: bool = True ENABLE_BACKGROUND_JOBS: bool = True - + # Optional features ENABLE_ANALYTICS: bool = False ENABLE_EMAIL_NOTIFICATIONS: bool = False ENABLE_FILE_UPLOADS: bool = False - + # Experimental features ENABLE_EXPERIMENTAL_API: bool = False ENABLE_BETA_FEATURES: bool = False @@ -258,10 +269,10 @@ class SecuritySettings(BaseSettings): raise ValueError("SSL_CERT_PATH required when HTTPS enabled") if not self.SSL_KEY_PATH: raise ValueError("SSL_KEY_PATH required when HTTPS enabled") - + if self.FORCE_SSL and not self.ENABLE_HTTPS: raise ValueError("Cannot force SSL without enabling HTTPS") - + return self ``` @@ -279,10 +290,10 @@ class EnvironmentSettings(BaseSettings): if self.ENVIRONMENT == "production": if self.DEBUG: raise ValueError("DEBUG must be False in production") - + if self.ENVIRONMENT not in ["local", "staging", "production"]: raise ValueError("ENVIRONMENT must be local, staging, or production") - + return self ``` @@ -295,10 +306,10 @@ Create computed values from other settings: ```python class StorageSettings(BaseSettings): STORAGE_TYPE: str = "local" # local, s3, gcs - + # Local storage LOCAL_STORAGE_PATH: str = "./uploads" - + # S3 settings AWS_ACCESS_KEY_ID: str = "" AWS_SECRET_ACCESS_KEY: str = "" @@ -326,7 +337,7 @@ class StorageSettings(BaseSettings): "credentials": { "access_key": self.AWS_ACCESS_KEY_ID, "secret_key": self.AWS_SECRET_ACCESS_KEY, - } + }, } return {} ``` @@ -346,20 +357,22 @@ class AuthSettings(BaseSettings): REFRESH_TOKEN_EXPIRE: int = 7200 PASSWORD_MIN_LENGTH: int = 8 -# Notification service settings + +# Notification service settings class NotificationSettings(BaseSettings): EMAIL_ENABLED: bool = False SMS_ENABLED: bool = False PUSH_ENABLED: bool = False - + # Email settings SMTP_HOST: str = "" SMTP_PORT: int = 587 - + # SMS settings (example with Twilio) TWILIO_ACCOUNT_SID: str = "" TWILIO_AUTH_TOKEN: str = "" + # Main settings class Settings( AppSettings, @@ -379,24 +392,28 @@ class BaseAppSettings(BaseSettings): APP_NAME: str = "FastAPI App" DEBUG: bool = False + class DevelopmentSettings(BaseAppSettings): DEBUG: bool = True LOG_LEVEL: str = "DEBUG" DATABASE_ECHO: bool = True + class ProductionSettings(BaseAppSettings): DEBUG: bool = False LOG_LEVEL: str = "WARNING" DATABASE_ECHO: bool = False + def get_settings() -> BaseAppSettings: environment = os.getenv("ENVIRONMENT", "local") - + if environment == "production": return ProductionSettings() else: return DevelopmentSettings() + settings = get_settings() ``` @@ -414,7 +431,7 @@ class MinimalSettings( CryptSettings, FirstUserSettings, # Removed: RedisCacheSettings - # Removed: RedisQueueSettings + # Removed: RedisQueueSettings # Removed: RedisRateLimiterSettings EnvironmentSettings, ): @@ -431,6 +448,7 @@ class ServiceSettings(BaseSettings): ENABLE_CELERY: bool = True ENABLE_MONITORING: bool = False + class ConditionalSettings( AppSettings, PostgresSettings, @@ -440,14 +458,10 @@ class ConditionalSettings( # Add Redis settings only if enabled def __init__(self, **kwargs): super().__init__(**kwargs) - + if self.ENABLE_REDIS: # Dynamically add Redis settings - self.__class__ = type( - "ConditionalSettings", - (self.__class__, RedisCacheSettings), - {} - ) + self.__class__ = type("ConditionalSettings", (self.__class__, RedisCacheSettings), {}) ``` ## Testing Settings @@ -460,18 +474,19 @@ Create separate settings for testing: class TestSettings(BaseSettings): # Override database for testing POSTGRES_DB: str = "test_database" - + # Disable external services ENABLE_REDIS: bool = False ENABLE_EMAIL: bool = False - + # Speed up tests ACCESS_TOKEN_EXPIRE_MINUTES: int = 5 - + # Test-specific settings TEST_USER_EMAIL: str = "test@example.com" TEST_USER_PASSWORD: str = "testpassword123" + # Use in tests @pytest.fixture def test_settings(): @@ -485,25 +500,22 @@ Test your custom settings: ```python def test_custom_settings_validation(): # Test valid configuration - settings = CustomSettings( - CUSTOM_API_KEY="test-key", - CUSTOM_TIMEOUT=60, - MAX_UPLOAD_SIZE=5242880 # 5MB - ) + settings = CustomSettings(CUSTOM_API_KEY="test-key", CUSTOM_TIMEOUT=60, MAX_UPLOAD_SIZE=5242880) # 5MB assert settings.CUSTOM_TIMEOUT == 60 # Test validation error with pytest.raises(ValueError, match="MAX_UPLOAD_SIZE cannot exceed 100MB"): CustomSettings(MAX_UPLOAD_SIZE=209715200) # 200MB + def test_settings_computed_fields(): settings = StorageSettings( STORAGE_TYPE="s3", AWS_ACCESS_KEY_ID="test-key", AWS_SECRET_ACCESS_KEY="test-secret", - AWS_BUCKET_NAME="test-bucket" + AWS_BUCKET_NAME="test-bucket", ) - + assert settings.STORAGE_ENABLED is True assert settings.STORAGE_CONFIG["bucket"] == "test-bucket" ``` @@ -511,27 +523,31 @@ def test_settings_computed_fields(): ## Best Practices ### Organization + - Group related settings in dedicated classes - Use descriptive names for settings groups - Keep validation logic close to the settings - Document complex validation rules ### Security + - Validate sensitive settings like secret keys - Never set default values for secrets in production - Use computed fields to derive connection strings - Separate test and production configurations ### Performance + - Use `@computed_field` for expensive calculations - Cache settings instances appropriately - Avoid complex validation in hot paths - Use model validators for cross-field validation ### Testing + - Create separate test settings classes - Test all validation rules - Mock external service settings in tests - Use dependency injection for settings in tests -The settings system provides type safety, validation, and organization for your application configuration. Start with the built-in settings and extend them as your application grows! \ No newline at end of file +The settings system provides type safety, validation, and organization for your application configuration. Start with the built-in settings and extend them as your application grows! diff --git a/src/app/core/config.py b/src/app/core/config.py index bf097ec..bce9b2b 100644 --- a/src/app/core/config.py +++ b/src/app/core/config.py @@ -10,6 +10,12 @@ config = Config(env_path) +def str_setting_to_list(setting: str) -> list[str]: + if isinstance(setting, str): + return [item.strip() for item in setting.split(",") if item.strip()] + raise ValueError("Invalid string setting for list conversion.") + + class AppSettings(BaseSettings): APP_NAME: str = config("APP_NAME", default="FastAPI app") APP_DESCRIPTION: str | None = config("APP_DESCRIPTION", default=None) @@ -67,7 +73,8 @@ class FirstUserSettings(BaseSettings): ADMIN_PASSWORD: str = config("ADMIN_PASSWORD", default="!Ch4ng3Th1sP4ssW0rd!") -class TestSettings(BaseSettings): ... +class TestSettings(BaseSettings): + ... class RedisCacheSettings(BaseSettings): @@ -127,6 +134,12 @@ class EnvironmentSettings(BaseSettings): ENVIRONMENT: EnvironmentOption = config("ENVIRONMENT", default=EnvironmentOption.LOCAL) +class CORSSettings(BaseSettings): + CORS_ORIGINS: list[str] = config("CORS_ORIGINS", cast=str_setting_to_list, default="*") + CORS_METHODS: list[str] = config("CORS_METHODS", cast=str_setting_to_list, default="*") + CORS_HEADERS: list[str] = config("CORS_HEADERS", cast=str_setting_to_list, default="*") + + class Settings( AppSettings, SQLiteSettings, @@ -141,6 +154,7 @@ class Settings( DefaultRateLimitSettings, CRUDAdminSettings, EnvironmentSettings, + CORSSettings, ): pass diff --git a/src/app/core/setup.py b/src/app/core/setup.py index 8e6bb81..b2cdcbf 100644 --- a/src/app/core/setup.py +++ b/src/app/core/setup.py @@ -8,6 +8,7 @@ from arq import create_pool from arq.connections import RedisSettings from fastapi import APIRouter, Depends, FastAPI +from fastapi.middleware.cors import CORSMiddleware from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html from fastapi.openapi.utils import get_openapi @@ -18,6 +19,7 @@ from .config import ( AppSettings, ClientSideCacheSettings, + CORSSettings, DatabaseSettings, EnvironmentOption, EnvironmentSettings, @@ -80,6 +82,7 @@ def lifespan_factory( | RedisCacheSettings | AppSettings | ClientSideCacheSettings + | CORSSettings | RedisQueueSettings | RedisRateLimiterSettings | EnvironmentSettings @@ -135,6 +138,7 @@ def create_application( | RedisCacheSettings | AppSettings | ClientSideCacheSettings + | CORSSettings | RedisQueueSettings | RedisRateLimiterSettings | EnvironmentSettings @@ -161,6 +165,7 @@ def create_application( - DatabaseSettings: Adds event handlers for initializing database tables during startup. - RedisCacheSettings: Sets up event handlers for creating and closing a Redis cache pool. - ClientSideCacheSettings: Integrates middleware for client-side caching. + - CORSSettings: Integrates CORS middleware with specified origins. - RedisQueueSettings: Sets up event handlers for creating and closing a Redis queue pool. - RedisRateLimiterSettings: Sets up event handlers for creating and closing a Redis rate limiter pool. - EnvironmentSettings: Conditionally sets documentation URLs and integrates custom routes for API documentation @@ -206,6 +211,15 @@ def create_application( if isinstance(settings, ClientSideCacheSettings): application.add_middleware(ClientCacheMiddleware, max_age=settings.CLIENT_CACHE_MAX_AGE) + if isinstance(settings, CORSSettings): + application.add_middleware( + CORSMiddleware, + allow_origins=settings.CORS_ORIGINS, + allow_credentials=True, + allow_methods=settings.CORS_METHODS, + allow_headers=settings.CORS_HEADERS, + ) + if isinstance(settings, EnvironmentSettings): if settings.ENVIRONMENT != EnvironmentOption.PRODUCTION: docs_router = APIRouter()