An AWS Lambda-based service that processes cryptocurrency risk data and detects anomalies using quantile-based statistical analysis.
This serverless application consumes messages from an SQS queue containing cryptocurrency risk assessment requests and performs anomaly detection on various risk metrics including price, market cap, liquidity, volatility, and volume data. The service uses pandas for data analysis and MongoDB for data storage and retrieval.
- AWS Lambda Function: Processes SQS messages containing risk data requests
- SQS Queue: Message queue for processing requests with dead letter queue for failed messages
- MongoDB: Data source for cryptocurrency risk scores and storage for anomaly flags
- AWS X-Ray: Distributed tracing for monitoring and debugging
- Multi-timeframe Analysis: Detects anomalies across 3-day, 7-day, and 30-day periods
- Statistical Anomaly Detection: Uses interquartile range (IQR) and configurable deviation limits
- Comprehensive Risk Metrics: Analyzes multiple risk indicators:
- Latest price anomalies
- Market cap score anomalies
- Liquidity risk score anomalies
- Volatility risk score anomalies
- Volume anomalies
- Market cap anomalies
- Volatility index anomalies
- Bulk Database Operations: Efficiently updates anomaly flags using MongoDB bulk writes
- VPC Integration: Deployed within VPC for secure database access
- Error Handling: Comprehensive logging and dead letter queue for failed processing
| Variable | Description |
|---|---|
MONGODB_RSKYCLUSTER_REGION |
MongoDB Atlas region |
MONGODB_RSKYCLUSTER_DATA_API_APP_ID |
MongoDB Data API application ID |
MONGODB_RSKYCLUSTER_DATA_API_KEY |
MongoDB Data API key |
MONGODB_RSKYCLUSTER_DATA_SOURCE |
MongoDB data source name |
MONGODB_RSKYCLUSTER_DATABASE |
Target database name |
MONGODB_RSKYCLUSTER_COINGECKO_ONCHAIN_SCORES_COLLECTION |
Collection for onchain scores |
MONGODB_RSKYCLUSTER_DATABASE_USER_NAME |
Database username |
MONGODB_RSKYCLUSTER_DATABASE_USER_PASSWORD |
Database password |
QUANTILE_DEVIATION_LIMIT |
Multiplier for IQR-based anomaly threshold |
This project uses the Serverless Framework for deployment:
# Install dependencies
pip install -r requirements.txt
# Deploy to AWS
serverless deploy --param="AWS_REGION=us-east-1" --param="QUANTILE_DEVIATION_LIMIT=1.5" # ... other paramsThe service implements a quantile-based anomaly detection algorithm:
- Data Preparation: Retrieves historical risk score data for specified time periods
- Quantile Calculation: Computes 25th (Q1) and 75th (Q3) percentiles for each risk metric
- Threshold Determination: Calculates anomaly boundaries using IQR method:
- Upper bound: Q3 + (deviation_limit Γ IQR)
- Lower bound: Q1 - (deviation_limit Γ IQR)
- Anomaly Detection: Flags data points falling outside these boundaries
- Database Update: Updates MongoDB documents with anomaly flags for detected outliers
Run the test suite:
# Run all tests
pytest
# Run specific test file
pytest tests/test_model.pyβββ handler.py # Lambda entry point
βββ libs/
β βββ helpers/
β β βββ constants_helper.py # Environment variables and constants
β β βββ mongodb_helper.py # MongoDB operations
β βββ models/
β βββ model.py # Main anomaly detection logic
βββ tests/ # Test files
βββ requirements.txt # Python dependencies
βββ serverless.yml # Serverless configuration
βββ README.md # This file
- aws-xray-sdk: AWS X-Ray tracing
- pandas: Data analysis and manipulation
- pymongo: MongoDB driver
- requests: HTTP requests (via Lambda layer)
The application includes comprehensive logging and AWS X-Ray tracing for monitoring:
- Lambda execution metrics
- Processing time per signal type
- Number of anomalies detected
- Database operation performance
Nzwisisa Chidembo
Created: December 26, 2022