A enterprise-grade high-availability RESTful API service built with Flask microservices architecture, featuring advanced load balancing, JWT authentication with NGINX auth_request module, asynchronous task processing with Celery, PostgreSQL with connection pooling, Redis clustering, and comprehensive real-time monitoring dashboard.
This system implements a production-ready microservices architecture with horizontal scalability, fault tolerance, and zero-downtime deployment capabilities.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β External Traffic β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββΌβββββββββ
β NGINX (Port 80) β ββββ Load Balancer + Auth Gateway
β - Load Balancing β
β - JWT Auth Proxy β
β - Rate Limiting β
β - SSL Terminationβ
βββββββββ¬βββββββββ
β
βββββββββββββΌββββββββββββ
β β β
βββββββΌβββ βββββββΌβββ βββββββΌβββ
β API-1 β β API-2 β β API-3 β ββββ Microservice Instances
β :8000 β β :8001 β β :8002 β
ββββββββββ ββββββββββ ββββββββββ
β β β
βββββββββββββΌββββββββββββ
β
βββββββββββββββββββΌββββββββββββββββββ
β β β
βββββΌβββββ βββββββββΌβββββββββ βββββΌβββββββββ
β Redis β β PostgreSQL β β Celery β
β Cache/ β β Primary DB β β Workers β
β Broker β β Connection β β + Beat β
β :6379 β β Pool :5432 β β Scheduler β
ββββββββββ ββββββββββββββββββ ββββββββββββββ
β
βββββββββΌβββββββββ
β Streamlit β ββββ Monitoring Dashboard
β Dashboard β
β :8501 β
ββββββββββββββββββ
- Smart Load Balancing: Least-connection algorithm with health checking
- JWT Authentication Proxy: Validates tokens before forwarding requests
- Rate Limiting: Protects against DDoS and abuse
- SSL/TLS Termination: Handles encryption/decryption
- Request Routing: Path-based routing with pattern matching
- Health Monitoring: Automatic upstream health detection
- 3 Identical API Instances: Horizontal scaling with state isolation
- Gunicorn WSGI Server: Production-grade Python web server
- Flask-JWT-Extended: Advanced JWT with refresh tokens and blacklisting
- Request Middleware: Comprehensive logging and monitoring
- Database ORM: SQLAlchemy with connection pooling
- Error Handling: Graceful degradation and circuit breaker patterns
- PostgreSQL: ACID-compliant primary database with optimized queries
- Redis: Multi-purpose (caching, session storage, message broker)
- Connection Pooling: Efficient database connection management
- Data Persistence: Automated backups and recovery mechanisms
- Celery Workers: Distributed task execution with worker pools
- Celery Beat: Cron-like periodic task scheduling
- Task Queue Management: Priority-based task distribution
- Result Backend: Redis-based result storage and retrieval
- Streamlit Dashboard: Real-time system monitoring
- Health Check System: Multi-layer health verification
- Node Heartbeat: Service discovery and health tracking
- Comprehensive Logging: Structured logging with correlation IDs
1. Client Request β NGINX (Port 80)
2. NGINX β Rate Limiting Check
3. NGINX β JWT Token Validation (auth_request)
4. NGINX β Load Balancing (least_conn)
5. NGINX β Forward to API Instance
6. API β Database/Redis Operations
7. API β Response Generation
8. NGINX β Response to Client
1. Login Request β NGINX β API Instance
2. API β Validate Credentials (PostgreSQL)
3. API β Generate JWT Tokens (Access + Refresh)
4. API β Store Session (Redis)
5. Subsequent Requests β NGINX auth_request validation
6. NGINX β Internal /auth/validate endpoint
7. API β JWT Verification & User Validation
8. Success β Forward to protected endpoint
- CPU: 4 cores (8 recommended for production)
- RAM: 8GB (16GB recommended)
- Storage: 50GB SSD (with automatic scaling)
- Network: 1Gbps connection
- Docker: v20.10+ with Docker Compose v2.0+
- Git: Latest version
- Operating System: Linux (Ubuntu 20.04+ recommended), macOS, Windows with WSL2
# Clone with full git history for versioning
git clone --depth=1 https://github.com/your-repo/ha-api-celery.git
cd ha-api-celery
# Create production environment file
cp .env.example .env.production
# Configure production secrets (CRITICAL FOR SECURITY)
nano .env.productionProduction Environment Configuration:
# === SECURITY CONFIGURATION ===
SECRET_KEY=your-super-secure-secret-key-minimum-32-chars
JWT_SECRET_KEY=your-jwt-secret-key-different-from-secret-key
JWT_ACCESS_TOKEN_EXPIRES=3600 # 1 hour
JWT_REFRESH_TOKEN_EXPIRES=2592000 # 30 days
# === DATABASE CONFIGURATION ===
DATABASE_URL=postgresql://prod_user:secure_password@postgres:5432/production_db
POSTGRES_DB=production_db
POSTGRES_USER=prod_user
POSTGRES_PASSWORD=your-secure-db-password
# === REDIS CONFIGURATION ===
REDIS_URL=redis://redis:6379/0
REDIS_PASSWORD=your-redis-password
# === API CONFIGURATION ===
FLASK_ENV=production
API_HOST=0.0.0.0
API_WORKERS=4
API_WORKER_CONNECTIONS=1000
# === NGINX CONFIGURATION ===
NGINX_WORKER_PROCESSES=auto
NGINX_WORKER_CONNECTIONS=1024
# === MONITORING ===
LOG_LEVEL=INFO
SENTRY_DSN=your-sentry-dsn-for-error-tracking# Generate SSL certificates (Let's Encrypt recommended)
sudo apt install certbot
sudo certbot certonly --standalone -d yourdomain.com
# Copy certificates to nginx directory
mkdir -p nginx/ssl
sudo cp /etc/letsencrypt/live/yourdomain.com/fullchain.pem nginx/ssl/
sudo cp /etc/letsencrypt/live/yourdomain.com/privkey.pem nginx/ssl/# Create external Docker network for advanced networking
docker network create ha_api_production_network
# Create named volumes for persistent data
docker volume create postgres_production_data
docker volume create redis_production_data
docker volume create nginx_production_logs# Generate secure database initialization script
cat > init-scripts/01-create-production-user.sql << EOF
-- Create production database and user
CREATE DATABASE production_db;
CREATE USER prod_user WITH ENCRYPTED PASSWORD 'your-secure-db-password';
GRANT ALL PRIVILEGES ON DATABASE production_db TO prod_user;
-- Security hardening
ALTER USER prod_user SET default_transaction_isolation TO 'read committed';
ALTER USER prod_user SET timezone TO 'UTC';
EOF# Make deployment script executable
chmod +x deploy.sh docker-deploy.sh
# Run production deployment with health checks
./docker-deploy.sh production
# Alternative: Manual deployment with monitoring
docker-compose -f docker-compose.yml --env-file .env.production up -d
# Verify all services are healthy (wait for initialization)
./deploy.sh health-check
# Run database migrations and create initial data
docker-compose exec api1 python init_db.py --environment=production#!/bin/bash
# health-check.sh - Comprehensive service verification
echo "π Running comprehensive health checks..."
# 1. Container health verification
echo "Checking container health..."
docker-compose ps
# 2. Database connectivity test
echo "Testing database connectivity..."
docker-compose exec postgres pg_isready -U prod_user -d production_db
# 3. Redis connectivity test
echo "Testing Redis connectivity..."
docker-compose exec redis redis-cli ping
# 4. API endpoint verification
echo "Testing API endpoints..."
curl -f http://localhost/health/ping || echo "β Health ping failed"
curl -f http://localhost/api/status || echo "β API status failed"
# 5. Load balancer verification
echo "Testing load balancer distribution..."
for i in {1..10}; do
curl -s http://localhost/health/ping | grep -o '"node_id":"[^"]*"'
done
# 6. Authentication system verification
echo "Testing authentication system..."
TOKEN=$(curl -s -X POST http://localhost/auth/login \
-H "Content-Type: application/json" \
-d '{"username":"admin","password":"admin123"}' | \
jq -r '.access_token')
if [[ "$TOKEN" != "null" && "$TOKEN" != "" ]]; then
echo "β
Authentication system operational"
# Test protected endpoint
curl -f -H "Authorization: Bearer $TOKEN" http://localhost/api/protected || echo "β Protected endpoint failed"
else
echo "β Authentication system failed"
fi
# 7. Task processing verification
echo "Testing task processing system..."
TASK_RESPONSE=$(curl -s -X POST http://localhost/tasks/submit \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"task_type":"sample","data":{"test":"health_check"}}')
TASK_ID=$(echo $TASK_RESPONSE | jq -r '.task_id')
if [[ "$TASK_ID" != "null" && "$TASK_ID" != "" ]]; then
echo "β
Task processing system operational"
else
echo "β Task processing system failed"
fi
# 8. Monitoring dashboard verification
echo "Testing monitoring dashboard..."
curl -f http://localhost:8501 || echo "β Dashboard not accessible"
echo "π Health check completed"After successful deployment, the system provides multiple access points:
| Service | URL | Purpose | Authentication |
|---|---|---|---|
| Load Balanced API | http://localhost |
Main API gateway | JWT Required for protected endpoints |
| Monitoring Dashboard | http://localhost:8501 |
Real-time system monitoring | Admin login required |
| Database Direct | localhost:5678 |
Direct PostgreSQL access | Database credentials |
| API Instance 1 | Internal only |
Direct API access (debugging) | Not exposed externally |
| API Instance 2 | Internal only |
Direct API access (debugging) | Not exposed externally |
| API Instance 3 | Internal only |
Direct API access (debugging) | Not exposed externally |
| Redis Cache | Internal only |
Cache and message broker | Internal network only |
The system implements a enterprise-grade PostgreSQL setup with advanced features for reliability, performance, and scalability.
-- Database: production_db
-- Connection Pool: 20 connections per API instance
-- Total Pool Size: 60 connections (3 instances Γ 20)
-- Backup Strategy: Automated daily backups with point-in-time recovery
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Application Layer β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β API Instance 1 β API Instance 2 β API Instance 3 β
β (20 conns) β (20 conns) β (20 conns) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Connection Pool Manager β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β PostgreSQL Primary β
β (Max 100 connections) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββPrimary Database Container:
postgres:
image: postgres:15-alpine
environment:
POSTGRES_DB: production_db
POSTGRES_USER: prod_user
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_INITDB_ARGS: --encoding=UTF-8 --locale=en_US.UTF-8
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init-scripts:/docker-entrypoint-initdb.d
command: |
postgres
-c max_connections=100
-c shared_buffers=256MB
-c effective_cache_size=1GB
-c work_mem=4MB
-c maintenance_work_mem=64MB
-c checkpoint_completion_target=0.9
-c wal_buffers=16MB
-c default_statistics_target=100# config.py - SQLAlchemy Configuration
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 20,
'pool_timeout': 30,
'pool_recycle': 3600,
'pool_pre_ping': True,
'max_overflow': 10,
'echo': False # Set to True for query debugging
}Automated Migration Process:
# init_db.py - Advanced database initialization
#!/usr/bin/env python3
import sys
import logging
from flask_migrate import init, migrate, upgrade
from app import create_app, db
from app.models import User, NodeHeartbeat, TaskHistory
def initialize_database():
"""
Comprehensive database initialization with error handling
and rollback mechanisms.
"""
app = create_app('production')
with app.app_context():
try:
# Create all tables
db.create_all()
# Run Alembic migrations
upgrade()
# Create initial data
create_admin_user()
create_system_configurations()
print("β
Database initialization completed successfully")
except Exception as e:
db.session.rollback()
print(f"β Database initialization failed: {e}")
sys.exit(1)
def create_admin_user():
"""Create default admin user if not exists."""
admin = User.query.filter_by(username='admin').first()
if not admin:
admin = User(
username='admin',
email='admin@company.com',
is_active=True
)
admin.set_password('admin123') # Change in production
db.session.add(admin)
db.session.commit()# Connect to database container for maintenance
docker-compose exec postgres psql -U prod_user -d production_db
# Database backup (production)
docker-compose exec postgres pg_dump -U prod_user production_db > backup_$(date +%Y%m%d).sql
# Database restore (production)
docker-compose exec -T postgres psql -U prod_user production_db < backup_file.sql
# Check database size and statistics
docker-compose exec postgres psql -U prod_user -d production_db -c "
SELECT
schemaname,
tablename,
attname,
n_distinct,
correlation
FROM pg_stats
WHERE schemaname = 'public';"
# Monitor active connections
docker-compose exec postgres psql -U prod_user -d production_db -c "
SELECT
pid,
usename,
application_name,
client_addr,
state,
query_start,
query
FROM pg_stat_activity
WHERE state = 'active';"User Management Model:
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(128), nullable=False)
is_active = db.Column(db.Boolean, default=True, nullable=False)
# Audit fields
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow,
onupdate=datetime.utcnow, nullable=False)
last_login = db.Column(db.DateTime, nullable=True)
login_count = db.Column(db.Integer, default=0)
# Relationships
task_submissions = db.relationship('TaskHistory', backref='submitted_by', lazy=True)Node Monitoring Model:
class NodeHeartbeat(db.Model):
__tablename__ = 'node_heartbeats'
id = db.Column(db.Integer, primary_key=True)
node_id = db.Column(db.String(100), nullable=False, index=True)
hostname = db.Column(db.String(255), nullable=False)
ip_address = db.Column(db.String(45), nullable=True)
port = db.Column(db.Integer, nullable=True)
# Health metrics
status = db.Column(db.String(20), default='active', nullable=False)
cpu_usage = db.Column(db.Float, nullable=True)
memory_usage = db.Column(db.Float, nullable=True)
disk_usage = db.Column(db.Float, nullable=True)
load_average = db.Column(db.Float, nullable=True)
# Timing
last_heartbeat = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
response_time = db.Column(db.Float, nullable=True) # in millisecondsTask Management Model:
class TaskHistory(db.Model):
__tablename__ = 'task_history'
id = db.Column(db.Integer, primary_key=True)
task_id = db.Column(db.String(255), nullable=False, index=True, unique=True)
task_name = db.Column(db.String(255), nullable=False)
task_type = db.Column(db.String(100), nullable=False, index=True)
# Execution details
status = db.Column(db.String(20), nullable=False, index=True)
priority = db.Column(db.Integer, default=5, nullable=False)
retry_count = db.Column(db.Integer, default=0, nullable=False)
max_retries = db.Column(db.Integer, default=3, nullable=False)
# Data and results
input_data = db.Column(db.JSON, nullable=True)
result = db.Column(db.JSON, nullable=True)
error_message = db.Column(db.Text, nullable=True)
# Timing
queued_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
started_at = db.Column(db.DateTime, nullable=True)
completed_at = db.Column(db.DateTime, nullable=True)
execution_time = db.Column(db.Float, nullable=True) # in seconds
# Worker information
worker_id = db.Column(db.String(255), nullable=True)
worker_hostname = db.Column(db.String(255), nullable=True)The system implements military-grade security with multi-layer authentication, authorization, and comprehensive security measures.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Security Layers β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 1. NGINX Rate Limiting & DDoS Protection β
β 2. SSL/TLS Encryption (Production) β
β 3. JWT Token Validation with NGINX auth_request β
β 4. Flask-JWT-Extended with Advanced Features β
β 5. Database-level User Validation β
β 6. Session Management with Redis β
β 7. CORS & Security Headers β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Advanced JWT configuration in config.py
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY')
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
JWT_BLACKLIST_ENABLED = True
JWT_BLACKLIST_TOKEN_CHECKS = ['access', 'refresh']
JWT_ALGORITHM = 'HS256'
# Additional security features
JWT_ERROR_MESSAGE_KEY = 'error'
JWT_JSON_KEY = 'access_token'
JWT_REFRESH_JSON_KEY = 'refresh_token'Authentication Endpoints:
| Endpoint | Method | Purpose | Rate Limit | Authentication |
|---|---|---|---|---|
/auth/register |
POST | User registration | 5 req/min | None |
/auth/login |
POST | User authentication | 5 req/min | Credentials |
/auth/refresh |
POST | Token refresh | 10 req/min | Refresh token |
/auth/validate |
GET | Token validation (NGINX) | 100 req/min | Access token |
/auth/profile |
GET | User profile | 50 req/min | Access token |
/auth/logout |
POST | User logout | 20 req/min | Access token |
/api/users |
GET | List all users (admin) | users:read |
50 req/min |
/api/users/<id> |
GET | Get specific user details | users:read |
100 req/min |
Advanced User Operations:
# Get current user profile with detailed information
curl -X GET http://localhost/auth/profile \
-H "Authorization: Bearer $TOKEN" | jq
# Response with comprehensive user data
{
"status": "success",
"data": {
"user": {
"id": 1,
"username": "admin",
"email": "admin@company.com",
"full_name": "System Administrator",
"role": "admin",
"department": "IT Operations",
"permissions": ["*"],
"account_status": "active",
"email_verified": true,
"two_factor_enabled": false,
"created_at": "2024-01-01T00:00:00Z",
"last_login": "2025-01-01T11:00:00Z",
"login_count": 156,
"password_changed_at": "2024-12-01T10:30:00Z"
},
"preferences": {
"timezone": "UTC",
"date_format": "YYYY-MM-DD",
"language": "en",
"notifications": {
"email": true,
"push": false,
"task_completion": true
}
},
"statistics": {
"tasks_submitted": 89,
"tasks_completed": 82,
"tasks_failed": 7,
"total_api_calls": 1247,
"last_activity": "2025-01-01T11:55:00Z"
}
}
}
# Update user profile with validation
curl -X PUT http://localhost/auth/profile \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"full_name": "John Smith Administrator",
"email": "john.admin@company.com",
"department": "DevOps Engineering",
"preferences": {
"timezone": "America/New_York",
"notifications": {
"email": true,
"task_completion": true
}
}
}'
# Logout from all devices/sessions
curl -X POST http://localhost/auth/logout-all \
-H "Authorization: Bearer $TOKEN"# Get comprehensive API metrics (admin only)
curl -X GET http://localhost/api/metrics \
-H "Authorization: Bearer $ADMIN_TOKEN" | jq
# Response with detailed analytics
{
"status": "success",
"data": {
"api_metrics": {
"total_requests": 15478,
"requests_per_minute": 125,
"avg_response_time": 45.7,
"error_rate": 0.03,
"active_users": 8,
"concurrent_connections": 23
},
"endpoint_statistics": [
{
"endpoint": "/api/protected",
"total_calls": 3456,
"avg_response_time": 23.4,
"success_rate": 0.99
},
{
"endpoint": "/tasks/submit",
"total_calls": 892,
"avg_response_time": 156.7,
"success_rate": 0.95
}
],
"user_activity": {
"active_sessions": 8,
"unique_users_today": 12,
"peak_concurrent_users": 15,
"avg_session_duration": 1847
},
"system_health": {
"cpu_usage": 15.2,
"memory_usage": 45.7,
"disk_usage": 23.1,
"database_connections": 15,
"redis_memory": "45.2MB"
}
},
"metadata": {
"collection_time": "2025-01-01T12:00:00Z",
"next_update": "2025-01-01T12:05:00Z"
}
}
# Data processing endpoint with file upload support
curl -X POST http://localhost/api/data-processing \
-H "Authorization: Bearer $TOKEN" \
-F "file=@large_dataset.csv" \
-F "processing_type=machine_learning" \
-F "parameters={\"algorithm\":\"random_forest\",\"max_depth\":10}"
# Bulk data import with validation
curl -X POST http://localhost/api/data/bulk-import \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"source": "external_api",
"data_type": "user_events",
"batch_size": 1000,
"validation_rules": ["email_format", "required_fields"],
"error_handling": "skip_invalid",
"callback_url": "https://myapp.com/webhook/import_complete"
}'The system implements a sophisticated task processing architecture with Celery, supporting complex workflows, task chains, and real-time monitoring.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Task Processing Flow β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 1. Client β API Endpoint (Task Submission) β
β 2. API β Input Validation & Authorization β
β 3. API β Task Creation & Queue Assignment β
β 4. Redis β Task Broker & Message Queue β
β 5. Celery Worker β Task Execution & Processing β
β 6. PostgreSQL β Result Storage & History β
β 7. Redis β Real-time Status Updates β
β 8. Webhook/Callback β Completion Notification β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
| Endpoint | Method | Description | Priority Support | Async Processing |
|---|---|---|---|---|
/tasks/submit |
POST | Submit single task | Yes (1-10) | Yes |
/tasks/batch |
POST | Submit multiple tasks | Yes | Yes |
/tasks/chain |
POST | Submit task chain/workflow | Yes | Yes |
/tasks/schedule |
POST | Schedule future task | Yes | Yes |
Advanced Task Submission Examples:
# Submit high-priority machine learning task
curl -X POST http://localhost/tasks/submit \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"task_type": "machine_learning_training",
"priority": 1,
"data": {
"model_type": "neural_network",
"dataset": "/data/training_set_v2.csv",
"hyperparameters": {
"learning_rate": 0.001,
"batch_size": 64,
"epochs": 100,
"dropout_rate": 0.2
},
"validation_split": 0.2,
"early_stopping": true
},
"resources": {
"cpu_cores": 4,
"memory_gb": 8,
"gpu_required": true
},
"callbacks": {
"progress_url": "https://myapp.com/webhook/training_progress",
"completion_url": "https://myapp.com/webhook/training_complete",
"error_url": "https://myapp.com/webhook/training_error"
},
"retry_config": {
"max_retries": 3,
"retry_delay": 300,
"exponential_backoff": true
}
}'
# Response with comprehensive task information
{
"status": "success",
"data": {
"task_id": "ml_train_abc123def456",
"task_type": "machine_learning_training",
"status": "queued",
"priority": 1,
"estimated_duration": 3600,
"queue_position": 2,
"worker_assignment": "pending",
"created_at": "2025-01-01T12:00:00Z",
"scheduled_for": "2025-01-01T12:00:30Z"
},
"links": {
"status": "/tasks/status/ml_train_abc123def456",
"cancel": "/tasks/cancel/ml_train_abc123def456",
"logs": "/tasks/logs/ml_train_abc123def456"
}
}
# Submit batch processing tasks
curl -X POST http://localhost/tasks/batch \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"batch_name": "daily_report_generation",
"tasks": [
{
"task_type": "data_extraction",
"data": {"source": "database", "query": "SELECT * FROM sales WHERE date >= ?", "params": ["2025-01-01"]}
},
{
"task_type": "data_transformation",
"data": {"input_format": "csv", "output_format": "json", "transformations": ["aggregate", "normalize"]}
},
{
"task_type": "report_generation",
"data": {"template": "daily_sales", "format": "pdf", "recipients": ["manager@company.com"]}
}
],
"execution_mode": "sequential",
"failure_strategy": "stop_on_error"
}'
# Schedule recurring task with cron expression
curl -X POST http://localhost/tasks/schedule \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"task_type": "database_cleanup",
"schedule": {
"type": "cron",
"expression": "0 2 * * *",
"timezone": "UTC"
},
"data": {
"cleanup_older_than_days": 30,
"tables": ["task_history", "user_sessions", "audit_logs"],
"backup_before_cleanup": true
},
"active": true,
"max_iterations": null
}'# Get detailed task status with progress information
curl -X GET http://localhost/tasks/status/ml_train_abc123def456 \
-H "Authorization: Bearer $TOKEN" | jq
# Comprehensive task status response
{
"status": "success",
"data": {
"task_id": "ml_train_abc123def456",
"task_type": "machine_learning_training",
"status": "running",
"progress": {
"percentage": 45.7,
"current_epoch": 46,
"total_epochs": 100,
"current_loss": 0.234,
"validation_accuracy": 0.89,
"estimated_remaining": 1980
},
"execution": {
"worker_id": "celery_worker_001",
"worker_hostname": "worker-node-1",
"started_at": "2025-01-01T12:01:15Z",
"running_time": 2745,
"memory_usage": "2.4GB",
"cpu_usage": 85.2
},
"history": [
{
"timestamp": "2025-01-01T12:00:00Z",
"status": "queued",
"message": "Task submitted to queue"
},
{
"timestamp": "2025-01-01T12:01:15Z",
"status": "running",
"message": "Task started by worker celery_worker_001"
},
{
"timestamp": "2025-01-01T12:15:30Z",
"status": "running",
"message": "Training progress: Epoch 25/100, Loss: 0.456"
}
],
"resources": {
"allocated_cpu": 4,
"allocated_memory": "8GB",
"gpu_allocated": true,
"gpu_utilization": 92.1
}
}
}
# Get task execution logs in real-time
curl -X GET http://localhost/tasks/logs/ml_train_abc123def456 \
-H "Authorization: Bearer $TOKEN" \
-H "Accept: text/plain"
# Cancel running task
curl -X POST http://localhost/tasks/cancel/ml_train_abc123def456 \
-H "Authorization: Bearer $TOKEN" \
-d '{"reason": "User requested cancellation", "force": false}'
# Get comprehensive task statistics
curl -X GET http://localhost/tasks/stats \
-H "Authorization: Bearer $TOKEN" | jq
{
"status": "success",
"data": {
"overview": {
"total_tasks": 15678,
"completed_tasks": 14892,
"failed_tasks": 123,
"running_tasks": 8,
"queued_tasks": 655,
"success_rate": 0.949
},
"performance": {
"avg_execution_time": 245.7,
"median_execution_time": 89.2,
"tasks_per_hour": 156,
"peak_concurrent_tasks": 25
},
"by_type": [
{
"task_type": "data_processing",
"count": 5678,
"avg_duration": 123.4,
"success_rate": 0.95
},
{
"task_type": "machine_learning_training",
"count": 89,
"avg_duration": 3456.7,
"success_rate": 0.92
}
],
"workers": {
"active_workers": 4,
"total_capacity": 16,
"current_load": 0.62,
"worker_health": [
{
"worker_id": "celery_worker_001",
"status": "active",
"current_tasks": 2,
"total_processed": 1247
}
]
}
}
}The system provides enterprise-grade monitoring with comprehensive metrics, alerting, and observability features.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Monitoring Stack β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Application Metrics β Prometheus β Grafana β
β System Metrics β Node Exporter β AlertManager β
β Log Aggregation β ELK Stack β Kibana β
β Custom Dashboards β Streamlit β Real-time Updates β
β Health Checks β Service Discovery β Auto-scaling β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Streamlit Dashboard Capabilities:
# Access monitoring dashboard
open http://localhost:8501
# Dashboard login (same credentials as API)
Username: admin
Password: admin123Dashboard Sections:
-
System Overview
- Real-time CPU, Memory, Disk usage
- Network I/O and bandwidth utilization
- Database connection pool status
- Redis memory usage and hit rates
-
API Performance Metrics
- Request rate and response times
- Error rates by endpoint
- Geographic request distribution
- Load balancer performance
-
Task Processing Analytics
- Task queue depths and processing rates
- Worker pool utilization
- Task success/failure rates
- Resource usage by task type
-
Security Monitoring
- Authentication success/failure rates
- Rate limiting activations
- Suspicious activity detection
- Token usage patterns
Advanced Health Check System:
# Comprehensive health endpoint
curl -X GET http://localhost/health/status | jq
{
"status": "healthy",
"timestamp": "2025-01-01T12:00:00Z",
"checks": {
"database": {
"status": "up",
"response_time": 2.3,
"connection_pool": {
"active": 15,
"idle": 5,
"total": 20
},
"slow_queries": 0
},
"redis": {
"status": "up",
"response_time": 0.8,
"memory_usage": "45.2MB",
"connected_clients": 8,
"hit_rate": 0.94
},
"celery": {
"status": "up",
"active_workers": 4,
"pending_tasks": 12,
"failed_tasks_last_hour": 1,
"average_task_time": 67.3
},
"external_dependencies": {
"email_service": {"status": "up", "response_time": 156.7},
"notification_service": {"status": "up", "response_time": 89.2}
}
},
"system": {
"uptime": 86400,
"cpu_usage": 15.2,
"memory_usage": 45.7,
"disk_usage": 23.1,
"load_average": [0.85, 0.92, 1.15],
"open_file_descriptors": 156
},
"alerts": []
}
# Node heartbeat system
curl -X GET http://localhost/health/nodes | jq
{
"status": "success",
"data": {
"total_nodes": 3,
"active_nodes": 3,
"inactive_nodes": 0,
"nodes": [
{
"node_id": "api-node-1",
"hostname": "ha_api_service_1",
"ip_address": "172.20.0.5",
"port": 8000,
"status": "active",
"last_heartbeat": "2025-01-01T11:59:45Z",
"response_time": 23.4,
"health_score": 98.5,
"metrics": {
"cpu_usage": 14.2,
"memory_usage": 42.1,
"requests_per_minute": 45,
"error_rate": 0.01
}
}
]
}
}# Optimized database queries with indexing
class User(db.Model):
__tablename__ = 'users'
# Optimized indexes for common queries
__table_args__ = (
db.Index('idx_username_active', 'username', 'is_active'),
db.Index('idx_email_domain', 'email'),
db.Index('idx_created_at', 'created_at'),
db.Index('idx_last_login', 'last_login'),
)
# Database connection pool optimization
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 20, # Base connection pool size
'pool_timeout': 30, # Timeout for getting connection
'pool_recycle': 3600, # Recycle connections every hour
'pool_pre_ping': True, # Validate connections before use
'max_overflow': 10, # Additional connections beyond pool_size
'pool_reset_on_return': 'commit' # Reset connection state
}# Advanced Redis caching implementation
class CacheManager:
def __init__(self, redis_client):
self.redis = redis_client
self.default_ttl = 3600 # 1 hour
def cache_with_tags(self, key, value, ttl=None, tags=None):
"""Cache with tag-based invalidation"""
ttl = ttl or self.default_ttl
# Store main cache entry
self.redis.setex(key, ttl, json.dumps(value))
# Store tags for batch invalidation
if tags:
for tag in tags:
self.redis.sadd(f"tag:{tag}", key)
self.redis.expire(f"tag:{tag}", ttl)
def invalidate_by_tag(self, tag):
"""Invalidate all cache entries with specific tag"""
keys = self.redis.smembers(f"tag:{tag}")
if keys:
self.redis.delete(*keys)
self.redis.delete(f"tag:{tag}")
# Usage example
cache_manager = CacheManager(redis_client)
# Cache user data with tags
cache_manager.cache_with_tags(
f"user:{user_id}",
user_data,
ttl=3600,
tags=['users', f'user_{user_id}', 'profiles']
)# Response compression and optimization
from flask import request, jsonify, make_response
import gzip
@app.after_request
def compress_response(response):
"""Compress API responses for better performance"""
accept_encoding = request.headers.get('Accept-Encoding', '')
if ('gzip' in accept_encoding.lower() and
response.status_code < 300 and
len(response.data) > 1000):
gzipped_data = gzip.compress(response.data)
response.data = gzipped_data
response.headers['Content-Encoding'] = 'gzip'
response.headers['Content-Length'] = len(gzipped_data)
return response
# Database query optimization with eager loading
def get_user_with_tasks(user_id):
"""Optimized query with eager loading"""
return User.query.options(
db.joinedload(User.task_submissions),
db.joinedload(User.node_heartbeats)
).filter_by(id=user_id).first()# High-performance NGINX configuration
worker_processes auto;
worker_cpu_affinity auto;
worker_rlimit_nofile 65535;
events {
worker_connections 4096;
use epoll;
multi_accept on;
}
http {
# Performance optimizations
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 30;
keepalive_requests 1000;
# Buffer optimizations
client_body_buffer_size 128k;
client_max_body_size 50m;
client_header_buffer_size 1k;
large_client_header_buffers 4 4k;
output_buffers 1 32k;
postpone_output 1460;
# Compression
gzip on;
gzip_vary on;
gzip_min_length 1000;
gzip_comp_level 6;
gzip_types
text/plain
text/css
application/json
application/javascript
text/xml
application/xml
application/xml+rss
text/javascript;
# Connection pooling to upstream
upstream api_backend {
least_conn;
server api1:8000 weight=1 max_fails=3 fail_timeout=30s;
server api2:8001 weight=1 max_fails=3 fail_timeout=30s;
server api3:8002 weight=1 max_fails=3 fail_timeout=30s;
keepalive 32;
}
}-- PostgreSQL optimization settings
-- postgresql.conf optimizations
# Memory settings
shared_buffers = 256MB # 25% of system RAM
effective_cache_size = 1GB # 75% of system RAM
work_mem = 4MB # Per-operation memory
maintenance_work_mem = 64MB # Maintenance operations
# Checkpoint settings
checkpoint_completion_target = 0.9
wal_buffers = 16MB
wal_compression = on
# Query planner
default_statistics_target = 100
random_page_cost = 1.1 # For SSD storage
# Connection settings
max_connections = 100
shared_preload_libraries = 'pg_stat_statements'
# Monitoring queries
SELECT
query,
calls,
total_time,
mean_time,
rows
FROM pg_stat_statements
ORDER BY total_time DESC
LIMIT 10;# docker-compose.production.yml
version: "3.8"
services:
# API services with resource limits
api1: &api_service
build: .
deploy:
resources:
limits:
cpus: "1.0"
memory: 1G
reservations:
cpus: "0.5"
memory: 512M
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 3
environment:
- FLASK_ENV=production
- API_WORKERS=4
- GUNICORN_TIMEOUT=120
- GUNICORN_KEEPALIVE=5
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health/ping"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
logging:
driver: "json-file"
options:
max-size: "100m"
max-file: "3"
security_opt:
- no-new-privileges:true
api2:
<<: *api_service
environment:
- FLASK_ENV=production
- API_PORT=8001
- API_WORKERS=4
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8001/health/ping"]
api3:
<<: *api_service
environment:
- FLASK_ENV=production
- API_PORT=8002
- API_WORKERS=4
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8002/health/ping"]
# Production PostgreSQL with backup
postgres:
image: postgres:15-alpine
deploy:
resources:
limits:
cpus: "2.0"
memory: 2G
environment:
- POSTGRES_DB=${POSTGRES_DB}
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
- ./backups:/backups
- ./init-scripts:/docker-entrypoint-initdb.d
command: |
postgres
-c max_connections=100
-c shared_buffers=256MB
-c effective_cache_size=1GB
-c work_mem=4MB
-c maintenance_work_mem=64MB
-c checkpoint_completion_target=0.9
-c wal_buffers=16MB
-c default_statistics_target=100
-c log_statement=all
-c log_duration=on
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"]
interval: 10s
timeout: 5s
retries: 10
start_period: 30s# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ha-api-deployment
labels:
app: ha-api
spec:
replicas: 3
selector:
matchLabels:
app: ha-api
template:
metadata:
labels:
app: ha-api
spec:
containers:
- name: ha-api
image: ha-api:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: ha-api-secrets
key: database-url
- name: REDIS_URL
valueFrom:
configMapKeyRef:
name: ha-api-config
key: redis-url
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health/ping
port: 8000
initialDelaySeconds: 30
periodSeconds: 30
readinessProbe:
httpGet:
path: /health/status
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: ha-api-service
spec:
selector:
app: ha-api
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer# .github/workflows/deploy.yml
name: Deploy HA API
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: test_db
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis:
image: redis:7
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run tests
env:
DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db
REDIS_URL: redis://localhost:6379/0
run: |
python -m pytest tests/ -v --cov=app --cov-report=xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run security scan
uses: pypa/gh-action-pip-audit@v1.0.8
with:
inputs: requirements.txt
build-and-deploy:
needs: [test, security-scan]
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Build Docker image
run: |
docker build -t ha-api:${{ github.sha }} .
docker tag ha-api:${{ github.sha }} ha-api:latest
- name: Deploy to staging
if: github.event_name == 'pull_request'
run: |
echo "Deploying to staging environment"
# Add staging deployment commands
- name: Deploy to production
if: github.ref == 'refs/heads/main'
run: |
echo "Deploying to production environment"
# Add production deployment commandsThe system implements multi-layer testing with unit tests, integration tests, load testing, and security testing.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Testing Pyramid β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β E2E Tests β Selenium/Playwright β User Workflows β
β Integration Tests β API Testing β Service Interactions β
β Unit Tests β pytest β Individual Components β
β Load Tests β Locust β Performance & Scalability β
β Security Tests β OWASP ZAP β Vulnerability Assessment β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Run comprehensive test suite
python -m pytest tests/ -v --cov=app --cov-report=html --cov-report=term
# Run specific test categories
python -m pytest tests/unit/ -v --tb=short
python -m pytest tests/integration/ -v --tb=short
python -m pytest tests/api/ -v --tb=short
# Run tests with different environments
pytest tests/ --env=staging --tb=short
pytest tests/ --env=production --tb=short --no-cov
# Load testing with Locust
locust -f tests/load/locustfile.py --host=http://localhost# tests/api/test_advanced_api.py
import pytest
import requests
import json
from concurrent.futures import ThreadPoolExecutor
import time
@pytest.fixture
def api_client():
"""Advanced API client with authentication"""
return APITestClient(base_url="http://localhost")
class APITestClient:
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
self.access_token = None
def authenticate(self, username="admin", password="admin123"):
"""Authenticate and store token"""
response = self.session.post(f"{self.base_url}/auth/login", json={
"username": username,
"password": password
})
assert response.status_code == 200
self.access_token = response.json()["data"]["access_token"]
self.session.headers.update({
"Authorization": f"Bearer {self.access_token}"
})
return self.access_token
def test_load_balancer_distribution(api_client):
"""Test load balancer distributes requests evenly"""
api_client.authenticate()
node_counts = {}
for _ in range(100):
response = api_client.session.get(f"{api_client.base_url}/health/ping")
assert response.status_code == 200
node_id = response.json()["node_id"]
node_counts[node_id] = node_counts.get(node_id, 0) + 1
# Verify distribution is reasonably even
assert len(node_counts) >= 2 # At least 2 nodes receiving requests
max_requests = max(node_counts.values())
min_requests = min(node_counts.values())
assert (max_requests - min_requests) / max_requests < 0.3 # Within 30%
def test_concurrent_task_submission(api_client):
"""Test system handles concurrent task submissions"""
api_client.authenticate()
def submit_task(task_id):
response = api_client.session.post(f"{api_client.base_url}/tasks/submit", json={
"task_type": "sample",
"data": {"test_id": task_id}
})
return response.status_code, response.json()
# Submit 50 tasks concurrently
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(submit_task, i) for i in range(50)]
results = [future.result() for future in futures]
# Verify all tasks were accepted
success_count = sum(1 for status, _ in results if status == 200)
assert success_count >= 45 # Allow some failures due to rate limiting
def test_database_transaction_integrity(api_client):
"""Test database transaction integrity under load"""
api_client.authenticate()
# Create test user
user_data = {
"username": f"test_user_{int(time.time())}",
"email": f"test_{int(time.time())}@example.com",
"password": "testpass123"
}
response = api_client.session.post(f"{api_client.base_url}/auth/register", json=user_data)
assert response.status_code == 200
user_id = response.json()["data"]["user"]["id"]
# Submit multiple tasks for the same user
def submit_user_task(task_num):
return api_client.session.post(f"{api_client.base_url}/tasks/submit", json={
"task_type": "user_specific",
"data": {"user_id": user_id, "task_num": task_num}
})
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(submit_user_task, i) for i in range(10)]
responses = [future.result() for future in futures]
# Verify all tasks were properly associated with user
success_responses = [r for r in responses if r.status_code == 200]
assert len(success_responses) == 10
def test_auth_token_security(api_client):
"""Test JWT token security features"""
# Test token expiration
token = api_client.authenticate()
# Test malformed token
api_client.session.headers.update({"Authorization": "Bearer invalid_token"})
response = api_client.session.get(f"{api_client.base_url}/api/protected")
assert response.status_code == 401
# Test missing token
del api_client.session.headers["Authorization"]
response = api_client.session.get(f"{api_client.base_url}/api/protected")
assert response.status_code == 401
# Test valid token
api_client.session.headers.update({"Authorization": f"Bearer {token}"})
response = api_client.session.get(f"{api_client.base_url}/api/protected")
assert response.status_code == 200
@pytest.mark.performance
def test_response_time_sla(api_client):
"""Test API response times meet SLA requirements"""
api_client.authenticate()
endpoints = [
"/health/ping",
"/health/status",
"/api/protected",
"/tasks/stats"
]
for endpoint in endpoints:
times = []
for _ in range(10):
start_time = time.time()
response = api_client.session.get(f"{api_client.base_url}{endpoint}")
end_time = time.time()
assert response.status_code == 200
times.append(end_time - start_time)
avg_time = sum(times) / len(times)
p95_time = sorted(times)[int(0.95 * len(times))]
# SLA requirements
if endpoint == "/health/ping":
assert avg_time < 0.1 # 100ms average
assert p95_time < 0.2 # 200ms P95
else:
assert avg_time < 0.5 # 500ms average
assert p95_time < 1.0 # 1s P95# tests/load/locustfile.py
from locust import HttpUser, task, between
import json
import random
class APIUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""Authenticate user on start"""
self.login()
def login(self):
"""Login and store token"""
response = self.client.post("/auth/login", json={
"username": "admin",
"password": "admin123"
})
if response.status_code == 200:
token = response.json()["data"]["access_token"]
self.client.headers.update({
"Authorization": f"Bearer {token}"
})
@task(3)
def health_check(self):
"""Frequent health checks"""
self.client.get("/health/ping")
@task(2)
def get_status(self):
"""Get API status"""
self.client.get("/api/status")
@task(2)
def protected_endpoint(self):
"""Access protected endpoint"""
self.client.get("/api/protected")
@task(1)
def submit_task(self):
"""Submit background task"""
self.client.post("/tasks/submit", json={
"task_type": "sample",
"data": {"load_test": True, "user_id": random.randint(1, 1000)}
})
@task(1)
def get_task_stats(self):
"""Get task statistics"""
self.client.get("/tasks/stats")
class AdminUser(HttpUser):
wait_time = between(2, 5)
weight = 1 # Fewer admin users
def on_start(self):
response = self.client.post("/auth/login", json={
"username": "admin",
"password": "admin123"
})
if response.status_code == 200:
token = response.json()["data"]["access_token"]
self.client.headers.update({
"Authorization": f"Bearer {token}"
})
@task
def get_metrics(self):
"""Admin metrics endpoint"""
self.client.get("/api/metrics")
@task
def get_users(self):
"""List users"""
self.client.get("/api/users")# Security testing with OWASP ZAP
docker run -t owasp/zap2docker-stable zap-baseline.py \
-t http://localhost \
-r security_report.html
# SQL injection testing
sqlmap -u "http://localhost/api/users?id=1" \
--cookie="session=your_session_token" \
--dbs --batch
# SSL/TLS testing
testssl.sh --full https://yourdomain.com
# Dependency vulnerability scanning
safety check -r requirements.txt
bandit -r app/ -f json -o security_report.json#!/bin/bash
# comprehensive-diagnostics.sh
echo "π Starting comprehensive system diagnostics..."
echo "================================================"
# Check Docker containers
echo "π¦ Docker Container Status:"
docker-compose ps
echo ""
# Check resource usage
echo "πΎ System Resource Usage:"
echo "CPU Usage: $(top -bn1 | grep load | awk '{printf "%.2f\n", $(NF-2)}')"
echo "Memory Usage: $(free | grep Mem | awk '{printf "%.2f%%\n", ($3/$2) * 100.0}')"
echo "Disk Usage: $(df -h / | awk 'NR==2{printf "%s\n", $5}')"
echo ""
# Database diagnostics
echo "ποΈ Database Diagnostics:"
docker-compose exec postgres psql -U postgres -d postgres -c "
SELECT
'Active Connections' as metric,
count(*) as value
FROM pg_stat_activity
WHERE state = 'active'
UNION ALL
SELECT
'Database Size' as metric,
pg_size_pretty(pg_database_size('postgres')) as value
UNION ALL
SELECT
'Longest Running Query' as metric,
COALESCE(max(EXTRACT(EPOCH FROM (now() - query_start)))::text || ' seconds', 'None') as value
FROM pg_stat_activity
WHERE state = 'active' AND query != '<IDLE>';
"
# Redis diagnostics
echo "π Redis Diagnostics:"
docker-compose exec redis redis-cli info | grep -E "(used_memory_human|connected_clients|total_commands_processed|keyspace_hits|keyspace_misses)"
echo ""
# API endpoint health
echo "π API Endpoint Health:"
endpoints=("/health/ping" "/health/status" "/api/status")
for endpoint in "${endpoints[@]}"; do
response_time=$(curl -o /dev/null -s -w "%{time_total}" http://localhost$endpoint)
status_code=$(curl -o /dev/null -s -w "%{http_code}" http://localhost$endpoint)
echo "$endpoint: $status_code (${response_time}s)"
done
echo ""
# Celery diagnostics
echo "βοΈ Celery Worker Status:"
docker-compose exec celery_worker celery -A celery_worker.celery inspect active
docker-compose exec celery_worker celery -A celery_worker.celery inspect stats
echo ""
# Log analysis
echo "π Recent Error Analysis:"
echo "API Errors (last 100 lines):"
docker-compose logs api1 api2 api3 --tail=100 | grep -i error | tail -5
echo ""
echo "Database Errors (last 100 lines):"
docker-compose logs postgres --tail=100 | grep -i error | tail -5
echo ""
# Network diagnostics
echo "π Network Connectivity:"
echo "Internal DNS resolution:"
docker-compose exec api1 nslookup postgres
docker-compose exec api1 nslookup redis
echo ""
# Performance metrics
echo "π Performance Metrics:"
echo "Load Average: $(uptime | awk -F'load average:' '{print $2}')"
echo "Active Processes: $(ps aux | wc -l)"
echo "Open Files: $(lsof | wc -l)"
echo ""
echo "β
Diagnostics completed"-- database-performance-analysis.sql
-- Long running queries
SELECT
pid,
now() - pg_stat_activity.query_start AS duration,
query,
state,
usename,
application_name
FROM pg_stat_activity
WHERE (now() - pg_stat_activity.query_start) > interval '5 minutes'
AND state = 'active'
ORDER BY duration DESC;
-- Database locks
SELECT
t.schemaname,
t.tablename,
l.locktype,
l.mode,
l.granted,
a.usename,
a.query,
a.query_start,
age(now(), a.query_start) AS "age"
FROM pg_stat_activity a
JOIN pg_locks l ON l.pid = a.pid
JOIN pg_stat_user_tables t ON l.relation = t.relid
WHERE a.state = 'active'
ORDER BY a.query_start;
-- Index usage analysis
SELECT
schemaname,
tablename,
attname,
n_distinct,
correlation,
most_common_vals
FROM pg_stats
WHERE schemaname = 'public'
AND n_distinct > 100
ORDER BY n_distinct DESC;
-- Table size analysis
SELECT
schemaname as table_schema,
tablename as table_name,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;# monitoring/advanced_monitoring.py
import psutil
import redis
import psycopg2
from datetime import datetime, timedelta
import json
class AdvancedMonitoringService:
def __init__(self):
self.redis_client = redis.from_url("redis://localhost:6379/0")
self.db_conn = psycopg2.connect("postgresql://postgres:postgres@localhost:5678/postgres")
def collect_comprehensive_metrics(self):
"""Collect all system metrics"""
metrics = {
"timestamp": datetime.utcnow().isoformat(),
"system": self.get_system_metrics(),
"database": self.get_database_metrics(),
"redis": self.get_redis_metrics(),
"application": self.get_application_metrics()
}
# Store metrics in Redis for dashboard
self.redis_client.setex(
"system_metrics",
300, # 5 minutes TTL
json.dumps(metrics)
)
return metrics
def get_system_metrics(self):
"""Get system-level metrics"""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
network = psutil.net_io_counters()
return {
"cpu": {
"usage_percent": cpu_percent,
"load_average": psutil.getloadavg(),
"cpu_count": psutil.cpu_count()
},
"memory": {
"total": memory.total,
"available": memory.available,
"percent": memory.percent,
"used": memory.used
},
"disk": {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": disk.percent
},
"network": {
"bytes_sent": network.bytes_sent,
"bytes_recv": network.bytes_recv,
"packets_sent": network.packets_sent,
"packets_recv": network.packets_recv
}
}
def get_database_metrics(self):
"""Get PostgreSQL metrics"""
with self.db_conn.cursor() as cursor:
# Connection statistics
cursor.execute("""
SELECT count(*) as active_connections
FROM pg_stat_activity
WHERE state = 'active'
""")
active_connections = cursor.fetchone()[0]
# Database size
cursor.execute("""
SELECT pg_database_size(current_database()) as db_size
""")
db_size = cursor.fetchone()[0]
# Query statistics
cursor.execute("""
SELECT
sum(n_tup_ins) as inserts,
sum(n_tup_upd) as updates,
sum(n_tup_del) as deletes
FROM pg_stat_user_tables
""")
query_stats = cursor.fetchone()
return {
"active_connections": active_connections,
"database_size": db_size,
"operations": {
"inserts": query_stats[0] or 0,
"updates": query_stats[1] or 0,
"deletes": query_stats[2] or 0
}
}
def get_redis_metrics(self):
"""Get Redis metrics"""
info = self.redis_client.info()
return {
"memory_usage": info.get('used_memory'),
"connected_clients": info.get('connected_clients'),
"total_commands": info.get('total_commands_processed'),
"keyspace_hits": info.get('keyspace_hits'),
"keyspace_misses": info.get('keyspace_misses'),
"hit_rate": info.get('keyspace_hits', 0) / max(1, info.get('keyspace_hits', 0) + info.get('keyspace_misses', 0))
}
def get_application_metrics(self):
"""Get application-specific metrics"""
# API request metrics from Redis
total_requests = self.redis_client.get('total_requests') or 0
# Task statistics
task_stats = {
"pending_tasks": self.redis_client.llen('celery'),
"completed_tasks": self.redis_client.get('completed_tasks') or 0,
"failed_tasks": self.redis_client.get('failed_tasks') or 0
}
return {
"api_requests": int(total_requests),
"task_statistics": task_stats
}
def detect_anomalies(self, metrics):
"""Detect system anomalies"""
anomalies = []
# CPU usage anomaly
if metrics["system"]["cpu"]["usage_percent"] > 90:
anomalies.append({
"type": "high_cpu_usage",
"severity": "critical",
"value": metrics["system"]["cpu"]["usage_percent"],
"threshold": 90
})
# Memory usage anomaly
if metrics["system"]["memory"]["percent"] > 85:
anomalies.append({
"type": "high_memory_usage",
"severity": "warning",
"value": metrics["system"]["memory"]["percent"],
"threshold": 85
})
# Database connection anomaly
if metrics["database"]["active_connections"] > 50:
anomalies.append({
"type": "high_db_connections",
"severity": "warning",
"value": metrics["database"]["active_connections"],
"threshold": 50
})
return anomalies
# Usage
if __name__ == "__main__":
monitor = AdvancedMonitoringService()
metrics = monitor.collect_comprehensive_metrics()
anomalies = monitor.detect_anomalies(metrics)
if anomalies:
print("π¨ Anomalies detected:")
for anomaly in anomalies:
print(f" - {anomaly['type']}: {anomaly['value']} (threshold: {anomaly['threshold']})")
else:
print("β
All systems operating normally")Issue: High Database Connection Usage
# Diagnosis
docker-compose exec postgres psql -U postgres -c "
SELECT count(*), state
FROM pg_stat_activity
GROUP BY state;"
# Solution: Optimize connection pool
# In config.py, adjust:
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 10, # Reduce from 20
'pool_timeout': 60, # Increase timeout
'pool_recycle': 1800, # Recycle connections more frequently
'pool_pre_ping': True
}Issue: High Memory Usage
# Diagnosis
docker stats --no-stream
# Solution: Implement memory limits
# In docker-compose.yml:
services:
api1:
mem_limit: 1g
memswap_limit: 1gIssue: Slow API Response Times
# Diagnosis
curl -w "Total time: %{time_total}s\n" -o /dev/null -s http://localhost/api/protected
# Solutions:
# 1. Enable Redis caching
# 2. Optimize database queries
# 3. Add database indexes
# 4. Implement response cachingIssue: JWT Token Validation Failures
# Diagnosis
docker-compose logs nginx | grep "auth_request"
# Check JWT secret consistency
docker-compose exec api1 printenv | grep JWT_SECRET_KEY
docker-compose exec api2 printenv | grep JWT_SECRET_KEY
# Solution: Ensure all services use same JWT secret
# Verify in docker-compose.yml that all API services have:
environment:
- JWT_SECRET_KEY=your-consistent-jwt-secret-keyIssue: NGINX Authentication Loops
# Diagnosis
docker-compose logs nginx | grep "auth_request"
# Solution: Check NGINX configuration
# Ensure /auth/validate doesn't have auth_request directive
location = /auth/validate {
internal;
proxy_pass http://api_backend/auth/validate;
# No auth_request here!
}Issue: Celery Workers Not Processing Tasks
# Diagnosis
docker-compose exec celery_worker celery -A celery_worker.celery inspect active
docker-compose exec celery_worker celery -A celery_worker.celery inspect stats
# Check Redis connection
docker-compose exec celery_worker python -c "
import redis
r = redis.from_url('redis://redis:6379/0')
print(r.ping())
"
# Solution: Restart workers and check configuration
docker-compose restart celery_worker celery_beatIssue: High Task Failure Rate
# Diagnosis
curl -H "Authorization: Bearer $TOKEN" http://localhost/tasks/stats | jq '.data.overview'
# Check worker logs
docker-compose logs celery_worker | grep ERROR
# Solutions:
# 1. Increase worker timeout
# 2. Add retry logic
# 3. Implement task validation
# 4. Monitor resource usageSecurity Configuration:
- Changed all default passwords and secrets
- Enabled SSL/TLS with valid certificates
- Configured firewall rules and security groups
- Implemented proper logging and monitoring
- Set up intrusion detection system
- Configured automated security updates
- Implemented backup encryption
- Set up access control and user management
Performance Configuration:
- Optimized database connection pools
- Configured Redis memory limits and eviction policies
- Set up NGINX caching and compression
- Implemented API rate limiting
- Configured Celery worker pools
- Set up monitoring and alerting
- Optimized Docker resource limits
- Implemented health checks for all services
Reliability Configuration:
- Set up automated backups
- Configured disaster recovery procedures
- Implemented health checks and monitoring
- Set up log aggregation and analysis
- Configured alerting for critical issues
- Implemented graceful shutdown procedures
- Set up service dependencies and startup order
- Configured resource limits and quotas
Target Performance Metrics:
- API Response Time: < 200ms (95th percentile)
- Database Query Time: < 50ms (average)
- Task Processing Time: < 30s (simple tasks)
- System Uptime: > 99.9%
- Error Rate: < 0.1%
- Memory Usage: < 80% of available
- CPU Usage: < 70% average
- Disk Usage: < 80% of available
Load Testing Results:
# Expected performance under load
# 1000 concurrent users
# 10,000 requests per minute
# Average response time: 150ms
# 99.9% uptime maintainedπ The HA API with Celery system is now fully documented with enterprise-grade architecture, comprehensive deployment guides, advanced monitoring, security best practices, and production-ready configurations. This documentation provides everything needed for development, testing, deployment, and maintenance of a high-availability API system.