Skip to content

Commit 4d3b5ff

Browse files
authored
Merge pull request #21 from TTB-Network/dev/statistics
Dev/statistics
2 parents b7733a0 + e527f51 commit 4d3b5ff

File tree

13 files changed

+178
-74
lines changed

13 files changed

+178
-74
lines changed

bmclapi_dashboard/static/js/index.min.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bmclapi_dashboard/static/js/ttb.min.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/__init__.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ def disconnect(self, client: ProxyClient):
7272
if client not in self._tables:
7373
return
7474
self._tables.remove(client)
75+
def get_origin_from_ip(self, ip: tuple[str, int]):
76+
# ip is connected client
77+
for target in self._tables:
78+
if target.target.get_sock_address() == ip:
79+
return target.origin.get_address()
80+
return None
7581

7682
ssl_server: Optional[asyncio.Server] = None
7783
server: Optional[asyncio.Server] = None
@@ -85,7 +91,7 @@ def disconnect(self, client: ProxyClient):
8591
IO_BUFFER: int = Config.get("advanced.io_buffer")
8692

8793
async def _handle_ssl(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
88-
return await _handle_process(Client(reader, writer), True)
94+
return await _handle_process(Client(reader, writer, peername = proxy.get_origin_from_ip(writer.get_extra_info("peername"))), True)
8995

9096
async def _handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
9197
return await _handle_process(Client(reader, writer))
@@ -101,7 +107,7 @@ async def _handle_process(client: Client, ssl: bool = False):
101107
await client.writer.drain()
102108
break
103109
if protocol == Protocol.Unknown and not ssl and ssl_server:
104-
target = Client(*(await asyncio.open_connection("127.0.0.1", ssl_server.sockets[0].getsockname()[1])))
110+
target = Client(*(await asyncio.open_connection("127.0.0.1", ssl_server.sockets[0].getsockname()[1])), peername=client.get_address())
105111
proxying = True
106112
await proxy.connect(client, target, header)
107113
break
@@ -121,7 +127,7 @@ async def check_ports():
121127
global ssl_server, server, client_side_ssl, restart, check_port_key
122128
while 1:
123129
ports: list[tuple[asyncio.Server, ssl.SSLContext | None]] = []
124-
for service in ((server, None), (ssl_server, client_side_ssl if get_loads() != 0 else None)):
130+
for service in ((server, None), (ssl_server, client_side_ssl if get_loaded() else None)):
125131
if not service[0]:
126132
continue
127133
ports.append((service[0], service[1]))
@@ -144,13 +150,14 @@ async def check_ports():
144150
async def main():
145151
global ssl_server, server, server_side_ssl, restart
146152
await web.init()
153+
certificate.load_cert(Path(".ssl/cert"), Path(".ssl/key"))
147154
Timer.delay(check_ports, (), 5)
148155
while 1:
149156
try:
150157
server = await asyncio.start_server(_handle, port=PORT)
151-
ssl_server = await asyncio.start_server(_handle_ssl, port=0 if SSL_PORT == PORT else SSL_PORT, ssl=server_side_ssl if get_loads() != 0 else None)
158+
ssl_server = await asyncio.start_server(_handle_ssl, port=0 if SSL_PORT == PORT else SSL_PORT, ssl=server_side_ssl if get_loaded() else None)
152159
logger.info(f"Listening server on {PORT}")
153-
logger.info(f"Listening server on {ssl_server.sockets[0].getsockname()[1]} Loaded certificates: {get_loads()}")
160+
logger.info(f"Listening server on {ssl_server.sockets[0].getsockname()[1]}")
154161
async with server, ssl_server:
155162
await asyncio.gather(server.serve_forever(), ssl_server.serve_forever())
156163
except asyncio.CancelledError:

core/api.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class File:
3434
last_hit: float = 0
3535
last_access: float = 0
3636
data: Optional[io.BytesIO] = None
37+
cache: bool = False
3738
def is_url(self):
3839
if not isinstance(self.path, str):
3940
return False
@@ -49,6 +50,11 @@ def set_data(self, data: io.BytesIO | memoryview | bytes):
4950
data = io.BytesIO(data)
5051
self.data = io.BytesIO(zlib.compress(data.getbuffer()))
5152

53+
@dataclass
54+
class StatsCache:
55+
total: int = 0
56+
bytes: int = 0
57+
5258
class Storage(metaclass=abc.ABCMeta):
5359
@abc.abstractmethod
5460
async def get(self, file: str) -> File:
@@ -66,6 +72,9 @@ async def exists(self, hash: str) -> bool:
6672
async def get_size(self, hash: str) -> int:
6773
raise NotImplementedError
6874
@abc.abstractmethod
75+
async def copy(self, origin: Path, hash: str) -> int:
76+
raise NotImplementedError
77+
@abc.abstractmethod
6978
async def write(self, hash: str, io: io.BytesIO) -> int:
7079
raise NotImplementedError
7180
@abc.abstractmethod
@@ -80,7 +89,9 @@ async def get_files_size(self, dir: str) -> int:
8089
@abc.abstractmethod
8190
async def removes(self, hashs: list[str]) -> int:
8291
raise NotImplementedError
83-
92+
@abc.abstractmethod
93+
async def get_cache_stats(self) -> StatsCache:
94+
raise NotImplementedError
8495

8596
def get_hash(org):
8697
if len(org) == 32:
@@ -92,7 +103,7 @@ def get_hash(org):
92103
async def get_file_hash(org: str, path: Path):
93104
hash = get_hash(org)
94105
async with aiofiles.open(path, "rb") as r:
95-
while data := await r.read(Config.get("io_buffer")):
106+
while data := await r.read(Config.get("advanced.io_buffer")):
96107
if not data:
97108
break
98109
hash.update(data)

core/certificate.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
from pathlib import Path
23
import ssl
34
import time
@@ -13,22 +14,23 @@
1314
client_side_ssl = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
1415
client_side_ssl.check_hostname = False
1516

16-
_loads: int = 0
17+
_loaded: bool = False
1718

1819
def load_cert(cert, key):
19-
global server_side_ssl, client_side_ssl, _loads
20+
global server_side_ssl, client_side_ssl, _loaded
21+
if not os.path.exists(cert) or not os.path.exists(key):
22+
return False
2023
try:
2124
server_side_ssl.load_cert_chain(cert, key)
2225
client_side_ssl.load_verify_locations(cert)
23-
_loads += 1
26+
_loaded = True
2427
return True
2528
except:
2629
logger.error("Failed to load certificate: ", traceback.format_exc())
2730
return False
2831

29-
def get_loads() -> int:
30-
global _loads
31-
return _loads
32+
def get_loaded() -> bool:
33+
return _loaded
3234

3335
def load_text(cert: str, key: str):
3436
t = time.time()
@@ -38,7 +40,7 @@ def load_text(cert: str, key: str):
3840
c.write(cert)
3941
k.write(key)
4042
if load_cert(cert_file, key_file):
41-
logger.info("Loaded certificate from text! Current: ", get_loads())
43+
logger.info("Loaded certificate from text!")
4244
core.restart = True
4345
if core.server:
4446
core.server.close()

core/cluster.py

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import dataclasses
23
import hashlib
34
import hmac
45
import io
@@ -13,7 +14,7 @@
1314
import socketio
1415
from tqdm import tqdm
1516
from core.config import Config
16-
from core import certificate, unit
17+
from core import certificate, system, unit
1718
from core.timer import Task, Timer
1819
import pyzstd as zstd
1920
import core.utils as utils
@@ -25,6 +26,7 @@
2526
from core.api import (
2627
File,
2728
BMCLAPIFile,
29+
StatsCache,
2830
Storage,
2931
get_hash,
3032
)
@@ -54,6 +56,7 @@ async def fetchToken(self):
5456
async with aiohttp.ClientSession(
5557
headers={"User-Agent": USER_AGENT}, base_url=BASE_URL
5658
) as session:
59+
logger.info("Fetching token")
5760
try:
5861
async with session.get(
5962
"/openbmclapi-agent/challenge", params={"clusterId": CLUSTER_ID}
@@ -80,15 +83,14 @@ async def fetchToken(self):
8083
Timer.delay(
8184
self.fetchToken, delay=float(content["ttl"]) / 1000.0 - 600
8285
)
86+
logger.info("Fetched token")
8387

8488
except aiohttp.ClientError as e:
8589
logger.error(f"Error fetching token: {e}.")
8690

8791
async def getToken(self) -> str:
8892
if not self.token:
89-
logger.info("Fetching token")
9093
await self.fetchToken()
91-
logger.info("Fetched token")
9294
return self.token or ""
9395

9496
class ParseFileList:
@@ -152,6 +154,8 @@ async def _download(self, pbar: tqdm, session: aiohttp.ClientSession):
152154
except:
153155
pbar.update(-size)
154156
await self.queues.put(file)
157+
if cluster:
158+
await cluster._check_files_sync_status("下载文件中", pbar, unit.format_more_bytes)
155159
await session.close()
156160

157161
async def _mount_file(self, file: BMCLAPIFile):
@@ -166,9 +170,12 @@ async def _mount_file(self, file: BMCLAPIFile):
166170
logger.error(traceback.format_exc())
167171
if result != file.size:
168172
logger.error(f"Download file error: File {file.hash}({unit.format_bytes(file.size)}) copy to target file error: {file.hash}({unit.format_bytes(result)})")
169-
os.remove(f"./cache/download/{file.hash[:2]}/{file.hash}")
173+
try:
174+
os.remove(f"./cache/download/{file.hash[:2]}/{file.hash}")
175+
except:
176+
...
170177
async def download(self, storages: list['Storage'], miss: list[BMCLAPIFile]):
171-
with tqdm(desc="Downloading files", unit="bytes", unit_divisor=1024, total=sum((file.size for file in miss)), unit_scale=True) as pbar:
178+
with tqdm(desc="Downloading files", unit="b", unit_divisor=1024, total=sum((file.size for file in miss)), unit_scale=True) as pbar:
172179
self.storages = storages
173180
for file in miss:
174181
await self.queues.put(file)
@@ -197,13 +204,12 @@ def __init__(self, dir: Path) -> None:
197204
raise FileExistsError("The path is file.")
198205
self.dir.mkdir(exist_ok=True, parents=True)
199206
self.cache: dict[str, File] = {}
200-
self.stats: stats.StorageStats = stats.get_storage(f"File_{self.dir}")
201207
self.timer = Timer.repeat(self.clear_cache, (), CHECK_CACHE, CHECK_CACHE)
202208
async def get(self, hash: str) -> File:
203209
if hash in self.cache:
204210
file = self.cache[hash]
205211
file.last_access = time.time()
206-
self.stats.hit(file, cache = True)
212+
file.cache = True
207213
return file
208214
path = Path(str(self.dir) + f"/{hash[:2]}/{hash}")
209215
buf = io.BytesIO()
@@ -213,12 +219,17 @@ async def get(self, hash: str) -> File:
213219
file = File(path, hash, buf.tell(), time.time(), time.time())
214220
file.set_data(buf.getbuffer())
215221
self.cache[hash] = file
216-
self.stats.hit(file)
222+
file.cache = False
217223
return file
218224
async def exists(self, hash: str) -> bool:
219225
return os.path.exists(str(self.dir) + f"/{hash[:2]}/{hash}")
220226
async def get_size(self, hash: str) -> int:
221227
return os.path.getsize(str(self.dir) + f"/{hash[:2]}/{hash}")
228+
async def copy(self, origin: Path, hash: str):
229+
Path(str(self.dir) + f"/{hash[:2]}/{hash}").parent.mkdir(exist_ok=True, parents=True)
230+
async with aiofiles.open(str(self.dir) + f"/{hash[:2]}/{hash}", "wb") as w, aiofiles.open(origin, "rb") as r:
231+
await w.write(await r.read())
232+
return origin.stat().st_size
222233
async def write(self, hash: str, io: io.BytesIO) -> int:
223234
Path(str(self.dir) + f"/{hash[:2]}/{hash}").parent.mkdir(exist_ok=True, parents=True)
224235
async with aiofiles.open(str(self.dir) + f"/{hash[:2]}/{hash}", "wb") as w:
@@ -258,9 +269,10 @@ async def clear_cache(self):
258269
logger.info(f"Outdate caches: {unit.format_number(len(old_keys))}({unit.format_bytes(old_size)})")
259270
async def get_files(self, dir: str) -> list[str]:
260271
files = []
261-
with os.scandir(str(self.dir) + f"/{dir}") as session:
262-
for file in session:
263-
files.append(file.name)
272+
if os.path.exists(str(self.dir) + f"/{dir}"):
273+
with os.scandir(str(self.dir) + f"/{dir}") as session:
274+
for file in session:
275+
files.append(file.name)
264276
return files
265277
async def removes(self, hashs: list[str]) -> int:
266278
success = 0
@@ -272,14 +284,25 @@ async def removes(self, hashs: list[str]) -> int:
272284
return success
273285
async def get_files_size(self, dir: str) -> int:
274286
size = 0
275-
with os.scandir(str(self.dir) + f"/{dir}") as session:
276-
for file in session:
277-
size += file.stat().st_size
287+
if os.path.exists(str(self.dir) + f"/{dir}"):
288+
with os.scandir(str(self.dir) + f"/{dir}") as session:
289+
for file in session:
290+
size += file.stat().st_size
278291
return size
292+
async def get_cache_stats(self) -> StatsCache:
293+
stat = StatsCache()
294+
for file in self.cache.values():
295+
stat.total += 1
296+
stat.bytes += file.size
297+
return stat
298+
class WebDav(Storage):
299+
def __init__(self) -> None:
300+
super().__init__()
279301
class Cluster:
280302
def __init__(self) -> None:
281303
self.sio = socketio.AsyncClient()
282304
self.storages: list[Storage] = []
305+
self.storage_stats: dict[Storage, stats.StorageStats] = {}
283306
self.started = False
284307
self.sio.on("message", self._message)
285308
self.cur_storage: Optional[stats.SyncStorage] = None
@@ -293,13 +316,21 @@ def _message(self, message):
293316
logger.info(f"[Remote] {message}")
294317
if "信任度过低" in message:
295318
self.trusted = False
319+
def get_storages(self):
320+
return self.storages.copy()
296321
def add_storage(self, storage):
297322
self.storages.append(storage)
323+
type = "Unknown"
324+
key = time.time()
325+
if isinstance(storage, FileStorage):
326+
type = "File"
327+
key = storage.dir
328+
self.storage_stats[storage] = stats.get_storage(f"{type}_{key}")
298329

299-
async def _check_files_sync_status(self, text: str, pbar: tqdm):
330+
async def _check_files_sync_status(self, text: str, pbar: tqdm, format = unit.format_numbers):
300331
if self.check_files_timer:
301332
return
302-
n, total = unit.format_numbers(pbar.n, pbar.total)
333+
n, total = format(pbar.n, pbar.total)
303334
await set_status(f"{text} ({n}/{total})")
304335

305336
async def check_files(self):
@@ -377,7 +408,6 @@ async def check_files(self):
377408
if paths:
378409
for path in paths:
379410
os.remove(path)
380-
pbar.disable()
381411
pbar.update(1)
382412
if dir:
383413
for d in dir:
@@ -402,8 +432,19 @@ async def start(self, ):
402432
await self.check_files()
403433
await set_status("启动服务")
404434
await self.enable()
405-
async def get(self, hash):
406-
return await self.storages[0].get(hash)
435+
async def get(self, hash) -> File:
436+
storage = self.storages[0]
437+
stat = self.storage_stats[storage]
438+
file = await storage.get(hash)
439+
stat.hit(file)
440+
return file
441+
async def get_cache_stats(self) -> StatsCache:
442+
stat = StatsCache()
443+
for storage in self.storages:
444+
t = await storage.get_cache_stats()
445+
stat.total += t.total
446+
stat.bytes += t.bytes
447+
return stat
407448
async def exists(self, hash):
408449
return await self.storages[0].exists(hash)
409450
async def enable(self) -> None:
@@ -486,14 +527,16 @@ async def _keepalive(self):
486527
"bytes": storage.get_total_bytes() - storage.get_last_bytes(),
487528
})
488529
await self.start_keepalive(300)
489-
async def _keepalive_timeout(self):
490-
logger.warn("Failed to keepalive? Reconnect the main")
530+
async def reconnect(self):
491531
try:
492532
await self.disable()
493533
except:
494534
...
495535
await self.cert()
496536
await self.enable()
537+
async def _keepalive_timeout(self):
538+
logger.warn("Failed to keepalive? Reconnect the main")
539+
await self.reconnect()
497540
async def cert(self):
498541
if Path("./.ssl/cert").exists() == Path("./.ssl/key").exists() == True:
499542
return
@@ -574,6 +617,13 @@ async def process(type: str, data: Any):
574617
async with aiohttp.ClientSession(BASE_URL) as session:
575618
async with session.get(data) as resp:
576619
return resp.json()
620+
if type == "system":
621+
return {
622+
"memory": system.get_used_memory(),
623+
"connections": system.get_connections(),
624+
"cpu": system.get_cpus(),
625+
"cache": dataclasses.asdict(await cluster.get_cache_stats()) if cluster else StatsCache()
626+
}
577627

578628
token = TokenManager()
579629
cluster: Optional[Cluster] = None
@@ -592,6 +642,7 @@ async def set_status(text: str):
592642
async def init():
593643
global cluster
594644
cluster = Cluster()
645+
system.init()
595646
plugins.load_plugins()
596647
for plugin in plugins.get_plugins():
597648
await plugin.init()

0 commit comments

Comments
 (0)