From 228051ecd5ca3edf0c68447114558a75674e662c Mon Sep 17 00:00:00 2001 From: nautilusshell Date: Tue, 17 Dec 2019 06:40:06 +0000 Subject: [PATCH 1/5] =?UTF-8?q?1.=20=E5=A2=9E=E5=8A=A0=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E3=80=82./logs/cmd/=E4=BC=9A=E4=BF=9D?= =?UTF-8?q?=E5=AD=98=E6=AF=8F=E4=B8=AA=E6=AD=A5=E9=AA=A4=E6=89=80=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E8=BF=87=E7=9A=84=E6=89=80=E6=9C=89=E5=91=BD=E4=BB=A4?= =?UTF-8?q?=202.=20=E8=84=9A=E6=9C=AC=E5=BE=AE=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ClusterBootstrap/deploy.py | 232 +++++++++++++++++++++++------ src/ClusterBootstrap/utils.py | 83 +++++++++-- src/docker-images/WebUI/Dockerfile | 4 +- src/utils/DockerUtils.py | 45 +++++- src/utils/MyLogger.py | 84 +++++++++++ 5 files changed, 394 insertions(+), 54 deletions(-) mode change 100755 => 100644 src/ClusterBootstrap/deploy.py mode change 100755 => 100644 src/ClusterBootstrap/utils.py mode change 100755 => 100644 src/docker-images/WebUI/Dockerfile mode change 100755 => 100644 src/utils/DockerUtils.py mode change 100755 => 100644 src/utils/MyLogger.py diff --git a/src/ClusterBootstrap/deploy.py b/src/ClusterBootstrap/deploy.py old mode 100755 new mode 100644 index 89d9738c..aebb6002 --- a/src/ClusterBootstrap/deploy.py +++ b/src/ClusterBootstrap/deploy.py @@ -18,6 +18,8 @@ import copy import numbers import multiprocessing +import pdb +import logging from os.path import expanduser @@ -45,6 +47,8 @@ from params import default_config_parameters, scriptblocks from ConfigUtils import * +from MyLogger import get_deploy_logger as Logger + capacityMatch = re.compile("\d+\.?\d*\s*[K|M|G|T|P]B") digitsMatch = re.compile("\d+\.?\d*") @@ -59,8 +63,6 @@ nocache = False limitnodes = None - - # default search for all partitions of hdb, hdc, hdd, and sdb, sdc, sdd sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -149,6 +151,7 @@ def _check_config_items(cnfitem, cnf): def check_config(cnf): if not config["isacs"]: _check_config_items("discovery_url",cnf) + _check_config_items("kubernetes_master_node",cnf) _check_config_items("kubernetes_master_ssh_user",cnf) _check_config_items("api_servers",cnf) @@ -158,6 +161,7 @@ def check_config(cnf): _check_config_items("ssh_cert",cnf) _check_config_items("pod_ip_range",cnf) _check_config_items("service_cluster_ip_range",cnf) + if not os.path.isfile(config["ssh_cert"]): raise Exception("ERROR: we cannot find ssh key file at %s. \n please run 'python build-pxe-coreos.py docker_image_name' to generate ssh key file and pxe server image." % config["ssh_cert"]) @@ -268,6 +272,7 @@ def update_config(): def add_ssh_key(): keys = fetch_config(config, ["sshKeys"]) + if isinstance( keys, list ): if "sshkey" in config and "sshKeys" in config and not (config["sshkey"] in config["sshKeys"]): config["sshKeys"].append(config["sshkey"]) @@ -726,35 +731,65 @@ def gen_worker_certificates(): def gen_master_certificates(): GetCertificateProperty() + utils.render_template_directory("./template/ssl", "./deploy/ssl", config) - utils.render_template_directory("./template/ssl", "./deploy/ssl",config) - os.system("cd ./deploy/ssl && bash ./gencerts_master.sh") - os.system("cd ./deploy/ssl && bash ./gencerts_aggregator.sh") + cmd = "cd ./deploy/ssl && bash ./gencerts_master.sh" + os.system(cmd) + Logger().cmd(cmd) + + cmd = "cd ./deploy/ssl && bash ./gencerts_aggregator.sh" + os.system(cmd) + Logger().cmd(cmd) def gen_ETCD_certificates(): GetCertificateProperty() utils.render_template_directory("./template/ssl", "./deploy/ssl",config) - os.system("cd ./deploy/ssl && bash ./gencerts_etcd.sh") + cmd = "cd ./deploy/ssl && bash ./gencerts_etcd.sh" + os.system(cmd) + Logger().cmd(cmd) +def rm_deploy_configs(): + cmd = "mkdir -p ./deploy/etcd" + os.system(cmd) + Logger().cmd(cmd) + + cmd = "mkdir -p ./deploy/kube-addons" + os.system(cmd) + Logger().cmd(cmd) + + cmd = "mkdir -p ./deploy/master" + os.system(cmd) + Logger().cmd(cmd) + + cmd = "rm -r ./deploy/etcd" + os.system(cmd) + Logger().cmd(cmd) + + cmd = "rm -r ./deploy/kube-addons" + os.system(cmd) + Logger().cmd(cmd) + + cmd = "rm -r ./deploy/master" + os.system(cmd) + Logger().cmd(cmd) + return def gen_configs(): print "===============================================" print "generating configuration files..." + utils.clean_rendered_target_directory() - os.system("mkdir -p ./deploy/etcd") - os.system("mkdir -p ./deploy/kube-addons") - os.system("mkdir -p ./deploy/master") - os.system("rm -r ./deploy/etcd") - os.system("rm -r ./deploy/kube-addons") - os.system("rm -r ./deploy/master") + rm_deploy_configs() deployDirs = ["deploy/etcd","deploy/kubelet","deploy/master","deploy/web-docker/kubelet","deploy/kube-addons","deploy/bin"] for deployDir in deployDirs: if not os.path.exists(deployDir): - os.system("mkdir -p %s" % (deployDir)) + cmd = "mkdir -p %s" % (deployDir) + os.system(cmd) + Logger().cmd(cmd) if "etcd_node" in config: etcd_servers = config["etcd_node"] @@ -783,17 +818,13 @@ def gen_configs(): config["api_servers"] = "https://"+config["kubernetes_master_node"][0]+":"+str(config["k8sAPIport"]) config["etcd_endpoints"] = ",".join(["https://"+x+":"+config["etcd3port1"] for x in config["etcd_node"]]) - - - if os.path.isfile(config["ssh_cert"]+".pub"): f = open(config["ssh_cert"]+".pub") sshkey_public = f.read() f.close() - config["sshkey"] = sshkey_public - add_ssh_key() + add_ssh_key() check_config(config) utils.render_template_directory("./template/etcd", "./deploy/etcd",config) @@ -875,12 +906,17 @@ def deploy_master(kubernetes_master): utils.SSH_exec_script(config["ssh_cert"],kubernetes_master_user, kubernetes_master, "./deploy/master/" + config["postmasterdeploymentscript"]) def get_cni_binary(): - os.system("mkdir -p ./deploy/bin") + cmd = "mkdir -p ./deploy/bin" + os.system(cmd) + Logger().cmd(cmd) + # urllib.urlretrieve ("http://ccsdatarepo.westus.cloudapp.azure.com/data/containernetworking/cni-amd64-v0.5.2.tgz", "./deploy/bin/cni-amd64-v0.5.2.tgz") if verbose: print "Extracting CNI binaries" - os.system("tar -zxvf ./deploy/bin/cni-amd64-v0.5.2.tgz -C ./deploy/bin") - + + cmd = "tar -zxvf ./deploy/bin/cni-amd64-v0.5.2.tgz -C ./deploy/bin" + os.system(cmd) + Logger().cmd(cmd) def get_kubectl_binary(force = False): get_hyperkube_docker(force = force) @@ -891,17 +927,24 @@ def get_kubectl_binary(force = False): get_cni_binary() def get_hyperkube_docker(force = False) : - os.system("mkdir -p ./deploy/bin") + cmd = "mkdir -p ./deploy/bin" + os.system(cmd) + Logger().cmd(cmd) + print( "Use docker container %s" % config["dockers"]["container"]["hyperkube"]["fullname"]) if force or not os.path.exists("./deploy/bin/hyperkube"): copy_from_docker_image(config["dockers"]["container"]["hyperkube"]["fullname"], "/hyperkube", "./deploy/bin/hyperkube") + if force or not os.path.exists("./deploy/bin/kubelet"): copy_from_docker_image(config["dockers"]["container"]["hyperkube"]["fullname"], "/kubelet", "./deploy/bin/kubelet") + if force or not os.path.exists("./deploy/bin/kubectl"): copy_from_docker_image(config["dockers"]["container"]["hyperkube"]["fullname"], "/kubectl", "./deploy/bin/kubectl") + if config['kube_custom_cri']: if force or not os.path.exists("./deploy/bin/crishim"): copy_from_docker_image(config["dockers"]["container"]["hyperkube"]["fullname"], "/crishim", "./deploy/bin/crishim") + if force or not os.path.exists("./deploy/bin/nvidiagpuplugin.so"): copy_from_docker_image(config["dockers"]["container"]["hyperkube"]["fullname"], "/nvidiagpuplugin.so", "./deploy/bin/nvidiagpuplugin.so") @@ -916,13 +959,14 @@ def deploy_masters(force = False): kubernetes_masters = config["kubernetes_master_node"] kubernetes_master_user = config["kubernetes_master_ssh_user"] - utils.render_template_directory("./template/master", "./deploy/master",config) utils.render_template_directory("./template/kube-addons", "./deploy/kube-addons",config) #temporary hard-coding, will be fixed after refactoring of config/render logic config["restapi"] = "http://%s:%s" % (kubernetes_masters[0],config["restfulapiport"]) + if verbose: print( "Restapi information == %s " % config["restapi"]) + utils.render_template_directory("./template/WebUI", "./deploy/WebUI",config) utils.render_template_directory("./template/RestfulAPI", "./deploy/RestfulAPI",config) render_service_templates() @@ -931,6 +975,7 @@ def deploy_masters(force = False): for i,kubernetes_master in enumerate(kubernetes_masters): deploy_master(kubernetes_master) + deploycmd = """ until curl -q http://127.0.0.1:8080/version/ ; do sleep 5; @@ -983,10 +1028,17 @@ def check_etcd_service(): print "waiting for ETCD service is ready..." etcd_servers = config["etcd_node"] cmd = "curl --cacert %s --cert %s --key %s 'https://%s:%s/v2/keys'" % ("./deploy/ssl/etcd/ca.pem","./deploy/ssl/etcd/etcd.pem","./deploy/ssl/etcd/etcd-key.pem", etcd_servers[0], config["etcd3port1"]) + + ## modify from https to http + #cmd = "curl --cacert %s --cert %s --key %s 'http://%s:%s/v2/keys'" % ("./deploy/ssl/etcd/ca.pem","./deploy/ssl/etcd/etcd.pem","./deploy/ssl/etcd/etcd-key.pem", etcd_servers[0], config["etcd3port1"]) + Logger().cmd(cmd) + if verbose: print cmd + while os.system(cmd) != 0: time.sleep(5) + print "ETCD service is ready to use..." @@ -1022,7 +1074,6 @@ def deploy_ETCD_docker(): print "===============================================" print "init etcd service on %s ..." % etcd_servers[0] - check_etcd_service() utils.scp(config["ssh_cert"],"./deploy/etcd/init_network.sh","/home/%s/init_network.sh" % etcd_server_user, etcd_server_user, etcd_servers[0] ) @@ -1077,16 +1128,38 @@ def deploy_ETCD(): print "===============================================" print "init etcd service on %s ..." % etcd_servers[0] + #get_etcd_server_ip_cmd = ("getent hosts %s | awk '{ print $1 }'") % (etcd_servers[0]) + #ips = utils.exec_cmd_local(get_etcd_server_ip_cmd).splitlines() + #print ips + + + ## temporily modify from https to http + #cmd = "curl --cacert %s --cert %s --key %s 'http://%s:%s/v2/keys'" % ("./deploy/ssl/etcd/ca.pem","./deploy/ssl/etcd/etcd.pem","./deploy/ssl/etcd/etcd-key.pem", etcd_servers[0], config["etcd3port1"]) + + ''' + if len(ips) > 0: + ip = ips[0] + ip = "192.168.255.1" + cmd = "curl --cacert %s --cert %s --key %s 'https://%s:%s/v2/keys'" % ("./deploy/ssl/etcd/ca.pem","./deploy/ssl/etcd/etcd.pem","./deploy/ssl/etcd/etcd-key.pem", ip, config["etcd3port1"]) + else: + cmd = "curl --cacert %s --cert %s --key %s 'https://%s:%s/v2/keys'" % ("./deploy/ssl/etcd/ca.pem","./deploy/ssl/etcd/etcd.pem","./deploy/ssl/etcd/etcd-key.pem", etcd_servers[0], config["etcd3port1"]) + ''' print "waiting for ETCD service is ready..." - cmd = "curl --cacert %s --cert %s --key %s 'https://%s:%s/v2/keys'" % ("./deploy/ssl/etcd/ca.pem","./deploy/ssl/etcd/etcd.pem","./deploy/ssl/etcd/etcd-key.pem", etcd_servers[0], config["etcd3port1"]) + cmd = "curl --cacert %s --cert %s --key %s 'https://%s:%s/v2/keys'" % ("./deploy/ssl/etcd/ca.pem","./deploy/ssl/etcd/etcd.pem","./deploy/ssl/etcd/etcd-key.pem", etcd_servers[0], config["etcd3port1"]) + + ##cmd = "curl --cacert %s --cert %s --key %s 'https://%s:%s/v2/keys'" % ("./deploy/ssl/etcd/ca.pem","./deploy/ssl/etcd/etcd.pem","./deploy/ssl/etcd/etcd-key.pem", + ## etcd_servers[0], config["etcd3port1"]) + + if verbose: + print cmd + + Logger().cmd(cmd) while os.system(cmd) != 0: print "ETCD service is NOT ready, waiting for 5 seconds..." time.sleep(5) - print "ETCD service is ready to use..." - - + print "ETCD service is ready to use..." utils.render_template("./template/etcd/init_network.sh","./deploy/etcd/init_network.sh",config) utils.SSH_exec_script( config["ssh_cert"], etcd_server_user, etcd_servers[0], "./deploy/etcd/init_network.sh") @@ -1284,7 +1357,9 @@ def deploy_restful_API_on_node(ipAddress): config["nfs-server"] = "10.196.44.241:/mnt/data" if not os.path.exists("./deploy/RestfulAPI"): - os.system("mkdir -p ./deploy/RestfulAPI") + cmd = "mkdir -p ./deploy/RestfulAPI" + os.system(cmd) + Logger().cmd(cmd) utils.render_template("./template/RestfulAPI/config.yaml","./deploy/RestfulAPI/config.yaml",config) utils.render_template("./template/master/restapi-kubeconfig.yaml","./deploy/master/restapi-kubeconfig.yaml",config) @@ -1306,9 +1381,13 @@ def deploy_restful_API_on_node(ipAddress): print "===============================================" print "restful api is running at: http://%s:%s" % (masterIP,config["restfulapiport"]) config["restapi"] = "http://%s:%s" % (masterIP,config["restfulapiport"]) + if verbose: print("Restapi === %s" % config["restapi"]) + return + + def deploy_webUI_on_node(ipAddress): sshUser = config["admin_username"] @@ -1320,12 +1399,18 @@ def deploy_webUI_on_node(ipAddress): return if not os.path.exists("./deploy/WebUI"): - os.system("mkdir -p ./deploy/WebUI") + cmd = "mkdir -p ./deploy/WebUI" + os.system(cmd) + Logger().cmd(cmd) + if verbose: print("Configuration == %s" % config) utils.render_template_directory("./template/WebUI","./deploy/WebUI", config) - os.system("cp --verbose ./deploy/WebUI/*.json ../WebUI/dotnet/WebPortal/") # used for debugging, when deploy, it will be overwritten by mount from host, contains secret + + cmd = "cp --verbose ./deploy/WebUI/*.json ../WebUI/dotnet/WebPortal/" + os.system(cmd) # used for debugging, when deploy, it will be overwritten by mount from host, contains secret + Logger().cmd(cmd) # write into host, mounted into container utils.sudo_scp(config["ssh_cert"],"./deploy/WebUI/userconfig.json","/etc/WebUI/userconfig.json", sshUser, webUIIP ) @@ -1348,26 +1433,31 @@ def deploy_webUI_on_node(ipAddress): with open("./deploy/WebUI/dashboardConfig.json","w") as fp: json.dump(reportConfig, fp) - os.system("cp --verbose ./deploy/WebUI/dashboardConfig.json ../WebUI/dotnet/WebPortal/") + + cmd = "cp --verbose ./deploy/WebUI/dashboardConfig.json ../WebUI/dotnet/WebPortal/" + os.system(cmd) + Logger().cmd(cmd) + # write into host, mounted into container utils.sudo_scp(config["ssh_cert"],"./deploy/WebUI/dashboardConfig.json","/etc/WebUI/dashboardConfig.json", sshUser, webUIIP ) - utils.render_template("./template/WebUI/Master-Templates.json", "./deploy/WebUI/Master-Templates.json", config) #os.system("cp --verbose ./template/WebUI/Master-Templates.json ./deploy/WebUI/Master-Templates.json") - os.system("cp --verbose ./deploy/WebUI/Master-Templates.json ../WebUI/dotnet/WebPortal/Master-Templates.json") - utils.sudo_scp(config["ssh_cert"],"./deploy/WebUI/Master-Templates.json","/etc/WebUI/Master-Templates.json", sshUser, webUIIP ) - - + + cmd = "cp --verbose ./deploy/WebUI/Master-Templates.json ../WebUI/dotnet/WebPortal/Master-Templates.json" + os.system(cmd) + Logger().cmd(cmd) + utils.sudo_scp(config["ssh_cert"],"./deploy/WebUI/Master-Templates.json","/etc/WebUI/Master-Templates.json", sshUser, webUIIP ) utils.render_template_directory("./template/RestfulAPI", "./deploy/RestfulAPI",config) utils.sudo_scp(config["ssh_cert"],"./deploy/RestfulAPI/config.yaml","/etc/RestfulAPI/config.yaml", sshUser, webUIIP ) - # utils.SSH_exec_cmd(config["ssh_cert"], sshUser, webUIIP, "docker pull %s ; docker rm -f webui ; docker run -d -p %s:80 -v /etc/WebUI:/WebUI --restart always --name webui %s ;" % (dockername,str(config["webuiport"]),dockername)) - print "===============================================" print "Web UI is running at: http://%s:%s" % (webUIIP,str(config["webuiport"])) + return + + # Install ssh key remotely def install_ssh_key(key_files): all_nodes = get_nodes(config["clusterId"]) @@ -1791,16 +1881,22 @@ def config_fqdn(): utils.SSH_exec_cmd(config["ssh_cert"], config["admin_username"], node, remotecmd) def add_service_config(): + if os.path.exists("deploy/etc/nginx/"): - os.system("cp deploy/etc/nginx/* deploy/services/nginx/") + cmd = "cp deploy/etc/nginx/* deploy/services/nginx/" + os.system(cmd) + Logger().cmd(cmd) def config_nginx(): all_nodes = get_nodes(config["clusterId"]) + template_dir = "services/nginx/" target_dir = "deploy/services/nginx/" + utils.render_template_directory(template_dir, target_dir,config) add_service_config() + for node in all_nodes: utils.sudo_scp(config["ssh_cert"],"./deploy/services/nginx/","/etc/nginx/conf.other", config["admin_username"], node ) # See https://github.com/kubernetes/examples/blob/master/staging/https-nginx/README.md @@ -2610,22 +2706,31 @@ def deploy_ETCD_master(force = False): if "etcd_node" in config and len(config["etcd_node"]) >= int(config["etcd_node_num"]) and "kubernetes_master_node" in config and len(config["kubernetes_master_node"]) >= 1: print "Ready to deploy kubernetes master on %s, etcd cluster on %s. " % (",".join(config["kubernetes_master_node"]), ",".join(config["etcd_node"])) gen_configs() + response = raw_input_with_default("Clean Up master, and deploy ETCD Nodes (y/n)?") if first_char(response) == "y": clean_master() gen_ETCD_certificates() deploy_ETCD() + response = raw_input_with_default("Deploy Master Nodes (y/n)?") if first_char(response) == "y": gen_master_certificates() deploy_masters(force) response = raw_input_with_default("Allow Workers to register (y/n)?") + if first_char(response) == "y": + cmd = config["homeinserver"]+"/SetClusterInfo?clusterId=%s&key=etcd_endpoints&value=%s" % (config["clusterId"],config["etcd_endpoints"]) + urllib.urlretrieve (cmd) + write_log("curl " + cmd) + + cmd = config["homeinserver"]+"/SetClusterInfo?clusterId=%s&key=api_server&value=%s" % (config["clusterId"],config["api_servers"]) + urllib.urlretrieve (cmd) + write_log("curl " + cmd) - urllib.urlretrieve (config["homeinserver"]+"/SetClusterInfo?clusterId=%s&key=etcd_endpoints&value=%s" % (config["clusterId"],config["etcd_endpoints"])) - urllib.urlretrieve (config["homeinserver"]+"/SetClusterInfo?clusterId=%s&key=api_server&value=%s" % (config["clusterId"],config["api_servers"])) return True + return False # response = raw_input_with_default("Create ISO file for deployment (y/n)?") @@ -2652,15 +2757,20 @@ def update_config_nodes(): def run_kube( prog, commands ): one_command = " ".join(commands) kube_command = "" + if (config["isacs"]): kube_command = "%s --kubeconfig=./deploy/%s %s" % (prog, config["acskubeconfig"], one_command) else: nodes = get_ETCD_master_nodes(config["clusterId"]) master_node = random.choice(nodes) kube_command = ("%s --server=https://%s:%s --certificate-authority=%s --client-key=%s --client-certificate=%s %s" % (prog, master_node, config["k8sAPIport"], "./deploy/ssl/ca/ca.pem", "./deploy/ssl/kubelet/apiserver-key.pem", "./deploy/ssl/kubelet/apiserver.pem", one_command) ) + if verbose: print kube_command + os.system(kube_command) + write_log(kube_command) + return def run_kubectl( commands ): run_kube( "./deploy/bin/kubectl", commands) @@ -2744,15 +2854,19 @@ def get_service_name(service_config_file): return None def get_service_yaml( use_service ): + servicedic = get_all_services() #print servicedic newentries = {} + for service in servicedic: servicename = get_service_name(servicedic[service]) newentries[servicename] = servicedic[service] + servicedic.update(newentries) #print servicedic fname = servicedic[use_service] + return fname def kubernetes_label_node(cmdoptions, nodename, label): @@ -2891,8 +3005,11 @@ def start_kube_service( servicename ): fname = get_service_yaml( servicename ) # print "start service %s with %s" % (servicename, fname) dirname = os.path.dirname(fname) + if os.path.exists(os.path.join(dirname,"launch_order")) and "/" not in servicename: + with open(os.path.join(dirname,"launch_order"),'r') as f: + allservices = f.readlines() for filename in allservices: # If this line is a sleep tag (e.g. SLEEP 10), sleep for given seconds to wait for the previous service to start. @@ -2901,20 +3018,26 @@ def start_kube_service( servicename ): else: filename = filename.strip('\n') start_one_kube_service(os.path.join(dirname,filename)) + else: start_one_kube_service(fname) + return + def stop_kube_service( servicename ): fname = get_service_yaml( servicename ) dirname = os.path.dirname(fname) + if os.path.exists(os.path.join(dirname,"launch_order")) and "/" not in servicename: with open(os.path.join(dirname,"launch_order"),'r') as f: + allservices = f.readlines() for filename in reversed(allservices): # If this line is a sleep tag, skip this line. if not filename.startswith("SLEEP"): filename = filename.strip('\n') stop_one_kube_service(os.path.join(dirname,filename)) + else: stop_one_kube_service(fname) @@ -2957,6 +3080,7 @@ def push_docker_images(nargs): if verbose: print "Build & push docker images to docker register ..." print "Nocache: {0}".format(nocache) + push_dockers("./deploy/docker-images/", config["dockerprefix"], config["dockertag"], nargs, config, verbose, nocache = nocache ) def check_buildable_images(nargs): @@ -2985,6 +3109,7 @@ def run_docker_image( imagename, native = False, sudo = False ): else: run_docker( matches[0], prompt = imagename, dockerConfig = dockerConfig, sudo = sudo ) + def run_command( args, command, nargs, parser ): # If necessary, show parsed arguments. # print args @@ -3029,6 +3154,7 @@ def run_command( args, command, nargs, parser ): f = open(config_file) merge_config(config, yaml.load(f)) f.close() + # print config if os.path.exists("./deploy/clusterID.yml"): f = open("./deploy/clusterID.yml") @@ -3514,34 +3640,45 @@ def run_command( args, command, nargs, parser ): elif command == "kubernetes": configuration( config, verbose ) + #pdb.set_trace() + if len(nargs) >= 1: + if len(nargs)>=2: servicenames = nargs[1:] + else: allservices = get_all_services() servicenames = [] for service in allservices: servicenames.append(service) # print servicenames + generate_hdfs_containermounts() configuration( config, verbose ) + if nargs[0] == "start": if args.force and "hdfsformat" in servicenames: print ("This operation will WIPEOUT HDFS namenode, and erase all data on the HDFS cluster, " ) response = raw_input ("Please type (WIPEOUT) in ALL CAPITALS to confirm the operation ---> ") + if response == "WIPEOUT": config["hdfsconfig"]["formatoptions"] = "--force " + # Start a kubelet service. for servicename in servicenames: start_kube_service(servicename) + elif nargs[0] == "stop": # stop a kubelet service. for servicename in servicenames: stop_kube_service(servicename) + elif nargs[0] == "restart": # restart a kubelet service. for servicename in servicenames: replace_kube_service(servicename) + elif nargs[0] == "labels": if len(nargs)>=2 and ( nargs[1] == "active" or nargs[1] == "inactive" or nargs[1] == "remove" ): kubernetes_label_nodes(nargs[1], nargs[2:], args.yes) @@ -3550,6 +3687,7 @@ def run_command( args, command, nargs, parser ): else: parser.print_help() print "Error: kubernetes labels expect a verb which is either active, inactive or remove, but get: %s" % nargs[1] + elif nargs[0] == "patchprovider": # TODO(harry): read a tag to decide which tools we are using, so we don't need nargs[1] if len(nargs)>=2 and ( nargs[1] == "aztools" or nargs[1] == "gstools" or nargs[1] == "awstools" ): @@ -3559,12 +3697,16 @@ def run_command( args, command, nargs, parser ): kubernetes_patch_nodes_provider(nargs[1], False) else: print "Error: kubernetes patchprovider expect a verb which is either aztools, gstools or awstools." + elif nargs[0] == "mark": kubernetes_mark_nodes( nargs[1:], True) + elif nargs[0] == "unmark": kubernetes_mark_nodes( nargs[1:], False) + elif nargs[0] == "cordon" or nargs[0] == "uncordon": run_kube_command_on_nodes(nargs) + else: parser.print_help() print "Error: Unknown kubernetes subcommand " + nargs[0] @@ -3611,14 +3753,18 @@ def run_command( args, command, nargs, parser ): config_fqdn() elif command == "docker": + if len(nargs)>=1: configuration( config, verbose ) + if nargs[0] == "build": check_buildable_images(nargs[1:]) build_docker_images(nargs[1:]) + elif nargs[0] == "push": check_buildable_images(nargs[1:]) push_docker_images(nargs[1:]) + elif nargs[0] == "run": if len(nargs)>=2: run_docker_image( nargs[1], args.native, sudo = args.sudo ) @@ -3827,7 +3973,9 @@ def run_script_blocks( verbose, script_collection ): if not os.path.exists("./deploy"): os.system("mkdir -p ./deploy") + config = init_config(default_config_parameters) + Logger().init(command) if command == "scriptblocks": if nargs[0] in scriptblocks: diff --git a/src/ClusterBootstrap/utils.py b/src/ClusterBootstrap/utils.py old mode 100755 new mode 100644 index c773b373..049a64c9 --- a/src/ClusterBootstrap/utils.py +++ b/src/ClusterBootstrap/utils.py @@ -1,4 +1,6 @@ #!/usr/bin/python +# -*- coding: UTF-8 -*- + import json import os import time @@ -23,6 +25,10 @@ from shutil import copyfile, copytree import urllib import socket,struct +import logging + +sys.path.append("../utils") +from MyLogger import get_deploy_logger as Logger verbose = False @@ -42,20 +48,33 @@ def clean_rendered_target_directory(): def render_template(template_file, target_file, config, verbose=False): filename, file_extension = os.path.splitext(template_file) basename = os.path.basename(template_file) + if ("render-exclude" in config and basename in config["render-exclude"] ): # Don't render/copy the file. return + if ("render-by-copy-ext" in config and file_extension in config["render-by-copy-ext"]) or ("render-by-copy" in config and basename in config["render-by-copy"]): + copyfile(template_file, target_file) + Logger().cmd("copy file %s %s" % (template_file, target_file)) + if verbose: print "Copy tempalte " + template_file + " --> " + target_file + + elif "render-by-copy-full" in config and template_file in config["render-by-copy-full"]: + copyfile(template_file, target_file) + Logger().cmd("copy file %s %s" % (template_file, target_file)) + if verbose: print "Copy tempalte " + template_file + " --> " + target_file + elif ("render-by-line-ext" in config and file_extension in config["render-by-line-ext"]) or ("render-by-line" in config and basename in config["render-by-line"]): if verbose: print "Render template " + template_file + " --> " + target_file + " Line by Line .... " + + ## 逐行拷贝 ENV_local = Environment(loader=FileSystemLoader("/")) with open(target_file, 'w') as f: with open(template_file, 'r') as fr: @@ -71,17 +90,26 @@ def render_template(template_file, target_file, config, verbose=False): fr.close() f.close() + Logger().cmd("copy file %s %s" % (template_file, target_file)) + else: if verbose: print "Render template " + template_file + " --> " + target_file try: ENV_local = Environment(loader=FileSystemLoader("/")) template = ENV_local.get_template(os.path.abspath(template_file)) + content = template.render(cnf=config) target_dir = os.path.dirname(target_file) - os.system("mkdir -p {0}".format(target_dir)) + + cmd = "mkdir -p {0}".format(target_dir) + os.system(cmd) + Logger().cmd(cmd) + Logger().cmd("copy file %s %s" % (template_file, target_file)) + with open(target_file, 'w') as f: f.write(content) + f.close() except Exception as e: print "!!! Failure !!! in render template " + template_file @@ -93,18 +121,25 @@ def render_template_directory(template_dir, target_dir,config, verbose=False, ex return else: StaticVariable.rendered_target_directory[target_dir]=template_dir - os.system("mkdir -p "+target_dir) + + cmd = "mkdir -p "+target_dir + os.system(cmd) + Logger().cmd(cmd) + markfile = os.path.join( target_dir, "DO_NOT_WRITE" ) # print "Evaluate %s" % markfile if not os.path.exists( markfile ): # print "Write DO_NOT_WRITE" open( markfile, 'w').close() + filenames = os.listdir(template_dir) for filename in filenames: if filename == "copy_dir": fullname = os.path.join(template_dir, filename) + with open( fullname ) as f: content = f.readlines() + content = [x.strip() for x in content] for copy_dir in content: fullname_copy_dir = os.path.join(template_dir, copy_dir) @@ -112,18 +147,27 @@ def render_template_directory(template_dir, target_dir,config, verbose=False, ex # Allow target directory to be re-rendered StaticVariable.rendered_target_directory.pop(target_dir, None) render_template_directory(fullname_copy_dir, target_dir,config, verbose, exclude_dir=template_dir) + elif os.path.isfile(os.path.join(template_dir, filename)): if exclude_dir is not None: check_file = os.path.join(exclude_dir, filename) if os.path.exists(check_file): continue + render_template(os.path.join(template_dir, filename), os.path.join(target_dir, filename),config, verbose) else: srcdir = os.path.join(template_dir, filename) dstdir = os.path.join(target_dir, filename) + if ("render-by-copy" in config and filename in config["render-by-copy"]): - os.system( "rm -rf %s" % dstdir ) - os.system( "cp -r %s %s" %(srcdir, dstdir)) + cmd = "rm -rf %s" % dstdir + os.system(cmd) + Logger().cmd(cmd) + + cmd = "cp -r %s %s" %(srcdir, dstdir) + os.system(cmd) + Logger().cmd(cmd) + else: if exclude_dir is None: render_template_directory(srcdir, dstdir,config, verbose) @@ -131,41 +175,55 @@ def render_template_directory(template_dir, target_dir,config, verbose=False, ex exdir = os.path.join(exclude_dir, filename) render_template_directory(srcdir, dstdir,config, verbose, exclude_dir=exdir) + # Execute a remote SSH cmd with identity file (private SSH key), user, host def SSH_exec_cmd(identity_file, user,host,cmd,showCmd=True): if len(cmd)==0: return; + if showCmd or verbose: print ("""ssh -q -o "StrictHostKeyChecking no" -o "UserKnownHostsFile=/dev/null" -i %s "%s@%s" "%s" """ % (identity_file, user, host, cmd) ) - os.system("""ssh -q -o "StrictHostKeyChecking no" -o "UserKnownHostsFile=/dev/null" -i %s "%s@%s" "%s" """ % (identity_file, user, host, cmd) ) + + cmd = """ssh -q -o "StrictHostKeyChecking no" -o "UserKnownHostsFile=/dev/null" -i %s "%s@%s" "%s" """ % (identity_file, user, host, cmd) + os.system(cmd) + Logger().cmd(cmd) # SSH Connect to a remote host with identity file (private SSH key), user, host # Program usually exit here. def SSH_connect(identity_file, user,host): if verbose: print ("""ssh -q -o "StrictHostKeyChecking no" -o "UserKnownHostsFile=/dev/null" -i %s "%s@%s" """ % (identity_file, user, host) ) - os.system("""ssh -q -o "StrictHostKeyChecking no" -o "UserKnownHostsFile=/dev/null" -i %s "%s@%s" """ % (identity_file, user, host) ) + + cmd = """ssh -q -o "StrictHostKeyChecking no" -o "UserKnownHostsFile=/dev/null" -i %s "%s@%s" """ % (identity_file, user, host) + os.system(cmd) + Logger().cmd(cmd) # Copy a local file or directory (source) to remote (target) with identity file (private SSH key), user, host def scp (identity_file, source, target, user, host, verbose = False): cmd = 'scp -q -o "StrictHostKeyChecking no" -o "UserKnownHostsFile=/dev/null" -i %s -r "%s" "%s@%s:%s"' % (identity_file, source, user, host, target) if verbose: print cmd + os.system(cmd) + Logger().cmd(cmd) # Copy a local file (source) or directory to remote (target) with identity file (private SSH key), user, host, and def sudo_scp (identity_file, source, target, user, host,changePermission=False, verbose = False ): tmp = str(uuid.uuid4()) scp(identity_file, source,"~/%s" % tmp, user, host, verbose ) targetPath = os.path.dirname(target) - if ( os.path.isfile(source)): + + if (os.path.isfile(source)): cmd = "sudo mkdir -p %s ; sudo mv ~/%s %s" % (targetPath, tmp, target) else: - cmd = "sudo mkdir -p %s ; sudo rm -r %s/*; sudo mv ~/%s/* %s; sudo rm -rf ~/%s" % (target, target, tmp, target, tmp) + cmd = "sudo mkdir -p %s ; sudo rm -rf %s/*; sudo mv ~/%s/* %s; sudo rm -rf ~/%s" % (target, target, tmp, target, tmp) + if changePermission: cmd += " ; sudo chmod +x %s" % target + if verbose: print cmd + SSH_exec_cmd(identity_file, user, host, cmd, verbose) # Execute a remote SSH cmd with identity file (private SSH key), user, host @@ -267,14 +325,19 @@ def _byteify(data, ignore_dicts = False): return data def exec_cmd_local(execmd, supressWarning = False): + if supressWarning: cmd += " 2>/dev/null" + if verbose: print execmd + try: output = subprocess.check_output( execmd, shell=True ) except subprocess.CalledProcessError as e: output = "Return code: " + str(e.returncode) + ", output: " + e.output.strip() + + Logger().cmd(execmd) # print output return output @@ -333,14 +396,17 @@ def SSH_exec_cmd_with_directory( identity_file, user, host, srcdir, cmd, supress def SSH_exec_script( identity_file, user, host, script, supressWarning = False, removeAfterExecution = True): tmpfile = os.path.join("/tmp", str(uuid.uuid4())+".sh") scp( identity_file, script, tmpfile, user, host) + cmd = "bash --verbose "+tmpfile dstcmd = "" if supressWarning: dstcmd += cmd + " 2>/dev/null; " else: dstcmd += cmd + "; " + if removeAfterExecution: dstcmd += "rm -r " + tmpfile + "; " + SSH_exec_cmd( identity_file, user, host, dstcmd,False ) @@ -551,4 +617,3 @@ def mergeDict(configDst, configSrc, bOverwrite): configDst[entry] = configSrc[entry] elif isinstance(configSrc[entry], dict) and isinstance(configDst[entry], dict): mergeDict(configDst[entry], configSrc[entry], bOverwrite) - diff --git a/src/docker-images/WebUI/Dockerfile b/src/docker-images/WebUI/Dockerfile old mode 100755 new mode 100644 index 4c105652..335afcf4 --- a/src/docker-images/WebUI/Dockerfile +++ b/src/docker-images/WebUI/Dockerfile @@ -12,8 +12,8 @@ RUN sh -c 'echo "deb [arch=amd64] https://apt-mo.trafficmanager.net/repos/dotnet RUN apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys 417A0893 RUN apt-get install -y apt-transport-https RUN apt-get update -RUN apt-get install -y dotnet-dev-1.0.0-preview2.1-003177 -RUN apt-get install -y dotnet-dev-1.0.0-preview2-003156 +RUN apt-get install -y --allow-unauthenticated dotnet-dev-1.0.0-preview2.1-003177 +RUN apt-get install -y --allow-unauthenticated dotnet-dev-1.0.0-preview2-003156 RUN apt-get update && apt-get install -y --no-install-recommends \ diff --git a/src/utils/DockerUtils.py b/src/utils/DockerUtils.py old mode 100755 new mode 100644 index f1db2523..a08f42b8 --- a/src/utils/DockerUtils.py +++ b/src/utils/DockerUtils.py @@ -9,32 +9,50 @@ import getpass import pwd import grp +import pdb + from os.path import expanduser from DirectoryUtils import cd +from MyLogger import get_deploy_logger as Logger + def build_docker( dockername, dirname, verbose=False, nocache=False ): + # docker name is designed to use lower case. dockername = dockername.lower() if verbose: print "Building docker ... " + dockername + " .. @" + dirname + with cd(dirname): + Logger().cmd("cd " + dirname) + # print "Test if prebuid.sh exists" if os.path.exists("prebuild.sh"): print "Execute prebuild.sh for docker %s" % dockername - os.system("bash prebuild.sh") + + cmd = "bash prebuild.sh" + os.system(cmd) + Logger().cmd(cmd) + if nocache: cmd = "docker build --no-cache -t "+ dockername + " ." else: cmd = "docker build -t "+ dockername + " ." + if verbose: print cmd + os.system(cmd) + Logger().cmd(cmd) + return dockername + def build_docker_with_config( dockername, config, verbose=False, nocache=False ): usedockername = dockername.lower() build_docker( config["dockers"]["container"][dockername]["name"], config["dockers"]["container"][dockername]["dirname"], verbose, nocache ) + def push_docker( dockername, docker_register, verbose=False): # docker name is designed to use lower case. dockername = dockername.lower() @@ -45,15 +63,24 @@ def push_docker( dockername, docker_register, verbose=False): os.system(cmd) return dockername + def push_docker_with_config( dockername, config, verbose=False, nocache=False ): + usedockername = dockername.lower() + # build_docker( config["dockers"]["container"][dockername]["name"], config["dockers"]["container"][dockername]["dirname"], verbose, nocache ) if verbose: print "Pushing docker ... " + config["dockers"]["container"][dockername]["name"] + " to " + config["dockers"]["container"][dockername]["fullname"] + cmd = "docker tag "+ config["dockers"]["container"][dockername]["name"] + " " + config["dockers"]["container"][dockername]["fullname"] cmd += "; docker push " + config["dockers"]["container"][dockername]["fullname"] + os.system(cmd) + Logger().cmd(cmd) + return config["dockers"]["container"][dockername]["name"] + + def run_docker(dockername, prompt="", dockerConfig = None, sudo = False, options = "" ): if not (dockerConfig is None): @@ -167,6 +194,7 @@ def get_docker_list(rootdir, dockerprefix, dockertag, nargs, verbose = False ): docker_list = {} if not (nargs is None) and len(nargs)>0: nargs = map(lambda x:x.lower(), nargs ) + fnames = os.listdir(rootdir) for fname in fnames: if nargs is None or len(nargs)==0 or fname.lower() in nargs: @@ -175,6 +203,7 @@ def get_docker_list(rootdir, dockerprefix, dockertag, nargs, verbose = False ): basename = os.path.basename(entry) dockername = dockerprefix + os.path.basename(entry)+":"+dockertag docker_list[dockername] = ( basename, entry ) + return docker_list system_docker_registry = None @@ -210,15 +239,20 @@ def config_dockers(rootdir, dockerprefix, dockertag, verbose, config): global system_docker_dic global infra_docker_registry global worker_docker_registry + if system_docker_registry is None: + infra_dockers = config["infrastructure-dockers"] if "infrastructure-dockers" in config else {} infra_docker_registry = config["infrastructure-dockerregistry"] if "infrastructure-dockerregistry" in config else config["dockerregistry"] worker_docker_registry = config["worker-dockerregistry"] if "worker-dockerregistry" in config else config["dockerregistry"] + system_docker_registry = config["dockers"]["hub"] system_docker_tag = config["dockers"]["tag"] system_docker_dic = config["dockers"]["system"] customize_docker_dic = config["dockers"]["customize"] + docker_list = get_docker_list(rootdir, dockerprefix, dockertag, None, verbose ) + # print("Customized_dic: %s" % customize_docker_dic) # Populate system dockers for assemblename, tuple in docker_list.iteritems(): @@ -241,14 +275,17 @@ def config_dockers(rootdir, dockerprefix, dockertag, verbose, config): dockerregistry = infra_docker_registry else: dockerregistry = worker_docker_registry + usedockername = dockername.lower() if "container" not in config["dockers"]: config["dockers"]["container"] = {} + config["dockers"]["container"][dockername] = { "dirname": os.path.join("./deploy/docker-images", dockername ), "fullname": dockerregistry + prefix + usedockername + ":" + tag, "name": prefix + usedockername + ":" + tag, } + # pxe-ubuntu and pxe-coreos is in template for dockername in config["dockers"]["infrastructure"]: config["dockers"]["container"][dockername] = { @@ -256,6 +293,7 @@ def config_dockers(rootdir, dockerprefix, dockertag, verbose, config): "fullname": infra_docker_registry + dockerprefix + dockername + ":" + dockertag, "name": dockerprefix + dockername + ":" + dockertag, } + # pxe-ubuntu and pxe-coreos is in template for dockername in config["dockers"]["external"]: usedockername = dockername.lower() @@ -289,9 +327,12 @@ def push_one_docker(dirname, dockerprefix, tag, basename, config, verbose = Fals build_docker_with_config( basename, config, verbose, nocache = nocache ) push_docker_with_config( basename, config, verbose, nocache = nocache ) + def push_dockers(rootdir, dockerprefix, dockertag, nargs, config, verbose = False, nocache = False ): + configuration(config, verbose) docker_list = get_docker_list(rootdir, dockerprefix, dockertag, nargs, verbose ); + for _, tuple in docker_list.iteritems(): dockername, _ = tuple build_docker_with_config( dockername, config, verbose, nocache = nocache ) @@ -299,8 +340,10 @@ def push_dockers(rootdir, dockerprefix, dockertag, nargs, config, verbose = Fals def copy_from_docker_image(image, srcFile, dstFile): + id = subprocess.check_output(['docker', 'create', image]) id = id.strip() + copyCmd = "docker cp --follow-link=true " + id + ":" + srcFile + " " + dstFile #print copyCmd os.system(copyCmd) diff --git a/src/utils/MyLogger.py b/src/utils/MyLogger.py old mode 100755 new mode 100644 index 2281584f..006a3c31 --- a/src/utils/MyLogger.py +++ b/src/utils/MyLogger.py @@ -1,7 +1,13 @@ +#!/usr/bin/python +# -*- coding: UTF-8 -*- + from config import global_vars import logging from logging.config import dictConfig +import os +from os import path + class MyLogger: def init(self): @@ -38,3 +44,81 @@ def debug(self,msg): if self.logger is not None: self.logger.debug(msg) + + +## 部署日志 +class DeployLogger: + + def __init__(self): + self.app_logger = None + self.cmd_logger = None + self.formatter = None + + return + + def init(self, cmd=""): + self.formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s', "%Y-%m-%d %H:%M:%S") + + ## 在当前目录下创建日志目录 + log_dir = "./logs/" + if path.exists(log_dir) is False: + os.mkdir(log_dir) + + cmd_dir = "./logs/cmd/" + if path.exists(cmd_dir) is False and not cmd: + os.mkdir(cmd_dir) + + self.app_logger = self.setup_logger("app", log_dir + "app.log") + self.cmd_logger = self.setup_logger("cmd", cmd_dir + cmd + ".log") + return + + def info(self,msg): + + if self.app_logger is not None: + self.app_logger.info(msg) + + return + + def error(self,msg): + + if self.app_logger is not None: + self.app_logger.error(msg) + + return + + def warn(self,msg): + + if self.app_logger is not None: + self.app_logger.warn(msg) + + return + + def debug(self,msg): + + if self.app_logger is not None: + self.app_logger.debug(msg) + + return + + def cmd(self, msg): + + if self.cmd_logger is not None: + self.cmd_logger.info(msg) + + return + + def setup_logger(self, name, log_file, level=logging.INFO): + handler = logging.FileHandler(log_file) + handler.setFormatter(self.formatter) + + logger = logging.getLogger(name) + logger.setLevel(level) + logger.addHandler(handler) + + return logger + + +deploy_logger = DeployLogger() +def get_deploy_logger(): + return deploy_logger + From 3fb3547ad3c8ee252336942c4a76779d2d74540b Mon Sep 17 00:00:00 2001 From: nautilusshell Date: Thu, 19 Dec 2019 01:40:24 +0000 Subject: [PATCH 2/5] =?UTF-8?q?1.=20=E8=B0=83=E6=95=B4=E6=A0=BC=E5=BC=8F?= =?UTF-8?q?=202.=20=E4=BF=AE=E6=94=B9=E6=97=A5=E5=BF=97=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ClusterBootstrap/deploy.py | 3 +++ src/utils/MyLogger.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) mode change 100644 => 100755 src/ClusterBootstrap/deploy.py diff --git a/src/ClusterBootstrap/deploy.py b/src/ClusterBootstrap/deploy.py old mode 100644 new mode 100755 index aebb6002..a06c9762 --- a/src/ClusterBootstrap/deploy.py +++ b/src/ClusterBootstrap/deploy.py @@ -3962,12 +3962,15 @@ def run_script_blocks( verbose, script_collection ): parser.add_argument('nargs', nargs=argparse.REMAINDER, help="Additional command argument", ) + args = parser.parse_args() command = args.command nargs = args.nargs + if args.verbose: verbose = True utils.verbose = True + if args.nodes is not None: limitnodes = args.nodes diff --git a/src/utils/MyLogger.py b/src/utils/MyLogger.py index 006a3c31..8517fb1d 100644 --- a/src/utils/MyLogger.py +++ b/src/utils/MyLogger.py @@ -65,7 +65,7 @@ def init(self, cmd=""): os.mkdir(log_dir) cmd_dir = "./logs/cmd/" - if path.exists(cmd_dir) is False and not cmd: + if path.exists(cmd_dir) is False and cmd: os.mkdir(cmd_dir) self.app_logger = self.setup_logger("app", log_dir + "app.log") From 25666121db37f3d489e40cb9ace0dd3e23c98cc1 Mon Sep 17 00:00:00 2001 From: nautilusshell Date: Thu, 19 Dec 2019 01:47:19 +0000 Subject: [PATCH 3/5] =?UTF-8?q?=E8=AE=A9git=E5=BF=BD=E7=95=A5=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E6=96=87=E4=BB=B6=E5=A4=B9logs/*?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 3f26a53f..d34bed69 100755 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,7 @@ src/ClusterBootstrap/.nfs* src/ClusterBootstrap/iso-creator/mkimg.sh src/ClusterBootstrap/deploy_backup/* +src/ClusterBootstrap/logs/* From e3316b259836fd308f0710cb087b1df5d7bd32f2 Mon Sep 17 00:00:00 2001 From: nautilusshell Date: Mon, 23 Dec 2019 11:52:52 +0000 Subject: [PATCH 4/5] =?UTF-8?q?1.=20=E5=A2=9E=E5=8A=A0=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ClusterBootstrap/deploy.py | 95 +++++++++++++++++++++++++++------- src/ClusterBootstrap/utils.py | 65 ++++++++++++++++++----- src/utils/MyLogger.py | 4 +- 3 files changed, 130 insertions(+), 34 deletions(-) diff --git a/src/ClusterBootstrap/deploy.py b/src/ClusterBootstrap/deploy.py index a06c9762..a5923c50 100755 --- a/src/ClusterBootstrap/deploy.py +++ b/src/ClusterBootstrap/deploy.py @@ -421,10 +421,13 @@ def add_additional_cloud_config(): # additional startup script to be added to report.sh translate_config_entry( config, ["coreos", "startupScripts"], "startupscripts", basestring ) + def init_deployment(): gen_new_key = True regenerate_key = False + if (os.path.exists("./deploy/clusterID.yml")): + clusterID = utils.get_cluster_ID_from_file() response = raw_input_with_default("There is a cluster (ID:%s) deployment in './deploy', do you want to keep the existing ssh key and CA certificates (y/n)?" % clusterID) if first_char(response) == "n": @@ -435,6 +438,7 @@ def init_deployment(): gen_new_key = False else: create_cluster_id() + if gen_new_key: utils.gen_SSH_key(regenerate_key) gen_CA_certificates() @@ -459,8 +463,13 @@ def init_deployment(): add_additional_cloud_config() add_kubelet_config() - os.system( "mkdir -p ./deploy/cloud-config/") - os.system( "mkdir -p ./deploy/iso-creator/") + cmd = "mkdir -p ./deploy/cloud-config/" + os.system(cmd) + Logger().cmd(cmd) + + cmd = "mkdir -p ./deploy/iso-creator/" + os.system(cmd) + Logger().cmd(cmd) template_file = "./template/cloud-config/cloud-config-master.yml" target_file = "./deploy/cloud-config/cloud-config-master.yml" @@ -489,20 +498,26 @@ def init_deployment(): with open("./deploy/ssl/kubelet/apiserver.pem", 'r') as f: content = f.read() + config["apiserver.pem"] = base64.b64encode(content) config["worker.pem"] = base64.b64encode(content) with open("./deploy/ssl/kubelet/apiserver-key.pem", 'r') as f: content = f.read() + config["apiserver-key.pem"] = base64.b64encode(content) config["worker-key.pem"] = base64.b64encode(content) add_additional_cloud_config() add_kubelet_config() + template_file = "./template/cloud-config/cloud-config-worker.yml" target_file = "./deploy/cloud-config/cloud-config-worker.yml" utils.render_template( template_file, target_file ,config) + return + + def check_node_availability(ipAddress): # print "Check node availability on: " + str(ipAddress) status = os.system('ssh -o "StrictHostKeyChecking no" -o "UserKnownHostsFile=/dev/null" -i %s -oBatchMode=yes %s@%s hostname > /dev/null' % (config["admin_username"], config["ssh_cert"], ipAddress)) @@ -685,7 +700,12 @@ def clean_deployment(): def gen_CA_certificates(): utils.render_template_directory("./template/ssl", "./deploy/ssl",config) - os.system("cd ./deploy/ssl && bash ./gencerts_ca.sh") + + cmd = "cd ./deploy/ssl && bash ./gencerts_ca.sh" + os.system(cmd) + Logger().cmd(cmd) + return + def GetCertificateProperty(): masterips = [] @@ -722,11 +742,19 @@ def GetCertificateProperty(): config["etcd_ssl_dns"] = "\n".join(["DNS."+str(i+5)+" = "+dns for i,dns in enumerate(etcddns)]) config["etcd_ssl_ip"] = "IP.1 = 127.0.0.1\n" + "\n".join(["IP."+str(i+2)+" = "+ip for i,ip in enumerate(etcdips)]) + return + def gen_worker_certificates(): utils.render_template_directory("./template/ssl", "./deploy/ssl",config) - os.system("cd ./deploy/ssl && bash ./gencerts_kubelet.sh") + + cmd = "cd ./deploy/ssl && bash ./gencerts_kubelet.sh" + os.system(cmd) + Logger().cmd(cmd) + + return + def gen_master_certificates(): @@ -1465,7 +1493,6 @@ def install_ssh_key(key_files): rootpasswdfile = "./deploy/sshkey/rootpasswd" rootuserfile = "./deploy/sshkey/rootuser" - with open(rootpasswdfile, "r") as f: rootpasswd = f.read().strip() f.close() @@ -1476,23 +1503,36 @@ def install_ssh_key(key_files): rootuser = f.read().strip() f.close() - for node in all_nodes: if len(key_files)>0: for key_file in key_files: print "Install key %s on %s" % (key_file, node) - os.system("""sshpass -f %s ssh-copy-id -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" -i %s %s@%s""" %(rootpasswdfile, key_file, rootuser, node)) + + cmd = """sshpass -f %s ssh-copy-id -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" -i %s %s@%s""" %(rootpasswdfile, key_file, rootuser, node) + os.system(cmd) + Logger().cmd(cmd) + else: print "Install key %s on %s" % ("./deploy/sshkey/id_rsa.pub", node) - os.system("""sshpass -f %s ssh-copy-id -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" -i ./deploy/sshkey/id_rsa.pub %s@%s""" %(rootpasswdfile, rootuser, node)) + cmd = """sshpass -f %s ssh-copy-id -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" -i ./deploy/sshkey/id_rsa.pub %s@%s""" %(rootpasswdfile, rootuser, node) + os.system(cmd) + Logger().cmd(cmd) if rootuser != config["admin_username"]: - for node in all_nodes: - os.system('sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "sudo useradd -p %s -d /home/%s -m -s /bin/bash %s"' % (rootpasswdfile,rootuser, node, rootpasswd,config["admin_username"],config["admin_username"])) - os.system('sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "sudo usermod -aG sudo %s"' % (rootpasswdfile,rootuser, node,config["admin_username"])) - os.system('sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "sudo mkdir -p /home/%s/.ssh"' % (rootpasswdfile,rootuser, node, config["admin_username"])) + for node in all_nodes: + cmd = 'sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "sudo useradd -p %s -d /home/%s -m -s /bin/bash %s"' % (rootpasswdfile,rootuser, node, rootpasswd,config["admin_username"],config["admin_username"]) + os.system(cmd) + Logger().cmd(cmd) + + cmd = 'sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "sudo usermod -aG sudo %s"' % (rootpasswdfile,rootuser, node,config["admin_username"]) + os.system(cmd) + Logger().cmd(cmd) + + cmd = 'sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "sudo mkdir -p /home/%s/.ssh"' % (rootpasswdfile,rootuser, node, config["admin_username"]) + os.system(cmd) + Logger().cmd(cmd) if len(key_files)>0: for key_file in key_files: @@ -1500,20 +1540,34 @@ def install_ssh_key(key_files): with open(key_file, "r") as f: publicKey = f.read().strip() f.close() - os.system('sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "echo %s | sudo tee /home/%s/.ssh/authorized_keys"' % (rootpasswdfile,rootuser, node,publicKey,config["admin_username"])) + + cmd = 'sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "echo %s | sudo tee /home/%s/.ssh/authorized_keys"' % (rootpasswdfile,rootuser, node,publicKey,config["admin_username"]) + os.system(cmd) + Logger().cmd(cmd) else: print "Install key %s on %s" % ("./deploy/sshkey/id_rsa.pub", node) with open("./deploy/sshkey/id_rsa.pub", "r") as f: publicKey = f.read().strip() f.close() - os.system('sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "echo %s | sudo tee /home/%s/.ssh/authorized_keys"' % (rootpasswdfile,rootuser, node,publicKey,config["admin_username"])) - os.system('sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "sudo chown %s:%s -R /home/%s"' % (rootpasswdfile,rootuser, node,config["admin_username"],config["admin_username"],config["admin_username"])) - os.system('sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "sudo chmod 400 /home/%s/.ssh/authorized_keys"' % (rootpasswdfile,rootuser, node,config["admin_username"])) - os.system("""sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "echo '%s ALL=(ALL) NOPASSWD: ALL' | sudo tee -a /etc/sudoers.d/%s " """ % (rootpasswdfile,rootuser, node,config["admin_username"],config["admin_username"])) + cmd = 'sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "echo %s | sudo tee /home/%s/.ssh/authorized_keys"' % (rootpasswdfile,rootuser, node,publicKey,config["admin_username"]) + os.system(cmd) + Logger().cmd(cmd) + cmd = 'sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "sudo chown %s:%s -R /home/%s"' % (rootpasswdfile,rootuser, node,config["admin_username"],config["admin_username"],config["admin_username"]) + os.system(cmd) + Logger().cmd(cmd) + cmd = 'sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "sudo chmod 400 /home/%s/.ssh/authorized_keys"' % (rootpasswdfile,rootuser, node,config["admin_username"]) + os.system(cmd) + Logger().cmd(cmd) + + cmd = """sshpass -f %s ssh -o "StrictHostKeyChecking=no" -o "UserKnownHostsFile=/dev/null" %s@%s "echo '%s ALL=(ALL) NOPASSWD: ALL' | sudo tee -a /etc/sudoers.d/%s " """ % (rootpasswdfile,rootuser, node,config["admin_username"],config["admin_username"]) + os.system(cmd) + Logger().cmd(cmd) + + return def pick_server( nodelists, curNode ): if curNode is None or not (curNode in nodelists): @@ -2723,11 +2777,11 @@ def deploy_ETCD_master(force = False): if first_char(response) == "y": cmd = config["homeinserver"]+"/SetClusterInfo?clusterId=%s&key=etcd_endpoints&value=%s" % (config["clusterId"],config["etcd_endpoints"]) urllib.urlretrieve (cmd) - write_log("curl " + cmd) + Logger().cmd("curl " + cmd) cmd = config["homeinserver"]+"/SetClusterInfo?clusterId=%s&key=api_server&value=%s" % (config["clusterId"],config["api_servers"]) urllib.urlretrieve (cmd) - write_log("curl " + cmd) + Logger().cmd("curl " + cmd) return True @@ -2769,7 +2823,7 @@ def run_kube( prog, commands ): print kube_command os.system(kube_command) - write_log(kube_command) + Logger().cmd(kube_command) return def run_kubectl( commands ): @@ -3272,6 +3326,7 @@ def run_command( args, command, nargs, parser ): elif command == "build": configuration( config, verbose ) + if len(nargs) <=0: init_deployment() # response = raw_input_with_default("Create ISO file for deployment (y/n)?") diff --git a/src/ClusterBootstrap/utils.py b/src/ClusterBootstrap/utils.py index 049a64c9..ca2c6c5f 100644 --- a/src/ClusterBootstrap/utils.py +++ b/src/ClusterBootstrap/utils.py @@ -457,23 +457,64 @@ def gen_SSH_key(regenerate_key): f.write("clusterId : %s" % clusterID) f.close() + + def execute_backup_and_encrypt(clusterName, fname, key): clusterID = get_cluster_ID_from_file() + backupdir = "./deploy_backup/backup" - os.system("mkdir -p %s/clusterID" % backupdir) - os.system("cp -r ./*.yaml %s" % backupdir) - os.system("cp -r ./deploy/sshkey %s/sshkey" % backupdir) - os.system("cp -r ./deploy/etc %s/etc" % backupdir) - os.system("cp -r ./deploy/ssl %s/ssl" % backupdir) - os.system("cp -r ./deploy/clusterID.yml %s/clusterID/" % backupdir) + + cmd = "mkdir -p %s/clusterID" % backupdir + os.system(cmd) + Logger().cmd(cmd) + + cmd = "cp -r ./*.yaml %s" % backupdir + os.system(cmd) + Logger().cmd(cmd) + + cmd = "cp -r ./deploy/sshkey %s/sshkey" % backupdir + os.system(cmd) + Logger().cmd(cmd) + + cmd = "cp -r ./deploy/etc %s/etc" % backupdir + os.system(cmd) + Logger().cmd(cmd) + + cmd = "cp -r ./deploy/ssl %s/ssl" % backupdir + os.system(cmd) + Logger().cmd(cmd) + + cmd = "cp -r ./deploy/clusterID.yml %s/clusterID/" % backupdir + os.system(cmd) + Logger().cmd(cmd) + + if os.path.exists("./deploy/acs_kubeclusterconfig"): - os.system("cp -r ./deploy/acs_kubeclusterconfig %s/" % backupdir) - os.system("tar -czvf %s.tar.gz %s" % (fname, backupdir)) + cmd = "cp -r ./deploy/acs_kubeclusterconfig %s/" % backupdir + os.system(cmd) + Logger().cmd(cmd) + + cmd = "tar -czvf %s.tar.gz %s" % (fname, backupdir) + os.system(cmd) + Logger().cmd(cmd) + if not key is None: - os.system("openssl enc -aes-256-cbc -k %s -in %s.tar.gz -out %s.tar.gz.enc" % (key, fname, fname) ) - os.system("rm %s.tar.gz" % fname ) - os.system("rm -rf ./deploy_backup/backup") - + cmd = "openssl enc -aes-256-cbc -k %s -in %s.tar.gz -out %s.tar.gz.enc" % (key, fname, fname) + os.system(cmd) + Logger().cmd(cmd) + + cmd = "rm %s.tar.gz" % fname + os.system(cmd) + Logger().cmd(cmd) + + + cmd = "rm -rf ./deploy_backup/backup" + os.system(cmd) + Logger().cmd(cmd) + + return + + def execute_restore_and_decrypt(fname, key): clusterID = get_cluster_ID_from_file() backupdir = "./deploy_backup/backup" diff --git a/src/utils/MyLogger.py b/src/utils/MyLogger.py index 8517fb1d..f25ae4ee 100644 --- a/src/utils/MyLogger.py +++ b/src/utils/MyLogger.py @@ -57,7 +57,7 @@ def __init__(self): return def init(self, cmd=""): - self.formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s', "%Y-%m-%d %H:%M:%S") + self.formatter = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s', "%Y-%m-%d %H:%M:%S") ## 在当前目录下创建日志目录 log_dir = "./logs/" @@ -103,7 +103,7 @@ def debug(self,msg): def cmd(self, msg): if self.cmd_logger is not None: - self.cmd_logger.info(msg) + self.cmd_logger.info(msg + "\n") return From 262a88ca559f62d6e8633b1596481515d32f7907 Mon Sep 17 00:00:00 2001 From: nautilusshell Date: Wed, 25 Dec 2019 09:53:34 +0000 Subject: [PATCH 5/5] =?UTF-8?q?1.=20=E5=A2=9E=E5=8A=A0=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E6=97=A5=E5=BF=97=EF=BC=8C=E6=96=B9=E4=BE=BF=E9=97=AE=E9=A2=98?= =?UTF-8?q?=E5=AE=9A=E4=BD=8D=202.=20=E4=BF=AE=E6=94=B9Pod=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E9=80=BB=E8=BE=91=EF=BC=8C=E5=90=AF=E5=8A=A8Pod?= =?UTF-8?q?=E5=90=8E=E5=8F=AA=E6=89=A7=E8=A1=8C/bin/sh=20sleep=E6=93=8D?= =?UTF-8?q?=E4=BD=9C=EF=BC=8C=E7=84=B6=E5=90=8E=E5=86=8D=E6=8A=8A=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E9=85=8D=E7=BD=AE=E7=9A=84command=E8=84=9A=E6=9C=AC?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E6=8B=B7=E8=B4=9D=E8=BF=9BPOD=E5=BD=93?= =?UTF-8?q?=E4=B8=AD=E5=B9=B6=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ClusterManager/cluster_manager.py | 3 + src/ClusterManager/job_manager.py | 248 ++++++++++++++++++++------ src/utils/k8sUtils.py | 20 ++- 3 files changed, 211 insertions(+), 60 deletions(-) mode change 100755 => 100644 src/ClusterManager/cluster_manager.py mode change 100755 => 100644 src/ClusterManager/job_manager.py mode change 100755 => 100644 src/utils/k8sUtils.py diff --git a/src/ClusterManager/cluster_manager.py b/src/ClusterManager/cluster_manager.py old mode 100755 new mode 100644 index 80a54056..cb037863 --- a/src/ClusterManager/cluster_manager.py +++ b/src/ClusterManager/cluster_manager.py @@ -1,3 +1,6 @@ +#!/usr/bin/python +# -*- coding: UTF-8 -*- + import json import os import time diff --git a/src/ClusterManager/job_manager.py b/src/ClusterManager/job_manager.py old mode 100755 new mode 100644 index 2d27fa53..cac8c16b --- a/src/ClusterManager/job_manager.py +++ b/src/ClusterManager/job_manager.py @@ -1,3 +1,6 @@ +#!/usr/bin/python +# -*- coding: UTF-8 -*- + import json import os import time @@ -38,9 +41,10 @@ nvidiaDriverPath = config["nvidiaDriverPath"] if "nvidiaDriverPath" in config else "/usr/local/cuda/lib64" - def printlog(msg): - print "%s - %s" % (datetime.datetime.utcnow().strftime("%x %X"),msg) + log_data = "%s - %s" % (datetime.datetime.utcnow().strftime("%x %X"), msg) + logging.info(log_data) + return def LoadJobParams(jobParamsJsonStr): return json.loads(jobParamsJsonStr) @@ -48,33 +52,40 @@ def LoadJobParams(jobParamsJsonStr): def cmd_exec(cmdStr): try: output = subprocess.check_output(["bash","-c", cmdStr]) + logging.info("bash -c " + cmdStr) + logging.info("output: " + output) + except Exception as e: print e output = "" - return output - - - + logging.info(str(e)) + return output def SubmitJob(job): jobParams = json.loads(base64.b64decode(job["jobParams"])) + if jobParams["jobtrainingtype"] == "RegularJob": SubmitRegularJob(job) + elif jobParams["jobtrainingtype"] == "PSDistJob": SubmitPSDistJob(job) + return + def CheckMountPoints(mplist, mp): ret = True for item in mplist: if item["name"] == mp["name"] or item["containerPath"] == mp["containerPath"] or item["hostPath"] == mp["hostPath"]: ret = False + return ret def SubmitRegularJob(job): ret = {} dataHandler = DataHandler() + logging.info("start to submit regular job...") try: jobParams = json.loads(base64.b64decode(job["jobParams"])) @@ -86,20 +97,21 @@ def SubmitRegularJob(job): if "jobPath" not in jobParams or len(jobParams["jobPath"].strip()) == 0: dataHandler.SetJobError(jobParams["jobId"],"ERROR: job-path does not exist") + msg = "ERROR: job-path does not exist. jobid: %s" % (jobParams["jobId"]) + logging.error(msg) return False if "workPath" not in jobParams or len(jobParams["workPath"].strip()) == 0: dataHandler.SetJobError(jobParams["jobId"],"ERROR: work-path does not exist") + + msg = "ERROR: work-path does not exist. jobid: %s" % (jobParams["jobId"]) + logging.error(msg) return False #if "dataPath" not in jobParams or len(jobParams["dataPath"].strip()) == 0: # dataHandler.SetJobError(jobParams["jobId"],"ERROR: data-path does not exist") # return False - - - jobPath,workPath,dataPath = GetStoragePath(jobParams["jobPath"],jobParams["workPath"],jobParams["dataPath"]) - - + jobPath, workPath, dataPath = GetStoragePath(jobParams["jobPath"],jobParams["workPath"],jobParams["dataPath"]) localJobPath = os.path.join(config["storage-mount-path"],jobPath) if not os.path.exists(localJobPath): @@ -116,19 +128,28 @@ def SubmitRegularJob(job): if isinstance(jobParams["cmd"], basestring) and not jobParams["cmd"] == "": launchScriptPath = os.path.join(localJobPath,"launch-%s.sh" % jobParams["jobId"]) + with open(launchScriptPath, 'w') as f: f.write("#!/bin/bash -x\n") f.write(jobParams["cmd"] + "\n") + + msg = "write cmd(%s) to file: %s" % (jobParams["cmd"], launchScriptPath) + logging.info(msg) + f.close() if "userId" in jobParams: - os.system("chown -R %s %s" % (jobParams["userId"], launchScriptPath)) - jobParams["LaunchCMD"] = "[\"bash\", \"/job/launch-%s.sh\"]" % jobParams["jobId"] + cmd = "chown -R %s %s" % (jobParams["userId"], launchScriptPath) + os.system(cmd) + logging.info(cmd) + # todo: Pod启动后会执行shell脚本,需预先将shell脚本拷贝到Pod所在的节点机器的目录: + # 譬如:/dlwsdata/work/user-nanme/jobs/191225/6f81459e-42ea-447e-9380-f545da2517e9/ + # Pod启动后,会将此目录挂载至/job/ + # jobParams["LaunchCMD"] = "[\"bash\", \"/job/launch-%s.sh\"]" % jobParams["jobId"] + jobParams["LaunchCMD"] = "[\"/bin/sh\", \"-ec\", \"sleep 6000315360000\"]" jobParams["jobDescriptionPath"] = "jobfiles/" + time.strftime("%y%m%d") + "/" + jobParams["jobId"] + "/" + jobParams["jobId"] + ".yaml" - jobParams["jobNameLabel"] = ''.join(e for e in jobParams["jobName"] if e.isalnum()) - ENV = Environment(loader=FileSystemLoader("/")) jobTempDir = os.path.join(config["root-path"],"Jobs_Templete") @@ -139,12 +160,12 @@ def SubmitRegularJob(job): jobParams["hostdataPath"] = os.path.join(config["storage-mount-path"], dataPath) jobParams["nvidiaDriverPath"] = nvidiaDriverPath - jobParams["userNameLabel"] = getAlias(jobParams["userName"]) jobParams["rest-api"] = config["rest-api"] if "mountpoints" not in jobParams: jobParams["mountpoints"] = [] + for onemount in jobParams["mountpoints"]: onemount["name"] = onemount["containerPath"].replace("/","").replace(".","").replace("_","-") @@ -170,22 +191,25 @@ def SubmitRegularJob(job): if CheckMountPoints(jobParams["mountpoints"],mp): jobParams["mountpoints"].append(mp) - jobParams["pod_ip_range"] = config["pod_ip_range"] if "usefreeflow" in config: jobParams["usefreeflow"] = config["usefreeflow"] else: jobParams["usefreeflow"] = False - print ("Render Job: %s" % jobParams) - jobDescriptionList = [] + msg = ("Render Job: %s" % jobParams) + print (msg) + logging.info(msg) + jobDescriptionList = [] pods = [] + if "hyperparametername" in jobParams and "hyperparameterstartvalue" in jobParams and "hyperparameterendvalue" in jobParams and "hyperparameterstep" in jobParams: i = int(jobParams["hyperparameterstartvalue"]) end = int(jobParams["hyperparameterendvalue"]) step = int(jobParams["hyperparameterstep"]) c = 0 + while (i <= end): pod = {} pod["podName"] = jobParams["jobId"]+"-pod-"+str(c) @@ -201,9 +225,8 @@ def SubmitRegularJob(job): if "env" not in jobParams: jobParams["env"] = [] - jobParams["commonenv"] = copy.copy(jobParams["env"]) - + jobParams["commonenv"] = copy.copy(jobParams["env"]) for pod in pods: jobParams["podName"] = pod["podName"] jobParams["env"] = jobParams["commonenv"] + pod["envs"] @@ -255,31 +278,62 @@ def SubmitRegularJob(job): jobParams["port-type"] = "TCP" serviceTemplate = ENV.get_template(os.path.join(jobTempDir,"KubeSvc.yaml.template")) - stemplate = ENV.get_template(serviceTemplate) interactiveMeta = stemplate.render(svc=jobParams) jobDescriptionList.append(interactiveMeta) - jobDescription = "\n---\n".join(jobDescriptionList) - jobDescriptionPath = os.path.join(config["storage-mount-path"], jobParams["jobDescriptionPath"]) + if not os.path.exists(os.path.dirname(os.path.realpath(jobDescriptionPath))): os.makedirs(os.path.dirname(os.path.realpath(jobDescriptionPath))) + if os.path.isfile(jobDescriptionPath): output = k8sUtils.kubectl_delete(jobDescriptionPath) + logging.info("kubectl delete " + jobDescriptionPath + " output: " + str(output)) with open(jobDescriptionPath, 'w') as f: f.write(jobDescription) output = k8sUtils.kubectl_create(jobDescriptionPath) - logging.info("Submitted job %s to k8s, returned with status %s" %(job["jobId"], output)) + logging.info("kubectl create " + jobDescriptionPath + " output: " + str(output)) + + msg = "Submitted job %s to k8s, returned with status %s" %(jobParams["jobId"], output) + logging.info(msg) + + msg = "JobParams: \n" + json.dumps(jobParams) + logging.info(msg) + + + ## 启动命令非空 + if isinstance(jobParams["cmd"], basestring) and not jobParams["cmd"] == "": + ## 等待docker启动完毕,再执行文件拷贝指令 + time.sleep(15) + launch_file_name = "launch-%s.sh" % jobParams["jobId"] + + # 将文件拷贝进podName:/tmp/ + # /job/目录需要root权限才能操作,因此此处无法直接拷贝进/job/ + remotecmd = "cp %s %s:%s" % (launchScriptPath, jobParams["podName"], "/tmp/") + output = k8sUtils.kubectl_exec(remotecmd) + logging.info("remotecmd[" + remotecmd + "]" + " output[" + str(output) + "]") + + # 添加执行权限:/tmp/lunach_jobid.sh + remotecmd = "exec %s -- bash -c \"chmod 777 /tmp/%s\"" % (jobParams["jobId"], launch_file_name) + output = k8sUtils.kubectl_exec(remotecmd) + logging.info("remotecmd[" + remotecmd + "]" + " output[" + str(output) + "]") + + # 执行/tmp/lunach_jobid.sh + remotecmd = "exec %s -- bash -c \"/tmp/%s\"" % (jobParams["jobId"], launch_file_name) + output = k8sUtils.kubectl_exec(remotecmd) + logging.info("remotecmd[" + remotecmd + "]" + " output[" + str(output) + "]") + + else: + pass + ret["output"] = output - ret["jobId"] = jobParams["jobId"] - if "userName" not in jobParams: jobParams["userName"] = "" @@ -287,7 +341,6 @@ def SubmitRegularJob(job): dataHandler.UpdateJobTextField(jobParams["jobId"],"jobDescriptionPath",jobParams["jobDescriptionPath"]) dataHandler.UpdateJobTextField(jobParams["jobId"],"jobDescription",base64.b64encode(jobDescription)) - jobMeta = {} jobMeta["jobDescriptionPath"] = jobParams["jobDescriptionPath"] jobMeta["jobPath"] = jobParams["jobPath"] @@ -296,11 +349,16 @@ def SubmitRegularJob(job): jobMeta["LaunchCMD"] = jobParams["LaunchCMD"] jobMetaStr = base64.b64encode(json.dumps(jobMeta)) - dataHandler.UpdateJobTextField(jobParams["jobId"],"jobMeta",jobMetaStr) + dataHandler.UpdateJobTextField(jobParams["jobId"],"jobMeta", jobMetaStr) + + msg = "update job text field %s, returned with status" % (jobParams["jobId"]) + logging.info(msg) + except Exception as e: print e ret["error"] = str(e) retries = dataHandler.AddandGetJobRetries(jobParams["jobId"]) + if retries >= 5: dataHandler.UpdateJobTextField(jobParams["jobId"],"jobStatus","error") dataHandler.UpdateJobTextField(jobParams["jobId"],"errorMsg","Cannot submit job!" + str(e)) @@ -312,6 +370,7 @@ def SubmitRegularJob(job): def SubmitPSDistJob(job): ret = {} dataHandler = DataHandler() + logging.info("start to submit regular job...") try: jobParams = json.loads(base64.b64decode(job["jobParams"])) @@ -320,11 +379,14 @@ def SubmitPSDistJob(job): distJobParams["ps"] = [] distJobParams["worker"] = [] assignedRack = None + if len(config["racks"]) > 0: assignedRack = random.choice(config["racks"]) + if jobParams["jobtrainingtype"] == "PSDistJob": jobDescriptionList = [] nums = {"ps":int(jobParams["numps"]),"worker":int(jobParams["numpsworker"])} + for role in ["ps","worker"]: for i in range(nums[role]): distJobParam=copy.deepcopy(jobParams) @@ -398,6 +460,7 @@ def SubmitPSDistJob(job): launchScriptPath = os.path.join(localJobPath,"launch-%s.sh" % distJobParam["jobId"]) with open(launchScriptPath, 'w') as f: f.write(launchCMD) + f.close() distJobParam["LaunchCMD"] = "[\"bash\", \"/job/launch-%s.sh\"]" % distJobParam["jobId"] @@ -451,16 +514,18 @@ def SubmitPSDistJob(job): jobDescriptionPath = os.path.join(config["storage-mount-path"], jobParams["jobDescriptionPath"]) if not os.path.exists(os.path.dirname(os.path.realpath(jobDescriptionPath))): os.makedirs(os.path.dirname(os.path.realpath(jobDescriptionPath))) + if os.path.isfile(jobDescriptionPath): output = k8sUtils.kubectl_delete(jobDescriptionPath) + logging.info("kubectl delete " + jobDescriptionPath + " output: " + str(output)) with open(jobDescriptionPath, 'w') as f: f.write(jobDescription) output = k8sUtils.kubectl_create(jobDescriptionPath) + logging.info("kubectl create " + jobDescriptionPath + " output: " + str(output)) ret["output"] = output - ret["jobId"] = jobParams["jobId"] @@ -482,6 +547,7 @@ def SubmitPSDistJob(job): jobMetaStr = base64.b64encode(json.dumps(jobMeta)) dataHandler.UpdateJobTextField(jobParams["jobId"],"jobMeta",jobMetaStr) + except Exception as e: print e ret["error"] = str(e) @@ -496,15 +562,25 @@ def KillJob(job): dataHandler = DataHandler() result, detail = k8sUtils.GetJobStatus(job["jobId"]) dataHandler.UpdateJobTextField(job["jobId"],"jobStatusDetail",base64.b64encode(json.dumps(detail))) - logging.info("Killing job %s, with status %s, %s" %(job["jobId"], result,detail)) + + msg = "Killing job %s, with status %s, %s" %(job["jobId"], result,detail) + logging.info(msg) + if "jobDescriptionPath" in job and job["jobDescriptionPath"] is not None: jobDescriptionPath = os.path.join(config["storage-mount-path"], job["jobDescriptionPath"]) + if os.path.isfile(jobDescriptionPath): - if k8sUtils.kubectl_delete(jobDescriptionPath) == 0: + + code = k8sUtils.kubectl_delete(jobDescriptionPath) + if code == 0: + logging.info("kubectl delete " + jobDescriptionPath + " succ. output: 0") dataHandler.UpdateJobTextField(job["jobId"],"jobStatus","killed") return True + else: dataHandler.UpdateJobTextField(job["jobId"],"errorMsg","Cannot delete job from Kubernetes Cluster!") + logging.info("kubectl delete " + jobDescriptionPath + " failed. output: " + str(code)) + else: dataHandler.UpdateJobTextField(job["jobId"],"errorMsg","Cannot find job description file!") @@ -523,20 +599,24 @@ def getAlias(username): def ApproveJob(job): + logging.info("start to Approve job...") + dataHandler = DataHandler() dataHandler.ApproveJob(job["jobId"]) dataHandler.Close() return True - - def AutoApproveJob(job): + cluster_status = get_cluster_status() jobUser = getAlias(job["userName"]) jobParams = json.loads(base64.b64decode(job["jobParams"])) jobGPU = int(jobParams["resourcegpu"]) + logging.info("start to autoApprove job...") currentGPU = 0 + logging.info("currentGPU: " + str(currentGPU) + " jobGPU: " + str(jobGPU)) + for user in cluster_status["user_status"]: if user["userName"] == jobUser: currentGPU = int(user["userGPU"]) @@ -550,32 +630,36 @@ def AutoApproveJob(job): UnusualJobs = {} def UpdateJobStatus(job): + dataHandler = DataHandler() jobParams = json.loads(base64.b64decode(job["jobParams"])) - + logging.info("start to update job status...") if job["jobStatus"] == "scheduling" and jobParams["jobtrainingtype"] == "PSDistJob": launch_ps_dist_job(jobParams) - jobPath,workPath,dataPath = GetStoragePath(jobParams["jobPath"],jobParams["workPath"],jobParams["dataPath"]) localJobPath = os.path.join(config["storage-mount-path"],jobPath) - logPath = os.path.join(localJobPath,"logs/joblog.txt") - + logPath = os.path.join(localJobPath,"logs/joblog.txt") result, detail = k8sUtils.GetJobStatus(job["jobId"]) dataHandler.UpdateJobTextField(job["jobId"],"jobStatusDetail",base64.b64encode(json.dumps(detail))) - logging.info("job %s status: %s,%s" % (job["jobId"], result, json.dumps(detail))) + msg = "job %s status, result: %s, detail: %s" % (job["jobId"], result, json.dumps(detail)) + logging.info(msg) jobDescriptionPath = os.path.join(config["storage-mount-path"], job["jobDescriptionPath"]) if "jobDescriptionPath" in job else None + if "userId" not in jobParams: jobParams["userId"] = "0" + if result.strip() == "Succeeded": joblog_manager.extract_job_log(job["jobId"],logPath,jobParams["userId"]) dataHandler.UpdateJobTextField(job["jobId"],"jobStatus","finished") + if jobDescriptionPath is not None and os.path.isfile(jobDescriptionPath): - k8sUtils.kubectl_delete(jobDescriptionPath) + output = k8sUtils.kubectl_delete(jobDescriptionPath) + logging.info("kubectl delete " + jobDescriptionPath + " output: " + str(output)) elif result.strip() == "Running": if job["jobStatus"] != "running": @@ -591,24 +675,32 @@ def UpdateJobStatus(job): joblog_manager.extract_job_log(job["jobId"],logPath,jobParams["userId"]) dataHandler.UpdateJobTextField(job["jobId"],"jobStatus","failed") dataHandler.UpdateJobTextField(job["jobId"],"errorMsg",detail) + if jobDescriptionPath is not None and os.path.isfile(jobDescriptionPath): - k8sUtils.kubectl_delete(jobDescriptionPath) + output = k8sUtils.kubectl_delete(jobDescriptionPath) + logging.info("kubectl delete " + jobDescriptionPath + " output: " + str(output)) elif result.strip() == "Unknown": if job["jobId"] not in UnusualJobs: UnusualJobs[job["jobId"]] = datetime.datetime.now() + elif (datetime.datetime.now() - UnusualJobs[job["jobId"]]).seconds > 300: del UnusualJobs[job["jobId"]] + retries = dataHandler.AddandGetJobRetries(job["jobId"]) if retries >= 5: printlog("Job %s fails for more than 5 times, abort" % job["jobId"]) dataHandler.UpdateJobTextField(job["jobId"],"jobStatus","error") dataHandler.UpdateJobTextField(job["jobId"],"errorMsg","cannot launch the job.") + if jobDescriptionPath is not None and os.path.isfile(jobDescriptionPath): - k8sUtils.kubectl_delete(jobDescriptionPath) + output = k8sUtils.kubectl_delete(jobDescriptionPath) + logging.info("kubectl delete " + jobDescriptionPath + " output: " + str(output)) + else: printlog("Job %s fails in Kubernetes, delete and re-submit the job. Retries %d" % (job["jobId"] , retries)) SubmitJob(job) + elif result.strip() == "PendingHostPort": printlog("Cannot find host ports for job :%s, re-launch the job with different host ports " % (job["jobId"])) @@ -632,14 +724,15 @@ def UpdateDistJobStatus(job): result, detail = k8sUtils.GetJobStatus(job["jobId"]) dataHandler.UpdateJobTextField(job["jobId"],"jobStatusDetail",base64.b64encode(detail)) - logging.info("job %s status: %s,%s" % (job["jobId"], result, json.dumps(detail))) - + msg = "job %s status. result: %s, detail: %s" % (job["jobId"], result, json.dumps(detail)) + logging.info(msg) jobDescriptionPath = os.path.join(config["storage-mount-path"], job["jobDescriptionPath"]) if "jobDescriptionPath" in job else None jobId = jobParams["jobId"] workerPodInfo = k8sUtils.GetPod("distRole=worker,run=" + jobId) psPodInfo = k8sUtils.GetPod("distRole=ps,run=" + jobId) + if "items" in workerPodInfo and len(workerPodInfo["items"]) == int(jobParams["numpsworker"]) and "items" in psPodInfo and len(psPodInfo["items"]) == int(jobParams["numps"]): if job["jobStatus"] == "scheduling" : launch_ps_dist_job(jobParams) @@ -654,13 +747,17 @@ def UpdateDistJobStatus(job): if result.strip() == "Succeeded": joblog_manager.extract_job_log(job["jobId"],logPath,jobParams["userId"]) dataHandler.UpdateJobTextField(job["jobId"],"jobStatus","finished") + if jobDescriptionPath is not None and os.path.isfile(jobDescriptionPath): - k8sUtils.kubectl_delete(jobDescriptionPath) + output = k8sUtils.kubectl_delete(jobDescriptionPath) + logging.info("kubectl delete " + jobDescriptionPath + " output: " + str(output)) elif result.strip() == "Running": joblog_manager.extract_job_log(job["jobId"],logPath,jobParams["userId"]) + if job["jobStatus"] != "running": dataHandler.UpdateJobTextField(job["jobId"],"jobStatus","running") + if "interactivePort" in jobParams: serviceAddress = k8sUtils.GetServiceAddress(job["jobId"]) serviceAddress = base64.b64encode(json.dumps(serviceAddress)) @@ -671,8 +768,10 @@ def UpdateDistJobStatus(job): joblog_manager.extract_job_log(job["jobId"],logPath,jobParams["userId"]) dataHandler.UpdateJobTextField(job["jobId"],"jobStatus","failed") dataHandler.UpdateJobTextField(job["jobId"],"errorMsg",detail) + if jobDescriptionPath is not None and os.path.isfile(jobDescriptionPath): - k8sUtils.kubectl_delete(jobDescriptionPath) + output = k8sUtils.kubectl_delete(jobDescriptionPath) + logging.info("kubectl delete " + jobDescriptionPath + " output: " + str(output)) elif result.strip() == "Unknown": if job["jobId"] not in UnusualJobs: @@ -684,8 +783,11 @@ def UpdateDistJobStatus(job): printlog("Job %s fails for more than 5 times, abort" % job["jobId"]) dataHandler.UpdateJobTextField(job["jobId"],"jobStatus","error") dataHandler.UpdateJobTextField(job["jobId"],"errorMsg","cannot launch the job.") + if jobDescriptionPath is not None and os.path.isfile(jobDescriptionPath): - k8sUtils.kubectl_delete(jobDescriptionPath) + output = k8sUtils.kubectl_delete(jobDescriptionPath) + logging.info("kubectl delete " + jobDescriptionPath + " output: " + str(output)) + else: printlog("Job %s fails in Kubernetes, delete and re-submit the job. Retries %d" % (job["jobId"] , retries)) SubmitJob(job) @@ -696,13 +798,13 @@ def UpdateDistJobStatus(job): pass - - def run_dist_cmd_on_pod(podId, cmd, outputfile): remotecmd = "exec %s -- %s" % (podId,cmd) print remotecmd - k8sUtils.kubectl_exec_output_to_file(remotecmd,outputfile) + k8sUtils.kubectl_exec_output_to_file(remotecmd,outputfile) + logging.info("kubectl exec " + remotecmd) + return class Kube_RemoteCMD_Thread(threading.Thread): @@ -720,8 +822,10 @@ def launch_ps_dist_job(jobParams): jobId = jobParams["jobId"] workerPodInfo = k8sUtils.GetPod("distRole=worker,run=" + jobId) psPodInfo = k8sUtils.GetPod("distRole=ps,run=" + jobId) + if "items" in workerPodInfo and len(workerPodInfo["items"]) == int(jobParams["numpsworker"]) and "items" in psPodInfo and len(psPodInfo["items"]) == int(jobParams["numps"]): podStatus = [k8sUtils.check_pod_status(pod) for pod in workerPodInfo["items"] + psPodInfo["items"] ] + if all([status == "Running" for status in podStatus]): ps_pod_names = [pod["metadata"]["name"] for pod in psPodInfo["items"]] worker_pod_names = [pod["metadata"]["name"] for pod in workerPodInfo["items"]] @@ -754,27 +858,43 @@ def launch_ps_dist_job(jobParams): for i in range(ps_num): os.system("mkdir -p %s" % ps_files[i]) ps_files[i] = os.path.join(ps_files[i],"run_dist_job.sh") + with open(ps_files[i], 'w') as f: f.write(ps_cmd[i] + "\n") - f.close() + + f.close() + if "userId" in jobParams: os.system("chown -R %s %s" % (jobParams["userId"], ps_files[i])) + remotecmd = "cp %s %s:/opt/run_dist_job.sh" % (ps_files[i],ps_pod_names[i]) - k8sUtils.kubectl_exec(remotecmd) - k8sUtils.kubectl_exec("exec %s touch /opt/run_dist_job" % ps_pod_names[i]) + output = k8sUtils.kubectl_exec(remotecmd) + logging.info("kubectl exec: " + remotecmd + " output: " + str(output)) + + remotecmd = "exec %s touch /opt/run_dist_job" % ps_pod_names[i] + output = k8sUtils.kubectl_exec(remotecmd) + logging.info("kubectl exec: " + remotecmd + " output: " + str(output)) for i in range(worker_num): os.system("mkdir -p %s" % worker_files[i]) worker_files[i] = os.path.join(worker_files[i],"run_dist_job.sh") + with open(worker_files[i], 'w') as f: f.write(worker_cmd[i] + "\n") + f.close() if "userId" in jobParams: os.system("chown -R %s %s" % (jobParams["userId"], worker_files[i])) + remotecmd = "cp %s %s:/opt/run_dist_job.sh" % (worker_files[i],worker_pod_names[i]) - k8sUtils.kubectl_exec(remotecmd) - k8sUtils.kubectl_exec("exec %s touch /opt/run_dist_job" % worker_pod_names[i]) + output = k8sUtils.kubectl_exec(remotecmd) + logging.info("kubectl exec: " + remotecmd + " output: " + str(output)) + + remotecmd = "exec %s touch /opt/run_dist_job" % worker_pod_names[i] + output = k8sUtils.kubectl_exec(remotecmd) + logging.info("kubectl exec: " + remotecmd + " output: " + str(output)) + dataHandler = DataHandler() dataHandler.UpdateJobTextField(jobParams["jobId"],"jobStatus","running") @@ -802,8 +922,10 @@ def launch_ps_dist_job(jobParams): def create_log( logdir = '/var/log/dlworkspace' ): + if not os.path.exists( logdir ): os.system("mkdir -p " + logdir ) + with open('logging.yaml') as f: logging_config = yaml.load(f) f.close() @@ -812,32 +934,44 @@ def create_log( logdir = '/var/log/dlworkspace' ): def Run(): + create_log() + logging.info("start to process jobs ...") while True: try: config["racks"] = k8sUtils.get_node_labels("rack") config["skus"] = k8sUtils.get_node_labels("sku") + except Exception as e: print e try: dataHandler = DataHandler() pendingJobs = dataHandler.GetPendingJobs() - printlog("updating status for %d jobs" % len(pendingJobs)) + #printlog("updating status for %d jobs" % len(pendingJobs)) + for job in pendingJobs: try: - print "Processing job: %s, status: %s" % (job["jobId"], job["jobStatus"]) + logging.info("to process one pendingJob.") + msg = "Processing job: %s, status: %s" % (str(job["jobId"]), str(job["jobStatus"])) + logging.info(msg) + if job["jobStatus"] == "queued": SubmitJob(job) + elif job["jobStatus"] == "killing": KillJob(job) + elif job["jobStatus"] == "scheduling" or job["jobStatus"] == "running" : UpdateJobStatus(job) + elif job["jobStatus"] == "unapproved" : AutoApproveJob(job) + except Exception as e: print e + except Exception as e: print e diff --git a/src/utils/k8sUtils.py b/src/utils/k8sUtils.py old mode 100755 new mode 100644 index eb996729..4f38e164 --- a/src/utils/k8sUtils.py +++ b/src/utils/k8sUtils.py @@ -27,6 +27,7 @@ import random import pycurl from StringIO import StringIO +import logging def curl_get(url): @@ -47,44 +48,56 @@ def curl_get(url): curl.close() return responseStr + def kubectl_create(jobfile,EXEC=True): if EXEC: try: output = subprocess.check_output(["bash","-c", config["kubelet-path"] + " create -f " + jobfile]) + except Exception as e: print e output = "" + logging.error(str(e)) + else: output = "Job " + jobfile + " is not submitted to kubernetes cluster" + return output + def kubectl_delete(jobfile,EXEC=True): if EXEC: try: cmd = "bash -c '" + config["kubelet-path"] + " delete -f " + jobfile + "'" - print cmd output = os.system(cmd) + except Exception as e: print e output = -1 + logging.error(str(e)) else: output = -1 + return output + def kubectl_exec(params): try: #print ("bash -c %s %s" % (config["kubelet-path"], params)) output = subprocess.check_output(["bash","-c", config["kubelet-path"] + " " + params]) + except Exception as e: print "EXCEPTION: " + str(e) output = "" + logging.error(str(e)) + return output def kubectl_exec_output_to_file(params,file): os.system("%s %s 2>&1 | tee %s" % (config["kubelet-path"], params, file)) - - + cmd = "%s %s 2>&1 | tee %s" % (config["kubelet-path"], params, file) + logging.info(cmd) def Split(text,spliter): @@ -365,6 +378,7 @@ def get_node_labels(key): if not v in ret: ret.append(v) return ret + if __name__ == '__main__': #Run()