-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathvolume_stats.py
More file actions
97 lines (84 loc) · 2.53 KB
/
volume_stats.py
File metadata and controls
97 lines (84 loc) · 2.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import time
import requests
from config import OI_BASE_URL as BASE
# =========================
# 🔗 URL mapping
# =========================
URLS = {
"OPEN_INTEREST": BASE + "/fapi/v1/openInterest?symbol={symbol}",
"FUNDING_RATE": BASE + "/fapi/v1/premiumIndex?symbol={symbol}",
"TICKER_24HR": BASE + "/fapi/v1/ticker/24hr?symbol={symbol}",
}
# =========================
# 🔐 Simple in-memory cache
# =========================
_cached = {
"oi": {},
"funding": {},
"24hr": {},
}
def _cache_get(group, key, ttl):
item = _cached[group].get(key)
if not item:
return None
if time.time() - item["ts"] > ttl:
return None
return item["value"]
def _cache_set(group, key, value):
_cached[group][key] = {
"value": value,
"ts": time.time()
}
# =========================
# 📌 API wrappers
# =========================
def get_open_interest(symbol):
cached = _cache_get("oi", symbol, ttl=60)
if cached is not None:
return cached
try:
r = requests.get(
URLS["OPEN_INTEREST"].format(symbol=symbol),
timeout=5
).json()
value = float(r.get("openInterest"))
except Exception:
value = None
_cache_set("oi", symbol, value)
return value
def get_funding_rate(symbol):
cached = _cache_get("funding", symbol, ttl=60)
if cached is not None:
return cached
try:
r = requests.get(
URLS["FUNDING_RATE"].format(symbol=symbol),
timeout=5
).json()
value = float(r.get("lastFundingRate"))
except Exception:
value = None
_cache_set("funding", symbol, value)
return value
def get_24hr_change(symbol):
cached = _cache_get("24hr", symbol, ttl=60)
if cached is not None:
return cached
try:
j = requests.get(
URLS["TICKER_24HR"].format(symbol=symbol),
timeout=5
).json()
result = {
"priceChange": float(j.get("priceChange", 0)),
"priceChangePercent": float(j.get("priceChangePercent", 0)),
"lastPrice": float(j.get("lastPrice", 0)),
"highPrice": float(j.get("highPrice", 0)),
"lowPrice": float(j.get("lowPrice", 0)),
"volume": float(j.get("volume", 0)),
"quoteVolume": float(j.get("quoteVolume", 0)),
}
except Exception:
result = None
_cache_set("24hr", symbol, result)
return result