Skip to content

Misterurias/Group-Programming-Data-Streams-Quest

Repository files navigation

Real-Time Air Quality Prediction MLOps Platform

A comprehensive end-to-end MLOps platform for real-time air quality monitoring and forecasting, featuring streaming data pipelines, machine learning models, and production-ready API serving.

Project Overview

This project implements a complete MLOps pipeline using the UCI Air Quality dataset, demonstrating proficiency in streaming architecture, exploratory data analysis, predictive modeling, and production deployment. The system processes environmental sensor data in real-time, conducts advanced pattern analysis, trains multiple ML models, and serves predictions through a REST API.

Key Features

  • Real-Time Streaming: Apache Kafka-based data pipeline with producer-consumer architecture
  • Advanced Analytics: Comprehensive exploratory data analysis with temporal pattern identification
  • Predictive Modeling: Linear Regression, SARIMA, and XGBoost models for 6-hour air quality forecasting
  • Interactive Dashboard: Streamlit-based real-time analytics and visualization interface
  • Production API: FastAPI-based model serving with MLflow integration
  • Model Registry: MLflow-based model versioning, tracking, and deployment
  • Automated Training: Environment-based configuration with automatic model registration

Air Quality Parameters Monitored

  • Carbon Monoxide (CO): Measured in mg/m³
  • Nitrogen Oxides (NOx): Measured in ppb
  • Nitrogen Dioxide (NO2): Measured in µg/m³
  • Benzene: Measured in µg/m³

Dashboard Overview

Project Structure

Group-Programming-Data-Streams-Quest/
├── streaming_infrastructure/            # Kafka setup and data pipeline
├── data_intelligence/                   # Analytics dashboard and EDA
├── predictive_analytics/                # ML models and forecasting
├── api/                                 # FastAPI production service
│   ├── main.py                         # FastAPI application
│   ├── routers/                        # API endpoints
│   ├── services/                       # Business logic
│   ├── models/                         # Pydantic models
│   └── config.py                       # Configuration
├── data/                               # Clean processed data files
├── documentation/                       # Phase-specific documentation
├── final_report/                        # Final comprehensive report
├── requirements.txt                     # Python dependencies
├── .env                                # Environment configuration
└── README.md                           # This file

Prerequisites

  • Python: Version 3.8 or newer
  • Apache Kafka: Version 3.0.0 or newer (for streaming)
  • System Requirements: Minimum 8GB RAM, 4 CPU cores, 10GB available storage
  • Dependencies: All included in requirements.txt

Environment Setup

Create and Activate Conda Environment

# 1. Create a new conda environment
conda create -n kafka-air-quality python=3.9

# 2. Activate the environment
conda activate kafka-air-quality

# 3. Install core dependencies
pip install -r requirements.txt

# 4. Verify installation
python -c "import kafka, pandas, numpy, sklearn, streamlit, plotly, statsmodels; print('All dependencies installed successfully!')"

🚀 Running Services Individually

This section provides comprehensive instructions for running each service independently, both with and without Docker.

Prerequisites

# 1. Create and activate conda environment
conda create -n kafka-air-quality python=3.9
conda activate kafka-air-quality

# 2. Install all dependencies
pip install -r requirements.txt

# 3. Create environment configuration
cat > .env << 'EOF'
MLFLOW_TRACKING_URI=http://127.0.0.1:5000
MLFLOW_EXPERIMENT_NAME=AirQuality-NOx-6h
REGISTER_BEST=1
MODEL_NAME=AirQualityNOx6h
DATA_PATH=./data/air_quality_clean.csv
HORIZON_H=6
TEST_SIZE=0.2
RANDOM_STATE=42
EOF

Service 1: MLflow Tracking Server

Purpose: Centralized model registry and experiment tracking

Option A: Run Locally

# Navigate to predictive analytics directory
cd predictive_analytics

# Start MLflow server
mlflow server --backend-store-uri sqlite:///mlflow.db \
  --default-artifact-root file://$PWD/mlruns_artifacts \
  --host 127.0.0.1 --port 5000

# Access UI at: http://localhost:5000

Option B: Run with Docker

# Start MLflow in Docker
docker-compose up -d mlflow

# Access UI at: http://localhost:5000

Configuration:

  • Backend Store: SQLite database for metadata
  • Artifact Store: Local file system for model artifacts
  • Port: 5000 (configurable)
  • Host: 127.0.0.1 (local) or 0.0.0.0 (Docker)

