1- < << << << HEAD
21import asyncio
32from dataclasses import dataclass
43import hashlib
@@ -329,329 +328,4 @@ async def clearCache():
329328 break
330329
331330
332- == == == =
333- import asyncio
334- from dataclasses import dataclass
335- import hashlib
336- import hmac
337- import io
338- import os
339- from pathlib import Path
340- import time
341- import aiofiles
342- import aiohttp
343- from typing import Any
344- import socketio
345- import config
346- from timer import Timer # type: ignore
347- import pyzstd as zstd
348- from avro import schema , io as avro_io
349- import utils
350- import stats
351- import web
352-
353- PY_VERSION = "1.0.0"
354- VERSION = "1.9.7"
355- UA = f"openbmclapi-cluster/{ VERSION } Python/{ PY_VERSION } "
356- URL = 'https://openbmclapi.bangbang93.com/'
357- COUNTER = stats .Counters ()
358-
359- @dataclass
360- class BMCLAPIFile :
361- path : str
362- hash : str
363- size : int
364-
365- class TokenManager :
366- def __init__ (self ) -> None :
367- self .token = None
368- async def fetchToken (self ):
369- async with aiohttp .ClientSession (headers = {
370- "User-Agent" : UA
371- }, base_url = URL ) as session :
372- try :
373- async with session .get ("/openbmclapi-agent/challenge" , params = {"clusterId" : config .CLUSTER_ID }) as req :
374- req .raise_for_status ()
375- challenge : str = (await req .json ())['challenge' ]
376-
377- signature = hmac .new (config .CLUSTER_SECRET .encode ("utf-8" ), digestmod = hashlib .sha256 )
378- signature .update (challenge .encode ())
379- signature = signature .hexdigest ()
380-
381- data = {
382- "clusterId" : config .CLUSTER_ID ,
383- "challenge" : challenge ,
384- "signature" : signature
385- }
386-
387- async with session .post ("/openbmclapi-agent/token" , json = data ) as req :
388- req .raise_for_status ()
389- content : dict [str , Any ] = await req .json ()
390- self .token = content ['token' ]
391- Timer .delay (self .fetchToken , delay = float (content ['ttl' ]) / 1000.0 - 600 )
392-
393- except aiohttp .ClientError as e :
394- print (f"Error fetching token: { e } " )
395- async def getToken (self ) -> str :
396- if not self .token :
397- await self .fetchToken ()
398- return self .token or ''
399-
400- class Progress :
401- def __init__ (self , data , func ) -> None :
402- self .func = func
403- self .data = data
404- self .total = len (data )
405- self .cur = 0
406- self .cur_speed = 0
407- self .cur_time = time .time ()
408- def process (self ):
409- for data in self .data :
410- self .func (data )
411- self .cur += 1
412- self .cur_speed += 1
413- yield self .cur , self .total
414- if time .time () - self .cur_time >= 1 :
415- self .cur_speed = 0
416-
417- class FileStorage :
418- def __init__ (self , dir : Path ) -> None :
419- self .dir = dir
420- if self .dir .is_file ():
421- raise FileExistsError ("The path is file." )
422- self .dir .mkdir (exist_ok = True , parents = True )
423- self .files : asyncio .Queue [BMCLAPIFile ] = asyncio .Queue ()
424- self .download_bytes = utils .Progress (5 )
425- self .download_files = utils .Progress (5 )
426- self .sio = socketio .AsyncClient ()
427- self .keepalive = None
428- async def download (self , session : aiohttp .ClientSession ):
429- while not self .files .empty ():
430- file = await self .files .get ()
431- hash = utils .get_hash (file .hash )
432- size = 0
433- filepath = Path (str (self .dir ) + "/" + file .hash [:2 ] + "/" + file .hash )
434- try :
435- async with session .get (file .path ) as resp :
436- filepath .parent .mkdir (exist_ok = True , parents = True )
437- async with aiofiles .open (filepath , "wb" ) as w :
438- while data := await resp .content .read (config .IO_BUFFER ):
439- if not data :
440- break
441- byte = len (data )
442- size += byte
443- self .download_bytes .add (byte )
444- await w .write (data )
445- hash .update (data )
446- if file .hash != hash .hexdigest ():
447- filepath .unlink (True )
448- raise EOFError
449- self .download_files .add ()
450- except :
451- self .download_bytes .add (- size )
452- await self .files .put (file )
453- async def check_file (self ):
454- print ("Requesting files..." )
455- filelist = await self .get_file_list ()
456- filesize = sum ((file .size for file in filelist ))
457- total = len (filelist )
458- byte = 0
459- miss = []
460- for i , file in enumerate (filelist ):
461- filepath = Path (str (self .dir ) + f"/{ file .hash [:2 ]} /{ file .hash } " )
462- if not filepath .exists () or filepath .stat ().st_size != file .size :
463- miss .append (file )
464- await asyncio .sleep (0 )
465- b = utils .calc_more_bytes (byte , filesize )
466- byte += file .size
467- print (f"<<<flush>>>Check file { i } /{ total } ({ b [0 ]} /{ b [1 ]} ): { file .path } " )
468- if not miss :
469- print (f"<<<flush>>>Checked all files!" )
470- await self .start_service ()
471- return
472- filelist = miss
473- filesize = sum ((file .size for file in filelist ))
474- total = len (filelist )
475- print (f"<<<flush>>>Missing files: { total } ({ utils .calc_bytes (filesize )} )" )
476- for file in filelist :
477- await self .files .put (file )
478- self .download_bytes = utils .Progress (5 , filesize )
479- self .download_files = utils .Progress (5 )
480- timers = []
481- for _ in range (0 , config .MAX_DOWNLOAD , 32 ):
482- for __ in range (32 ):
483- timers .append (Timer .delay (self .download , args = (aiohttp .ClientSession (URL , headers = {
484- "User-Agent" : UA ,
485- "Authorization" : f"Bearer { await token .getToken ()} "
486- }), )))
487- while any ([not timer .called for timer in timers ]):
488- b = utils .calc_more_bytes (self .download_bytes .get_cur (), filesize )
489- bits = self .download_bytes .get_cur_speeds () or [0 ]
490- minbit = min (bits )
491- bit = utils .calc_more_bit (minbit , bits [- 1 ], max (bits ))
492- eta = self .download_bytes .get_eta ()
493- print (f"<<<flush>>>Downloading files... { self .download_files .get_cur ()} /{ total } { b [0 ]} /{ b [1 ]} , eta: { utils .format_time (eta if eta != - 1 else None )} , total: { utils .format_time (self .download_bytes .get_total ())} , Min: { bit [0 ]} , Cur: { bit [2 ]} , Max: { bit [1 ]} , Files: { self .download_files .get_cur_speed ()} /s" )
494- await asyncio .sleep (1 )
495- await self .start_service ()
496- async def start_service (self ):
497- tokens = await token .getToken ()
498- await self .sio .connect (URL ,
499- transports = ['websocket' ],
500- auth = {"token" : tokens },
501- ) # type: ignore
502- await self .enable ()
503- async def enable (self ):
504- if not self .sio .connected :
505- return
506- await self .emit ("enable" , {
507- "host" : config .PUBLICHOST ,
508- "port" : config .PUBLICPORT or config .PORT ,
509- "version" : VERSION ,
510- "byoc" : config .BYOC ,
511- "noFastEnable" : False
512- })
513- if not (Path (".ssl/cert.pem" ).exists () and Path (".ssl/key.pem" ).exists ()):
514- await self .emit ("request-cert" )
515- self .cur_counter = stats .Counters ()
516- print ("Connected Main Server." )
517- async def message (self , type , data ):
518- if type == "request-cert" :
519- cert = data [1 ]
520- print ("Requested cert!" )
521- cert_file = Path (".ssl/cert.pem" )
522- key_file = Path (".ssl/key.pem" )
523- for file in (cert_file , key_file ):
524- file .parent .mkdir (exist_ok = True , parents = True )
525- with open (cert_file , "w" ) as w :
526- w .write (cert ['cert' ])
527- with open (key_file , "w" ) as w :
528- w .write (cert ['key' ])
529- exit (0 )
530- elif type == "enable" :
531- if self .keepalive :
532- self .keepalive .block ()
533- self .keepalive = Timer .delay (self .keepaliveTimer , (), 5 )
534- if len (data ) == 2 and data [1 ] == True :
535- print ("Checked! Can service" )
536- return
537- print ("Error:" + data [0 ]['message' ])
538- Timer .delay (self .enable )
539- elif type == "keep-alive" :
540- COUNTER .hit -= self .cur_counter .hit
541- COUNTER .bytes -= self .cur_counter .bytes
542- self .keepalive = Timer .delay (self .keepaliveTimer , (), 5 )
543- async def keepaliveTimer (self ):
544- self .cur_counter .hit = COUNTER .hit
545- self .cur_counter .bytes = COUNTER .bytes
546- await self .emit ("keep-alive" , {
547- "time" : time .time (),
548- "hits" : self .cur_counter .hit ,
549- "bytes" : self .cur_counter .bytes
550- })
551- async def emit (self , channel , data = None ):
552- await self .sio .emit (channel , data , callback = lambda x : Timer .delay (self .message , (channel , x )))
553- async def get_file_list (self ):
554- async with aiohttp .ClientSession (headers = {
555- "User-Agent" : UA ,
556- "Authorization" : f"Bearer { await token .getToken ()} "
557- }, base_url = URL ) as session :
558- async with session .get ('/openbmclapi/files' , data = {
559- "responseType" : "buffer" ,
560- "cache" : ""
561- }) as req :
562- req .raise_for_status ()
563- print ("Requested files" )
564-
565- parser = avro_io .DatumReader (schema .parse (
566- '''
567- {
568- "type": "array",
569- "items": {
570- "type": "record",
571- "name": "FileList",
572- "fields": [
573- {"name": "path", "type": "string"},
574- {"name": "hash", "type": "string"},
575- {"name": "size", "type": "long"}
576- ]
577- }
578- }
579- ''' ))
580- decoder = avro_io .BinaryDecoder (io .BytesIO (zstd .decompress (await req .read ())))
581- return [BMCLAPIFile (** file ) for file in parser .read (decoder )]
582-
583- class FileCache :
584- def __init__ (self , file : Path ) -> None :
585- self .buf = io .BytesIO ()
586- self .size = 0
587- self .last_file = 0
588- self .last = 0
589- self .file = file
590- self .access = 0
591- async def __call__ (self ) -> io .BytesIO :
592- self .access = time .time ()
593- if self .last < time .time ():
594- stat = self .file .stat ()
595- if self .size == stat .st_size and self .last_file == stat .st_mtime :
596- self .last = time .time () + 600
597- return self .buf
598- self .buf .seek (0 , os .SEEK_SET )
599- async with aiofiles .open (self .file , "rb" ) as r :
600- while (data := await r .read (min (config .IO_BUFFER , stat .st_size - self .buf .tell ()))) and self .buf .tell () < stat .st_size :
601- self .buf .write (data )
602- self .last = time .time () + 600
603- self .size = stat .st_size
604- self .last_file = stat .st_mtime
605- self .buf .seek (0 , os .SEEK_SET )
606- return self .buf
607- cache : dict [str , FileCache ] = {}
608- token = TokenManager ()
609- storage : FileStorage = FileStorage (Path ("bmclapi" ))
610- async def init ():
611- global storage
612- Timer .delay (storage .check_file )
613- app = web .app
614- @app .get ("/measure/{size}" )
615- async def _ (request : web .Request , size : int , s : str , e : str ):
616- #if not config.SKIP_SIGN:
617- # check_sign(request.protocol + "://" + request.host + request.path, config.CLUSTER_SECRET, s, e)
618- async def iter (size ):
619- for _ in range (size ):
620- yield b'\x00 ' * 1024 * 1024
621- return web .Response (iter (size ))
622-
623- @app .get ("/download/{hash}" )
624- async def _ (request : web .Request , hash : str , s : str , e : str ):
625- #if not config.SKIP_SIGN:
626- # check_sign(request.protocol + "://" + request.host + request.path, config.CLUSTER_SECRET, s, e)
627- file = Path (str (storage .dir ) + "/" + hash [:2 ] + "/" + hash )
628- if not file .exists ():
629- return web .Response (status_code = 404 )
630- if hash not in cache :
631- cache [hash ] = FileCache (file )
632- data = await cache [hash ]()
633- COUNTER .bytes += len (data .getbuffer ())
634- COUNTER .hit += 1
635- return data .getbuffer ()
636-
637- async def clearCache ():
638- global cache
639- data = cache .copy ()
640- size = 0
641- for k , v in data .items ():
642- if v .access + 60 < time .time ():
643- cache .pop (k )
644- else :
645- size += v .size
646- if size > 1024 * 1024 * 512 :
647- data = cache .copy ()
648- for k , v in data .items ():
649- if size > 1024 * 1024 * 512 :
650- cache .pop (k )
651- size -= v .size
652- else :
653- break
654-
655-
656- > >> >> >> 1821e9 a699e53437109088d3d8cf4bb4a1bf9a50
657331Timer .repeat (clearCache , (), 5 , 10 )
0 commit comments