Skip to content

A distributed AI-powered high-frequency trading engine using Python, TensorFlow, Kafka, and FastAPI for sub-50ms latency trades.

License

Notifications You must be signed in to change notification settings

mosesachizz/distributed-ai-trading-engine

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Distributed AI Trading Engine

A high-performance, distributed trading engine built with Python, TensorFlow, and Apache Kafka.
The system executes trades with sub-50ms latency, integrates adaptive risk models, and sustains millions of transactions daily with minimal downtime.
It also includes distributed systems, machine learning, CI/CD, authentication, and advanced monitoring.


Problem Statement

High-frequency trading (HFT) systems require ultra-low latency, scalability, and robust risk management to handle volatile markets and high transaction volumes.
Traditional systems often struggle with real-time data processing, adaptive decision-making, security, and monitoring — leading to missed opportunities or excessive risk exposure.


Solution

This project implements a Distributed AI Trading Engine that:

  • Executes trades with sub-50ms latency using optimized Python and TensorFlow LSTM models.
  • Integrates adaptive risk models to dynamically adjust to market conditions (volatility, RSI).
  • Leverages Apache Kafka for real-time market data streaming and fault-tolerant processing.
  • Scales horizontally with Docker and Kafka to handle millions of transactions daily.
  • Secures endpoints with JWT authentication.
  • Provides advanced monitoring with Prometheus and Grafana integration.
  • Ensures code quality with automated CI/CD pipelines and linting.

Tech Stack

Component Technology
Programming Language Python
Machine Learning TensorFlow (LSTM for time-series trade predictions)
Streaming Platform Apache Kafka
API Framework FastAPI with JWT authentication
Containerization Docker, Docker Compose
Monitoring Prometheus & Grafana
Testing PyTest
CI/CD GitHub Actions
Linting Flake8
Configuration YAML

Architecture Decisions

  • Modular Design – Separates concerns (trade execution, risk management, data streaming, API) for maintainability.
  • LSTM Model – Uses time-series modeling for accurate trade predictions.
  • Kafka for Streaming – Ensures fault-tolerant, high-throughput data processing.
  • JWT Authentication – Secures API endpoints for production use.
  • Prometheus & Grafana – Provides advanced metrics visualization.
  • CI/CD Pipeline – Automates testing, linting, and deployment for code quality.
  • Containerization – Docker ensures consistent environments and scalability.

Key Feature: Low-Latency Trade Execution

Below is a code snippet from src/core/trade_engine.py, showing type hints, error handling, and Prometheus metrics.

def execute_trade(self, market_data: Dict) -> Optional[Dict]:
    """Execute a trade based on market data and ML predictions."""
    start_time = time.time()
    try:
        features = self.market_data.preprocess(market_data)
        prediction = self.model.predict(np.array([features]), verbose=0)[0]
        if not self.risk_manager.evaluate_risk(prediction, market_data):
            self.logger.warning("Trade rejected due to risk thresholds: %s", market_data)
            self.metrics.record_trade_rejection()
            return None
        trade = {
            "symbol": market_data["symbol"],
            "action": "buy" if prediction[0] > 0.5 else "sell",
            "quantity": self.risk_manager.calculate_position_size(market_data),
            "price": market_data["price"],
            "timestamp": time.time()
        }
        latency = (time.time() - start_time) * 1000
        self.metrics.record_trade_latency(latency)
        self.metrics.record_trade_execution()
        self.logger.info("Trade executed: %s, Latency: %.2fms", trade, latency)
        return trade
    except Exception as e:
        self.logger.error("Trade execution failed: %s", str(e))
        self.metrics.record_error()
        return None

Explanation:

  • Low Latency: Measures and logs execution time to ensure sub-50ms performance.
  • Robustness: Comprehensive error handling and logging.
  • Modularity: Integrates with risk management and market data modules.
  • Metrics: Tracks performance with Prometheus for monitoring.

Setup Instructions

1. Clone the Repository

git clone https://github.com/mosesachizz/distributed-ai-trading-engine.git
cd distributed-ai-trading-engine

2. Install Dependencies

pip install -r requirements.txt

3. Set Up Kafka, Prometheus, and Grafana

Start services with Docker Compose:

docker-compose -f docker/docker-compose.yml up -d

4. Train the ML Model

python scripts/train_model.py

5. Run the Market Data Simulator (optional)

python scripts/simulate_market.py

6. Run the Application

python src/main.py

7. Run Tests

pytest tests/

8. Access the API

Generate a JWT token (example using Python)

from src.utils.auth import create_token
print(create_token({"sub": "user1"}))

Endpoints

  • Trade execution: http://localhost:8000/trade
    (POST JSON: {"symbol": "AAPL", "price": 150.0, "volume": 1000.0, "volatility": 0.02, "sma": 148.0, "rsi": 50.0})
    Include header: Authorization: Bearer <token>
  • Health check: http://localhost:8000/health
  • Prometheus metrics: http://localhost:9090
  • Grafana dashboard: http://localhost:3000 (default login: admin/admin)

9. CI/CD Setup

The GitHub Actions workflow (.github/workflows/ci.yml) runs tests, linting, and Docker builds on push/pull requests.


License

This project is licensed under the MIT License — see the LICENSE file for details.

About

A distributed AI-powered high-frequency trading engine using Python, TensorFlow, Kafka, and FastAPI for sub-50ms latency trades.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published