From c93a10175accbf30b4dac77cfa9ed5a833604d23 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Mon, 12 Jan 2026 15:50:23 -0500 Subject: [PATCH 1/5] Move more google related code to a common api --- common/google_api.py | 130 ++++++++++++++++++++++++++++++ common/google_metadata.py | 34 -------- dags/google_api_helper.py | 162 ++++++++++---------------------------- 3 files changed, 170 insertions(+), 156 deletions(-) create mode 100644 common/google_api.py delete mode 100644 common/google_metadata.py diff --git a/common/google_api.py b/common/google_api.py new file mode 100644 index 00000000..785163ed --- /dev/null +++ b/common/google_api.py @@ -0,0 +1,130 @@ +from googleapiclient import discovery +import requests +import json + +def get_project_data(key): + apiurl = f"http://metadata/computeMetadata/v1/project/{key}" + response = requests.get(apiurl, headers={"Metadata-Flavor": "Google"}) + response.raise_for_status() + return response.text + +def get_instance_data(key): + apiurl = f"http://metadata/computeMetadata/v1/instance/{key}" + response = requests.get(apiurl, headers={"Metadata-Flavor": "Google"}) + response.raise_for_status() + return response.text + +def get_instance_metadata(project, zone, instance): + service = discovery.build('compute', 'v1') + request = service.instances().get(project=project, zone=zone, instance=instance) + info = request.execute() + return info['metadata'] + +def set_instance_metadata(project, zone, instance, data): + service = discovery.build('compute', 'v1') + request = service.instances().setMetadata(body=data, project=project, zone=zone, instance=instance) + return request.execute() + +def gce_external_ip(): + return get_instance_data("network-interfaces/0/access-configs/0/external-ip") + +def gce_internal_ip(): + return get_instance_data("network-interfaces/0/ip") + +def gce_hostname(): + return get_instance_data("hostname") + +def get_project_id(): + apiurl = "http://metadata.google.internal/computeMetadata/v1/project/project-id" + response = requests.get(apiurl, headers={"Metadata-Flavor": "Google"}) + response.raise_for_status() + return response.text + +def get_zone(): + apiurl = "http://metadata.google.internal/computeMetadata/v1/instance/zone" + response = requests.get(apiurl, headers={"Metadata-Flavor": "Google"}) + response.raise_for_status() + return response.text.split('/')[-1] + +def delete_instances(ig, instances): + project_id = get_project_id() + request_body = { + "instances": instances, + "skipInstancesOnValidationError": True, + } + service = discovery.build('compute', 'v1') + request = service.instanceGroupManagers().deleteInstances(project=project_id, zone=ig["zone"], instanceGroupManager=ig["name"], body=request_body) + return request.execute() + +def get_created_by(): + apiurl = "http://metadata.google.internal/computeMetadata/v1/instance/attributes/created-by" + response = requests.get(apiurl, headers={"Metadata-Flavor": "Google"}) + response.raise_for_status() + return response.text + +def instance_group_manager_info(project_id, instance_group): + service = discovery.build('compute', 'v1') + request = service.instanceGroupManagers().get(project=project_id, zone=instance_group['zone'], instanceGroupManager=instance_group['name']) + return request.execute() + +def instance_group_manager_error(project_id, instance_group): + service = discovery.build('compute', 'v1') + request = service.instanceGroupManagers().listErrors(project=project_id, zone=instance_group['zone'], instanceGroupManager=instance_group['name'], orderBy="creationTimestamp desc") + return request.execute() + +def instance_group_info(project_id, instance_group): + service = discovery.build('compute', 'v1') + request = service.instanceGroups().get(project=project_id, zone=instance_group['zone'], instanceGroup=instance_group['name']) + return request.execute() + +def list_managed_instances(instance_group): + project_id = get_project_id() + service = discovery.build("compute", "v1") + page_token = None + instances = [] + while True: + request = service.instanceGroupManagers().listManagedInstances(project=project_id, zone=instance_group["zone"], instanceGroupManager=instance_group["name"], pageToken=page_token, maxResults=20) + ret = request.execute() + if not ret: + return instances + instances += [r["instance"] for r in ret['managedInstances']] + page_token = ret.get("nextPageToken", None) + if not page_token: + break + + return instances + +def get_instance_property(instance_zone, instance, key): + project_id = get_project_id() + service = discovery.build("compute", "v1") + request = service.instances().get(project=project_id, zone=instance_zone, instance=instance) + ret = request.execute() + return ret[key] + +def resize_instance_group(ig, size): + project_id = get_project_id() + service = discovery.build('compute', 'v1') + request = service.instanceGroupManagers().resize(project=project_id, zone=ig['zone'], instanceGroupManager=ig['name'], size=size) + return request.execute() + +def start_instance(instance_name, zone): + service = discovery.build('compute', 'v1') + request = service.instances().start( + project=get_project_id(), + zone=zone, + instance=instance_name + ) + response = request.execute() + return response + + +def stop_instance(instance_name, zone): + service = discovery.build('compute', 'v1') + request = service.instances().stop( + discardLocalSsd=True, + project=get_project_id(), + zone=zone, + instance=instance_name + ) + response = request.execute() + return response \ No newline at end of file diff --git a/common/google_metadata.py b/common/google_metadata.py deleted file mode 100644 index 62eb63fd..00000000 --- a/common/google_metadata.py +++ /dev/null @@ -1,34 +0,0 @@ -from googleapiclient import discovery -import requests - -def get_project_data(key): - apiurl = f"http://metadata/computeMetadata/v1/project/{key}" - response = requests.get(apiurl, headers={"Metadata-Flavor": "Google"}) - response.raise_for_status() - return response.text - -def get_instance_data(key): - apiurl = f"http://metadata/computeMetadata/v1/instance/{key}" - response = requests.get(apiurl, headers={"Metadata-Flavor": "Google"}) - response.raise_for_status() - return response.text - -def get_instance_metadata(project, zone, instance): - service = discovery.build('compute', 'v1') - request = service.instances().get(project=project, zone=zone, instance=instance) - info = request.execute() - return info['metadata'] - -def set_instance_metadata(project, zone, instance, data): - service = discovery.build('compute', 'v1') - request = service.instances().setMetadata(body=data, project=project, zone=zone, instance=instance) - return request.execute() - -def gce_external_ip(): - return get_instance_data("network-interfaces/0/access-configs/0/external-ip") - -def gce_internal_ip(): - return get_instance_data("network-interfaces/0/ip") - -def gce_hostname(): - return get_instance_data("hostname") diff --git a/dags/google_api_helper.py b/dags/google_api_helper.py index 07007698..554a183c 100644 --- a/dags/google_api_helper.py +++ b/dags/google_api_helper.py @@ -1,81 +1,37 @@ from time import sleep from datetime import datetime, timedelta, timezone + +from requests import Response from airflow.models import Variable from airflow.hooks.base_hook import BaseHook -from googleapiclient import discovery from slack_message import slack_message -import requests import json +from common import google_api -def get_project_id(): - apiurl = "http://metadata.google.internal/computeMetadata/v1/project/project-id" - response = requests.get(apiurl, headers={"Metadata-Flavor": "Google"}) - response.raise_for_status() - return response.text - - -def instance_group_manager_info(project_id, instance_group): - service = discovery.build('compute', 'v1') - request = service.instanceGroupManagers().get(project=project_id, zone=instance_group['zone'], instanceGroupManager=instance_group['name']) - return request.execute() - -def instance_group_manager_error(project_id, instance_group): - service = discovery.build('compute', 'v1') - request = service.instanceGroupManagers().listErrors(project=project_id, zone=instance_group['zone'], instanceGroupManager=instance_group['name'], orderBy="creationTimestamp desc") - return request.execute() - -def instance_group_info(project_id, instance_group): - service = discovery.build('compute', 'v1') - request = service.instanceGroups().get(project=project_id, zone=instance_group['zone'], instanceGroup=instance_group['name']) - return request.execute() - def list_managed_instances(instance_group): - project_id = get_project_id() - service = discovery.build("compute", "v1") - page_token = None - instances = [] - while True: - request = service.instanceGroupManagers().listManagedInstances(project=project_id, zone=instance_group["zone"], instanceGroupManager=instance_group["name"], pageToken=page_token, maxResults=20) - ret = request.execute() - if not ret: - return instances - instances += [r["instance"] for r in ret['managedInstances']] - page_token = ret.get("nextPageToken", None) - if not page_token: - break + return google_api.list_managed_instances(instance_group) - return instances def get_instance_property(instance_zone, instance, key): - project_id = get_project_id() - service = discovery.build("compute", "v1") - request = service.instances().get(project=project_id, zone=instance_zone, instance=instance) - ret = request.execute() - return ret[key] + return google_api.get_instance_property(instance_zone, instance, key) + def delete_instances(ig, instances): - project_id = get_project_id() - request_body = { - "instances": instances, - "skipInstancesOnValidationError": True, - } - service = discovery.build('compute', 'v1') - request = service.instanceGroupManagers().deleteInstances(project=project_id, zone=ig["zone"], instanceGroupManager=ig["name"], body=request_body) - ret = request.execute() - print(ret) + return google_api.delete_instances(ig, instances) + def get_cluster_target_size(project_id, instance_groups): total_size = 0 for ig in instance_groups: - info = instance_group_manager_info(project_id, ig) + info = google_api.instance_group_manager_info(project_id, ig) total_size += info['targetSize'] return total_size def get_cluster_errors(project_id, instance_groups): msg = [] for ig in instance_groups: - items = instance_group_manager_error(project_id, ig) + items = google_api.instance_group_manager_error(project_id, ig) if len(items.get("items", [])) == 0: continue @@ -94,7 +50,7 @@ def get_cluster_errors(project_id, instance_groups): def get_cluster_size(project_id, instance_groups): total_size = 0 for ig in instance_groups: - info = instance_group_info(project_id, ig) + info = google_api.instance_group_info(project_id, ig) total_size += info['size'] return total_size @@ -103,7 +59,7 @@ def reset_cluster(key, initial_size): if not run_metadata.get("manage_clusters", True): return try: - project_id = get_project_id() + project_id = google_api.get_project_id() cluster_info = json.loads(BaseHook.get_connection("InstanceGroups").extra) except: slack_message(":exclamation:Failed to load the cluster information from connection {}".format("InstanceGroups")) @@ -133,16 +89,12 @@ def reset_cluster(key, initial_size): def resize_instance_group(ig, size): - project_id = get_project_id() - service = discovery.build('compute', 'v1') - request = service.instanceGroupManagers().resize(project=project_id, zone=ig['zone'], instanceGroupManager=ig['name'], size=size) - response = request.execute() - print(json.dumps(response, indent=2)) slack_message(f":information_source: resize instance group {ig['name']} to {size} instances", notification=True) + return google_api.resize_instance_group(ig, size) def resize_cluster(instance_groups, size): - project_id = get_project_id() + project_id = google_api.get_project_id() total_size = get_cluster_target_size(project_id, instance_groups) @@ -159,16 +111,14 @@ def resize_cluster(instance_groups, size): target_size = size for ig in instance_groups: - info_group_manager = instance_group_manager_info(project_id, ig) - info_group = instance_group_info(project_id, ig) + info_group_manager = google_api.instance_group_manager_info(project_id, ig) + info_group = google_api.instance_group_info(project_id, ig) ig_size = min(target_size, ig['max_size']) if ig_size < info_group["size"] and not downsize: continue if info_group_manager["targetSize"] > info_group["size"]: ig_size = min(ig_size, info_group["size"]+1) - service = discovery.build('compute', 'v1') - request = service.instanceGroupManagers().resize(project=project_id, zone=ig['zone'], instanceGroupManager=ig['name'], size=ig_size) - response = request.execute() + response = google_api.resize_instance_group(ig, ig_size) print(json.dumps(response, indent=2)) slack_message(":information_source: resize instance group {} to {} instances".format(ig['name'], ig_size), notification=True) target_size -= ig_size @@ -180,9 +130,8 @@ def resize_cluster(instance_groups, size): return min(size, max_size) - def redistribute_instances(key, instance_groups, target_size, move_instances=False): - project_id = get_project_id() + project_id = google_api.get_project_id() slack_message(f":recycle: Redistribute instances in cluster {key} with target size {target_size}") ig_statuses = [] @@ -190,8 +139,8 @@ def redistribute_instances(key, instance_groups, target_size, move_instances=Fal # First pass: identify unstable IGs for i, ig in enumerate(instance_groups): - manager_info = instance_group_manager_info(project_id, ig) - ig_info = instance_group_info(project_id, ig) + manager_info = google_api.instance_group_manager_info(project_id, ig) + ig_info = google_api.instance_group_info(project_id, ig) current_size = ig_info.get('size', 0) ig_target_size = manager_info.get('targetSize', 0) is_unstable = ig_target_size > current_size @@ -211,7 +160,6 @@ def redistribute_instances(key, instance_groups, target_size, move_instances=Fal # Shrink unstable IGs and calculate deficit total_deficit = 0 - service = discovery.build('compute', 'v1') for i in unstable_indices: status = ig_statuses[i] ig = status['ig'] @@ -227,8 +175,7 @@ def redistribute_instances(key, instance_groups, target_size, move_instances=Fal if deficit > 0: total_deficit += deficit slack_message(f"Shrinking unstable instance group {ig['name']} from {status['target']} to {new_ig_size}") - request = service.instanceGroupManagers().resize(project=project_id, zone=ig['zone'], instanceGroupManager=ig['name'], size=new_ig_size) - request.execute() + google_api.resize_instance_group(ig, new_ig_size) ig_statuses[i]['target'] = new_ig_size if total_deficit == 0: @@ -264,8 +211,7 @@ def redistribute_instances(key, instance_groups, target_size, move_instances=Fal to_add = min(total_deficit, available_capacity) new_size = current_target + to_add slack_message(f"Scaling up {ig['name']} from {current_target} to {new_size} to compensate for deficit.") - request = service.instanceGroupManagers().resize(project=project_id, zone=ig['zone'], instanceGroupManager=ig['name'], size=new_size) - request.execute() + google_api.resize_instance_group(ig, new_size) total_deficit -= to_add ig_statuses[current_ig_index]['target'] = new_size @@ -288,7 +234,6 @@ def ramp_up_cluster(key, initial_size, total_size): sleep(60) Variable.set("cluster_target_size", target_sizes, serialize_json=True) - def ramp_down_cluster(key, total_size): run_metadata = Variable.get("run_metadata", deserialize_json=True, default_var={}) if not run_metadata.get("manage_clusters", True): @@ -306,7 +251,7 @@ def ramp_down_cluster(key, total_size): def increase_instance_group_size(key, size): try: - project_id = get_project_id() + project_id = google_api.get_project_id() cluster_info = json.loads(BaseHook.get_connection("InstanceGroups").extra) except: slack_message(":exclamation:Failed to load the cluster information from connection {}".format("InstanceGroups")) @@ -326,10 +271,9 @@ def increase_instance_group_size(key, size): real_size = resize_cluster(cluster_info[key], size) slack_message(":arrow_up: Scale up cluster {} to {} instances".format(key, real_size)) - def reduce_instance_group_size(key, size): try: - project_id = get_project_id() + project_id = google_api.get_project_id() cluster_info = json.loads(BaseHook.get_connection("InstanceGroups").extra) except: slack_message(":exclamation:Failed to load the cluster information from connection {}".format("InstanceGroups")) @@ -350,9 +294,8 @@ def reduce_instance_group_size(key, size): slack_message(":arrow_down: Scale down cluster {} to {} instances, sleep for one minute to let it stabilize".format(key, real_size)) sleep(60) - def cluster_status(name, cluster): - project_id = get_project_id() + project_id = google_api.get_project_id() current_size = get_cluster_size(project_id, cluster) requested_size = get_cluster_target_size(project_id, cluster) errors = get_cluster_errors(project_id, cluster) @@ -370,12 +313,11 @@ def cluster_status(name, cluster): return stable, requested_size - def collect_resource_metrics(start_time, end_time): import pendulum from google.cloud import monitoring_v3 - project_id = get_project_id() + project_id = google_api.get_project_id() cluster_info = json.loads(BaseHook.get_connection("InstanceGroups").extra) resources = {} @@ -429,7 +371,8 @@ def query_metric(metric, aggregation): return client.list_time_series( request={ "name": project_name, - "filter": f'metric.type = "{metric}"', + "filter": f'metric.type = "{metric}"' +, "interval": interval, "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, "aggregation": aggregation, @@ -480,29 +423,15 @@ def query_metric(metric, aggregation): return resources +def wait_for_instance(instance, zone, target, retries=5): + for _ in range(retries): + status = get_instance_property(zone, instance, "status") + if status == target: + return + sleep(60) -def start_instance(instance_name, zone): - service = discovery.build('compute', 'v1') - request = service.instances().start( - project=get_project_id(), - zone=zone, - instance=instance_name - ) - response = request.execute() - return response - - -def stop_instance(instance_name, zone): - service = discovery.build('compute', 'v1') - request = service.instances().stop( - discardLocalSsd=True, - project=get_project_id(), - zone=zone, - instance=instance_name - ) - response = request.execute() - return response - + slack_message(f'*Timeout while waiting {instance}*') + raise RuntimeError(f"Timeout waiting {instance}") def toggle_easyseg_worker(on=False): from dag_utils import get_connection @@ -513,10 +442,9 @@ def toggle_easyseg_worker(on=False): zone = ig_conn.login easyseg_worker = f"{deployment}-easyseg-worker" if on: - start_instance(easyseg_worker, zone) + google_api.start_instance(easyseg_worker, zone) else: - stop_instance(easyseg_worker, zone) - + google_api.stop_instance(easyseg_worker, zone) def toggle_nfs_server(on=False): from dag_utils import get_connection @@ -527,21 +455,11 @@ def toggle_nfs_server(on=False): zone = nfs_conn.login nfs_server = f"{deployment}-nfs-server" if on: - start_instance(nfs_server, zone) + google_api.start_instance(nfs_server, zone) wait_for_instance(nfs_server, zone, "RUNNING") else: - stop_instance(nfs_server, zone) + google_api.stop_instance(nfs_server, zone) wait_for_instance(nfs_server, zone, "TERMINATED") slack_message(f'*Turning {"on" if on else "off"} the nfs server*') - -def wait_for_instance(instance, zone, target, retries=5): - for _ in range(retries): - status = get_instance_property(zone, instance, "status") - if status == target: - return - sleep(60) - - slack_message(f'*Timeout while waiting {instance}*') - raise RuntimeError(f"Timeout waiting {instance}") From f51bf91a3863c0c4d77cbef2f33acb52797ab6a8 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Mon, 12 Jan 2026 15:51:10 -0500 Subject: [PATCH 2/5] Use the new api --- cloud/google/common.py | 2 +- pipeline/init_pipeline.py | 2 +- slackbot/redeploy_commands.py | 2 +- slackbot/slack_bot.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cloud/google/common.py b/cloud/google/common.py index 108b6cb8..ff107c3a 100644 --- a/cloud/google/common.py +++ b/cloud/google/common.py @@ -79,7 +79,7 @@ def GenerateAirflowVar(context, hostname_manager): sqlalchemy_conn = f'''postgresql+psycopg2://{postgres_user}:{postgres_password}@{hostname_manager}/{postgres_db}''' airflow_variable = { - 'AIRFLOW__CORE__HOSTNAME_CALLABLE': 'google_metadata.gce_hostname', + 'AIRFLOW__CORE__HOSTNAME_CALLABLE': 'common.google_api.gce_hostname', 'AIRFLOW__DATABASE__SQL_ALCHEMY_CONN': sqlalchemy_conn, 'AIRFLOW__CORE__FERNET_KEY': context.properties['airflow'].get('fernetKey', fernet_key), 'AIRFLOW__CELERY__BROKER_URL': f'amqp://{hostname_manager}', diff --git a/pipeline/init_pipeline.py b/pipeline/init_pipeline.py index bb444fee..811e2db3 100644 --- a/pipeline/init_pipeline.py +++ b/pipeline/init_pipeline.py @@ -1,7 +1,7 @@ from airflow.utils import db as db_utils from airflow.models import Variable from airflow import models -from google_metadata import get_project_data, get_instance_data, get_instance_metadata, set_instance_metadata +from common.google_api import get_project_data, get_instance_data, get_instance_metadata, set_instance_metadata from param_default import param_default, inference_param_default, synaptor_param_default import os import json diff --git a/slackbot/redeploy_commands.py b/slackbot/redeploy_commands.py index 37628aed..ecc1c374 100644 --- a/slackbot/redeploy_commands.py +++ b/slackbot/redeploy_commands.py @@ -1,7 +1,7 @@ import time from seuronbot import SeuronBot from bot_utils import replyto, send_message -from google_metadata import get_project_data, get_instance_data, get_instance_metadata, set_instance_metadata +from common.google_api import get_project_data, get_instance_data, get_instance_metadata, set_instance_metadata import tenacity diff --git a/slackbot/slack_bot.py b/slackbot/slack_bot.py index 67687814..e811511e 100644 --- a/slackbot/slack_bot.py +++ b/slackbot/slack_bot.py @@ -9,7 +9,7 @@ import subprocess if os.environ.get("VENDOR", None) == "Google": - from google_metadata import gce_external_ip + from common.google_api import gce_external_ip else: import socket From 46e9f67e487cd7a90852013b1d9c3bf23b5eab94 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Thu, 8 Jan 2026 12:24:03 -0500 Subject: [PATCH 3/5] Update timestamp of latest heartbeat_dag run --- dags/heartbeat_dag.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/dags/heartbeat_dag.py b/dags/heartbeat_dag.py index b5d82044..a0f4f385 100644 --- a/dags/heartbeat_dag.py +++ b/dags/heartbeat_dag.py @@ -233,6 +233,14 @@ def shutdown_easyseg_worker(): except Exception: pass +def update_last_success_timestamp(): + import os + import redis + from datetime import datetime + redis_host = os.environ['REDIS_SERVER'] + r = redis.Redis(redis_host, decode_responses=True) + r.set('heartbeat_dag_last_success_timestamp', datetime.now().timestamp()) + latest = LatestOnlyOperator( task_id='latest_only', @@ -261,4 +269,12 @@ def shutdown_easyseg_worker(): queue="cluster", dag=dag) -latest >> queue_sizes_task >> remove_failed_instances_task >> shutdown_easyseg_worker_task +update_timestamp_task = PythonOperator( + task_id="update_last_success_timestamp", + python_callable=update_last_success_timestamp, + priority_weight=1000, + queue="cluster", + dag=dag +) + +latest >> queue_sizes_task >> remove_failed_instances_task >> shutdown_easyseg_worker_task >> update_timestamp_task From 459bce2c622d4db51478272468e4026d065261b7 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Thu, 8 Jan 2026 12:24:51 -0500 Subject: [PATCH 4/5] Delete instance without heartbeat check --- utils/memory_monitor.py | 100 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 93 insertions(+), 7 deletions(-) diff --git a/utils/memory_monitor.py b/utils/memory_monitor.py index aa177590..6e87780a 100644 --- a/utils/memory_monitor.py +++ b/utils/memory_monitor.py @@ -5,11 +5,24 @@ from enum import Enum import os import redis -from datetime import datetime +from datetime import datetime, timedelta from time import sleep from kombu_helper import put_message -from google_metadata import gce_hostname +from common.google_api import gce_hostname, get_project_id, get_zone, delete_instances, get_created_by +class RedisHealthMonitor: + def __init__(self, timeout_seconds=1800): + self.last_success_time = datetime.now() + self.timeout = timedelta(seconds=timeout_seconds) + + def record_success(self): + self.last_success_time = datetime.now() + + def record_failure(self): + if datetime.now() - self.last_success_time > self.timeout: + logging.error("Redis has been unavailable for 30 minutes. Deleting instance from group.") + delete_self_from_instance_group() + sys.exit(1) class InstanceError(Enum): OOM = 1 @@ -19,6 +32,44 @@ class InstanceError(Enum): DISKFULL_ALERT_PERCENT_THRESHOLD = 90 +def delete_self_from_instance_group(): + try: + hostname = gce_hostname().split('.')[0] + zone = get_zone() + project_id = get_project_id() + + created_by = get_created_by() + igm_name = created_by.split('/')[-1] + + ig = {'zone': zone, 'name': igm_name} + instance_url = f"https://www.googleapis.com/compute/v1/projects/{project_id}/zones/{zone}/instances/{hostname}" + + logging.info(f"Deleting instance {hostname} from instance group {igm_name}") + delete_instances(ig, [instance_url]) + put_message(sys.argv[1], sys.argv[2], { + "text": f"{hostname} failed to connect to the manager for 60 minutes. Deleting instance from group." + } + ) + except Exception as e: + logging.error(f"Failed to delete instance from instance group: {e}") + + +def check_heartbeat(redis_conn, redis_monitor): + try: + last_heartbeat = redis_conn.get('heartbeat_dag_last_success_timestamp') + if redis_monitor: + redis_monitor.record_success() + if last_heartbeat: + last_heartbeat_time = datetime.fromtimestamp(float(last_heartbeat)) + if datetime.now() - last_heartbeat_time > timedelta(hours=1): + logging.warning("Heartbeat is older than 1 hour. Deleting instance from group.") + delete_self_from_instance_group() + except Exception as e: + logging.warning(f"Failed to read from Redis: {e}") + if redis_monitor: + redis_monitor.record_failure() + + def check_filesystems_full(): partitions = psutil.disk_partitions() @@ -44,7 +95,7 @@ def sleep_time(mem_avail): return max(min_sleep, sleep_time) -def run_oom_canary(): +def run_oom_canary(redis_conn, hostname, redis_monitor): loop_counter = 0 while True: loop_counter += 1 @@ -63,12 +114,20 @@ def run_oom_canary(): t = sleep_time(mem.available) if t > 1: if loop_counter % 60 == 0: + check_heartbeat(redis_conn, redis_monitor) if check_filesystems_full(): return InstanceError.DISKFULL cpu_usage = sum(psutil.cpu_percent(interval=1, percpu=True)) if cpu_usage > 20: logging.info(f"{cpu_usage}% cpu used, heartbeat") - redis_conn.set(hostname, datetime.now().timestamp()) + try: + redis_conn.set(hostname, datetime.now().timestamp()) + if redis_monitor: + redis_monitor.record_success() + except Exception as e: + logging.warning(f"Failed to write to Redis: {e}") + if redis_monitor: + redis_monitor.record_failure() continue counters_start = psutil.net_io_counters() sleep(1) @@ -77,7 +136,14 @@ def run_oom_canary(): upload_speed = counters_end.bytes_sent - counters_start.bytes_sent if download_speed > 1e6 or upload_speed > 1e6: logging.info(f"Significant network IO: {download_speed/1e6}MB/s, {upload_speed/1e6}MB/s, heartbeat") - redis_conn.set(hostname, datetime.now().timestamp()) + try: + redis_conn.set(hostname, datetime.now().timestamp()) + if redis_monitor: + redis_monitor.record_success() + except Exception as e: + logging.warning(f"Failed to write to Redis: {e}") + if redis_monitor: + redis_monitor.record_failure() continue else: sleep(1) @@ -88,12 +154,32 @@ def run_oom_canary(): if __name__ == "__main__": logging.basicConfig(level=logging.INFO) - redis_conn = redis.Redis(os.environ["REDIS_SERVER"]) + + redis_conn = None + start_time = datetime.now() + try: hostname = gce_hostname().split(".")[0] except: hostname = socket.gethostname() + while (datetime.now() - start_time).total_seconds() < 1800: # 30 minutes + try: + redis_conn = redis.Redis(os.environ["REDIS_SERVER"], socket_connect_timeout=5, decode_responses=True) + redis_conn.ping() # Check if connection is alive + logging.info("Successfully connected to Redis.") + break + except Exception as e: + logging.error(f"An unexpected error occurred while connecting to Redis: {e}") + sleep(10) + + if redis_conn is None: + logging.error("Failed to connect to Redis for 30 minutes. Deleting instance from group.") + delete_self_from_instance_group() + sys.exit(1) + + redis_monitor = RedisHealthMonitor() + error_message = { InstanceError.OOM: { 'text': f":u6e80: *OOM detected from instance* `{hostname}`!" @@ -103,6 +189,6 @@ def run_oom_canary(): }, } - exit_reason = run_oom_canary() + exit_reason = run_oom_canary(redis_conn, hostname, redis_monitor) logging.warning("canary died") put_message(sys.argv[1], sys.argv[2], error_message[exit_reason]) From 065d7b67ddffc9dd6e0c4199f639ec4c94e4409d Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Wed, 21 Jan 2026 12:55:02 -0500 Subject: [PATCH 5/5] Use timedelta consistently --- utils/memory_monitor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/utils/memory_monitor.py b/utils/memory_monitor.py index 6e87780a..48871c50 100644 --- a/utils/memory_monitor.py +++ b/utils/memory_monitor.py @@ -11,9 +11,9 @@ from common.google_api import gce_hostname, get_project_id, get_zone, delete_instances, get_created_by class RedisHealthMonitor: - def __init__(self, timeout_seconds=1800): + def __init__(self, timeout=timedelta(seconds=1800)): self.last_success_time = datetime.now() - self.timeout = timedelta(seconds=timeout_seconds) + self.timeout = timeout def record_success(self): self.last_success_time = datetime.now() @@ -163,7 +163,7 @@ def run_oom_canary(redis_conn, hostname, redis_monitor): except: hostname = socket.gethostname() - while (datetime.now() - start_time).total_seconds() < 1800: # 30 minutes + while datetime.now() - start_time < timedelta(seconds=1800): # 30 minutes try: redis_conn = redis.Redis(os.environ["REDIS_SERVER"], socket_connect_timeout=5, decode_responses=True) redis_conn.ping() # Check if connection is alive