-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathkline_fetcher.py
More file actions
56 lines (45 loc) · 1.89 KB
/
kline_fetcher.py
File metadata and controls
56 lines (45 loc) · 1.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import time
import json
import logging
import requests
from concurrent.futures import ThreadPoolExecutor
from config import monitor_symbols, timeframes, KLINE_LIMITS
from database import redis_client
def fetch_historical(symbol, interval, limit):
url = f"https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval={interval}&limit={limit}"
rkey = f"historical_data:{symbol}:{interval}"
try:
data = requests.get(url, timeout=5).json()
now = int(time.time() * 1000)
with redis_client.pipeline() as pipe:
for k in data:
ts, close_ts = k[0], k[6]
if close_ts > now:
continue
entry = json.dumps({
"Open": float(k[1]),
"High": float(k[2]),
"Low": float(k[3]),
"Close": float(k[4]),
"Volume": float(k[5]),
"TakerBuyVolume": float(k[9]),
"TakerSellVolume": float(k[5]) - float(k[9])
})
pipe.hset(rkey, ts, entry)
pipe.execute()
except Exception as e:
logging.warning(f"{symbol} {interval} 历史获取失败: {e}")
def fetch_all():
total_requests = len(monitor_symbols) * len(timeframes)
print(f"⏳ 初始化下载中... 预计请求数: {total_requests}")
start_time = time.time()
time.sleep(2)
with ThreadPoolExecutor(max_workers=8) as exe:
for s in monitor_symbols:
for tf in timeframes:
limit = KLINE_LIMITS.get(tf, 301) # 兜底默认
exe.submit(fetch_historical, s, tf, limit)
elapsed = time.time() - start_time
avg = elapsed / total_requests
print(f"📌 历史数据初始化完成 ✓")
print(f"⏱ 总耗时: {elapsed:.2f} 秒 (平均单请求: {avg:.3f} 秒)")