Service 2: Model Training

Purpose: Train and register ML models for air quality prediction

Run Training Scripts

# Navigate to predictive analytics directory
cd predictive_analytics

# Train all models (recommended)
python train.py

# Or train individual models:
python training/train_linear_regression.py
python training/train_sarima.py
python training/train_xgboost.py

Output:

  • Trained models saved to predictive_analytics/models/
  • Training logs in predictive_analytics/logs/
  • MLflow experiments logged to tracking server
  • Best model auto-registered (if REGISTER_BEST=1)

Log Files:

  • predictive_analytics/logs/lr_training.log
  • predictive_analytics/logs/sarima_training.log
  • predictive_analytics/logs/xgb_training.log

Service 3: Kafka Producer

Purpose: Simulate real-time air quality data streaming

Prerequisites

# Install and start Kafka (macOS)
brew install kafka
brew services start kafka

# Or use Docker for Kafka
docker-compose up -d redpanda

Run Producer

# Navigate to streaming infrastructure directory
cd streaming_infrastructure

# Start producer
python producer.py

# Optional: Configure producer settings
python producer.py --speedup 10 --topic air_quality_raw

Configuration:

  • Topic: air_quality_raw (default)
  • Speedup: Configurable data generation rate
  • Data Source: UCI Air Quality dataset
  • Output: Kafka topic messages

Service 4: Kafka Consumer

Purpose: Process streaming data and perform real-time analytics

Run Consumer

# Navigate to streaming infrastructure directory
cd streaming_infrastructure

# Start consumer
python consumer.py

# Optional: Configure consumer settings
python consumer.py --batch-size 100 --output-file processed_data.csv

Configuration:

  • Input Topic: air_quality_raw
  • Output: data/air_quality_clean.csv
  • Processing: Data validation, cleaning, and preprocessing
  • Batch Size: Configurable micro-batching

Service 5: FastAPI Production API

Purpose: Serve trained models via REST API for predictions

Run API Server

# Navigate to API directory
cd api

# Start FastAPI server
python main.py

# Or use uvicorn directly
uvicorn main:app --host 0.0.0.0 --port 8000 --reload

Endpoints:

  • Health Check: GET /api/v1/health
  • Model List: GET /api/v1/models
  • Predictions: POST /api/v1/predict
  • API Docs: GET /docs (Swagger UI)

Configuration:

  • Port: 8000 (default)
  • Host: 0.0.0.0 (all interfaces)
  • MLflow Integration: Automatic model loading from registry

Service 6: Streamlit Analytics Dashboard

Purpose: Interactive dashboard for data visualization and analysis

Run Dashboard

# Navigate to data intelligence directory
cd data_intelligence

# Start Streamlit dashboard
streamlit run streaming_dashboard.py

# Or with custom port
streamlit run streaming_dashboard.py --server.port 8501

Features:

  • Real-time Visualization: Live data plots and charts
  • Statistical Analysis: Comprehensive data summaries
  • Anomaly Detection: Outlier identification and analysis
  • Forecast Visualization: Model prediction displays
  • Interactive Controls: Filtering and range selection

Configuration:

  • Port: 8501 (default)
  • Data Source: data/air_quality_clean.csv
  • Model Integration: Automatic loading of trained models

Service 7: Data Intelligence Analytics

Purpose: Advanced analytics and pattern recognition

Run Analytics Engine

# Navigate to data intelligence directory
cd data_intelligence

# Run streaming analytics
python streaming_analytics.py

# Or run specific analytics modules
python -c "from streaming_analytics import analyze_patterns; analyze_patterns()"

Capabilities:

  • Temporal Analysis: Daily, weekly, seasonal patterns
  • Correlation Analysis: Cross-pollutant relationships
  • Statistical Tests: Significance testing and validation
  • Anomaly Detection: Z-score based outlier identification

Service 8: Evidently Drift Detection

Purpose: Monitor data drift and model performance degradation

Run Drift Detection

# Navigate to monitoring directory
cd monitoring

# Generate drift report
python evidently_service.py --verbose

# Or use API endpoint
curl -X POST "http://localhost:8000/api/v1/monitoring/drift-report/generate"

View Drift Report

# Open in browser
open "http://localhost:8000/reports/drift"

