From cae955b2f6b09da3d0bf59343c0d052ea2cd4aa2 Mon Sep 17 00:00:00 2001 From: jetzhang Date: Mon, 22 Sep 2025 12:34:09 +0800 Subject: [PATCH] Update index.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 优化代码 --- Python3.6-AutoRecovery/src/index.py | 383 +++++++++++++++++++--------- 1 file changed, 256 insertions(+), 127 deletions(-) diff --git a/Python3.6-AutoRecovery/src/index.py b/Python3.6-AutoRecovery/src/index.py index fe6a25993..ae6497b4c 100644 --- a/Python3.6-AutoRecovery/src/index.py +++ b/Python3.6-AutoRecovery/src/index.py @@ -1,165 +1,294 @@ # -*- coding: utf8 -*- -import hashlib, hmac, json, os, sys, time, base64 -from datetime import datetime, timedelta +import hashlib +import hmac +import json +import os +import sys +import time +import base64 +from datetime import datetime if sys.version_info[0] <= 2: from httplib import HTTPSConnection else: from http.client import HTTPSConnection +# Constants +ALGORITHM = "TC3-HMAC-SHA256" +CONTENT_TYPE = "application/json; charset=utf-8" +API_DOMAIN = ".tencentcloudapi.com" +SIGNED_HEADERS = "content-type;host;x-tc-action" +MAX_RETRY_ATTEMPTS = 6 +RETRY_DELAY = 10 +STABILIZATION_DELAY = 20 +CVM_API_VERSION = "2017-03-12" +TAT_API_VERSION = "2020-10-28" +RUNNING_STATE = "RUNNING" + -# 计算签名摘要函数 def sign(key, msg): + """计算签名摘要""" return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() -# 计算签名信息 -def authen_content(timestamp, date, secret_id, secret_key, payload, service, host, algorithm, action, params): - # ************* 步骤 1:拼接规范请求串 ************* - http_request_method = "POST" - canonical_uri = "/" - canonical_querystring = "" - ct = "application/json; charset=utf-8" - canonical_headers = "content-type:%s\nhost:%s\nx-tc-action:%s\n" % (ct, host, action.lower()) - signed_headers = "content-type;host;x-tc-action" - hashed_request_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest() - canonical_request = (http_request_method + "\n" + - canonical_uri + "\n" + - canonical_querystring + "\n" + - canonical_headers + "\n" + - signed_headers + "\n" + - hashed_request_payload) - - # ************* 步骤 2:拼接待签名字符串 ************* - credential_scope = date + "/" + service + "/" + "tc3_request" +def get_authorization(timestamp, date, secret_id, secret_key, payload, service, action): + """生成腾讯云API认证信息""" + host = f"{service}{API_DOMAIN}" + + # 步骤1: 拼接规范请求串 + canonical_headers = f"content-type:{CONTENT_TYPE}\nhost:{host}\nx-tc-action:{action.lower()}\n" + hashed_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest() + + canonical_request = "\n".join([ + "POST", "/", "", canonical_headers, SIGNED_HEADERS, hashed_payload + ]) + + # 步骤2: 拼接待签名字符串 + credential_scope = f"{date}/{service}/tc3_request" hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest() - string_to_sign = (algorithm + "\n" + - str(timestamp) + "\n" + - credential_scope + "\n" + - hashed_canonical_request) - - # ************* 步骤 3:计算签名 ************* - secret_date = sign(("TC3" + secret_key).encode("utf-8"), date) + + string_to_sign = "\n".join([ + ALGORITHM, str(timestamp), credential_scope, hashed_canonical_request + ]) + + # 步骤3: 计算签名 + secret_date = sign(f"TC3{secret_key}".encode("utf-8"), date) secret_service = sign(secret_date, service) secret_signing = sign(secret_service, "tc3_request") signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() + + # 步骤4: 拼接Authorization + return f"{ALGORITHM} Credential={secret_id}/{credential_scope}, SignedHeaders={SIGNED_HEADERS}, Signature={signature}" - # ************* 步骤 4:拼接 Authorization ************* - authorization = (algorithm + " " + - "Credential=" + secret_id + "/" + credential_scope + ", " + - "SignedHeaders=" + signed_headers + ", " + - "Signature=" + signature) - return (authorization) - - -# 获取服务器状态 -def get_cvm_status(timestamp, date, secretid, secretkey, region, insid): - token = "" - service = "cvm" - host = "%s.tencentcloudapi.com" % service - region = "%s" % region - action = "DescribeInstances" - version = "2017-03-12" - payload = "{\"InstanceIds\":[\"%s\"]}" % insid - params = json.loads(payload) - endpoint = "https://cvm.tencentcloudapi.com" - algorithm = "TC3-HMAC-SHA256" - - authorization = authen_content(timestamp, date, secretid, secretkey, payload, service, host, algorithm, action, params) - +def call_api(service, action, version, payload, timestamp, date, secret_id, secret_key, region=None): + """通用腾讯云API调用函数""" + host = f"{service}{API_DOMAIN}" headers = { - "Authorization": authorization, - "Content-Type": "application/json; charset=utf-8", + "Authorization": get_authorization(timestamp, date, secret_id, secret_key, payload, service, action), + "Content-Type": CONTENT_TYPE, "Host": host, "X-TC-Action": action, - "X-TC-Timestamp": timestamp, + "X-TC-Timestamp": str(timestamp), "X-TC-Version": version } + if region: headers["X-TC-Region"] = region - if token: - headers["X-TC-Token"] = token - + + conn = None try: - req = HTTPSConnection(host) - req.request("POST", "/", headers=headers, body=payload.encode("utf-8")) - resp = req.getresponse() - return (resp.read()) + conn = HTTPSConnection(host) + conn.request("POST", "/", headers=headers, body=payload.encode("utf-8")) + response = conn.getresponse() + response_data = response.read() + + if isinstance(response_data, bytes): + response_data = response_data.decode('utf-8') + + return response_data except Exception as err: - print(err) + print(f"API调用失败: {err}") + raise + finally: + if conn: + conn.close() -# 执行服务器命令 -def run_tat_command(timestamp, date, secretid, secretkey, region, insid, work_directory, user, run_content): - token = "" - service = "tat" - host = "%s.tencentcloudapi.com" % service - endpoint = "https://" + host - region = "%s" % region - action = "RunCommand" - version = "2020-10-28" - payload = "{\"CommandName\":\"cmd-test\",\"CommandType\":\"SHELL\",\"InstanceIds\":[\"%s\"],\"WorkingDirectory\":\"%s\",\"Username\":\"%s\",\"Content\":\"%s\"}" % ( - insid, work_directory, user, run_content) - params = json.loads(payload) - algorithm = "TC3-HMAC-SHA256" +def get_cvm_status(timestamp, date, secret_id, secret_key, region, instance_id): + """获取CVM实例状态""" + payload = json.dumps({"InstanceIds": [instance_id]}) + return call_api("cvm", "DescribeInstances", CVM_API_VERSION, + payload, timestamp, date, secret_id, secret_key, region) - authorization = authen_content(timestamp, date, secretid, secretkey, payload, service, host, algorithm, action, params) +def run_tat_command(timestamp, date, secret_id, secret_key, region, instance_id, + work_directory, user, command_content): + """执行TAT命令""" + payload = json.dumps({ + "CommandName": "cmd-test", + "CommandType": "SHELL", + "InstanceIds": [instance_id], + "WorkingDirectory": work_directory, + "Username": user, + "Content": command_content + }) + return call_api("tat", "RunCommand", TAT_API_VERSION, + payload, timestamp, date, secret_id, secret_key, region) + +def query_tat_command(timestamp, date, secret_id, secret_key, region, invocation_id): + """查询TAT执行结果。使用Filters参数精确查询指定的调用ID""" + payload = json.dumps({ + "Filters": [ + { + "Name": "invocation-id", + "Values": [invocation_id] + } + ] + }) + return call_api("tat", "DescribeInvocationTasks", TAT_API_VERSION, + payload, timestamp, date, secret_id, secret_key, region) - headers = { - "Authorization": authorization, - "Content-Type": "application/json; charset=utf-8", - "Host": host, - "X-TC-Action": action, - "X-TC-Timestamp": timestamp, - "X-TC-Version": version - } - if region: - headers["X-TC-Region"] = region - if token: - headers["X-TC-Token"] = token +def validate_environment_variables(): + """验证必需的环境变量""" + required_vars = ['secretID', 'secretKey', 'work_Directory', 'work_User', 'run_Command'] + env_vars = {var: os.environ.get(var) for var in required_vars} + missing_vars = [var for var, value in env_vars.items() if not value] + + if missing_vars: + error_msg = f"缺少必需的环境变量: {', '.join(missing_vars)}" + print(error_msg) + return None, error_msg + + return env_vars, None + +def safe_json_loads(response_text): + """安全的JSON解析,包含错误检查""" try: - req = HTTPSConnection(host) - req.request("POST", "/", headers=headers, body=payload.encode("utf-8")) - resp = req.getresponse() - return (resp.read()) - except Exception as err: - print(err) + data = json.loads(response_text) + if "Error" in data.get("Response", {}): + error_info = data["Response"]["Error"] + raise Exception(f"API错误: {error_info.get('Code')} - {error_info.get('Message')}") + return data + except json.JSONDecodeError as e: + raise Exception(f"JSON解析失败: {e}") +def execute_recovery_command(timestamp, date, secret_id, secret_key, region, + instance_id, work_directory, user, encoded_command): + """执行故障恢复命令""" + time.sleep(STABILIZATION_DELAY) # 等待实例稳定 + + # 执行TAT命令 + command_response = run_tat_command( + timestamp, date, secret_id, secret_key, region, + instance_id, work_directory, user, encoded_command + ) + + # 解析命令响应 + cmd_data = safe_json_loads(command_response) + invocation_id = cmd_data["Response"]["InvocationId"] + tat_result_response = query_tat_command(timestamp, date, secret_id, secret_key, region, invocation_id) + + # 解析TAT查询结果并检查任务状态 + tat_result = safe_json_loads(tat_result_response) + invocation_task_set = tat_result.get("Response", {}).get("InvocationTaskSet", []) + + if not invocation_task_set: + print(f"未找到任务执行记录: {invocation_id}") + return f"{instance_id} 命令执行记录未找到" + + # 持续检查任务状态直到不是RUNNING + task_status = invocation_task_set[0].get("TaskStatus", "") + print(f"初始任务状态: {task_status}") + + # 如果任务状态是RUNNING,持续轮询直到完成 + polling_attempt = 1 + while task_status == "RUNNING": + print(f"第{polling_attempt}次轮询,任务仍在运行中,等待{RETRY_DELAY}秒后重新检查...") + time.sleep(RETRY_DELAY) + + # 重新查询任务状态 - 使用query_tat_command获取最新状态 + print(f"正在使用query_tat_command查询任务ID: {invocation_id} 的最新状态...") + + # 使用新的时间戳确保获取最新数据 + current_timestamp = int(time.time()) + current_date = datetime.utcfromtimestamp(current_timestamp).strftime("%Y-%m-%d") + + tat_result_response = query_tat_command(current_timestamp, current_date, secret_id, secret_key, region, invocation_id) + tat_result = safe_json_loads(tat_result_response) + invocation_task_set = tat_result.get("Response", {}).get("InvocationTaskSet", []) + + if not invocation_task_set: + print(f"轮询过程中任务记录丢失: {invocation_id}") + return f"{instance_id} 任务执行过程中记录丢失" + + # 更新任务状态 - 这是关键的更新步骤 + new_task_status = invocation_task_set[0].get("TaskStatus", "") + print(f"第{polling_attempt}次轮询,任务状态从 '{task_status}' 更新为 '{new_task_status}'") + task_status = new_task_status + polling_attempt += 1 + + # 防止无限轮询,设置最大轮询次数 + if polling_attempt > MAX_RETRY_ATTEMPTS: + print(f"任务轮询超时,已轮询{MAX_RETRY_ATTEMPTS}次,任务状态仍为: {task_status}") + return f"{instance_id} 任务执行超时,当前状态: {task_status}" + + # 任务已完成,获取最新的任务信息用于最终状态检查 + # 如果没有进入轮询循环,需要确保使用最新的invocation_task_set + if polling_attempt == 1: + # 初始状态就不是RUNNING,使用原始查询结果 + current_task_info = invocation_task_set[0] + else: + # 经过轮询,使用最新的任务信息 + current_task_info = invocation_task_set[0] if invocation_task_set else {} + + # 任务已完成,检查最终状态 + print(f"最终任务状态: {task_status}") + + # 只有TaskStatus为SUCCESS才返回成功,其他任何状态都返回失败 + if task_status == "SUCCESS": + print(f"实例 {instance_id} 故障自愈成功!") + return f"{instance_id} 故障自愈成功!" + else: + # 获取详细错误信息 + error_info = current_task_info.get("ErrorInfo", "未知错误") + exit_code = current_task_info.get("TaskResult", {}).get("ExitCode", "N/A") + + print(f"实例 {instance_id} 故障自愈失败!") + print(f"任务状态: {task_status}") + print(f"错误信息: {error_info}") + print(f"退出代码: {exit_code}") + + raise Exception(f"{instance_id} 故障自愈失败!") def main_handler(event, context): + """主处理函数 - CVM故障自愈""" timestamp = int(time.time()) date = datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d") - secretid = os.environ.get('secretID') - secretkey = os.environ.get('secretKey') - - # 实例信息获取 - insid = event['subject'].strip() - region = event['region'].strip() - - # 命令执行参数 - work_directory = os.environ.get('work_Directory') - user = os.environ.get('work_User') - run_command = os.environ.get('run_Command') - run_command_base = base64.b64encode(run_command.encode('utf-8')) - - # 判断服务器是否在 RUNNING 状态 - status_get = get_cvm_status(timestamp, date, secretid, secretkey, region, insid) - status = json.loads(status_get)["Response"]["InstanceSet"][0]["InstanceState"] - print(status) - - loop = 1 - while loop <= 6: - if status == 'RUNNING': - # 服务器命令执行 - time.sleep(10) - tat_command = run_tat_command(timestamp, date, secretid, secretkey, region, insid, work_directory, user, - run_command_base.decode('utf-8')) - print(insid, tat_command, "故障自愈成功!") - return ("%s 故障自愈成功!" % insid) + + # 验证环境变量 + env_vars, error_msg = validate_environment_variables() + if error_msg: + return error_msg + + assert env_vars is not None + secret_id, secret_key = env_vars['secretID'], env_vars['secretKey'] + work_directory, user = env_vars['work_Directory'], env_vars['work_User'] + run_command = env_vars['run_Command'] + + # 解析事件参数 + try: + instance_id, region = event['subject'].strip(), event['region'].strip() + except KeyError as e: + error_msg = f"事件参数缺失: {e}" + print(error_msg) + return error_msg + + # 编码命令 + assert run_command is not None + encoded_command = base64.b64encode(run_command.encode('utf-8')).decode('utf-8') + + # 检查实例状态 - RUNNING状态只检查一次,其他状态重试6次 + for attempt in range(1, MAX_RETRY_ATTEMPTS + 1): + status_data = safe_json_loads(get_cvm_status(timestamp, date, secret_id, secret_key, region, instance_id)) + + instance_set = status_data.get("Response", {}).get("InstanceSet", []) + if not instance_set: + raise Exception(f"实例 {instance_id} 不存在或无权限访问") + + instance_state = instance_set[0]["InstanceState"] + print(f"第{attempt}次检查,实例状态: {instance_state}") + + if instance_state == RUNNING_STATE: + # RUNNING状态直接执行,不需要重试 + return execute_recovery_command( + timestamp, date, secret_id, secret_key, region, + instance_id, work_directory, user, encoded_command + ) else: - time.sleep(10) - loop += 1 - else: - print("机器状态未就绪!") - return ("机器状态未就绪!") \ No newline at end of file + # 非RUNNING状态,需要重试检查 + if attempt < MAX_RETRY_ATTEMPTS: + print(f"实例状态为 {instance_state},等待{RETRY_DELAY}秒后重试...") + time.sleep(RETRY_DELAY) + else: + print(f"实例状态为 {instance_state},已达到最大重试次数") + raise Exception(f"{instance_id} 故障自愈失败!")