Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 349 additions & 0 deletions userbot/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,349 @@
from userbot import bot
from telethon import events
from var import Var
from pathlib import Path
from userbot.uniborgConfig import Config
from userbot import LOAD_PLUG
from userbot import CMD_LIST
import re
import logging
import inspect

def command(**args):
stack = inspect.stack()
previous_stack_frame = stack[1]
file_test = Path(previous_stack_frame.filename)
file_test = file_test.stem.replace(".py", "")
if 1 == 0:
return print("stupidity at its best")
else:
pattern = args.get("pattern", None)
allow_sudo = args.get("allow_sudo", None)
allow_edited_updates = args.get('allow_edited_updates', False)
args["incoming"] = args.get("incoming", False)
args["outgoing"] = True
if bool(args["incoming"]):
args["outgoing"] = False

try:
if pattern is not None and not pattern.startswith('(?i)'):
args['pattern'] = '(?i)' + pattern
except:
pass

reg = re.compile('(.*)')
if not pattern == None:
try:
cmd = re.search(reg, pattern)
try:
cmd = cmd.group(1).replace("$", "").replace("\\", "").replace("^", "")
except:
pass

try:
CMD_LIST[file_test].append(cmd)
except:
CMD_LIST.update({file_test: [cmd]})
except:
pass

if allow_sudo:
args["from_users"] = list(Var.SUDO_USERS)
# Mutually exclusive with outgoing (can only set one of either).
args["incoming"] = True
del allow_sudo
try:
del args["allow_sudo"]
except:
pass

if "allow_edited_updates" in args:
del args['allow_edited_updates']

def decorator(func):
if allow_edited_updates:
bot.add_event_handler(func, events.MessageEdited(**args))
bot.add_event_handler(func, events.NewMessage(**args))
try:
LOAD_PLUG[file_test].append(func)
except:
LOAD_PLUG.update({file_test: [func]})
return func

return decorator