# Or with custom window size
open "http://localhost:8000/reports/drift?window_days=3"

Features:

  • Statistical Drift Detection: Kolmogorov-Smirnov, Chi-square, Wasserstein distance
  • Interactive HTML Reports: Distribution comparisons and visualizations
  • Dynamic Time Windows: Compare last N days vs previous N days
  • Real-time Monitoring: On-demand report generation
  • API Integration: RESTful endpoints for automation

Drift Detection Methods:

  • Kolmogorov-Smirnov Test: Detects distribution shape changes
  • Chi-square Test: Identifies frequency shifts
  • Wasserstein Distance: Measures distribution magnitude changes
  • Jensen-Shannon Divergence: Overall distribution similarity

🔄 Complete Pipeline Orchestration

Option A: Manual Service Management

Run each service in separate terminals:

# Terminal 1: MLflow Server
cd predictive_analytics
mlflow server --backend-store-uri sqlite:///mlflow.db \
  --default-artifact-root file://$PWD/mlruns_artifacts \
  --host 127.0.0.1 --port 5000

# Terminal 2: Train Models
cd predictive_analytics
python train.py

# Terminal 3: Kafka Producer
cd streaming_infrastructure
python producer.py

# Terminal 4: Kafka Consumer
cd streaming_infrastructure
python consumer.py

# Terminal 5: FastAPI Server
cd api
python main.py

# Terminal 6: Streamlit Dashboard
cd data_intelligence
streamlit run streaming_dashboard.py

Option B: Docker Compose (Recommended)

# 1. Start MLflow server outside Docker first
cd predictive_analytics
mlflow server --backend-store-uri sqlite:///mlflow.db \
  --default-artifact-root file://$PWD/mlruns_artifacts \
  --host 127.0.0.1 --port 5000

# 2. In another terminal, start Docker services
docker-compose up -d

# View logs
docker-compose logs -f

# Stop all services
docker-compose down

Docker Compose Services

The docker-compose.yml orchestrates the following services:

Core Infrastructure:

  • Redpanda: Kafka-compatible streaming platform (port 9092)
  • Kafka Producer: Simulates real-time air quality data streaming
  • Kafka Consumer: Processes streaming data and creates clean CSV files

Application Services:

  • FastAPI: Production API server with MLflow integration (port 8000)
  • Streamlit: Interactive analytics dashboard (port 8501)

Service Dependencies:

  • Producer waits for Redpanda to be healthy
  • Consumer depends on Redpanda and creates clean data files
  • FastAPI waits for consumer to create clean data files
  • Streamlit waits for clean data files to be available

Volume Mounts:

  • ./data:/data - Shared data directory for clean CSV files
  • ./api:/app/api - FastAPI application code
  • ./monitoring:/app/monitoring - Evidently drift detection
  • ./predictive_analytics:/app/predictive_analytics - ML models and training
  • ./data_intelligence:/app/data_intelligence - Streamlit dashboard
  • ./streaming_infrastructure:/app/streaming_infrastructure - Kafka components

Health Checks:

  • All services include health checks to ensure proper startup order
  • FastAPI health check: curl -f http://localhost:8000/api/v1/health
  • Streamlit health check: curl -f http://localhost:8501/_stcore/health

Option C: Hybrid Approach

# Start MLflow server outside Docker
cd predictive_analytics
mlflow server --backend-store-uri sqlite:///mlflow.db \
  --default-artifact-root file://$PWD/mlruns_artifacts \
  --host 127.0.0.1 --port 5000

# Start infrastructure services with Docker
docker-compose up -d redpanda

# Run application services locally
cd predictive_analytics && python train.py
cd streaming_infrastructure && python producer.py
cd streaming_infrastructure && python consumer.py
cd api && python main.py
cd data_intelligence && streamlit run streaming_dashboard.py

🌐 Service Access Points

🧪 Testing Individual Services

Test MLflow Server

curl http://localhost:5000/health

Test FastAPI

curl http://localhost:8000/api/v1/health

Test Model Predictions

curl -X POST "http://localhost:8000/api/v1/predict" \
  -H "Content-Type: application/json" \
  -d '{
    "current_data": {
      "datetime": "2025-10-07T12:00:00Z",
      "nox_gt": 200,
      "co_gt": 2.5,
      "no2_gt": 50
    },
    "lag_values": {
      "nox_lag1": 195,
      "nox_lag3": 190,
      "nox_lag6": 185,
      "nox_lag12": 180,
      "nox_lag18": 175,
      "nox_lag24": 170
    }
  }'

