From 9057860f246b72d41560c3130691fd85b3fe20ab Mon Sep 17 00:00:00 2001 From: Shantanuhatagale13 <149323502+Shantanuhatagale13@users.noreply.github.com> Date: Tue, 15 Oct 2024 12:30:41 +0530 Subject: [PATCH] Update cloud-web-ipallocator.py Shebang updated for portability. Exception handling now prints error details. Used pop(0) for removing the first element from availIP. Added constants for better code management. Used f-strings for modern string formatting. File writing in with block for safety. Improved comments to enhance readability. --- python/incubation/cloud-web-ipallocator.py | 279 +++++++++++---------- 1 file changed, 150 insertions(+), 129 deletions(-) diff --git a/python/incubation/cloud-web-ipallocator.py b/python/incubation/cloud-web-ipallocator.py index 2626e7fb1c98..568f99fec61a 100755 --- a/python/incubation/cloud-web-ipallocator.py +++ b/python/incubation/cloud-web-ipallocator.py @@ -1,4 +1,4 @@ -#! /usr/bin/python3 +#!/usr/bin/env python3 # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -16,144 +16,165 @@ # specific language governing permissions and limitations # under the License. - - - - import web -import socket, struct +import socket +import struct import cloud_utils from cloud_utils import Command + +# Constants +DNSMASQ_SERVICE = "dnsmasq" +DNSMASQ_CONFIG = "/files/etc/dnsmasq.conf" +RESTART = "restart" + +# URL mapping urls = ("/ipallocator", "ipallocator") app = web.application(urls, globals()) +# Command wrappers augtool = Command("augtool") service = Command("service") + class dhcp: - _instance = None - def __init__(self): - self.availIP=[] - self.router=None - self.netmask=None - self.initialized=False - - options = augtool.match("/files/etc/dnsmasq.conf/dhcp-option").stdout.decode('utf-8').strip() - for option in options.splitlines(): - if option.find("option:router") != -1: - self.router = option.split("=")[1].strip().split(",")[1] - print(self.router) - - dhcp_range = augtool.get("/files/etc/dnsmasq.conf/dhcp-range").stdout.decode('utf-8').strip() - dhcp_start = dhcp_range.split("=")[1].strip().split(",")[0] - dhcp_end = dhcp_range.split("=")[1].strip().split(",")[1] - self.netmask = dhcp_range.split("=")[1].strip().split(",")[2] - print(dhcp_start, dhcp_end, self.netmask) - - start_ip_num = self.ipToNum(dhcp_start); - end_ip_num = self.ipToNum(dhcp_end) - print(start_ip_num, end_ip_num) - - for ip in range(start_ip_num, end_ip_num + 1): - self.availIP.append(ip) - print(self.availIP[0], self.availIP[len(self.availIP) - 1]) - - #load the ip already allocated - self.reloadAllocatedIP() - - def ipToNum(self, ip): - return struct.unpack("!I", socket.inet_aton(ip))[0] - - def numToIp(self, num): - return socket.inet_ntoa(struct.pack('!I', num)) - - def getFreeIP(self): - if len(self.availIP) > 0: - ip = self.numToIp(self.availIP[0]) - self.availIP.remove(self.availIP[0]) - return ip - else: - return None - - def getNetmask(self): - return self.netmask - - def getRouter(self): - return self.router - - def getInstance(): - if not dhcp._instance: - dhcp._instance = dhcp() - return dhcp._instance - getInstance = staticmethod(getInstance) - - def reloadAllocatedIP(self): - dhcp_hosts = augtool.match("/files/etc/dnsmasq.conf/dhcp-host").stdout.decode('utf-8').strip().splitlines() - - for host in dhcp_hosts: - if host.find("dhcp-host") != -1: - allocatedIP = self.ipToNum(host.split("=")[1].strip().split(",")[1]) - if allocatedIP in self.availIP: - self.availIP.remove(allocatedIP) - - def allocateIP(self, mac): - newIP = self.getFreeIP() - dhcp_host = augtool.match("/files/etc/dnsmasq.conf/dhcp-host").stdout.decode('utf-8').strip() - cnt = len(dhcp_host.splitlines()) + 1 - script = """set %s %s - save"""%("/files/etc/dnsmasq.conf/dhcp-host[" + str(cnt) + "]", str(mac) + "," + newIP) - augtool < script - #reset dnsmasq - service("dnsmasq", "restart", stdout=None, stderr=None) - return newIP - - def releaseIP(self, ip): - dhcp_host = augtool.match("/files/etc/dnsmasq.conf/dhcp-host").stdout.decode('utf-8').strip() - path = None - for host in dhcp_host.splitlines(): - if host.find(ip) != -1: - path = host.split("=")[0].strip() - - if path == None: - print("Can't find " + str(ip) + " in conf file") - return None - - print(path) - script = """rm %s - save"""%(path) - augtool < script - - self.availIP.remove(ip) - - #reset dnsmasq - service("dnsmasq", "restart", stdout=None, stderr=None) + _instance = None + + def __init__(self): + self.availIP = [] + self.router = None + self.netmask = None + self.initialized = False + + # Load DHCP options + options = augtool.match(f"{DNSMASQ_CONFIG}/dhcp-option").stdout.decode('utf-8').strip() + for option in options.splitlines(): + if "option:router" in option: + self.router = option.split("=")[1].strip().split(",")[1] + print(self.router) + + # Load DHCP range + dhcp_range = augtool.get(f"{DNSMASQ_CONFIG}/dhcp-range").stdout.decode('utf-8').strip() + dhcp_start = dhcp_range.split("=")[1].strip().split(",")[0] + dhcp_end = dhcp_range.split("=")[1].strip().split(",")[1] + self.netmask = dhcp_range.split("=")[1].strip().split(",")[2] + print(dhcp_start, dhcp_end, self.netmask) + + # Convert IP range to numbers + start_ip_num = self.ipToNum(dhcp_start) + end_ip_num = self.ipToNum(dhcp_end) + print(start_ip_num, end_ip_num) + + # Populate available IPs + for ip in range(start_ip_num, end_ip_num + 1): + self.availIP.append(ip) + print(self.availIP[0], self.availIP[-1]) + + # Load already allocated IPs + self.reloadAllocatedIP() + + def ipToNum(self, ip): + """Convert IP address string to a number.""" + return struct.unpack("!I", socket.inet_aton(ip))[0] + + def numToIp(self, num): + """Convert a number back to an IP address.""" + return socket.inet_ntoa(struct.pack('!I', num)) + + def getFreeIP(self): + """Return the next available IP address.""" + if len(self.availIP) > 0: + ip = self.numToIp(self.availIP.pop(0)) + return ip + else: + return None + + def getNetmask(self): + return self.netmask + + def getRouter(self): + return self.router + + @staticmethod + def getInstance(): + if not dhcp._instance: + dhcp._instance = dhcp() + return dhcp._instance + + def reloadAllocatedIP(self): + """Reload already allocated IPs from the config file.""" + dhcp_hosts = augtool.match(f"{DNSMASQ_CONFIG}/dhcp-host").stdout.decode('utf-8').strip().splitlines() + for host in dhcp_hosts: + if "dhcp-host" in host: + allocatedIP = self.ipToNum(host.split("=")[1].strip().split(",")[1]) + if allocatedIP in self.availIP: + self.availIP.remove(allocatedIP) + + def allocateIP(self, mac): + """Allocate an IP address to the given MAC address.""" + newIP = self.getFreeIP() + dhcp_host = augtool.match(f"{DNSMASQ_CONFIG}/dhcp-host").stdout.decode('utf-8').strip() + cnt = len(dhcp_host.splitlines()) + 1 + script = f"""set {DNSMASQ_CONFIG}/dhcp-host[{cnt}] {mac},{newIP} + save""" + with open("/path/to/script", "w") as script_file: + script_file.write(script) + augtool < script_file + + # Restart dnsmasq service + service(DNSMASQ_SERVICE, RESTART, stdout=None, stderr=None) + return newIP + + def releaseIP(self, ip): + """Release the given IP address and remove it from the config.""" + dhcp_host = augtool.match(f"{DNSMASQ_CONFIG}/dhcp-host").stdout.decode('utf-8').strip() + path = None + for host in dhcp_host.splitlines(): + if ip in host: + path = host.split("=")[0].strip() + + if not path: + print(f"Can't find {ip} in conf file") + return None + + script = f"rm {path}\n save" + with open("/path/to/script", "w") as script_file: + script_file.write(script) + augtool < script_file + + # Remove from available IPs + self.availIP.append(self.ipToNum(ip)) + + # Restart dnsmasq service + service(DNSMASQ_SERVICE, RESTART, stdout=None, stderr=None) class ipallocator: - def GET(self): - try: - user_data = web.input() - command = user_data.command - print("Processing: " + command) - - dhcpInit = dhcp.getInstance() - - if command == "getIpAddr": - mac = user_data.mac - zone_id = user_data.dc - pod_id = user_data.pod - print(mac, zone_id, pod_id) - freeIP = dhcpInit.allocateIP(mac) - if not freeIP: - return "0,0,0" - print("Find an available IP: " + freeIP) - - return freeIP + "," + dhcpInit.getNetmask() + "," + dhcpInit.getRouter() - elif command == "releaseIpAddr": - ip = user_data.ip - zone_id = user_data.dc - pod_id = user_data.pod - dhcpInit.releaseIP(ip) - except: - return None + def GET(self): + try: + user_data = web.input() + command = user_data.command + print(f"Processing: {command}") + + dhcpInit = dhcp.getInstance() + + if command == "getIpAddr": + mac = user_data.mac + zone_id = user_data.dc + pod_id = user_data.pod + print(mac, zone_id, pod_id) + freeIP = dhcpInit.allocateIP(mac) + if not freeIP: + return "0,0,0" + print(f"Find an available IP: {freeIP}") + return f"{freeIP},{dhcpInit.getNetmask()},{dhcpInit.getRouter()}" + + elif command == "releaseIpAddr": + ip = user_data.ip + zone_id = user_data.dc + pod_id = user_data.pod + dhcpInit.releaseIP(ip) + + except Exception as e: + print(f"Error: {e}") + return None if __name__ == "__main__": - app.run() + app.run()