diff --git a/matrix/__init__.py b/matrix/__init__.py index f40e3fe..a9885fb 100644 --- a/matrix/__init__.py +++ b/matrix/__init__.py @@ -1,4 +1,5 @@ from .bot import Bot +from .group import Group from .config import Config from .context import Context from .command import Command @@ -8,6 +9,7 @@ __all__ = [ "Bot", + "Group", "Config", "Command", "Context", diff --git a/matrix/bot.py b/matrix/bot.py index f69ea6d..85e7e01 100644 --- a/matrix/bot.py +++ b/matrix/bot.py @@ -25,15 +25,23 @@ ) from .room import Room +from .group import Group from .config import Config from .context import Context from .command import Command from .help import HelpCommand -from .errors import AlreadyRegisteredError, CommandNotFoundError, CheckError from .scheduler import Scheduler +from .errors import ( + AlreadyRegisteredError, + CommandNotFoundError, + CheckError, + GroupAlreadyRegisteredError +) + Callback = Callable[..., Coroutine[Any, Any, Any]] +GroupCallable = Callable[[Callable[..., Coroutine[Any, Any, Any]]], Group] ErrorCallback = Callable[[Exception], Coroutine] @@ -86,7 +94,7 @@ def __init__( self.commands: Dict[str, Command] = {} self.checks: List[Callback] = [] self.scheduler = Scheduler() - + self._handlers: Dict[Type[Event], List[Callback]] = defaultdict(list) self._on_error: Optional[ErrorCallback] = None @@ -186,7 +194,7 @@ def wrapper(f: Callback) -> Callback: def command( self, name: Optional[str] = None, - cooldown: Optional[tuple[int, float]] = None + **kwargs ) -> Callable[[Callback], Command]: """ Decorator to register a coroutine function as a command handler. @@ -194,16 +202,13 @@ def command( The command name defaults to the function name unless explicitly provided. - :param name: The name of the command. If omitted, the function - name is used. - :type name: str, optional :raises TypeError: If the decorated function is not a coroutine. :raises ValueError: If a command with the same name is registered. :return: Decorator that registers the command handler. :rtype: Callback """ def wrapper(func: Callback) -> Command: - cmd = Command(func, name=name, cooldown=cooldown, prefix=self.prefix) + cmd = Command(func, name=name, prefix=self.prefix, **kwargs) return self.register_command(cmd) return wrapper @@ -233,15 +238,32 @@ def wrapper(f: Callback) -> Callback: return wrapper - def register_command(self, cmd: Command): + def register_command(self, cmd: Command) -> Command: if cmd in self.commands: raise AlreadyRegisteredError(cmd) self.commands[cmd.name] = cmd - self.log.debug("command %s registered", cmd) + self.log.debug("command '%s' registered", cmd) return cmd + def group(self, **kwargs) -> GroupCallable: + """Decorator to register a custom error handler for the command.""" + + def wrapper(func: Callback) -> Group: + group = Group(func, prefix=self.prefix, **kwargs) + return self.register_group(group) + return wrapper + + def register_group(self, group: Group) -> Group: + if group in self.commands: + raise GroupAlreadyRegisteredError(group) + + self.commands[group.name] = group + self.log.debug("group '%s' registered", group) + + return group + def error(self): """Decorator to register a custom error handler for the command.""" diff --git a/matrix/command.py b/matrix/command.py index e118ffa..a211851 100644 --- a/matrix/command.py +++ b/matrix/command.py @@ -52,11 +52,12 @@ def __init__(self, func: Callback, **kwargs: Any): self.description: str = kwargs.get("description", "") self.prefix: str = kwargs.get("prefix", "") + self.parent: str = kwargs.get("parent", "") self.usage: str = kwargs.get("usage", self._build_usage()) self.help: str = self._build_help() - self._before_invoke: Optional[Callback] = None - self._after_invoke: Optional[Callback] = None + self._before_invoke_callback: Optional[Callback] = None + self._after_invoke_callback: Optional[Callback] = None self._on_error: Optional[ErrorCallback] = None self.cooldown_rate: Optional[int] = None @@ -114,7 +115,12 @@ def _build_usage(self) -> str: :rtype: str """ params = " ".join(f"[{p.name}]" for p in self.params) - return f"{self.prefix}{self.name} {params}" + command_name = self.name + + if self.parent: + command_name = f"{self.parent} {self.name}" + + return f"{self.prefix}{command_name} {params}" def _parse_arguments(self, ctx: "Context") -> list[Any]: parsed_args = [] @@ -185,7 +191,7 @@ def before_invoke(self, func: Callback) -> None: if not asyncio.iscoroutinefunction(func): raise TypeError('The hook must be a coroutine.') - self._before_invoke = func + self._before_invoke_callback = func def after_invoke(self, func: Callback) -> None: """ @@ -200,7 +206,7 @@ def after_invoke(self, func: Callback) -> None: if not asyncio.iscoroutinefunction(func): raise TypeError('The hook must be a coroutine.') - self._after_invoke = func + self._after_invoke_callback = func def error(self, func: ErrorCallback) -> None: """ @@ -234,17 +240,25 @@ async def on_error(self, ctx: "Context", error: Exception) -> None: ctx.logger.exception("error while executing command '%s'", self) raise error - async def __before_invoke(self, ctx: "Context") -> None: - for check in self.checks: - if not await check(ctx): - raise CheckError(self, check) + async def invoke(self, ctx): + parsed_args = self._parse_arguments(ctx) + await self.callback(ctx, *parsed_args) + + async def _invoke(self, ctx: "Context"): + try: + for check in self.checks: + if not await check(ctx): + raise CheckError(self, check) + + if self._before_invoke_callback: + await self._before_invoke_callback(ctx) - if self._before_invoke: - await self._before_invoke(ctx) + await self.invoke(ctx) - async def __after_invoke(self, ctx: "Context") -> None: - if self._after_invoke: - await self._after_invoke(ctx) + if self._after_invoke_callback: + await self._after_invoke_callback(ctx) + except Exception as error: + await self.on_error(ctx, error) async def __call__(self, ctx: "Context") -> None: """ @@ -253,15 +267,7 @@ async def __call__(self, ctx: "Context") -> None: :param ctx: The command execution context. :type ctx: Context """ - try: - await self.__before_invoke(ctx) - - parsed_args = self._parse_arguments(ctx) - await self.callback(ctx, *parsed_args) - - await self.__after_invoke(ctx) - except Exception as error: - await self.on_error(ctx, error) + await self._invoke(ctx) def __eq__(self, other) -> bool: return self.name == other diff --git a/matrix/context.py b/matrix/context.py index a61aaf7..8ae9e7e 100644 --- a/matrix/context.py +++ b/matrix/context.py @@ -9,6 +9,7 @@ if TYPE_CHECKING: from .bot import Bot # pragma: no cover from .command import Command # pragma: no cover + from .group import Group class Context: @@ -42,6 +43,7 @@ def __init__(self, bot: "Bot", room: MatrixRoom, event: Event): # Command metdata self.prefix: str = bot.prefix self.command: Optional[Command] = None + self.subcommand: Optional[Command] = None self._args: List[str] = shlex.split(self.body) @property @@ -54,8 +56,12 @@ def args(self) -> List[str]: :return: The list of arguments. :rtype: List[str] """ + if self.subcommand: + return self._args[2:] + if self.command: return self._args[1:] + return self._args @property @@ -80,6 +86,12 @@ async def reply(self, message: str) -> None: raise MatrixError(f"Failed to send message: {e}") async def send_help(self) -> None: - if not self.command: - return await self.bot.help.execute(self) - await self.reply(self.command.help) + if self.subcommand: + await self.reply(self.subcommand.help) + return + + if self.command: + await self.reply(self.command.help) + return + + await self.bot.help.execute(self) diff --git a/matrix/errors.py b/matrix/errors.py index bebd5fb..3be504e 100644 --- a/matrix/errors.py +++ b/matrix/errors.py @@ -23,7 +23,16 @@ def __init__(self, param): class CheckError(CommandError): def __init__(self, cmd, check): - super().__init__(f"'{check.__name__}' has failed for '{cmd.name}'!") + super().__init__(f"'{check.__name__}' has failed for '{cmd.name}'") + + +class GroupError(CommandError): + pass + + +class GroupAlreadyRegisteredError(GroupError): + def __init__(self, group): + super().__init__(f"Group '{group}' is already registered") class ConfigError(MatrixError): diff --git a/matrix/group.py b/matrix/group.py new file mode 100644 index 0000000..fcf4522 --- /dev/null +++ b/matrix/group.py @@ -0,0 +1,72 @@ +import logging +from typing import TYPE_CHECKING, Optional, Dict, Any, Callable, Coroutine + +from .command import Command +from .errors import AlreadyRegisteredError, CommandNotFoundError + +if TYPE_CHECKING: + from .context import Context # pragma: no cover + +logger = logging.getLogger(__name__) + +Callback = Callable[..., Coroutine[Any, Any, Any]] +ErrorCallback = Callable[["Context", Exception], Coroutine[Any, Any, Any]] + + +class Group(Command): + def __init__(self, callback: Callback, **kwargs: Any): + self.commands: Dict[str, Command] = {} + + super().__init__(callback, **kwargs) + + def _build_usage(self): + return f"{self.prefix}{self.name} [subcommand]" + + def get_command(self, cmd_name: str): + if cmd := self.commands.get(cmd_name): + return cmd + raise CommandNotFoundError(cmd_name) + + def command( + self, + name: Optional[str] = None + ) -> Callable[[Callback], Command]: + """ + Decorator to register a coroutine function as a command handler. + + The command name defaults to the function name unless + explicitly provided. + + :param name: The name of the command. If omitted, the function + name is used. + :type name: str, optional + :raises TypeError: If the decorated function is not a coroutine. + :raises ValueError: If a command with the same name is registered. + :return: Decorator that registers the command handler. + :rtype: Callback + """ + def wrapper(func: Callback) -> Command: + cmd = Command( + func, + name=name, + prefix=self.prefix, + parent=self.name + ) + return self.register_command(cmd) + return wrapper + + def register_command(self, cmd: Command): + if cmd in self.commands: + raise AlreadyRegisteredError(cmd) + + self.commands[cmd.name] = cmd + logger.debug("command '%s' registered for group '%s'", cmd, self) + + return cmd + + async def invoke(self, ctx: "Context"): + if subcommand := ctx.args.pop(0): + ctx.subcommand = self.get_command(subcommand) + await ctx.subcommand(ctx) + else: + await self.callback(ctx) \ No newline at end of file diff --git a/matrix/help.py b/matrix/help.py index 2cc8d07..2d7e30e 100644 --- a/matrix/help.py +++ b/matrix/help.py @@ -1,7 +1,8 @@ -from typing import Optional, List, TypeVar, Generic +from typing import Optional, List, TypeVar, Generic, Union from .context import Context from .command import Command +from .group import Group T = TypeVar('T') @@ -98,8 +99,11 @@ def next_page(self) -> Optional[int]: class HelpCommand(Command): """A reusable help command with built-in pagination support. - To customize formatting, override the format_command() method. - To customize pagination display, override the format_page_info() method. + Supports both regular commands and group with proper formatting. + + - To customize formatting, override the format_command() or format_group() + methods. + - To customize pagination display, override the format_page_info() method. """ DEFAULT_PER_PAGE = 5 @@ -136,6 +140,40 @@ def format_command(self, cmd: Command) -> str: f"Description: {cmd.description or 'None'}" ) + def format_group(self, group: Group) -> str: + """Format a group command for display. + + Override this method to customize group formatting. + + :param group: The group to format + :return: Formatted string representation of the group + """ + subcommands_text = "" + subcommand_count = len(getattr(group, 'commands', {})) + + if subcommand_count > 0: + subcommands_text = f" ({subcommand_count} subcommands)" + + return ( + f"**{group.name}** [GROUP]{subcommands_text}\n" + f"Usage: `{group.usage}`\n" + f"Description: {group.description or 'None'}" + ) + + def format_subcommand(self, subcommand: Command) -> str: + """Format a subcommand for display. + + Override this method to customize subcommand formatting. + + :param subcommand: The subcommand to format + :return: Formatted string representation of the subcommand + """ + return ( + f"**{subcommand.name}**\n" + f"Usage: `{subcommand.usage}`\n" + f"Description: {subcommand.description or 'None'}" + ) + def format_page_info(self, page: Page[Command]) -> str: """Format the page information display. @@ -146,21 +184,56 @@ def format_page_info(self, page: Page[Command]) -> str: """ return f"**Page {page.page_number}/{page.total_pages}**" - def format_help_page(self, page: Page[Command]) -> str: + def format_help_page( + self, + page: Page[Command], + title: str = "Commands" + ) -> str: """Format a complete help page. :param page: Page object containing commands and pagination info + :param title: Title for the help page :return: Complete formatted help page """ + help_entries = [] + if not page.items: - return "No commands available." + return f"No {title.lower()} available." - help_entries = "\n\n".join( - self.format_command(cmd) for cmd in page.items - ) + for cmd in page.items: + if isinstance(cmd, Group): + help_entries.append(self.format_group(cmd)) + else: + help_entries.append(self.format_command(cmd)) + + help_text = "\n\n".join(help_entries) + page_info = self.format_page_info(page) + + return f"**{title}**\n\n{help_text}\n\n{page_info}" + + def format_subcommand_page( + self, + page: Page[Command], + group_name: str + ) -> str: + """Format a complete subcommand help page. + + :param page: Page object containing subcommands and pagination info + :param group_name: Name of the parent group + :return: Complete formatted subcommand help page + """ + help_entries = [] + + if not page.items: + return f"No subcommands available for group `{group_name}`." + + for subcmd in page.items: + help_entries.append(self.format_subcommand(subcmd)) + + help_text = "\n\n".join(help_entries) page_info = self.format_page_info(page) - return f"{help_entries}\n\n{page_info}" + return f"**{group_name} Subcommands**\n\n{help_text}\n\n{page_info}" def get_commands_paginator(self, ctx: Context) -> Paginator[Command]: """Get a paginator for all commands. @@ -173,7 +246,22 @@ def get_commands_paginator(self, ctx: Context) -> Paginator[Command]: return Paginator(sorted_commands, self.per_page) - def find_command(self, ctx, command_name: str) -> Optional[Command]: + def get_subcommands_paginator(self, group: Group) -> Paginator[Command]: + """Get a paginator for all subcommands in a group. + + :param group: The group to get subcommands from + :return: Paginator configured with all subcommands + """ + subcommands = list(getattr(group, 'commands', {}).values()) + sorted_subcommands = sorted(subcommands, key=lambda c: c.name.lower()) + + return Paginator(sorted_subcommands, self.per_page) + + def find_command( + self, + ctx: Context, + command_name: str + ) -> Optional[Command]: """Find a command by name. :param ctx: Command context @@ -182,21 +270,68 @@ def find_command(self, ctx, command_name: str) -> Optional[Command]: """ return ctx.bot.commands.get(command_name) - async def show_command_help(self, ctx, command_name: str) -> None: - """Show help for a specific command. + def find_subcommand( + self, + group: Group, + subcommand_name: str + ) -> Optional[Command]: + """Find a subcommand within a group. + + :param group: The group to search in + :param subcommand_name: Name of the subcommand to find + :return: Subcommand if found, None otherwise + """ + group_commands = getattr(group, 'commands', {}) + return group_commands.get(subcommand_name) + + async def show_command_help( + self, + ctx: Context, + command_name: str, + subcommand_name: Optional[str] = None + ) -> None: + """Show help for a specific command or subcommand. :param ctx: Command context :param command_name: Name of the command to show help for + :param subcommand_name: Name of the subcommand """ cmd = self.find_command(ctx, command_name) - if cmd: - await ctx.reply(self.format_command(cmd)) - else: + if not cmd: await ctx.reply(f"Command `{command_name}` not found.") + return + + if not subcommand_name: + if isinstance(cmd, Group): + group_help = self.format_group(cmd) + subcommands = getattr(cmd, 'commands', {}) + + if subcommands: + paginator = self.get_subcommands_paginator(cmd) + first_page = paginator.get_page(1) + subcommand_list = self.format_subcommand_page( + first_page, + cmd.name + ) + help_message = f"{group_help}\n\n{subcommand_list}" + else: + help_message = f"{group_help}\n\nNo subcommands available." + + await ctx.reply(help_message) + else: + await ctx.reply(self.format_command(cmd)) + return - async def show_help_page(self, ctx, page_number: int = 1) -> None: - """Show a paginated help page. + if not isinstance(cmd, Group): + await ctx.reply(f"Command `{command_name}` is not a group.") + return + + if subcommand := self.find_subcommand(cmd, subcommand_name): + await ctx.reply(self.format_subcommand(subcommand)) + + async def show_help_page(self, ctx: Context, page_number: int = 1) -> None: + """Show a paginated help page for all commands. :param ctx: Command context :param page_number: Page number to display @@ -207,35 +342,110 @@ async def show_help_page(self, ctx, page_number: int = 1) -> None: await ctx.reply(help_message) + async def show_subcommand_page( + self, + ctx: Context, + group_name: str, + page_number: int = 1 + ) -> None: + """Show a paginated help page for group subcommands. + + :param ctx: Command context + :param group_name: Name of the group + :param page_number: Page number to display + """ + group = self.find_command(ctx, group_name) + + if not group: + await ctx.reply(f"Group `{group_name}` not found.") + return + + if not isinstance(group, Group): + await ctx.reply(f"Command `{group_name}` is not a group.") + return + + paginator = self.get_subcommands_paginator(group) + page = paginator.get_page(page_number) + help_message = self.format_subcommand_page(page, group_name) + + await ctx.reply(help_message) + + def parse_help_arguments( + self, + args: List[str] + ) -> tuple[Optional[str], Optional[str], int]: + """Parse help command arguments to determine what to show. + + :param args: List of arguments passed to help command + :return: Tuple of (command_name, subcommand_name, page_number) + """ + command_name = None + subcommand_name = None + page_number = 1 + + if not args: + return command_name, subcommand_name, page_number + + # Check if first argument is a page number + if len(args) == 1 and args[0].isdigit(): + page_number = int(args[0]) + return command_name, subcommand_name, page_number + + command_name = args[0] + + if len(args) >= 2: + if args[1].isdigit(): + page_number = int(args[1]) + else: + subcommand_name = args[1] + + if len(args) >= 3 and args[2].isdigit(): + page_number = int(args[2]) + + return command_name, subcommand_name, page_number + async def execute( self, ctx: Context, - arg=None # type: ignore + cmd_or_page=None, + subcommand=None ) -> None: """Execute the help command. - Note: For now we ignore the arg type since it's the only way - to do different actions depending on the type of the function - without causing errors. - Usage patterns: - `help` - Show first page of all commands - `help 2` - Show page 2 of all commands - `help ping` - Show help for specific command + - `help group` - Show help for group with first page of subcommands + - `help group subcmd` - Show help for specific subcommand + - `help group 2` - Show page 2 of group's subcommands :param ctx: Command context - :param arg: Command name or Page number + :param cmd_or_page: Command name or page number + :param subcommand: Subcommand name or page number for groups """ - page = 1 - command_name = None + # Convert arguments to list for parsing + args = [] + if cmd_or_page is not None: + args.append(cmd_or_page) + if subcommand is not None: + args.append(subcommand) - if isinstance(arg, str): - if arg.isdigit(): - page = int(arg) - else: - command_name = arg.strip() + command_name, subcommand_name, page = self.parse_help_arguments(args) if command_name: - await self.show_command_help(ctx, arg) + if subcommand_name and not subcommand_name.isdigit(): + await self.show_command_help( + ctx, + command_name, + subcommand_name + ) + else: + cmd = self.find_command(ctx, command_name) + + if cmd and isinstance(cmd, Group) and subcommand_name is None: + await self.show_subcommand_page(ctx, command_name, page) + else: + await self.show_command_help(ctx, command_name) else: await self.show_help_page(ctx, page)