From c38074dba5fa63e28cf4b99257265f61beef7ee8 Mon Sep 17 00:00:00 2001 From: Adrien Banlin Date: Fri, 12 May 2023 18:24:29 +0200 Subject: [PATCH 1/3] ifupdown.utils: use metaclass for cmds definition --- ifupdown2/ifupdown/utils.py | 95 ++++++++++++++++--------------------- 1 file changed, 40 insertions(+), 55 deletions(-) diff --git a/ifupdown2/ifupdown/utils.py b/ifupdown2/ifupdown/utils.py index 05c7e48e..09f43dc8 100644 --- a/ifupdown2/ifupdown/utils.py +++ b/ifupdown2/ifupdown/utils.py @@ -10,6 +10,7 @@ import os import re import shlex +from shutil import which import fcntl import signal import logging @@ -37,7 +38,45 @@ def signal_handler_f(ps, sig, frame): if sig == signal.SIGINT: raise KeyboardInterrupt -class utils(): + +def default_commands(): + """ + Set debian path as default path for all the commands. + If command not present in debian path, search for the + commands in the other system directories. + This search is carried out to handle different locations + on different distros. + If the command is not found in any of the system + directories, command execution will fail because we have + set default path same as debian path. + """ + logger = logging.getLogger('ifupdown') + default_cmds = """ + /sbin/bridge /bin/ip /sbin/brctl + /bin/pidof /usr/sbin/service /sbin/sysctl + /sbin/modprobe /usr/bin/pstree /bin/ss + /usr/sbin/vrrpd /usr/sbin/ifplugd /sbin/mstpctl + /sbin/ethtool /bin/systemctl /usr/bin/dpkg + """.split() + logger.info("utils init command paths") + for cmd in default_cmds: + name = os.path.basename(cmd) + path = which(name) + if path: + yield (name, path) + continue + logger.debug(f"warning: path {cmd} not found: {name} won't be usable") + yield (name, cmd) + + +class MetaUtils(type): + @classmethod + def __prepare__(cls, _name, _bases): + """ predefined class attributes """ + return {f'{prog}_cmd': path for prog, path in default_commands()} + + +class utils(metaclass=MetaUtils): logger = logging.getLogger('ifupdown') DEVNULL = open(os.devnull, 'w') vlan_aware_bridge_address_support = None @@ -82,60 +121,6 @@ class utils(): '0' : 'no' } - """ - Set debian path as default path for all the commands. - If command not present in debian path, search for the - commands in the other system directories. - This search is carried out to handle different locations - on different distros. - If the command is not found in any of the system - directories, command execution will fail because we have - set default path same as debian path. - """ - bridge_cmd = '/sbin/bridge' - ip_cmd = '/bin/ip' - brctl_cmd = '/sbin/brctl' - pidof_cmd = '/bin/pidof' - service_cmd = '/usr/sbin/service' - sysctl_cmd = '/sbin/sysctl' - modprobe_cmd = '/sbin/modprobe' - pstree_cmd = '/usr/bin/pstree' - ss_cmd = '/bin/ss' - vrrpd_cmd = '/usr/sbin/vrrpd' - ifplugd_cmd = '/usr/sbin/ifplugd' - mstpctl_cmd = '/sbin/mstpctl' - ethtool_cmd = '/sbin/ethtool' - systemctl_cmd = '/bin/systemctl' - dpkg_cmd = '/usr/bin/dpkg' - - logger.info("utils init command paths") - for cmd in ['bridge', - 'ip', - 'brctl', - 'pidof', - 'service', - 'sysctl', - 'modprobe', - 'pstree', - 'ss', - 'vrrpd', - 'ifplugd', - 'mstpctl', - 'ethtool', - 'systemctl', - 'dpkg' - ]: - if os.path.exists(vars()[cmd + '_cmd']): - continue - for path in ['/bin/', - '/sbin/', - '/usr/bin/', - '/usr/sbin/',]: - if os.path.exists(path + cmd): - vars()[cmd + '_cmd'] = path + cmd - else: - logger.debug('warning: path %s not found: %s won\'t be usable' % (path + cmd, cmd)) - mac_translate_tab = str.maketrans(":.-,", " ") @classmethod From a91597465fea893cb482d16aad0d920381737e1d Mon Sep 17 00:00:00 2001 From: Adrien Banlin Date: Fri, 12 May 2023 18:25:16 +0200 Subject: [PATCH 2/3] ifupdown.utils: forbid utils class instantiation --- ifupdown2/ifupdown/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ifupdown2/ifupdown/utils.py b/ifupdown2/ifupdown/utils.py index 09f43dc8..1e0c16c7 100644 --- a/ifupdown2/ifupdown/utils.py +++ b/ifupdown2/ifupdown/utils.py @@ -75,6 +75,10 @@ def __prepare__(cls, _name, _bases): """ predefined class attributes """ return {f'{prog}_cmd': path for prog, path in default_commands()} + def __call__(cls, *_a, **_k): + """ forbiden call """ + raise NotImplementedError("This class should not be instanciated") + class utils(metaclass=MetaUtils): logger = logging.getLogger('ifupdown') From 46a49d1858e2cf1897186d8c2d09e2f61bf16f77 Mon Sep 17 00:00:00 2001 From: Adrien Banlin Date: Fri, 12 May 2023 18:26:47 +0200 Subject: [PATCH 3/3] ifupdown.utils: set all classmethods by default --- ifupdown2/ifupdown/utils.py | 93 +++++++++++++------------------------ 1 file changed, 32 insertions(+), 61 deletions(-) diff --git a/ifupdown2/ifupdown/utils.py b/ifupdown2/ifupdown/utils.py index 1e0c16c7..bbed0d4e 100644 --- a/ifupdown2/ifupdown/utils.py +++ b/ifupdown2/ifupdown/utils.py @@ -79,6 +79,12 @@ def __call__(cls, *_a, **_k): """ forbiden call """ raise NotImplementedError("This class should not be instanciated") + def __new__(cls, name, bases, attrs): + """ finalize class definition (replacing methods to classmethods) """ + return type.__new__(cls, name, bases, { + k: classmethod(v) if callable(v) else v for k, v in attrs.items() + }) + class utils(metaclass=MetaUtils): logger = logging.getLogger('ifupdown') @@ -127,7 +133,6 @@ class utils(metaclass=MetaUtils): mac_translate_tab = str.maketrans(":.-,", " ") - @classmethod def mac_str_to_int(cls, hw_address): mac = 0 if hw_address: @@ -137,63 +142,53 @@ def mac_str_to_int(cls, hw_address): mac += int(i, 16) return mac - @staticmethod - def get_onff_from_onezero(value): - if value in utils._onoff_onezero: - return utils._onoff_onezero[value] + def get_onff_from_onezero(cls, value): + if value in cls._onoff_onezero: + return cls._onoff_onezero[value] return value - @staticmethod - def get_yesno_from_onezero(value): - if value in utils._yesno_onezero: - return utils._yesno_onezero[value] + def get_yesno_from_onezero(cls, value): + if value in cls._yesno_onezero: + return cls._yesno_onezero[value] return value - @staticmethod - def get_onoff_bool(value): - if value in utils._onoff_bool: - return utils._onoff_bool[value] + def get_onoff_bool(cls, value): + if value in cls._onoff_bool: + return cls._onoff_bool[value] return value - @staticmethod - def get_boolean_from_string(value, default=False): - return utils._string_values.get(value, default) + def get_boolean_from_string(cls, value, default=False): + return cls._string_values.get(value, default) - @staticmethod - def get_yesno_boolean(bool): - return utils._yesno_bool[bool] + def get_yesno_boolean(cls, bool): + return cls._yesno_bool[bool] - @staticmethod - def boolean_support_binary(value): - return utils._binary_bool[utils.get_boolean_from_string(value)] + def boolean_support_binary(cls, value): + return cls._binary_bool[cls.get_boolean_from_string(value)] - @staticmethod - def is_binary_bool(value): + def is_binary_bool(cls, value): return value == '0' or value == '1' - @staticmethod - def support_yesno_attrs(attrsdict, attrslist, ifaceobj=None): + def support_yesno_attrs(cls, attrsdict, attrslist, ifaceobj=None): if ifaceobj: for attr in attrslist: value = ifaceobj.get_attr_value_first(attr) - if value and not utils.is_binary_bool(value): + if value and not cls.is_binary_bool(value): if attr in attrsdict: - bool = utils.get_boolean_from_string(attrsdict[attr]) - attrsdict[attr] = utils.get_yesno_boolean(bool) + bool = cls.get_boolean_from_string(attrsdict[attr]) + attrsdict[attr] = cls.get_yesno_boolean(bool) else: for attr in attrslist: if attr in attrsdict: - attrsdict[attr] = utils.boolean_support_binary(attrsdict[attr]) + attrsdict[attr] = cls.boolean_support_binary(attrsdict[attr]) - @staticmethod - def get_int_from_boolean_and_string(value): + def get_int_from_boolean_and_string(cls, value): try: return int(value) except Exception: - return int(utils.get_boolean_from_string(value)) + return int(cls.get_boolean_from_string(value)) - @staticmethod - def strip_hwaddress(hwaddress): + def strip_hwaddress(cls, hwaddress): if hwaddress and hwaddress.startswith("ether"): hwaddress = hwaddress[5:].strip() return hwaddress.lower() if hwaddress else hwaddress @@ -201,7 +196,6 @@ def strip_hwaddress(hwaddress): # what we have in the cache (data retrieved via a netlink dump by # nlmanager). nlmanager return all macs in lower-case - @classmethod def importName(cls, modulename, name): """ Import a named object """ try: @@ -210,7 +204,6 @@ def importName(cls, modulename, name): return None return getattr(module, name) - @classmethod def lockFile(cls, lockfile): try: fp = os.open(lockfile, os.O_CREAT | os.O_TRUNC | os.O_WRONLY) @@ -220,7 +213,6 @@ def lockFile(cls, lockfile): return False return True - @classmethod def parse_iface_range(cls, name): # eg: swp1.[2-100] # return (prefix, range-start, range-end) @@ -243,7 +235,6 @@ def parse_iface_range(cls, name): int(range_groups[2], 10)) return None - @classmethod def expand_iface_range(cls, name): ifacenames = [] irange = cls.parse_iface_range(name) @@ -258,13 +249,11 @@ def expand_iface_range(cls, name): ifacenames.append('%s%d%s' %(irange[0], i, irange[3])) return ifacenames - @classmethod def is_ifname_range(cls, name): if '[' in name or ']' in name: return True return False - @classmethod def check_ifname_size_invalid(cls, name=''): """ IFNAMSIZ in include/linux/if.h is 16 so we check this """ IFNAMSIZ = 16 @@ -273,15 +262,12 @@ def check_ifname_size_invalid(cls, name=''): else: return False - @classmethod def enable_subprocess_signal_forwarding(cls, ps, sig): signal.signal(sig, partial(signal_handler_f, ps)) - @classmethod def disable_subprocess_signal_forwarding(cls, sig): signal.signal(sig, signal.SIG_DFL) - @classmethod def _log_command_exec(cls, cmd, stdin): dry_run = "DRY-RUN: " if ifupdownflags.flags.DRYRUN else "" if stdin: @@ -289,7 +275,6 @@ def _log_command_exec(cls, cmd, stdin): else: cls.logger.info('%sexecuting %s' % (dry_run, cmd)) - @classmethod def _format_error(cls, cmd, cmd_returncode, cmd_output, stdin): if type(cmd) is list: cmd = ' '.join(cmd) @@ -301,7 +286,6 @@ def _format_error(cls, cmd, cmd_returncode, cmd_output, stdin): else: return 'cmd \'%s\' failed: returned %d' % (cmd, cmd_returncode) - @classmethod def is_addr_ip_allowed_on(cls, ifaceobj, syntax_check=False): if cls.vlan_aware_bridge_address_support is None: cls.vlan_aware_bridge_address_support = utils.get_boolean_from_string( @@ -338,7 +322,6 @@ def is_addr_ip_allowed_on(cls, ifaceobj, syntax_check=False): return False return True - @classmethod def _execute_subprocess(cls, cmd, env=None, shell=False, @@ -366,14 +349,14 @@ def _execute_subprocess(cls, cmd, stdin=subprocess.PIPE if stdin else None, stdout=subprocess.PIPE if stdout else cls.DEVNULL, stderr=stderr) - utils.enable_subprocess_signal_forwarding(ch, signal.SIGINT) + cls.enable_subprocess_signal_forwarding(ch, signal.SIGINT) if stdout or stdin: cmd_output = ch.communicate(input=stdin.encode() if stdin else stdin)[0] cmd_returncode = ch.wait() except Exception as e: raise Exception('cmd \'%s\' failed (%s)' % (' '.join(cmd), str(e))) finally: - utils.disable_subprocess_signal_forwarding(signal.SIGINT) + cls.disable_subprocess_signal_forwarding(signal.SIGINT) cmd_output_string = cmd_output.decode() if cmd_output is not None else cmd_output @@ -384,7 +367,6 @@ def _execute_subprocess(cls, cmd, stdin)) return cmd_output_string - @classmethod def exec_user_command(cls, cmd, env=None, close_fds=False, stdout=True, stdin=None, stderr=subprocess.STDOUT): cls._log_command_exec(cmd, stdin) @@ -396,7 +378,6 @@ def exec_user_command(cls, cmd, env=None, close_fds=False, stdout=True, stdin=stdin, stderr=stderr) - @classmethod def exec_command(cls, cmd, env=None, close_fds=False, stdout=True, stdin=None, stderr=subprocess.STDOUT): cls._log_command_exec(cmd, stdin) @@ -407,7 +388,6 @@ def exec_command(cls, cmd, env=None, close_fds=False, stdout=True, stdin=stdin, stderr=stderr) - @classmethod def exec_commandl(cls, cmdl, env=None, close_fds=False, stdout=True, stdin=None, stderr=subprocess.STDOUT): cls._log_command_exec(' '.join(cmdl), stdin) @@ -418,13 +398,11 @@ def exec_commandl(cls, cmdl, env=None, close_fds=False, stdout=True, stdin=stdin, stderr=stderr) - @classmethod def ints_to_ranges(cls, ints): for a, b in itertools.groupby(enumerate(ints), lambda x_y: x_y[1] - x_y[0]): b = list(b) yield b[0][1], b[-1][1] - @classmethod def ranges_to_ints(cls, rangelist): """ returns expanded list of integers given set of string ranges example: ['1', '2-4', '6'] returns [1, 2, 3, 4, 6] @@ -444,23 +422,19 @@ def ranges_to_ints(cls, rangelist): pass return result - @classmethod def compress_into_ranges(cls, ids_ints): return ['%d' %start if start == end else '%d-%d' %(start, end) for start, end in cls.ints_to_ranges(ids_ints)] - @classmethod def compress_into_ip_ranges(cls, ip_list): return [ "%s" % IPv4Address(start) if start == end else "%s-%s" % (IPv4Address(start), IPv4Address(end)) for start, end in cls.ints_to_ranges(map(int, ip_list)) ] - @classmethod def diff_ids(cls, ids1_ints, ids2_ints): return set(ids2_ints).difference(ids1_ints), set(ids1_ints).difference(ids2_ints) - @classmethod def compare_ids(cls, ids1, ids2, pvid=None, expand_range=True): """ Returns true if the ids are same else return false """ @@ -478,7 +452,6 @@ def compare_ids(cls, ids1, ids2, pvid=None, expand_range=True): else: return True - @classmethod def get_vlan_vni_in_map_entry(cls, vlan_vni_map_entry): # a good example for map is bridge-vlan-vni-map attribute # format eg: = @@ -530,7 +503,6 @@ def get_vlan_vni_in_map_entry(cls, vlan_vni_map_entry): return return (vlan, vni) - @classmethod def get_vlan_vnis_in_map(cls, vlan_vni_map): # a good example for map is bridge-vlan-vni-map attribute # format eg: = @@ -550,7 +522,6 @@ def get_vlan_vnis_in_map(cls, vlan_vni_map): vnis.extend([vni]) return (vlans, vnis) - @classmethod def get_vni_mcastgrp_in_map(cls, vni_mcastgrp_map): vnid = {} for ventry in vni_mcastgrp_map.split():