Test Kafka Producer

# Check if producer is sending data
kafka-console-consumer --bootstrap-server localhost:9092 --topic air_quality_raw --from-beginning

Test Streamlit Dashboard

# Open browser to http://localhost:8501
# Verify all tabs load correctly

Test Drift Detection

# Generate drift report
curl -X POST "http://localhost:8000/api/v1/monitoring/drift-report/generate"

# View report in browser
open "http://localhost:8000/reports/drift"

# Check monitoring health
curl "http://localhost:8000/api/v1/monitoring/health"

📡 API Endpoints

Health & Status

  • GET /api/v1/health - Health check
  • GET /api/v1/liveness - Liveness probe

Model Management

  • GET /api/v1/models - List all available models
  • GET /api/v1/models/{model_name} - Get specific model details
  • GET /api/v1/models/{model_name}/predict - Predict with specific model

Predictions

  • POST /api/v1/predict - Make predictions (main endpoint)
  • POST /api/v1/predict/batch - Batch predictions

MLflow Integration

  • GET /api/v1/mlflow/experiments - Get MLflow experiment info
  • GET /api/v1/mlflow/model-metadata - Get MLflow model metadata

Monitoring & Drift Detection

  • GET /api/v1/monitoring/health - Monitoring service health check
  • POST /api/v1/monitoring/drift-report/generate - Generate drift report
  • GET /api/v1/monitoring/drift-report/info - Get report information
  • GET /api/v1/monitoring/data/info - Get dataset statistics

HTML Reports

  • GET /reports/drift - Interactive drift detection report (HTML)

API Documentation

  • GET /docs - Interactive Swagger UI (recommended for testing)
  • GET /redoc - Alternative API documentation

Quick Start Guide

Phase 1: Streaming Infrastructure Setup

Objective: Set up Apache Kafka streaming infrastructure for real-time data ingestion and processing.

Quick Start:

# 1. Install and start Kafka (KRaft mode)
brew install kafka
brew services start kafka

# 2. Navigate to Phase 1 directory
cd streaming_infrastructure

# 3. Install dependencies
pip install -r requirements.txt

# 4. Start the producer (generates streaming data)
python producer.py

# 5. In another terminal, start the consumer (processes data)
python consumer.py

What You'll See:

  • Real-time data streaming from UCI Air Quality dataset
  • Data preprocessing and quality validation
  • Clean data output to data/air_quality_clean.csv

Key Files:

  • producer.py: Kafka producer for data ingestion
  • consumer.py: Kafka consumer for data processing
  • data_preprocessing.py: Data quality management
  • config/kafka_config.py: Kafka configuration settings

Phase 2: Data Intelligence Dashboard

Objective: Build interactive analytics dashboard for exploratory data analysis and pattern identification.

Quick Start:

# 1. Navigate to Phase 2 directory
cd data_intelligence

# 2. Install dependencies
pip install streamlit plotly pandas numpy

# 3. Start the analytics dashboard
streamlit run streaming_dashboard.py

What You'll See:

  • Interactive dashboard with multiple tabs:
    • Time Series: Real-time pollutant concentration plots
    • Statistics: Comprehensive statistical summaries
    • Correlations: Cross-pollutant correlation analysis
    • Anomalies: Outlier detection and characterization
    • Forecasts: Real-time predictions (requires Phase 3 models)

Key Features:

  • Real-time data visualization
  • Temporal pattern analysis (daily, weekly, seasonal)
  • Statistical analysis and correlation matrices
  • Anomaly detection with z-score analysis
  • Interactive filtering and range selection

Phase 3: Predictive Analytics

Objective: Develop and deploy machine learning models for air quality forecasting.

Quick Start:

# 1. Navigate to Phase 3 directory
cd predictive_analytics

# 2. Install ML dependencies
pip install scikit-learn statsmodels joblib scipy

# 3. Train all models (unified approach)
python train.py

# 4. Or train models individually:
python training/train_linear_regression.py
python training/train_sarima.py

What You'll Get:

  • Linear Regression Model: 68% MAE improvement over baseline
  • SARIMA Model: 8.7% MAE improvement with uncertainty quantification
  • Model Artifacts: Saved models, scalers, and performance metrics
  • Statistical Validation: Paired t-tests confirming significance

