From 6e27f744ce97797a7819c83ddc19af584f8def22 Mon Sep 17 00:00:00 2001 From: arafat Date: Mon, 6 Apr 2026 12:01:36 +0530 Subject: [PATCH] HDDS-14927. Add Quasi-Closed Container Tracking in Recon. --- .../ozone/recon/ReconControllerModule.java | 2 + .../ozone/recon/api/ContainerEndpoint.java | 96 ++++++ .../types/QuasiClosedContainerMetadata.java | 100 ++++++ .../types/QuasiClosedContainersResponse.java | 54 ++++ .../recon/fsck/QuasiClosedContainerTask.java | 171 ++++++++++ .../QuasiClosedContainerSchemaManager.java | 297 ++++++++++++++++++ .../ReconStorageContainerManagerFacade.java | 15 +- 7 files changed, 734 insertions(+), 1 deletion(-) create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainerMetadata.java create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainersResponse.java create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/QuasiClosedContainerTask.java create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/QuasiClosedContainerSchemaManager.java diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java index 9a9dfb48e74b..5aabcf7a6c69 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java @@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.recon.heatmap.HeatMapServiceImpl; import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager; +import org.apache.hadoop.ozone.recon.persistence.QuasiClosedContainerSchemaManager; import org.apache.hadoop.ozone.recon.persistence.DataSourceConfiguration; import org.apache.hadoop.ozone.recon.persistence.JooqPersistenceModule; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; @@ -110,6 +111,7 @@ protected void configure() { bind(OMMetadataManager.class).to(ReconOmMetadataManagerImpl.class); bind(ContainerHealthSchemaManager.class).in(Singleton.class); + bind(QuasiClosedContainerSchemaManager.class).in(Singleton.class); bind(ReconContainerMetadataManager.class) .to(ReconContainerMetadataManagerImpl.class).in(Singleton.class); bind(ReconFileMetadataManager.class) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java index 4cf6ca85f6f7..02d57640a6b4 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java @@ -72,10 +72,14 @@ import org.apache.hadoop.ozone.recon.api.types.KeysResponse; import org.apache.hadoop.ozone.recon.api.types.MissingContainerMetadata; import org.apache.hadoop.ozone.recon.api.types.MissingContainersResponse; +import org.apache.hadoop.ozone.recon.api.types.QuasiClosedContainerMetadata; +import org.apache.hadoop.ozone.recon.api.types.QuasiClosedContainersResponse; import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainerMetadata; import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersResponse; import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersSummary; import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager; +import org.apache.hadoop.ozone.recon.persistence.QuasiClosedContainerSchemaManager; +import org.apache.hadoop.ozone.recon.persistence.QuasiClosedContainerSchemaManager.QuasiClosedContainerRecord; import org.apache.hadoop.ozone.recon.persistence.ContainerHistory; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.scm.ReconContainerManager; @@ -102,6 +106,7 @@ public class ContainerEndpoint { private final ReconContainerManager containerManager; private final PipelineManager pipelineManager; private final ContainerHealthSchemaManager containerHealthSchemaManager; + private final QuasiClosedContainerSchemaManager quasiClosedSchemaManager; private final ReconNamespaceSummaryManager reconNamespaceSummaryManager; private final OzoneStorageContainerManager reconSCM; private static final Logger LOG = @@ -143,6 +148,7 @@ public static DataFilter fromValue(String value) { @Inject public ContainerEndpoint(OzoneStorageContainerManager reconSCM, ContainerHealthSchemaManager containerHealthSchemaManager, + QuasiClosedContainerSchemaManager quasiClosedSchemaManager, ReconNamespaceSummaryManager reconNamespaceSummaryManager, ReconContainerMetadataManager reconContainerMetadataManager, ReconOMMetadataManager omMetadataManager) { @@ -150,6 +156,7 @@ public ContainerEndpoint(OzoneStorageContainerManager reconSCM, (ReconContainerManager) reconSCM.getContainerManager(); this.pipelineManager = reconSCM.getPipelineManager(); this.containerHealthSchemaManager = containerHealthSchemaManager; + this.quasiClosedSchemaManager = quasiClosedSchemaManager; this.reconNamespaceSummaryManager = reconNamespaceSummaryManager; this.reconSCM = reconSCM; this.reconContainerMetadataManager = reconContainerMetadataManager; @@ -812,4 +819,93 @@ public Response getOmContainersDeletedInSCM( response.put("containerDiscrepancyInfo", containerDiscrepancyInfoList); return Response.ok(response).build(); } + + // ───────────────────────────────────────────────────────────────────────── + // QUASI-CLOSED CONTAINER ENDPOINTS + // ───────────────────────────────────────────────────────────────────────── + + /** + * Returns all containers currently in QUASI_CLOSED lifecycle state with + * cursor-based forward pagination. + * + *

QUASI_CLOSED containers are those locally closed by a datanode + * (e.g. due to a pipeline failure) but not yet force-closed to CLOSED by SCM. + * Long-running QUASI_CLOSED containers may indicate stalled pipeline finalization. + * + * @param limit maximum containers to return (default: 1000) + * @param minContainerId cursor — return containers with ID > minContainerId + * @return paginated list of quasi-closed containers with replica history + */ + @GET + @Path("/quasiClosed") + public Response getQuasiClosedContainers( + @DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT) int limit, + @DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE) + @QueryParam(RECON_QUERY_PREVKEY) long minContainerId) { + return buildQuasiClosedResponse( + quasiClosedSchemaManager.getQuasiClosedContainers(minContainerId, limit)); + } + + /** + * Returns QUASI_CLOSED containers belonging to a specific pipeline. + * Useful for diagnosing which pipelines have the most stuck containers. + * + * @param pipelineId pipeline UUID to filter by + * @param limit maximum containers to return + * @param minContainerId forward-pagination cursor + * @return paginated list of quasi-closed containers for the given pipeline + */ + @GET + @Path("/quasiClosed/pipeline/{pipelineId}") + public Response getQuasiClosedContainersByPipeline( + @PathParam("pipelineId") String pipelineId, + @DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT) int limit, + @DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE) + @QueryParam(RECON_QUERY_PREVKEY) long minContainerId) { + if (pipelineId == null || pipelineId.isEmpty()) { + throw new WebApplicationException("pipelineId must not be empty", + Response.Status.BAD_REQUEST); + } + return buildQuasiClosedResponse( + quasiClosedSchemaManager.getByPipeline(pipelineId, minContainerId, limit)); + } + + /** + * Returns summary count of QUASI_CLOSED containers. + */ + @GET + @Path("/quasiClosed/summary") + public Response getQuasiClosedSummary() { + long count = quasiClosedSchemaManager.getCount(); + Map summary = new HashMap<>(); + summary.put("quasiClosedCount", count); + return Response.ok(summary).build(); + } + + private Response buildQuasiClosedResponse(List records) { + List metaList = new ArrayList<>(); + long lastKey = 0L; + for (QuasiClosedContainerRecord rec : records) { + long containerId = rec.getContainerId(); + List replicas = + containerManager.getAllContainerHistory(containerId); + metaList.add(new QuasiClosedContainerMetadata( + containerId, + rec.getPipelineId(), + rec.getDatanodeCount(), + rec.getKeyCount(), + rec.getDataSize(), + rec.getReplicationType(), + rec.getReplicationFactor(), + rec.getStateEnterTime(), + rec.getFirstSeenTime(), + rec.getLastScanTime(), + replicas)); + lastKey = Math.max(lastKey, containerId); + } + long totalCount = quasiClosedSchemaManager.getCount(); + QuasiClosedContainersResponse response = + new QuasiClosedContainersResponse(totalCount, metaList, lastKey); + return Response.ok(response).build(); + } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainerMetadata.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainerMetadata.java new file mode 100644 index 000000000000..8e0ba06ba7ae --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainerMetadata.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.fasterxml.jackson.annotation.JsonInclude; +import java.util.List; +import org.apache.hadoop.ozone.recon.persistence.ContainerHistory; + +/** + * JSON response DTO for a single QUASI_CLOSED container. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class QuasiClosedContainerMetadata { + + private long containerID; + private String pipelineID; + private int datanodeCount; + private long keyCount; + private long dataSize; + private String replicationType; + private int replicationFactor; + /** When the container entered QUASI_CLOSED state according to SCM. */ + private long stateEnterTime; + /** When Recon first observed this container as QUASI_CLOSED. */ + private long firstSeenTime; + /** When Recon last confirmed this container is still QUASI_CLOSED. */ + private long lastScanTime; + /** Current datanode replicas — reuses the existing ContainerHistory type. */ + private List replicas; + + public QuasiClosedContainerMetadata() { + } + + public QuasiClosedContainerMetadata( + long containerID, String pipelineID, int datanodeCount, + long keyCount, long dataSize, String replicationType, + int replicationFactor, long stateEnterTime, + long firstSeenTime, long lastScanTime, + List replicas) { + this.containerID = containerID; + this.pipelineID = pipelineID; + this.datanodeCount = datanodeCount; + this.keyCount = keyCount; + this.dataSize = dataSize; + this.replicationType = replicationType; + this.replicationFactor = replicationFactor; + this.stateEnterTime = stateEnterTime; + this.firstSeenTime = firstSeenTime; + this.lastScanTime = lastScanTime; + this.replicas = replicas; + } + + public long getContainerID() { return containerID; } + public void setContainerID(long containerID) { this.containerID = containerID; } + + public String getPipelineID() { return pipelineID; } + public void setPipelineID(String pipelineID) { this.pipelineID = pipelineID; } + + public int getDatanodeCount() { return datanodeCount; } + public void setDatanodeCount(int datanodeCount) { this.datanodeCount = datanodeCount; } + + public long getKeyCount() { return keyCount; } + public void setKeyCount(long keyCount) { this.keyCount = keyCount; } + + public long getDataSize() { return dataSize; } + public void setDataSize(long dataSize) { this.dataSize = dataSize; } + + public String getReplicationType() { return replicationType; } + public void setReplicationType(String replicationType) { this.replicationType = replicationType; } + + public int getReplicationFactor() { return replicationFactor; } + public void setReplicationFactor(int replicationFactor) { this.replicationFactor = replicationFactor; } + + public long getStateEnterTime() { return stateEnterTime; } + public void setStateEnterTime(long stateEnterTime) { this.stateEnterTime = stateEnterTime; } + + public long getFirstSeenTime() { return firstSeenTime; } + public void setFirstSeenTime(long firstSeenTime) { this.firstSeenTime = firstSeenTime; } + + public long getLastScanTime() { return lastScanTime; } + public void setLastScanTime(long lastScanTime) { this.lastScanTime = lastScanTime; } + + public List getReplicas() { return replicas; } + public void setReplicas(List replicas) { this.replicas = replicas; } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainersResponse.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainersResponse.java new file mode 100644 index 000000000000..79b65d417972 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainersResponse.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import java.util.ArrayList; +import java.util.List; + +/** + * Wrapper response for the quasi-closed containers API. + */ +public class QuasiClosedContainersResponse { + + private long totalCount; + private List containers; + /** The highest container ID returned — for forward-pagination cursor. */ + private long lastKey; + + public QuasiClosedContainersResponse() { + this.containers = new ArrayList<>(); + } + + public QuasiClosedContainersResponse( + long totalCount, List containers, long lastKey) { + this.totalCount = totalCount; + this.containers = containers; + this.lastKey = lastKey; + } + + public long getTotalCount() { return totalCount; } + public void setTotalCount(long totalCount) { this.totalCount = totalCount; } + + public List getContainers() { return containers; } + public void setContainers(List containers) { + this.containers = containers; + } + + public long getLastKey() { return lastKey; } + public void setLastKey(long lastKey) { this.lastKey = lastKey; } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/QuasiClosedContainerTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/QuasiClosedContainerTask.java new file mode 100644 index 000000000000..f1326b127368 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/QuasiClosedContainerTask.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.ozone.recon.persistence.QuasiClosedContainerSchemaManager; +import org.apache.hadoop.ozone.recon.persistence.QuasiClosedContainerSchemaManager.QuasiClosedContainerRecord; +import org.apache.hadoop.ozone.recon.scm.ReconContainerManager; +import org.apache.hadoop.ozone.recon.scm.ReconScmTask; +import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig; +import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Background task that periodically scans for QUASI_CLOSED containers and + * persists them to the {@code QUASI_CLOSED_CONTAINERS} table. + * + *

Detection strategy: calls + * {@link ReconContainerManager#getContainers(LifeCycleState)} with + * {@code QUASI_CLOSED} directly — O(q) where q is the number of QUASI_CLOSED + * containers, not the entire cluster. This avoids running the full + * ReplicationManager handler chain just to extract lifecycle state. + * + *

first_seen_time preservation: the task reads the previous + * first_seen_time for containers that were already tracked, and keeps that + * value so operators can see how long a container has been stuck. + */ +public class QuasiClosedContainerTask extends ReconScmTask { + + private static final Logger LOG = + LoggerFactory.getLogger(QuasiClosedContainerTask.class); + + /** Minimum gap between two consecutive task runs regardless of configured interval. */ + private static final long MIN_NEXT_RUN_INTERVAL_MS = 60_000L; + + private final ReconContainerManager containerManager; + private final QuasiClosedContainerSchemaManager schemaManager; + private final long intervalMs; + + public QuasiClosedContainerTask( + ReconContainerManager containerManager, + QuasiClosedContainerSchemaManager schemaManager, + ReconTaskConfig reconTaskConfig, + ReconTaskStatusUpdaterManager taskStatusUpdaterManager) { + super(taskStatusUpdaterManager); + this.containerManager = containerManager; + this.schemaManager = schemaManager; + // Reuse the missingContainer interval as the default; operators can + // override via ozone.recon.task.missingcontainer.interval + this.intervalMs = reconTaskConfig.getMissingContainerTaskInterval().toMillis(); + LOG.info("Initialized QuasiClosedContainerTask with interval={}ms", intervalMs); + } + + @Override + protected void run() { + while (canRun()) { + long cycleStart = Time.monotonicNow(); + try { + initializeAndRunTask(); + long elapsed = Time.monotonicNow() - cycleStart; + long sleepMs = Math.max(MIN_NEXT_RUN_INTERVAL_MS, intervalMs - elapsed); + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.info("QuasiClosedContainerTask interrupted"); + break; + } catch (Exception e) { + LOG.error("Error in QuasiClosedContainerTask", e); + } + } + } + + @Override + protected void runTask() throws Exception { + long start = Time.monotonicNow(); + LOG.info("QuasiClosedContainerTask starting scan"); + + // 1. Fetch all QUASI_CLOSED containers from the ContainerManager. + // This is O(q) — only containers in QUASI_CLOSED state, not the whole cluster. + List qcContainers = + containerManager.getContainers(LifeCycleState.QUASI_CLOSED); + + LOG.info("Found {} containers in QUASI_CLOSED state", qcContainers.size()); + + // 2. Collect container IDs for first_seen_time preservation + List containerIds = new ArrayList<>(qcContainers.size()); + for (ContainerInfo ci : qcContainers) { + containerIds.add(ci.getContainerID()); + } + + // 3. Load existing first_seen_times from DB before replace + Map existingFirstSeen = schemaManager.getExistingFirstSeenTimes(containerIds); + long now = Time.now(); + + // 4. Build records + List records = new ArrayList<>(qcContainers.size()); + for (ContainerInfo ci : qcContainers) { + long containerId = ci.getContainerID(); + + int datanodeCount = 0; + try { + Set replicas = + containerManager.getContainerReplicas(ContainerID.valueOf(containerId)); + datanodeCount = replicas.size(); + } catch (IOException e) { + LOG.warn("Failed to get replicas for container {}", containerId, e); + } + + long stateEnterTime = ci.getStateEnterTime() != null + ? ci.getStateEnterTime().toEpochMilli() : now; + + // Preserve original first_seen_time if we already knew about this container + long firstSeenTime = existingFirstSeen.getOrDefault(containerId, now); + + String pipelineId = ci.getPipelineID() != null + ? ci.getPipelineID().getId().toString() : null; + + int requiredNodes; + try { + requiredNodes = ci.getReplicationConfig().getRequiredNodes(); + } catch (Exception e) { + requiredNodes = 0; + } + + records.add(new QuasiClosedContainerRecord( + containerId, + pipelineId, + datanodeCount, + ci.getNumberOfKeys(), + ci.getUsedBytes(), + ci.getReplicationType().name(), + requiredNodes, + stateEnterTime, + firstSeenTime, + now // last_scan_time always updated + )); + } + + // 5. Atomically replace all records in DB + schemaManager.replaceAll(records); + + long elapsed = Time.monotonicNow() - start; + LOG.info("QuasiClosedContainerTask completed: {} containers in {} ms", + records.size(), elapsed); + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/QuasiClosedContainerSchemaManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/QuasiClosedContainerSchemaManager.java new file mode 100644 index 000000000000..32be586f6058 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/QuasiClosedContainerSchemaManager.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.persistence; + +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.name; +import static org.jooq.impl.DSL.table; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.jooq.Configuration; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Record; +import org.jooq.SelectQuery; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages CRUD operations for the QUASI_CLOSED_CONTAINERS table. + * + *

This manager tracks containers in the QUASI_CLOSED lifecycle state. + * Unlike the UNHEALTHY_CONTAINERS table which tracks replication health, + * this table tracks lifecycle state — containers that have been locally + * closed by a datanode but not yet force-closed to CLOSED by SCM. + */ +@Singleton +public class QuasiClosedContainerSchemaManager { + + private static final Logger LOG = + LoggerFactory.getLogger(QuasiClosedContainerSchemaManager.class); + + public static final String QUASI_CLOSED_CONTAINERS_TABLE_NAME = + "QUASI_CLOSED_CONTAINERS"; + + static final int BATCH_INSERT_CHUNK_SIZE = 1_000; + + // Column definitions + private static final Field COL_CONTAINER_ID = + field(name("container_id"), SQLDataType.BIGINT); + private static final Field COL_PIPELINE_ID = + field(name("pipeline_id"), SQLDataType.VARCHAR(64)); + private static final Field COL_DATANODE_COUNT = + field(name("datanode_count"), SQLDataType.INTEGER); + private static final Field COL_KEY_COUNT = + field(name("key_count"), SQLDataType.BIGINT); + private static final Field COL_DATA_SIZE = + field(name("data_size"), SQLDataType.BIGINT); + private static final Field COL_REPLICATION_TYPE = + field(name("replication_type"), SQLDataType.VARCHAR(16)); + private static final Field COL_REPLICATION_FACTOR = + field(name("replication_factor"), SQLDataType.INTEGER); + private static final Field COL_STATE_ENTER_TIME = + field(name("state_enter_time"), SQLDataType.BIGINT); + private static final Field COL_FIRST_SEEN_TIME = + field(name("first_seen_time"), SQLDataType.BIGINT); + private static final Field COL_LAST_SCAN_TIME = + field(name("last_scan_time"), SQLDataType.BIGINT); + + private final Configuration jooqConfiguration; + + @Inject + public QuasiClosedContainerSchemaManager(Configuration jooqConfiguration) { + this.jooqConfiguration = jooqConfiguration; + } + + private DSLContext dsl() { + return DSL.using(jooqConfiguration); + } + + /** + * Atomically replaces all quasi-closed container records. + * Deletes all existing rows and inserts the new set in a single transaction. + * + * @param records new records to insert (may be empty to clear the table) + */ + public void replaceAll(List records) { + dsl().transaction(configuration -> { + DSLContext txCtx = DSL.using(configuration); + // 1. Delete all existing records + txCtx.deleteFrom(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME)).execute(); + // 2. Batch-insert new records + if (records != null && !records.isEmpty()) { + batchInsert(txCtx, records); + } + }); + LOG.debug("Replaced {} quasi-closed container records", + records == null ? 0 : records.size()); + } + + private void batchInsert(DSLContext txCtx, List records) { + for (int from = 0; from < records.size(); from += BATCH_INSERT_CHUNK_SIZE) { + int to = Math.min(from + BATCH_INSERT_CHUNK_SIZE, records.size()); + List chunk = records.subList(from, to); + + // Build a multi-row INSERT using jOOQ's fluent API + org.jooq.InsertValuesStep10 insert = + txCtx.insertInto(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME), + COL_CONTAINER_ID, COL_PIPELINE_ID, COL_DATANODE_COUNT, + COL_KEY_COUNT, COL_DATA_SIZE, COL_REPLICATION_TYPE, + COL_REPLICATION_FACTOR, COL_STATE_ENTER_TIME, + COL_FIRST_SEEN_TIME, COL_LAST_SCAN_TIME); + + for (QuasiClosedContainerRecord rec : chunk) { + insert = insert.values( + rec.getContainerId(), + rec.getPipelineId(), + rec.getDatanodeCount(), + rec.getKeyCount(), + rec.getDataSize(), + rec.getReplicationType(), + rec.getReplicationFactor(), + rec.getStateEnterTime(), + rec.getFirstSeenTime(), + rec.getLastScanTime()); + } + insert.execute(); + } + } + + /** + * Returns first_seen_time values for containers already tracked. + * Used to preserve the timestamp across scan cycles. + * + * @param containerIds list of container IDs to look up + * @return map of containerId to first_seen_time + */ + public Map getExistingFirstSeenTimes(List containerIds) { + Map result = new HashMap<>(); + if (containerIds == null || containerIds.isEmpty()) { + return result; + } + try { + dsl().select(COL_CONTAINER_ID, COL_FIRST_SEEN_TIME) + .from(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME)) + .where(COL_CONTAINER_ID.in(containerIds)) + .forEach(r -> result.put( + r.get(COL_CONTAINER_ID), + r.get(COL_FIRST_SEEN_TIME))); + } catch (Exception e) { + LOG.warn("Failed to load existing first_seen_time values. " + + "Will use current time for all.", e); + } + return result; + } + + /** + * Paginated forward read of all quasi-closed containers. + * + * @param minContainerId cursor — only containers with ID > minContainerId + * @param limit max rows to return + * @return list of records in ascending container_id order + */ + public List getQuasiClosedContainers( + long minContainerId, int limit) { + SelectQuery query = dsl().selectQuery(); + query.addFrom(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME)); + query.addConditions(COL_CONTAINER_ID.greaterThan(minContainerId)); + query.addOrderBy(COL_CONTAINER_ID.asc()); + query.addLimit(limit); + query.fetchSize(limit); + return fetchRecords(query); + } + + /** + * Paginated read of quasi-closed containers for a specific pipeline. + * + * @param pipelineId filter by pipeline + * @param minContainerId cursor + * @param limit max rows + * @return list of records + */ + public List getByPipeline( + String pipelineId, long minContainerId, int limit) { + SelectQuery query = dsl().selectQuery(); + query.addFrom(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME)); + query.addConditions( + COL_PIPELINE_ID.eq(pipelineId) + .and(COL_CONTAINER_ID.greaterThan(minContainerId))); + query.addOrderBy(COL_CONTAINER_ID.asc()); + query.addLimit(limit); + query.fetchSize(limit); + return fetchRecords(query); + } + + /** + * Returns total count of quasi-closed containers currently tracked. + */ + public long getCount() { + try { + return dsl().fetchCount(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME)); + } catch (Exception e) { + LOG.error("Failed to count QUASI_CLOSED_CONTAINERS", e); + return 0L; + } + } + + private List fetchRecords(SelectQuery query) { + List result = new ArrayList<>(); + try { + query.fetch().forEach(r -> result.add(new QuasiClosedContainerRecord( + r.get(COL_CONTAINER_ID), + r.get(COL_PIPELINE_ID), + r.get(COL_DATANODE_COUNT), + r.get(COL_KEY_COUNT), + r.get(COL_DATA_SIZE), + r.get(COL_REPLICATION_TYPE), + r.get(COL_REPLICATION_FACTOR), + r.get(COL_STATE_ENTER_TIME), + r.get(COL_FIRST_SEEN_TIME), + r.get(COL_LAST_SCAN_TIME)))); + } catch (Exception e) { + LOG.error("Failed to query QUASI_CLOSED_CONTAINERS", e); + } + return result; + } + + @VisibleForTesting + public void clearAll() { + try { + dsl().deleteFrom(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME)).execute(); + } catch (Exception e) { + LOG.error("Failed to clear QUASI_CLOSED_CONTAINERS", e); + } + } + + // ────────────────────────────────────────────────────────────────────────── + // POJO + // ────────────────────────────────────────────────────────────────────────── + + /** + * Immutable record representing one row in QUASI_CLOSED_CONTAINERS. + */ + public static final class QuasiClosedContainerRecord { + private final long containerId; + private final String pipelineId; + private final int datanodeCount; + private final long keyCount; + private final long dataSize; + private final String replicationType; + private final int replicationFactor; + private final long stateEnterTime; + private final long firstSeenTime; + private final long lastScanTime; + + public QuasiClosedContainerRecord( + long containerId, String pipelineId, int datanodeCount, + long keyCount, long dataSize, String replicationType, + int replicationFactor, long stateEnterTime, + long firstSeenTime, long lastScanTime) { + this.containerId = containerId; + this.pipelineId = pipelineId; + this.datanodeCount = datanodeCount; + this.keyCount = keyCount; + this.dataSize = dataSize; + this.replicationType = replicationType; + this.replicationFactor = replicationFactor; + this.stateEnterTime = stateEnterTime; + this.firstSeenTime = firstSeenTime; + this.lastScanTime = lastScanTime; + } + + public long getContainerId() { return containerId; } + public String getPipelineId() { return pipelineId; } + public int getDatanodeCount() { return datanodeCount; } + public long getKeyCount() { return keyCount; } + public long getDataSize() { return dataSize; } + public String getReplicationType() { return replicationType; } + public int getReplicationFactor() { return replicationFactor; } + public long getStateEnterTime() { return stateEnterTime; } + public long getFirstSeenTime() { return firstSeenTime; } + public long getLastScanTime() { return lastScanTime; } + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index 278bac0011dc..a142bc51be8e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -112,9 +112,11 @@ import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; import org.apache.hadoop.ozone.recon.ReconUtils; import org.apache.hadoop.ozone.recon.fsck.ContainerHealthTask; +import org.apache.hadoop.ozone.recon.fsck.QuasiClosedContainerTask; import org.apache.hadoop.ozone.recon.fsck.ReconReplicationManager; import org.apache.hadoop.ozone.recon.fsck.ReconSafeModeMgrTask; import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager; +import org.apache.hadoop.ozone.recon.persistence.QuasiClosedContainerSchemaManager; import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; import org.apache.hadoop.ozone.recon.tasks.ContainerSizeCountTask; @@ -164,6 +166,7 @@ public class ReconStorageContainerManagerFacade private ContainerSizeCountTask containerSizeCountTask; private ContainerCountBySizeDao containerCountBySizeDao; private ReconReplicationManager reconReplicationManager; + private QuasiClosedContainerTask quasiClosedContainerTask; private AtomicBoolean isSyncDataFromSCMRunning; private final String threadNamePrefix; @@ -182,7 +185,8 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf, ReconContext reconContext, DataSource dataSource, ReconTaskStatusUpdaterManager taskStatusUpdaterManager, - ContainerHealthSchemaManager containerHealthSchemaManager) + ContainerHealthSchemaManager containerHealthSchemaManager, + QuasiClosedContainerSchemaManager quasiClosedSchemaManager) throws IOException { reconNodeDetails = reconUtils.getReconNodeDetails(conf); this.threadNamePrefix = reconNodeDetails.threadNamePrefix(); @@ -377,6 +381,15 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf, reconScmTasks.add(pipelineSyncTask); reconScmTasks.add(containerHealthTask); reconScmTasks.add(containerSizeCountTask); + + // Create QuasiClosedContainerTask + LOG.info("Creating QuasiClosedContainerTask"); + this.quasiClosedContainerTask = new QuasiClosedContainerTask( + containerManager, + quasiClosedSchemaManager, + reconTaskConfig, + taskStatusUpdaterManager); + reconScmTasks.add(quasiClosedContainerTask); reconSafeModeMgrTask = new ReconSafeModeMgrTask( containerManager, nodeManager, safeModeManager, reconTaskConfig, ozoneConfiguration);