A robust, real-time data engineering pipeline that detects Bitcoin (BTC) price discrepancies between Coinbase and Binance using Apache Flink and Kafka. The system processes market data in real-time, identifies arbitrage opportunities, and alerts users via Discord and a live Streamlit dashboard.
The pipeline consists of four main microservices orchestrated via Docker Compose:
graph LR
A[Coinbase/Binance API] -->|WebSocket| B(Producer)
B -->|JSON| C{Kafka: crypto-prices}
C -->|Stream| D[Flink Processor]
D -->|Tumbling Window| D
D -->|Arbitrage Event| E{Kafka: arbitrage-alerts}
E -->|Consumer| F[Streamlit Dashboard]
E -->|Consumer| G[Discord Alerter]
Consumer/Producer (src/producer.py):
- Connects to Coinbase and Binance WebSockets.
- Normalizes real-time trade data.
- Publishes to Kafka topic
crypto-prices.
Stream Processor (src/processor.py):
- Apache Flink (PyFlink) job.
- Consumes from
crypto-prices. - Applies Tumbling Windows (10 seconds) to aggregate prices.
- Joins streams from both exchanges.
- Detects spreads > $50.
- Sinks alerts to Kafka topic
arbitrage-alerts.
Alerter Service (src/alerter.py):
- Consumes from
arbitrage-alerts. - Sends real-time notifications to a Discord Webhook.
Dashboard (src/dashboard.py):
- Streamlit application.
- Consumes
arbitrage-alertsto visualize price spreads and alert history in real-time.
Real-Time Alerts Real-time visualization of price spreads Instant notifications sent to Discord
Discord Alert

- Docker & Docker Compose installed.
- Git installed.
Clone the repository and ensure your workspace looks like this:
.
├── docker-compose.yml
├── Dockerfile
├── jars/
│ └── flink-sql-connector-kafka-3.0.1-1.18.jar <-- See download link below
└── src/
├── producer.py
├── processor.py
├── alerter.py
└── dashboard.py
You must download the Flink-Kafka connector and place it in the jars/ folder:
Download flink-sql-connector-kafka-3.0.1-1.18.jar
Before running, you must configure where the alerts are sent.
- Open
src/alerter.py. - Find the variable
DISCORD_WEBHOOK. - Replace the placeholder text with your actual Discord Webhook URL.
- If you don't have one, you can leave it blank, but you won't receive Discord notifications.
Build the custom images and start the services:
docker-compose up --build -d- Dashboard: Access the real-time graph at http://localhost:8501
- Logs: Watch the "Brain" (Flink) processing data:
docker-compose logs -f taskmanagerReal arbitrage opportunities are rare. To force alerts for testing purposes, you can enable DEMO_MODE. This lowers the detection threshold to $0.1.
- Open
docker-compose.yml. - Find the
job-submitterservice. - Add/Uncomment the environment variable:
job-submitter:
environment:
- DEMO_MODE=TRUERestart the services: docker-compose up -d
| File/Folder | Description |
|---|---|
src/producer.py |
Python script to fetch WS data and push to Kafka. |
src/processor.py |
PyFlink SQL job for windowing and arbitrage logic. |
src/alerter.py |
Service to push alerts to Discord. |
src/dashboard.py |
Streamlit web app for visualization. |
docker-compose.yml |
Orchestrates Zookeeper, Kafka, JobManager, TaskManager, and App services. |
Dockerfile |
Custom image definition for Python dependencies. |
- "Leader Not Available" in Kafka Logs: This is common during startup. The services are designed to retry and should recover automatically after 30-60 seconds.
- Dashboard shows "Waiting for Kafka":
Ensure the Kafka container is healthy (
docker ps). - No Alerts:
Real arbitrage is rare! Enable
DEMO_MODEto verify the pipeline is working.
To turn this project from a detector into a profitable trading engine, the following upgrades are needed:
-
Fee Reduction (Critical)
- Problem: Retail trading fees (0.1% - 0.6%) often exceed the arbitrage spread (e.g., $20 spread vs $200 fees).
- Solution: Upgrade to VIP/Institutional tiers or use exchanges with zero-fee promotions.
-
Automated Execution (Bot)
- Current: Manual execution via dashboard.
- Future: Implement a
Executorservice that automatically places limit orders on both exchanges simultaneously usingccxt.
-
Smart Routing & More Exchanges
- Incorporate Kraken, Bybit, and OKX to increase probability of finding larger spreads.
- Implement Triangular Arbitrage (e.g., USD -> BTC -> ETH -> USD) to find inefficiencies within a single exchange.
-
DeFi & Flash Loans
- Move beyond Centralized Exchanges (CEX).
- Use Flash Loans (Aave/Uniswap) to borrow capital instantly, execute the trade on DEXs, and repay the loan in the same transaction blocks, removing capital requirements.