Model Performance:

  • Linear Regression: MAE = 102 ppb, RMSE = 139 ppb, R² = 0.417
  • SARIMA: MAE = 126 ppb, RMSE = 158 ppb, R² = 0.238
  • Forecast Horizon: 6-hour ahead predictions
  • Integration: Automatic loading in Phase 2 dashboard

Phase 4: Documentation and Reporting

Objective: Comprehensive documentation and final report synthesis.

Available Documentation:

  • Setup Guides: Detailed installation and troubleshooting instructions (documentation/)
  • Analysis Reports: Comprehensive data intelligence findings (documentation/)
  • Technical Documentation: Architecture decisions and implementation details (documentation/)
  • Final Report: Complete project synthesis and business insights (final_report/)

Final Report Location:

# The comprehensive final report is located at:
final_report/final_report.md

Key Findings

Environmental Patterns

  • Traffic Dominance: Vehicular emissions are primary pollution source
  • Seasonal Variation: 5.4x concentration difference between summer minimum and winter peak
  • Daily Patterns: Pronounced bimodal peaks during morning and evening rush hours
  • Weekend Effect: 30% reduction in nitrogen oxide concentrations on weekends

Model Performance

  • Linear Regression: Superior performance with comprehensive feature engineering
  • Statistical Significance: Both models significantly outperform naive baseline
  • Production Ready: Real-time inference with comprehensive logging and monitoring

Business Intelligence

  • Peak Risk Periods: Morning (8-9 AM) and evening (7-8 PM) rush hours
  • High-Risk Seasons: November-February require enhanced monitoring
  • Operational Thresholds: Z-score based alerting with pollutant-specific limits

Technical Architecture

Data Flow Diagram

System Architecture Overview

The platform implements a comprehensive MLOps pipeline with the following components:

Data Ingestion Layer

  • Kafka Producer: Simulates real-time sensor data with configurable speedup
  • Redpanda: Kafka-compatible streaming platform for high-throughput data ingestion
  • Data Source: UCI Air Quality dataset with 9,358 hourly measurements

Data Processing Layer

  • Kafka Consumer: Processes streaming data with micro-batching and quality validation
  • Data Validation: Real-time data quality checks and outlier detection
  • Data Storage: CSV-based persistence with append-only design (data/air_quality_clean.csv)

Analytics & Intelligence Layer

  • Streamlit Dashboard: Interactive visualization and analysis interface
  • Real-Time Processing: Sliding window analytics and correlation analysis
  • Anomaly Detection: Z-score based outlier identification
  • Temporal Analysis: Daily, weekly, and seasonal pattern recognition

Machine Learning Layer

  • Model Training: Linear Regression, SARIMA, and XGBoost models
  • Feature Engineering: Lag features, rolling statistics, temporal encodings
  • Model Registry: MLflow-based model versioning and deployment
  • Production Integration: Automatic model loading and real-time inference

API & Serving Layer

  • FastAPI Server: Production-ready REST API with comprehensive documentation
  • Model Serving: Real-time prediction endpoints with MLflow integration
  • Monitoring: Evidently-based drift detection and performance monitoring
  • Health Checks: Comprehensive service health monitoring

Monitoring & Observability

  • Drift Detection: Statistical drift detection using Kolmogorov-Smirnov, Chi-square, and Wasserstein distance
  • Performance Monitoring: Real-time model performance tracking
  • Data Quality: Continuous data validation and anomaly detection
  • Service Health: Docker Compose health checks and service dependencies

Docker Compose Architecture

The platform uses Docker Compose for containerized deployment with the following architecture:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Redpanda      │    │   Kafka         │    │   Kafka         │
│   (Kafka)       │◄───┤   Producer      │    │   Consumer      │
│   Port: 9092    │    │                 │    │                 │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                                         │
                                                         ▼
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   FastAPI       │    │   Streamlit     │    │   Clean Data    │
│   Port: 8000    │◄───┤   Port: 8501   │◄───┤   CSV Files     │
│                 │    │                 │    │   /data/        │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │
         ▼
┌─────────────────┐
│   MLflow        │
│   Port: 5000    │
│   (External)     │
└─────────────────┘

