diff --git a/userbot/utils.py b/userbot/utils.py new file mode 100644 index 0000000..51a6e72 --- /dev/null +++ b/userbot/utils.py @@ -0,0 +1,349 @@ +from userbot import bot +from telethon import events +from var import Var +from pathlib import Path +from userbot.uniborgConfig import Config +from userbot import LOAD_PLUG +from userbot import CMD_LIST +import re +import logging +import inspect + +def command(**args): + stack = inspect.stack() + previous_stack_frame = stack[1] + file_test = Path(previous_stack_frame.filename) + file_test = file_test.stem.replace(".py", "") + if 1 == 0: + return print("stupidity at its best") + else: + pattern = args.get("pattern", None) + allow_sudo = args.get("allow_sudo", None) + allow_edited_updates = args.get('allow_edited_updates', False) + args["incoming"] = args.get("incoming", False) + args["outgoing"] = True + if bool(args["incoming"]): + args["outgoing"] = False + + try: + if pattern is not None and not pattern.startswith('(?i)'): + args['pattern'] = '(?i)' + pattern + except: + pass + + reg = re.compile('(.*)') + if not pattern == None: + try: + cmd = re.search(reg, pattern) + try: + cmd = cmd.group(1).replace("$", "").replace("\\", "").replace("^", "") + except: + pass + + try: + CMD_LIST[file_test].append(cmd) + except: + CMD_LIST.update({file_test: [cmd]}) + except: + pass + + if allow_sudo: + args["from_users"] = list(Var.SUDO_USERS) + # Mutually exclusive with outgoing (can only set one of either). + args["incoming"] = True + del allow_sudo + try: + del args["allow_sudo"] + except: + pass + + if "allow_edited_updates" in args: + del args['allow_edited_updates'] + + def decorator(func): + if allow_edited_updates: + bot.add_event_handler(func, events.MessageEdited(**args)) + bot.add_event_handler(func, events.NewMessage(**args)) + try: + LOAD_PLUG[file_test].append(func) + except: + LOAD_PLUG.update({file_test: [func]}) + return func + + return decorator + + +def load_module(shortname): + if shortname.startswith("__"): + pass + elif shortname.endswith("_"): + import userbot.utils + import sys + import importlib + from pathlib import Path + path = Path(f"userbot/plugins/{shortname}.py") + name = "userbot.plugins.{}".format(shortname) + spec = importlib.util.spec_from_file_location(name, path) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + print("Successfully (re)imported "+shortname) + else: + import userbot.utils + import sys + import importlib + from pathlib import Path + path = Path(f"userbot/plugins/{shortname}.py") + name = "userbot.plugins.{}".format(shortname) + spec = importlib.util.spec_from_file_location(name, path) + mod = importlib.util.module_from_spec(spec) + mod.bot = bot + mod.tgbot = bot.tgbot + mod.Var = Var + mod.command = command + mod.logger = logging.getLogger(shortname) + # support for uniborg + sys.modules["uniborg.util"] = userbot.utils + mod.Config = Config + mod.borg = bot + # support for paperplaneextended + sys.modules["userbot.events"] = userbot.utils + spec.loader.exec_module(mod) + # for imports + sys.modules["userbot.plugins."+shortname] = mod + print("Successfully (re)imported "+shortname) + +def remove_plugin(shortname): + try: + try: + for i in LOAD_PLUG[shortname]: + bot.remove_event_handler(i) + del LOAD_PLUG[shortname] + + except: + name = f"userbot.plugins.{shortname}" + + for i in reversed(range(len(bot._event_builders))): + ev, cb = bot._event_builders[i] + if cb.__module__ == name: + del bot._event_builders[i] + except: + raise ValueError + +def admin_cmd(pattern=None, **args): + stack = inspect.stack() + previous_stack_frame = stack[1] + file_test = Path(previous_stack_frame.filename) + file_test = file_test.stem.replace(".py", "") + allow_sudo = args.get("allow_sudo", False) + + # get the pattern from the decorator + if pattern is not None: + if pattern.startswith("\#"): + # special fix for snip.py + args["pattern"] = re.compile(pattern) + else: + args["pattern"] = re.compile("\." + pattern) + cmd = "." + pattern + try: + CMD_LIST[file_test].append(cmd) + except: + CMD_LIST.update({file_test: [cmd]}) + + args["outgoing"] = True + # should this command be available for other users? + if allow_sudo: + args["from_users"] = list(Config.SUDO_USERS) + # Mutually exclusive with outgoing (can only set one of either). + args["incoming"] = True + del args["allow_sudo"] + + # error handling condition check + elif "incoming" in args and not args["incoming"]: + args["outgoing"] = True + + # add blacklist chats, UB should not respond in these chats + allow_edited_updates = False + if "allow_edited_updates" in args and args["allow_edited_updates"]: + allow_edited_updates = args["allow_edited_updates"] + del args["allow_edited_updates"] + + # check if the plugin should listen for outgoing 'messages' + is_message_enabled = True + + return events.NewMessage(**args) + +""" Userbot module for managing events. + One of the main components of the userbot. """ + +from telethon import events +import asyncio +from userbot import bot +from traceback import format_exc +from time import gmtime, strftime +import math +import subprocess +import sys +import traceback +import datetime + + +def register(**args): + """ Register a new event. """ + stack = inspect.stack() + previous_stack_frame = stack[1] + file_test = Path(previous_stack_frame.filename) + file_test = file_test.stem.replace(".py", "") + pattern = args.get('pattern', None) + disable_edited = args.get('disable_edited', False) + + if pattern is not None and not pattern.startswith('(?i)'): + args['pattern'] = '(?i)' + pattern + + if "disable_edited" in args: + del args['disable_edited'] + + reg = re.compile('(.*)') + if not pattern == None: + try: + cmd = re.search(reg, pattern) + try: + cmd = cmd.group(1).replace("$", "").replace("\\", "").replace("^", "") + except: + pass + + try: + CMD_LIST[file_test].append(cmd) + except: + CMD_LIST.update({file_test: [cmd]}) + except: + pass + + def decorator(func): + if not disable_edited: + bot.add_event_handler(func, events.MessageEdited(**args)) + bot.add_event_handler(func, events.NewMessage(**args)) + try: + LOAD_PLUG[file_test].append(func) + except Exception as e: + LOAD_PLUG.update({file_test: [func]}) + + return func + + return decorator + + +def errors_handler(func): + async def wrapper(errors): + try: + await func(errors) + except BaseException: + + date = strftime("%Y-%m-%d %H:%M:%S", gmtime()) + new = { + 'error': str(sys.exc_info()[1]), + 'date': datetime.datetime.now() + } + + text = "**USERBOT CRASH REPORT**\n\n" + + link = "[here](https://t.me/PaperplaneExtendedSupport)" + text += "If you wanna you can report it" + text += f"- just forward this message {link}.\n" + text += "Nothing is logged except the fact of error and date\n" + + ftext = "\nDisclaimer:\nThis file uploaded ONLY here," + ftext += "\nwe logged only fact of error and date," + ftext += "\nwe respect your privacy," + ftext += "\nyou may not report this error if you've" + ftext += "\nany confidential data here, no one will see your data\n\n" + + ftext += "--------BEGIN USERBOT TRACEBACK LOG--------" + ftext += "\nDate: " + date + ftext += "\nGroup ID: " + str(errors.chat_id) + ftext += "\nSender ID: " + str(errors.sender_id) + ftext += "\n\nEvent Trigger:\n" + ftext += str(errors.text) + ftext += "\n\nTraceback info:\n" + ftext += str(traceback.format_exc()) + ftext += "\n\nError text:\n" + ftext += str(sys.exc_info()[1]) + ftext += "\n\n--------END USERBOT TRACEBACK LOG--------" + + command = "git log --pretty=format:\"%an: %s\" -5" + + ftext += "\n\n\nLast 5 commits:\n" + + process = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + stdout, stderr = await process.communicate() + result = str(stdout.decode().strip()) \ + + str(stderr.decode().strip()) + + ftext += result + + return wrapper + +async def progress(current, total, event, start, type_of_ps, file_name=None): + """Generic progress_callback for both + upload.py and download.py""" + now = time.time() + diff = now - start + if round(diff % 10.00) == 0 or current == total: + percentage = current * 100 / total + speed = current / diff + elapsed_time = round(diff) * 1000 + time_to_completion = round((total - current) / speed) * 1000 + estimated_total_time = elapsed_time + time_to_completion + progress_str = "[{0}{1}]\nProgress: {2}%\n".format( + ''.join(["█" for i in range(math.floor(percentage / 5))]), + ''.join(["░" for i in range(20 - math.floor(percentage / 5))]), + round(percentage, 2)) + tmp = progress_str + \ + "{0} of {1}\nETA: {2}".format( + humanbytes(current), + humanbytes(total), + time_formatter(estimated_total_time) + ) + if file_name: + await event.edit("{}\nFile Name: `{}`\n{}".format( + type_of_ps, file_name, tmp)) + else: + await event.edit("{}\n{}".format(type_of_ps, tmp)) + + +def humanbytes(size): + """Input size in bytes, + outputs in a human readable format""" + # https://stackoverflow.com/a/49361727/4723940 + if not size: + return "" + # 2 ** 10 = 1024 + power = 2**10 + raised_to_pow = 0 + dict_power_n = {0: "", 1: "Ki", 2: "Mi", 3: "Gi", 4: "Ti"} + while size > power: + size /= power + raised_to_pow += 1 + return str(round(size, 2)) + " " + dict_power_n[raised_to_pow] + "B" + + +def time_formatter(milliseconds: int) -> str: + """Inputs time in milliseconds, to get beautified time, + as string""" + seconds, milliseconds = divmod(int(milliseconds), 1000) + minutes, seconds = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + days, hours = divmod(hours, 24) + tmp = ((str(days) + " day(s), ") if days else "") + \ + ((str(hours) + " hour(s), ") if hours else "") + \ + ((str(minutes) + " minute(s), ") if minutes else "") + \ + ((str(seconds) + " second(s), ") if seconds else "") + \ + ((str(milliseconds) + " millisecond(s), ") if milliseconds else "") + return tmp[:-2] + +class Loader(): + def __init__(self, func=None, **args): + self.Var = Var + bot.add_event_handler(func, events.NewMessage(**args))