Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion validator/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,83 @@ async def remove_disconnected_nodes(self):
)
keys_to_delete.append(hotkey)

logger.info(f"Deleteing keys from connected nodes: {keys_to_delete}")
logger.info(f"Deleting keys from connected nodes: {keys_to_delete}")
for hotkey in keys_to_delete:
# Get TEE addresses before clearing so we can remove from masa-tee-api
tee_addresses = self.validator.routing_table.get_miner_addresses(hotkey)

# Remove each TEE address from the masa-tee-api
for address, _ in tee_addresses:
await self._remove_tee_worker_from_api(address, hotkey)

del self.connected_nodes[hotkey]
self.validator.routing_table.clear_miner(hotkey)

async def _remove_tee_worker_from_api(self, address: str, hotkey: str) -> bool:
"""
Remove a TEE worker from the MASA TEE API when a miner deregisters.

Args:
address: The TEE worker address to remove
hotkey: The hotkey of the deregistered miner (for logging)

Returns:
True if removal was successful, False otherwise
"""
masa_tee_api = os.getenv("MASA_TEE_API", "")
masa_tee_api_key = os.getenv("MASA_TEE_API_KEY", "")

if not masa_tee_api:
logger.debug(
f"MASA_TEE_API not configured, skipping TEE worker removal for {address}"
)
return False

if not masa_tee_api_key:
logger.debug(
f"MASA_TEE_API_KEY not configured, skipping TEE worker removal for {address}"
)
return False

try:
base_url = masa_tee_api.rstrip("/")
api_endpoint = f"{base_url}/remove-tee-worker"
payload = {"address": address}
headers = {"X-API-Key": masa_tee_api_key}

logger.info(
f"Removing TEE worker from MASA API: {address} (hotkey: {hotkey})"
)

response = await self.validator.http_client_manager.client.post(
api_endpoint, json=payload, headers=headers, timeout=10.0
)

if response.status_code == 200:
logger.info(
f"Successfully removed TEE worker from MASA API: {address}"
)
return True
elif response.status_code == 404:
logger.debug(
f"TEE worker not found in MASA API (already removed?): {address}"
)
return True
else:
logger.warning(
f"Failed to remove TEE worker from MASA API: "
f"{response.status_code} - {response.text}"
)
return False

except asyncio.CancelledError:
raise
except Exception as e:
logger.error(
f"Error removing TEE worker from MASA API: {address} - {str(e)}"
)
return False

async def send_custom_message(self, node_hotkey: str, message: str) -> None:
"""
Send a custom message to a specific miner.
Expand Down
4 changes: 2 additions & 2 deletions validator/weights.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def _get_delta_node_data(self, telemetry_data: List[NodeData]) -> List[NodeData]

# Extract platform metrics from delta stats
delta_platform_metrics = (
platform_manager.extract_platform_metrics_from_stats(
self.platform_manager.extract_platform_metrics_from_stats(
delta_stats_json
)
)
Expand Down Expand Up @@ -381,7 +381,7 @@ def _get_delta_node_data(self, telemetry_data: List[NodeData]) -> List[NodeData]
for (
platform_name,
platform_config,
) in platform_manager.get_all_platforms().items():
) in self.platform_manager.get_all_platforms().items():
for error_metric in platform_config.error_metrics:
raw_field = platform_config.get_raw_field_name(error_metric)
total_errors += delta_stats_json.get(raw_field, 0)
Expand Down