Data Flow Architecture

  1. Data Ingestion: UCI Air Quality dataset → Kafka Producer → Redpanda
  2. Data Processing: Redpanda → Kafka Consumer → Clean CSV files
  3. Analytics: Clean CSV → Streamlit Dashboard → Real-time visualizations
  4. Predictions: Clean CSV → ML Models → FastAPI → Predictions
  5. Monitoring: Clean CSV → Evidently → Drift Detection Reports

Service Dependencies

The Docker Compose setup ensures proper service startup order:

  1. Redpanda starts first (Kafka infrastructure)
  2. Kafka Producer waits for Redpanda to be healthy
  3. Kafka Consumer depends on Redpanda and creates clean data files
  4. FastAPI waits for consumer to create clean data files
  5. Streamlit waits for clean data files to be available

Volume Management

All services share the ./data:/data volume mount for:

  • Clean Data Files: air_quality_clean.csv, air_quality_excluded.csv
  • Real-time Updates: Consumer continuously updates clean data files
  • Service Access: All services can access the same data files

Health Monitoring

Each service includes comprehensive health checks:

  • Redpanda: Internal health check on port 8081
  • Kafka Producer: File existence check for data files
  • Kafka Consumer: File existence check for clean data files
  • FastAPI: HTTP health check on /api/v1/health
  • Streamlit: HTTP health check on /_stcore/health

Dependencies

Core Requirements

kafka-python>=2.0.0
pandas>=1.3.0
numpy>=1.21.0
scikit-learn>=1.0.0
streamlit>=1.0.0
plotly>=5.0.0
evidently>=0.4.14
fastapi>=0.104.1
uvicorn[standard]>=0.24.0

Optional Extensions

statsmodels>=0.13.0
matplotlib>=3.5.0
seaborn>=0.11.0
mlflow>=2.7.1

🔧 Troubleshooting

Common Issues

Kafka Connection Problems:

  • Ensure Kafka broker is running: brew services start kafka
  • Check Kafka is running on localhost:9092
  • Install missing dependencies: pip install six kafka-python==2.2.15

MLflow Issues:

  • Database migration errors: Delete predictive_analytics/mlflow.db and restart
  • Connection refused: Ensure MLflow server is running on port 5000
  • Missing metrics: Check if models are properly registered in Production stage

API Issues:

  • Empty performance metrics: Ensure MLflow server is running and models are registered
  • Model not found: Train models first with python train.py
  • Connection errors: Check MLflow server is accessible at http://127.0.0.1:5000

Training Issues:

  • Missing dependencies: pip install mlflow xgboost scikit-learn statsmodels
  • No data: Ensure data/AirQualityUCI.csv exists
  • Environment variables: Create .env file with MLflow configuration

Dashboard Issues:

  • Port conflicts: Ensure port 8501 is available
  • Missing data: Check data/air_quality_clean.csv exists
  • Dependencies: pip install streamlit plotly pandas numpy

Drift Detection Issues:

  • Evidently not installed: pip install evidently==0.4.14
  • Missing data: Ensure data/air_quality_clean.csv exists
  • Insufficient data: Need at least 24 hours of data for drift detection
  • Import errors: Check Python path and dependencies

Docker Compose Issues:

  • Services not starting: Check docker-compose logs -f for error details
  • Port conflicts: Ensure ports 8000, 8501, 9092 are available
  • Volume mount issues: Verify ./data directory exists and is writable
  • Health check failures: Services may take time to become healthy
  • MLflow connection: Ensure MLflow server is running outside Docker on port 5000
  • Build dependencies: gcc and python3-dev are installed automatically in containers
  • Plotly warnings: Deprecated width='stretch' warnings are resolved in latest version

Production Deployment

Docker Compose Production Setup

The platform is production-ready with Docker Compose orchestration:

# 1. Start MLflow server (required for model serving)
cd predictive_analytics
mlflow server --backend-store-uri sqlite:///mlflow.db \
  --default-artifact-root file://$PWD/mlruns_artifacts \
  --host 127.0.0.1 --port 5000

# 2. Start all services with Docker Compose
docker-compose up -d

# 3. Monitor service health
docker-compose ps
docker-compose logs -f

# 4. Access services
# - FastAPI: http://localhost:8000
# - Streamlit: http://localhost:8501
# - MLflow: http://localhost:5000
# - Drift Reports: http://localhost:8000/reports/drift

Service Monitoring

Health Checks:

# Check all service health
docker-compose ps