def load_module(shortname):
if shortname.startswith("__"):
pass
elif shortname.endswith("_"):
import userbot.utils
import sys
import importlib
from pathlib import Path
path = Path(f"userbot/plugins/{shortname}.py")
name = "userbot.plugins.{}".format(shortname)
spec = importlib.util.spec_from_file_location(name, path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
print("Successfully (re)imported "+shortname)
else:
import userbot.utils
import sys
import importlib
from pathlib import Path
path = Path(f"userbot/plugins/{shortname}.py")
name = "userbot.plugins.{}".format(shortname)
spec = importlib.util.spec_from_file_location(name, path)
mod = importlib.util.module_from_spec(spec)
mod.bot = bot
mod.tgbot = bot.tgbot
mod.Var = Var
mod.command = command
mod.logger = logging.getLogger(shortname)
# support for uniborg
sys.modules["uniborg.util"] = userbot.utils
mod.Config = Config
mod.borg = bot
# support for paperplaneextended
sys.modules["userbot.events"] = userbot.utils
spec.loader.exec_module(mod)
# for imports
sys.modules["userbot.plugins."+shortname] = mod
print("Successfully (re)imported "+shortname)

def remove_plugin(shortname):
try:
try:
for i in LOAD_PLUG[shortname]:
bot.remove_event_handler(i)
del LOAD_PLUG[shortname]

except:
name = f"userbot.plugins.{shortname}"

for i in reversed(range(len(bot._event_builders))):
ev, cb = bot._event_builders[i]
if cb.__module__ == name:
del bot._event_builders[i]
except:
raise ValueError

def admin_cmd(pattern=None, **args):
stack = inspect.stack()
previous_stack_frame = stack[1]
file_test = Path(previous_stack_frame.filename)
file_test = file_test.stem.replace(".py", "")
allow_sudo = args.get("allow_sudo", False)

# get the pattern from the decorator
if pattern is not None:
if pattern.startswith("\#"):
# special fix for snip.py
args["pattern"] = re.compile(pattern)
else:
args["pattern"] = re.compile("\." + pattern)
cmd = "." + pattern
try:
CMD_LIST[file_test].append(cmd)
except:
CMD_LIST.update({file_test: [cmd]})

args["outgoing"] = True
# should this command be available for other users?
if allow_sudo:
args["from_users"] = list(Config.SUDO_USERS)
# Mutually exclusive with outgoing (can only set one of either).
args["incoming"] = True
del args["allow_sudo"]

# error handling condition check
elif "incoming" in args and not args["incoming"]:
args["outgoing"] = True

# add blacklist chats, UB should not respond in these chats
allow_edited_updates = False
if "allow_edited_updates" in args and args["allow_edited_updates"]:
allow_edited_updates = args["allow_edited_updates"]
del args["allow_edited_updates"]

# check if the plugin should listen for outgoing 'messages'
is_message_enabled = True

return events.NewMessage(**args)

""" Userbot module for managing events.
One of the main components of the userbot. """

from telethon import events
import asyncio
from userbot import bot
from traceback import format_exc
from time import gmtime, strftime
import math
import subprocess
import sys
import traceback
import datetime


def register(**args):
""" Register a new event. """
stack = inspect.stack()
previous_stack_frame = stack[1]
file_test = Path(previous_stack_frame.filename)
file_test = file_test.stem.replace(".py", "")
pattern = args.get('pattern', None)
disable_edited = args.get('disable_edited', False)

if pattern is not None and not pattern.startswith('(?i)'):
args['pattern'] = '(?i)' + pattern

if "disable_edited" in args:
del args['disable_edited']

reg = re.compile('(.*)')
if not pattern == None:
try:
cmd = re.search(reg, pattern)
try:
cmd = cmd.group(1).replace("$", "").replace("\\", "").replace("^", "")
except:
pass

try:
CMD_LIST[file_test].append(cmd)
except:
CMD_LIST.update({file_test: [cmd]})
except:
pass

def decorator(func):
if not disable_edited:
bot.add_event_handler(func, events.MessageEdited(**args))
bot.add_event_handler(func, events.NewMessage(**args))
try:
LOAD_PLUG[file_test].append(func)
except Exception as e:
LOAD_PLUG.update({file_test: [func]})

return func

return decorator


def errors_handler(func):
async def wrapper(errors):
try:
await func(errors)
except BaseException:

date = strftime("%Y-%m-%d %H:%M:%S", gmtime())
new = {
'error': str(sys.exc_info()[1]),
'date': datetime.datetime.now()
}

text = "**USERBOT CRASH REPORT**\n\n"

link = "[here](https://t.me/PaperplaneExtendedSupport)"
text += "If you wanna you can report it"
text += f"- just forward this message {link}.\n"
text += "Nothing is logged except the fact of error and date\n"

ftext = "\nDisclaimer:\nThis file uploaded ONLY here,"
ftext += "\nwe logged only fact of error and date,"
ftext += "\nwe respect your privacy,"
ftext += "\nyou may not report this error if you've"
ftext += "\nany confidential data here, no one will see your data\n\n"

ftext += "--------BEGIN USERBOT TRACEBACK LOG--------"
ftext += "\nDate: " + date
ftext += "\nGroup ID: " + str(errors.chat_id)
ftext += "\nSender ID: " + str(errors.sender_id)
ftext += "\n\nEvent Trigger:\n"
ftext += str(errors.text)
ftext += "\n\nTraceback info:\n"
ftext += str(traceback.format_exc())
ftext += "\n\nError text:\n"
ftext += str(sys.exc_info()[1])
ftext += "\n\n--------END USERBOT TRACEBACK LOG--------"

command = "git log --pretty=format:\"%an: %s\" -5"

ftext += "\n\n\nLast 5 commits:\n"

process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await process.communicate()
result = str(stdout.decode().strip()) \
+ str(stderr.decode().strip())

ftext += result

return wrapper

async def progress(current, total, event, start, type_of_ps, file_name=None):
"""Generic progress_callback for both
upload.py and download.py"""
now = time.time()
diff = now - start
if round(diff % 10.00) == 0 or current == total:
percentage = current * 100 / total
speed = current / diff
elapsed_time = round(diff) * 1000
time_to_completion = round((total - current) / speed) * 1000
estimated_total_time = elapsed_time + time_to_completion
progress_str = "[{0}{1}]\nProgress: {2}%\n".format(
''.join(["█" for i in range(math.floor(percentage / 5))]),
''.join(["░" for i in range(20 - math.floor(percentage / 5))]),
round(percentage, 2))
tmp = progress_str + \
"{0} of {1}\nETA: {2}".format(
humanbytes(current),
humanbytes(total),
time_formatter(estimated_total_time)
)
if file_name:
await event.edit("{}\nFile Name: `{}`\n{}".format(
type_of_ps, file_name, tmp))
else:
await event.edit("{}\n{}".format(type_of_ps, tmp))


def humanbytes(size):
"""Input size in bytes,
outputs in a human readable format"""
# https://stackoverflow.com/a/49361727/4723940
if not size:
return ""
# 2 ** 10 = 1024
power = 2**10
raised_to_pow = 0
dict_power_n = {0: "", 1: "Ki", 2: "Mi", 3: "Gi", 4: "Ti"}
while size > power:
size /= power
raised_to_pow += 1
return str(round(size, 2)) + " " + dict_power_n[raised_to_pow] + "B"


def time_formatter(milliseconds: int) -> str:
"""Inputs time in milliseconds, to get beautified time,
as string"""
seconds, milliseconds = divmod(int(milliseconds), 1000)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
tmp = ((str(days) + " day(s), ") if days else "") + \
((str(hours) + " hour(s), ") if hours else "") + \
((str(minutes) + " minute(s), ") if minutes else "") + \
((str(seconds) + " second(s), ") if seconds else "") + \
((str(milliseconds) + " millisecond(s), ") if milliseconds else "")
return tmp[:-2]

class Loader():
def __init__(self, func=None, **args):
self.Var = Var
bot.add_event_handler(func, events.NewMessage(**args))