diff --git a/XTREME/utils.py b/XTREME/utils.py deleted file mode 100644 index 35412cf..0000000 --- a/XTREME/utils.py +++ /dev/null @@ -1,350 +0,0 @@ -from userbot import bot -from telethon import events -from var import Var -from pathlib import Path -from userbot.uniborgConfig import Config -from userbot import LOAD_PLUG -from userbot import CMD_LIST -import re -import logging -import inspect - -def command(**args): - stack = inspect.stack() - previous_stack_frame = stack[1] - file_test = Path(previous_stack_frame.filename) - file_test = file_test.stem.replace(".py", "") - if 1 == 0: - return print("stupidity at its best") - else: - pattern = args.get("pattern", None) - allow_sudo = args.get("allow_sudo", None) - allow_edited_updates = args.get('allow_edited_updates', False) - args["incoming"] = args.get("incoming", False) - args["outgoing"] = True - if bool(args["incoming"]): - args["outgoing"] = False - - try: - if pattern is not None and not pattern.startswith('(?i)'): - args['pattern'] = '(?i)' + pattern - except: - pass - - reg = re.compile('(.*)') - if not pattern == None: - try: - cmd = re.search(reg, pattern) - try: - cmd = cmd.group(1).replace("$", "").replace("\\", "").replace("^", "") - except: - pass - - try: - CMD_LIST[file_test].append(cmd) - except: - CMD_LIST.update({file_test: [cmd]}) - except: - pass - - if allow_sudo: - args["from_users"] = list(Var.SUDO_USERS) - # Mutually exclusive with outgoing (can only set one of either). - args["incoming"] = True - del allow_sudo - try: - del args["allow_sudo"] - except: - pass - - if "allow_edited_updates" in args: - del args['allow_edited_updates'] - - def decorator(func): - if allow_edited_updates: - bot.add_event_handler(func, events.MessageEdited(**args)) - bot.add_event_handler(func, events.NewMessage(**args)) - try: - LOAD_PLUG[file_test].append(func) - except: - LOAD_PLUG.update({file_test: [func]}) - return func - - return decorator - - -def load_module(shortname): - if shortname.startswith("__"): - pass - elif shortname.endswith("_"): - import userbot.utils - import sys - import importlib - from pathlib import Path - path = Path(f"userbot/plugins/{shortname}.py") - name = "userbot.plugins.{}".format(shortname) - spec = importlib.util.spec_from_file_location(name, path) - mod = importlib.util.module_from_spec(spec) - spec.loader.exec_module(mod) - print("Successfully (re)imported "+shortname) - else: - import userbot.utils - import sys - import importlib - from pathlib import Path - path = Path(f"userbot/plugins/{shortname}.py") - name = "userbot.plugins.{}".format(shortname) - spec = importlib.util.spec_from_file_location(name, path) - mod = importlib.util.module_from_spec(spec) - mod.bot = bot - mod.tgbot = bot.tgbot - mod.Var = Var - mod.command = command - mod.logger = logging.getLogger(shortname) - # support for uniborg - sys.modules["uniborg.util"] = userbot.utils - mod.Config = Config - mod.borg = bot - # support for paperplaneextended - sys.modules["userbot.events"] = userbot.utils - spec.loader.exec_module(mod) - # for imports - sys.modules["userbot.plugins."+shortname] = mod - print("Successfully (re)imported "+shortname) - -def remove_plugin(shortname): - try: - try: - for i in LOAD_PLUG[shortname]: - bot.remove_event_handler(i) - del LOAD_PLUG[shortname] - - except: - name = f"userbot.plugins.{shortname}" - - for i in reversed(range(len(bot._event_builders))): - ev, cb = bot._event_builders[i] - if cb.__module__ == name: - del bot._event_builders[i] - except: - raise ValueError - -def admin_cmd(pattern=None, **args): - stack = inspect.stack() - previous_stack_frame = stack[1] - file_test = Path(previous_stack_frame.filename) - file_test = file_test.stem.replace(".py", "") - allow_sudo = args.get("allow_sudo", False) - - # get the pattern from the decorator - if pattern is not None: - if pattern.startswith("\#"): - # special fix for snip.py - args["pattern"] = re.compile(pattern) - else: - args["pattern"] = re.compile("\." + pattern) - cmd = "." + pattern - try: - CMD_LIST[file_test].append(cmd) - except: - CMD_LIST.update({file_test: [cmd]}) - - args["outgoing"] = True - # should this command be available for other users? - if allow_sudo: - args["from_users"] = list(Config.SUDO_USERS) - # Mutually exclusive with outgoing (can only set one of either). - args["incoming"] = True - del args["allow_sudo"] - - # error handling condition check - elif "incoming" in args and not args["incoming"]: - args["outgoing"] = True - - # add blacklist chats, UB should not respond in these chats - allow_edited_updates = False - if "allow_edited_updates" in args and args["allow_edited_updates"]: - allow_edited_updates = args["allow_edited_updates"] - del args["allow_edited_updates"] - - # check if the plugin should listen for outgoing 'messages' - is_message_enabled = True - - return events.NewMessage(**args) - -""" Userbot module for managing events. - One of the main components of the userbot. """ - -from telethon import events -import asyncio -from userbot import bot -from traceback import format_exc -from time import gmtime, strftime -import math -import subprocess -import sys -import traceback -import datetime - - -def register(**args): - """ Register a new event. """ - stack = inspect.stack() - previous_stack_frame = stack[1] - file_test = Path(previous_stack_frame.filename) - file_test = file_test.stem.replace(".py", "") - pattern = args.get('pattern', None) - disable_edited = args.get('disable_edited', False) - - if pattern is not None and not pattern.startswith('(?i)'): - args['pattern'] = '(?i)' + pattern - - if "disable_edited" in args: - del args['disable_edited'] - - reg = re.compile('(.*)') - if not pattern == None: - try: - cmd = re.search(reg, pattern) - try: - cmd = cmd.group(1).replace("$", "").replace("\\", "").replace("^", "") - except: - pass - - try: - CMD_LIST[file_test].append(cmd) - except: - CMD_LIST.update({file_test: [cmd]}) - except: - pass - - def decorator(func): - if not disable_edited: - bot.add_event_handler(func, events.MessageEdited(**args)) - bot.add_event_handler(func, events.NewMessage(**args)) - try: - LOAD_PLUG[file_test].append(func) - except Exception as e: - LOAD_PLUG.update({file_test: [func]}) - - return func - - return decorator - - -def errors_handler(func): - async def wrapper(errors): - try: - await func(errors) - except BaseException: - - date = strftime("%Y-%m-%d %H:%M:%S", gmtime()) - new = { - 'error': str(sys.exc_info()[1]), - 'date': datetime.datetime.now() - } - - text = "**USERBOT CRASH REPORT**\n\n" - - link = "[here](https://t.me/PaperplaneExtendedSupport)" - text += "If you wanna you can report it" - text += f"- just forward this message {link}.\n" - text += "Nothing is logged except the fact of error and date\n" - - ftext = "\nDisclaimer:\nThis file uploaded ONLY here," - ftext += "\nwe logged only fact of error and date," - ftext += "\nwe respect your privacy," - ftext += "\nyou may not report this error if you've" - ftext += "\nany confidential data here, no one will see your data\n\n" - - ftext += "--------BEGIN USERBOT TRACEBACK LOG--------" - ftext += "\nDate: " + date - ftext += "\nGroup ID: " + str(errors.chat_id) - ftext += "\nSender ID: " + str(errors.sender_id) - ftext += "\n\nEvent Trigger:\n" - ftext += str(errors.text) - ftext += "\n\nTraceback info:\n" - ftext += str(traceback.format_exc()) - ftext += "\n\nError text:\n" - ftext += str(sys.exc_info()[1]) - ftext += "\n\n--------END USERBOT TRACEBACK LOG--------" - - command = "git log --pretty=format:\"%an: %s\" -5" - - ftext += "\n\n\nLast 5 commits:\n" - - process = await asyncio.create_subprocess_shell( - command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE) - stdout, stderr = await process.communicate() - result = str(stdout.decode().strip()) \ - + str(stderr.decode().strip()) - - ftext += result - - return wrapper - -async def progress(current, total, event, start, type_of_ps, file_name=None): - """Generic progress_callback for both - upload.py and download.py""" - now = time.time() - diff = now - start - if round(diff % 10.00) == 0 or current == total: - percentage = current * 100 / total - speed = current / diff - elapsed_time = round(diff) * 1000 - time_to_completion = round((total - current) / speed) * 1000 - estimated_total_time = elapsed_time + time_to_completion - progress_str = "[{0}{1}]\nProgress: {2}%\n".format( - ''.join(["█" for i in range(math.floor(percentage / 5))]), - ''.join(["░" for i in range(20 - math.floor(percentage / 5))]), - round(percentage, 2)) - tmp = progress_str + \ - "{0} of {1}\nETA: {2}".format( - humanbytes(current), - humanbytes(total), - time_formatter(estimated_total_time) - ) - if file_name: - await event.edit("{}\nFile Name: `{}`\n{}".format( - type_of_ps, file_name, tmp)) - else: - await event.edit("{}\n{}".format(type_of_ps, tmp)) - - -def humanbytes(size): - """Input size in bytes, - outputs in a human readable format""" - # https://stackoverflow.com/a/49361727/4723940 - if not size: - return "" - # 2 ** 10 = 1024 - power = 2**10 - raised_to_pow = 0 - dict_power_n = {0: "", 1: "Ki", 2: "Mi", 3: "Gi", 4: "Ti"} - while size > power: - size /= power - raised_to_pow += 1 - return str(round(size, 2)) + " " + dict_power_n[raised_to_pow] + "B" - - -def time_formatter(milliseconds: int) -> str: - """Inputs time in milliseconds, to get beautified time, - as string""" - seconds, milliseconds = divmod(int(milliseconds), 1000) - minutes, seconds = divmod(seconds, 60) - hours, minutes = divmod(minutes, 60) - days, hours = divmod(hours, 24) - tmp = ((str(days) + " day(s), ") if days else "") + \ - ((str(hours) + " hour(s), ") if hours else "") + \ - ((str(minutes) + " minute(s), ") if minutes else "") + \ - ((str(seconds) + " second(s), ") if seconds else "") + \ - ((str(milliseconds) + " millisecond(s), ") if milliseconds else "") - return tmp[:-2] - -class Loader(): - def __init__(self, func=None, **args): - self.Var = Var - bot.add_event_handler(func, events.NewMessage(**args)) -