# Check specific service logs
docker-compose logs -f fastapi
docker-compose logs -f streamlit

# Test API health
curl http://localhost:8000/api/v1/health

# Test Streamlit health
curl http://localhost:8501/_stcore/health

Performance Monitoring:

  • Real-time Data: Consumer processes ~10-15 records every 5 seconds
  • Model Predictions: FastAPI serves predictions in <100ms
  • Dashboard Updates: Streamlit refreshes every 5 seconds
  • Drift Detection: On-demand reports generated in <30 seconds

Scaling Considerations

Horizontal Scaling:

  • Kafka Consumer: Can run multiple consumer instances for parallel processing
  • FastAPI: Can be scaled with multiple replicas behind a load balancer
  • Streamlit: Single instance recommended for dashboard consistency

Resource Requirements:

  • Minimum: 4 CPU cores, 8GB RAM, 10GB storage
  • Recommended: 8 CPU cores, 16GB RAM, 50GB storage
  • Production: 16 CPU cores, 32GB RAM, 100GB storage

Security Considerations

Network Security:

  • All services run on localhost by default
  • No external network exposure required
  • MLflow server runs externally for security

Data Security:

  • Clean data files stored locally in ./data directory
  • No sensitive data transmitted over network
  • Model artifacts stored locally in MLflow

Contributing

This project was developed as part of the "Fundamentals of Operationalizing AI" course. The implementation demonstrates best practices in:

  • End-to-end MLOps pipelines: From data ingestion to model serving
  • Streaming data architecture: Real-time data processing with Kafka
  • Model lifecycle management: Training, versioning, and deployment with MLflow
  • Production-ready APIs: FastAPI with proper error handling and documentation
  • Real-time analytics: Interactive dashboards with Streamlit
  • Drift detection and monitoring: Evidently-based data drift detection with HTML reports
  • Environment management: Configuration-driven development with .env files
  • Comprehensive documentation: Complete setup and troubleshooting guides
  • Containerized deployment: Docker Compose orchestration with health checks
  • Service dependencies: Proper startup order and service coordination

License

This project is developed for educational purposes as part of an academic assignment.

Acknowledgments

This project was developed with assistance from Claude AI to ensure technical accuracy, comprehensive analysis, and professional presentation. The implementation reflects the integration of streaming technologies with predictive analytics for environmental applications.

MLflow Tracking (Phase 3)

  • Training scripts now log to MLflow under predictive_analytics/mlruns using experiment name AirQuality-NOx-6h.
  • To view runs locally:
cd predictive_analytics
mlflow ui --backend-store-uri file://$PWD/mlruns --host 127.0.0.1 --port 5000
  • Each run records:
    • Parameters: algorithm/config, feature count, horizon
    • Metrics: validation and test metrics (MAE, RMSE, R², sMAPE), significance tests
    • Artifacts: model binaries, features.json, metrics.json (under per-model subfolders)

Auto-register best model (optional)

  • Requires MLflow server with DB backend (registry not supported on file:// stores)
  • Enable auto-registration during training:
export MLFLOW_TRACKING_URI=http://127.0.0.1:5000
export REGISTER_BEST=1
cd predictive_analytics
python train.py
  • Behavior: selects the best run in experiment AirQuality-NOx-6h by lowest test_rmse, registers it as AirQualityNOx6h, and promotes to Production (archiving previous versions).

Recommended local setup with env vars

  1. Start MLflow server with registry (SQLite) in Phase 3 dir:
cd predictive_analytics
mlflow server --backend-store-uri sqlite:///mlflow.db \
  --default-artifact-root file://$PWD/mlruns_artifacts \
  --host 127.0.0.1 --port 5000
  1. Export env vars (current shell):
export MLFLOW_TRACKING_URI=http://127.0.0.1:5000
export REGISTER_BEST=1
export MODEL_NAME=AirQualityNOx6h
  1. Train and auto-register:
cd predictive_analytics
python train.py
  1. View UI: http://127.0.0.1:5000

Using a .env file (optional)

Create predictive_analytics/.env:

MLFLOW_TRACKING_URI=http://127.0.0.1:5000
REGISTER_BEST=1
MODEL_NAME=AirQualityNOx6h

Load and run:

cd predictive_analytics
set -a; source .env; set +a
python train.py

Author: Santiago Bolaños Vega
Course: Fundamentals of Operationalizing AI Date: September 23, 2025

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •