Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,25 @@ public async Task<HealthCheckResult> CheckHealthAsync(
{
if (!await PingAsync(cancellationToken))
return HealthCheckResult.Unhealthy("ClearML is unresponsive");
IReadOnlySet<string> queuesWithoutWorkers = await QueuesWithoutWorkers(cancellationToken);
if (queuesWithoutWorkers.Count > 0)
(QueueHealth queueHealth, ICollection<string> queuesWithoutWorkers) = await QueuesWithoutWorkers(
cancellationToken
);
switch (queueHealth)
{
return HealthCheckResult.Unhealthy(
$"No ClearML agents are available for configured queues: {string.Join(", ", queuesWithoutWorkers)}"
);
case QueueHealth.Degraded:
return HealthCheckResult.Degraded(
$"No ClearML agents are available for configured queues: {string.Join(", ", queuesWithoutWorkers)}"
);
case QueueHealth.Unhealthy:
return HealthCheckResult.Unhealthy(
$"Autoscaler is down and there are no ClearML agents are available for configured queues: {string.Join(", ", queuesWithoutWorkers)}"
);
case QueueHealth.Healthy:
default:
using (await _lock.LockAsync(cancellationToken))
cache.Set(FailureCountKey, 0);
return HealthCheckResult.Healthy("ClearML is available");
}

using (await _lock.LockAsync(cancellationToken))
cache.Set(FailureCountKey, 0);
return HealthCheckResult.Healthy("ClearML is available");
}
catch (Exception e)
{
Expand Down Expand Up @@ -69,33 +77,79 @@ public async Task<HealthCheckResult> CheckHealthAsync(
return (JsonObject?)JsonNode.Parse(result);
}

public async Task<bool> PingAsync(CancellationToken cancellationToken = default)
private async Task<bool> PingAsync(CancellationToken cancellationToken = default)
{
JsonObject? result = await CallAsync("debug", "ping", new JsonObject(), cancellationToken);
return result is not null;
}

public async Task<IReadOnlySet<string>> QueuesWithoutWorkers(CancellationToken cancellationToken = default)
private async Task<(QueueHealth, ICollection<string>)> QueuesWithoutWorkers(
CancellationToken cancellationToken = default
)
{
var queuesWithoutWorkers = _queuesMonitored.ToHashSet();
JsonObject? result = await CallAsync("workers", "get_all", new JsonObject(), cancellationToken);
JsonNode? workers_node = result?["data"]?["workers"];
if (workers_node is null)
JsonObject? queuesResult = await CallAsync("workers", "get_all", new JsonObject(), cancellationToken);
JsonNode? workersNode = queuesResult?["data"]?["workers"];
if (workersNode is null)
throw new InvalidOperationException("Malformed response from ClearML server.");
var workers = (JsonArray)workers_node;
foreach (var worker in workers)
var workers = (JsonArray)workersNode;
foreach (JsonNode? worker in workers)
{
JsonNode? queues_node = worker?["queues"];
if (queues_node is null)
JsonNode? queuesNode = worker?["queues"];
if (queuesNode is null)
continue;
var queues = (JsonArray)queues_node;
foreach (var currentQueue in queues)
var queues = (JsonArray)queuesNode;
foreach (JsonNode? currentQueue in queues)
{
string? currentQueueName = (string?)currentQueue?["name"];
if (currentQueueName is not null)
queuesWithoutWorkers.Remove(currentQueueName);
}
}
return queuesWithoutWorkers;

// Ensure the autoscaler queues are present, if there are queues without workers
if (queuesWithoutWorkers.Count > 0)
{
JsonObject? instancesResult = await CallAsync(
"apps",
"get_instances",
new JsonObject { ["app"] = "gcp-autoscaler", ["status"] = "running" },
cancellationToken
);

// Get the autoscaler queues
HashSet<string> autoscalerQueueNames = [];
JsonNode? instancesNode = instancesResult?["data"]?["instances"];
if (instancesNode is null)
throw new InvalidOperationException("Malformed response from ClearML server.");
var instances = (JsonArray)instancesNode;
foreach (JsonNode? instance in instances)
{
if (instance?["application"]?["configuration"]?["instance_queue_list"] is JsonArray queues)
{
foreach (JsonNode? queueNode in queues)
{
string? queueName = queueNode?["queue_name"]?.ToString();
if (!string.IsNullOrEmpty(queueName))
autoscalerQueueNames.Add(queueName);
}
}
}

// We do not check for workers, as autoscaler queues can have no workers but still be healthy
if (autoscalerQueueNames.Count == 0)
return (QueueHealth.Unhealthy, queuesWithoutWorkers);

return (QueueHealth.Degraded, queuesWithoutWorkers);
}

return (QueueHealth.Healthy, queuesWithoutWorkers);
}

private enum QueueHealth
{
Healthy,
Degraded,
Unhealthy,
}
}
Loading