diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index 9a9dfb48e74b..5aabcf7a6c69 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.recon.heatmap.HeatMapServiceImpl;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
+import org.apache.hadoop.ozone.recon.persistence.QuasiClosedContainerSchemaManager;
import org.apache.hadoop.ozone.recon.persistence.DataSourceConfiguration;
import org.apache.hadoop.ozone.recon.persistence.JooqPersistenceModule;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
@@ -110,6 +111,7 @@ protected void configure() {
bind(OMMetadataManager.class).to(ReconOmMetadataManagerImpl.class);
bind(ContainerHealthSchemaManager.class).in(Singleton.class);
+ bind(QuasiClosedContainerSchemaManager.class).in(Singleton.class);
bind(ReconContainerMetadataManager.class)
.to(ReconContainerMetadataManagerImpl.class).in(Singleton.class);
bind(ReconFileMetadataManager.class)
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
index 4cf6ca85f6f7..02d57640a6b4 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
@@ -72,10 +72,14 @@
import org.apache.hadoop.ozone.recon.api.types.KeysResponse;
import org.apache.hadoop.ozone.recon.api.types.MissingContainerMetadata;
import org.apache.hadoop.ozone.recon.api.types.MissingContainersResponse;
+import org.apache.hadoop.ozone.recon.api.types.QuasiClosedContainerMetadata;
+import org.apache.hadoop.ozone.recon.api.types.QuasiClosedContainersResponse;
import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainerMetadata;
import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersResponse;
import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersSummary;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
+import org.apache.hadoop.ozone.recon.persistence.QuasiClosedContainerSchemaManager;
+import org.apache.hadoop.ozone.recon.persistence.QuasiClosedContainerSchemaManager.QuasiClosedContainerRecord;
import org.apache.hadoop.ozone.recon.persistence.ContainerHistory;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.scm.ReconContainerManager;
@@ -102,6 +106,7 @@ public class ContainerEndpoint {
private final ReconContainerManager containerManager;
private final PipelineManager pipelineManager;
private final ContainerHealthSchemaManager containerHealthSchemaManager;
+ private final QuasiClosedContainerSchemaManager quasiClosedSchemaManager;
private final ReconNamespaceSummaryManager reconNamespaceSummaryManager;
private final OzoneStorageContainerManager reconSCM;
private static final Logger LOG =
@@ -143,6 +148,7 @@ public static DataFilter fromValue(String value) {
@Inject
public ContainerEndpoint(OzoneStorageContainerManager reconSCM,
ContainerHealthSchemaManager containerHealthSchemaManager,
+ QuasiClosedContainerSchemaManager quasiClosedSchemaManager,
ReconNamespaceSummaryManager reconNamespaceSummaryManager,
ReconContainerMetadataManager reconContainerMetadataManager,
ReconOMMetadataManager omMetadataManager) {
@@ -150,6 +156,7 @@ public ContainerEndpoint(OzoneStorageContainerManager reconSCM,
(ReconContainerManager) reconSCM.getContainerManager();
this.pipelineManager = reconSCM.getPipelineManager();
this.containerHealthSchemaManager = containerHealthSchemaManager;
+ this.quasiClosedSchemaManager = quasiClosedSchemaManager;
this.reconNamespaceSummaryManager = reconNamespaceSummaryManager;
this.reconSCM = reconSCM;
this.reconContainerMetadataManager = reconContainerMetadataManager;
@@ -812,4 +819,93 @@ public Response getOmContainersDeletedInSCM(
response.put("containerDiscrepancyInfo", containerDiscrepancyInfoList);
return Response.ok(response).build();
}
+
+ // ─────────────────────────────────────────────────────────────────────────
+ // QUASI-CLOSED CONTAINER ENDPOINTS
+ // ─────────────────────────────────────────────────────────────────────────
+
+ /**
+ * Returns all containers currently in QUASI_CLOSED lifecycle state with
+ * cursor-based forward pagination.
+ *
+ *
QUASI_CLOSED containers are those locally closed by a datanode
+ * (e.g. due to a pipeline failure) but not yet force-closed to CLOSED by SCM.
+ * Long-running QUASI_CLOSED containers may indicate stalled pipeline finalization.
+ *
+ * @param limit maximum containers to return (default: 1000)
+ * @param minContainerId cursor — return containers with ID > minContainerId
+ * @return paginated list of quasi-closed containers with replica history
+ */
+ @GET
+ @Path("/quasiClosed")
+ public Response getQuasiClosedContainers(
+ @DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT) int limit,
+ @DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
+ @QueryParam(RECON_QUERY_PREVKEY) long minContainerId) {
+ return buildQuasiClosedResponse(
+ quasiClosedSchemaManager.getQuasiClosedContainers(minContainerId, limit));
+ }
+
+ /**
+ * Returns QUASI_CLOSED containers belonging to a specific pipeline.
+ * Useful for diagnosing which pipelines have the most stuck containers.
+ *
+ * @param pipelineId pipeline UUID to filter by
+ * @param limit maximum containers to return
+ * @param minContainerId forward-pagination cursor
+ * @return paginated list of quasi-closed containers for the given pipeline
+ */
+ @GET
+ @Path("/quasiClosed/pipeline/{pipelineId}")
+ public Response getQuasiClosedContainersByPipeline(
+ @PathParam("pipelineId") String pipelineId,
+ @DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT) int limit,
+ @DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
+ @QueryParam(RECON_QUERY_PREVKEY) long minContainerId) {
+ if (pipelineId == null || pipelineId.isEmpty()) {
+ throw new WebApplicationException("pipelineId must not be empty",
+ Response.Status.BAD_REQUEST);
+ }
+ return buildQuasiClosedResponse(
+ quasiClosedSchemaManager.getByPipeline(pipelineId, minContainerId, limit));
+ }
+
+ /**
+ * Returns summary count of QUASI_CLOSED containers.
+ */
+ @GET
+ @Path("/quasiClosed/summary")
+ public Response getQuasiClosedSummary() {
+ long count = quasiClosedSchemaManager.getCount();
+ Map summary = new HashMap<>();
+ summary.put("quasiClosedCount", count);
+ return Response.ok(summary).build();
+ }
+
+ private Response buildQuasiClosedResponse(List records) {
+ List metaList = new ArrayList<>();
+ long lastKey = 0L;
+ for (QuasiClosedContainerRecord rec : records) {
+ long containerId = rec.getContainerId();
+ List replicas =
+ containerManager.getAllContainerHistory(containerId);
+ metaList.add(new QuasiClosedContainerMetadata(
+ containerId,
+ rec.getPipelineId(),
+ rec.getDatanodeCount(),
+ rec.getKeyCount(),
+ rec.getDataSize(),
+ rec.getReplicationType(),
+ rec.getReplicationFactor(),
+ rec.getStateEnterTime(),
+ rec.getFirstSeenTime(),
+ rec.getLastScanTime(),
+ replicas));
+ lastKey = Math.max(lastKey, containerId);
+ }
+ long totalCount = quasiClosedSchemaManager.getCount();
+ QuasiClosedContainersResponse response =
+ new QuasiClosedContainersResponse(totalCount, metaList, lastKey);
+ return Response.ok(response).build();
+ }
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainerMetadata.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainerMetadata.java
new file mode 100644
index 000000000000..8e0ba06ba7ae
--- /dev/null
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainerMetadata.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import java.util.List;
+import org.apache.hadoop.ozone.recon.persistence.ContainerHistory;
+
+/**
+ * JSON response DTO for a single QUASI_CLOSED container.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class QuasiClosedContainerMetadata {
+
+ private long containerID;
+ private String pipelineID;
+ private int datanodeCount;
+ private long keyCount;
+ private long dataSize;
+ private String replicationType;
+ private int replicationFactor;
+ /** When the container entered QUASI_CLOSED state according to SCM. */
+ private long stateEnterTime;
+ /** When Recon first observed this container as QUASI_CLOSED. */
+ private long firstSeenTime;
+ /** When Recon last confirmed this container is still QUASI_CLOSED. */
+ private long lastScanTime;
+ /** Current datanode replicas — reuses the existing ContainerHistory type. */
+ private List replicas;
+
+ public QuasiClosedContainerMetadata() {
+ }
+
+ public QuasiClosedContainerMetadata(
+ long containerID, String pipelineID, int datanodeCount,
+ long keyCount, long dataSize, String replicationType,
+ int replicationFactor, long stateEnterTime,
+ long firstSeenTime, long lastScanTime,
+ List replicas) {
+ this.containerID = containerID;
+ this.pipelineID = pipelineID;
+ this.datanodeCount = datanodeCount;
+ this.keyCount = keyCount;
+ this.dataSize = dataSize;
+ this.replicationType = replicationType;
+ this.replicationFactor = replicationFactor;
+ this.stateEnterTime = stateEnterTime;
+ this.firstSeenTime = firstSeenTime;
+ this.lastScanTime = lastScanTime;
+ this.replicas = replicas;
+ }
+
+ public long getContainerID() { return containerID; }
+ public void setContainerID(long containerID) { this.containerID = containerID; }
+
+ public String getPipelineID() { return pipelineID; }
+ public void setPipelineID(String pipelineID) { this.pipelineID = pipelineID; }
+
+ public int getDatanodeCount() { return datanodeCount; }
+ public void setDatanodeCount(int datanodeCount) { this.datanodeCount = datanodeCount; }
+
+ public long getKeyCount() { return keyCount; }
+ public void setKeyCount(long keyCount) { this.keyCount = keyCount; }
+
+ public long getDataSize() { return dataSize; }
+ public void setDataSize(long dataSize) { this.dataSize = dataSize; }
+
+ public String getReplicationType() { return replicationType; }
+ public void setReplicationType(String replicationType) { this.replicationType = replicationType; }
+
+ public int getReplicationFactor() { return replicationFactor; }
+ public void setReplicationFactor(int replicationFactor) { this.replicationFactor = replicationFactor; }
+
+ public long getStateEnterTime() { return stateEnterTime; }
+ public void setStateEnterTime(long stateEnterTime) { this.stateEnterTime = stateEnterTime; }
+
+ public long getFirstSeenTime() { return firstSeenTime; }
+ public void setFirstSeenTime(long firstSeenTime) { this.firstSeenTime = firstSeenTime; }
+
+ public long getLastScanTime() { return lastScanTime; }
+ public void setLastScanTime(long lastScanTime) { this.lastScanTime = lastScanTime; }
+
+ public List getReplicas() { return replicas; }
+ public void setReplicas(List replicas) { this.replicas = replicas; }
+}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainersResponse.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainersResponse.java
new file mode 100644
index 000000000000..79b65d417972
--- /dev/null
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/QuasiClosedContainersResponse.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Wrapper response for the quasi-closed containers API.
+ */
+public class QuasiClosedContainersResponse {
+
+ private long totalCount;
+ private List containers;
+ /** The highest container ID returned — for forward-pagination cursor. */
+ private long lastKey;
+
+ public QuasiClosedContainersResponse() {
+ this.containers = new ArrayList<>();
+ }
+
+ public QuasiClosedContainersResponse(
+ long totalCount, List containers, long lastKey) {
+ this.totalCount = totalCount;
+ this.containers = containers;
+ this.lastKey = lastKey;
+ }
+
+ public long getTotalCount() { return totalCount; }
+ public void setTotalCount(long totalCount) { this.totalCount = totalCount; }
+
+ public List getContainers() { return containers; }
+ public void setContainers(List containers) {
+ this.containers = containers;
+ }
+
+ public long getLastKey() { return lastKey; }
+ public void setLastKey(long lastKey) { this.lastKey = lastKey; }
+}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/QuasiClosedContainerTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/QuasiClosedContainerTask.java
new file mode 100644
index 000000000000..f1326b127368
--- /dev/null
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/QuasiClosedContainerTask.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.fsck;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.ozone.recon.persistence.QuasiClosedContainerSchemaManager;
+import org.apache.hadoop.ozone.recon.persistence.QuasiClosedContainerSchemaManager.QuasiClosedContainerRecord;
+import org.apache.hadoop.ozone.recon.scm.ReconContainerManager;
+import org.apache.hadoop.ozone.recon.scm.ReconScmTask;
+import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
+import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background task that periodically scans for QUASI_CLOSED containers and
+ * persists them to the {@code QUASI_CLOSED_CONTAINERS} table.
+ *
+ * Detection strategy: calls
+ * {@link ReconContainerManager#getContainers(LifeCycleState)} with
+ * {@code QUASI_CLOSED} directly — O(q) where q is the number of QUASI_CLOSED
+ * containers, not the entire cluster. This avoids running the full
+ * ReplicationManager handler chain just to extract lifecycle state.
+ *
+ *
first_seen_time preservation: the task reads the previous
+ * first_seen_time for containers that were already tracked, and keeps that
+ * value so operators can see how long a container has been stuck.
+ */
+public class QuasiClosedContainerTask extends ReconScmTask {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(QuasiClosedContainerTask.class);
+
+ /** Minimum gap between two consecutive task runs regardless of configured interval. */
+ private static final long MIN_NEXT_RUN_INTERVAL_MS = 60_000L;
+
+ private final ReconContainerManager containerManager;
+ private final QuasiClosedContainerSchemaManager schemaManager;
+ private final long intervalMs;
+
+ public QuasiClosedContainerTask(
+ ReconContainerManager containerManager,
+ QuasiClosedContainerSchemaManager schemaManager,
+ ReconTaskConfig reconTaskConfig,
+ ReconTaskStatusUpdaterManager taskStatusUpdaterManager) {
+ super(taskStatusUpdaterManager);
+ this.containerManager = containerManager;
+ this.schemaManager = schemaManager;
+ // Reuse the missingContainer interval as the default; operators can
+ // override via ozone.recon.task.missingcontainer.interval
+ this.intervalMs = reconTaskConfig.getMissingContainerTaskInterval().toMillis();
+ LOG.info("Initialized QuasiClosedContainerTask with interval={}ms", intervalMs);
+ }
+
+ @Override
+ protected void run() {
+ while (canRun()) {
+ long cycleStart = Time.monotonicNow();
+ try {
+ initializeAndRunTask();
+ long elapsed = Time.monotonicNow() - cycleStart;
+ long sleepMs = Math.max(MIN_NEXT_RUN_INTERVAL_MS, intervalMs - elapsed);
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.info("QuasiClosedContainerTask interrupted");
+ break;
+ } catch (Exception e) {
+ LOG.error("Error in QuasiClosedContainerTask", e);
+ }
+ }
+ }
+
+ @Override
+ protected void runTask() throws Exception {
+ long start = Time.monotonicNow();
+ LOG.info("QuasiClosedContainerTask starting scan");
+
+ // 1. Fetch all QUASI_CLOSED containers from the ContainerManager.
+ // This is O(q) — only containers in QUASI_CLOSED state, not the whole cluster.
+ List qcContainers =
+ containerManager.getContainers(LifeCycleState.QUASI_CLOSED);
+
+ LOG.info("Found {} containers in QUASI_CLOSED state", qcContainers.size());
+
+ // 2. Collect container IDs for first_seen_time preservation
+ List containerIds = new ArrayList<>(qcContainers.size());
+ for (ContainerInfo ci : qcContainers) {
+ containerIds.add(ci.getContainerID());
+ }
+
+ // 3. Load existing first_seen_times from DB before replace
+ Map existingFirstSeen = schemaManager.getExistingFirstSeenTimes(containerIds);
+ long now = Time.now();
+
+ // 4. Build records
+ List records = new ArrayList<>(qcContainers.size());
+ for (ContainerInfo ci : qcContainers) {
+ long containerId = ci.getContainerID();
+
+ int datanodeCount = 0;
+ try {
+ Set replicas =
+ containerManager.getContainerReplicas(ContainerID.valueOf(containerId));
+ datanodeCount = replicas.size();
+ } catch (IOException e) {
+ LOG.warn("Failed to get replicas for container {}", containerId, e);
+ }
+
+ long stateEnterTime = ci.getStateEnterTime() != null
+ ? ci.getStateEnterTime().toEpochMilli() : now;
+
+ // Preserve original first_seen_time if we already knew about this container
+ long firstSeenTime = existingFirstSeen.getOrDefault(containerId, now);
+
+ String pipelineId = ci.getPipelineID() != null
+ ? ci.getPipelineID().getId().toString() : null;
+
+ int requiredNodes;
+ try {
+ requiredNodes = ci.getReplicationConfig().getRequiredNodes();
+ } catch (Exception e) {
+ requiredNodes = 0;
+ }
+
+ records.add(new QuasiClosedContainerRecord(
+ containerId,
+ pipelineId,
+ datanodeCount,
+ ci.getNumberOfKeys(),
+ ci.getUsedBytes(),
+ ci.getReplicationType().name(),
+ requiredNodes,
+ stateEnterTime,
+ firstSeenTime,
+ now // last_scan_time always updated
+ ));
+ }
+
+ // 5. Atomically replace all records in DB
+ schemaManager.replaceAll(records);
+
+ long elapsed = Time.monotonicNow() - start;
+ LOG.info("QuasiClosedContainerTask completed: {} containers in {} ms",
+ records.size(), elapsed);
+ }
+}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/QuasiClosedContainerSchemaManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/QuasiClosedContainerSchemaManager.java
new file mode 100644
index 000000000000..32be586f6058
--- /dev/null
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/QuasiClosedContainerSchemaManager.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.persistence;
+
+import static org.jooq.impl.DSL.field;
+import static org.jooq.impl.DSL.name;
+import static org.jooq.impl.DSL.table;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.jooq.Configuration;
+import org.jooq.DSLContext;
+import org.jooq.Field;
+import org.jooq.Record;
+import org.jooq.SelectQuery;
+import org.jooq.impl.DSL;
+import org.jooq.impl.SQLDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages CRUD operations for the QUASI_CLOSED_CONTAINERS table.
+ *
+ * This manager tracks containers in the QUASI_CLOSED lifecycle state.
+ * Unlike the UNHEALTHY_CONTAINERS table which tracks replication health,
+ * this table tracks lifecycle state — containers that have been locally
+ * closed by a datanode but not yet force-closed to CLOSED by SCM.
+ */
+@Singleton
+public class QuasiClosedContainerSchemaManager {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(QuasiClosedContainerSchemaManager.class);
+
+ public static final String QUASI_CLOSED_CONTAINERS_TABLE_NAME =
+ "QUASI_CLOSED_CONTAINERS";
+
+ static final int BATCH_INSERT_CHUNK_SIZE = 1_000;
+
+ // Column definitions
+ private static final Field COL_CONTAINER_ID =
+ field(name("container_id"), SQLDataType.BIGINT);
+ private static final Field COL_PIPELINE_ID =
+ field(name("pipeline_id"), SQLDataType.VARCHAR(64));
+ private static final Field COL_DATANODE_COUNT =
+ field(name("datanode_count"), SQLDataType.INTEGER);
+ private static final Field COL_KEY_COUNT =
+ field(name("key_count"), SQLDataType.BIGINT);
+ private static final Field COL_DATA_SIZE =
+ field(name("data_size"), SQLDataType.BIGINT);
+ private static final Field COL_REPLICATION_TYPE =
+ field(name("replication_type"), SQLDataType.VARCHAR(16));
+ private static final Field COL_REPLICATION_FACTOR =
+ field(name("replication_factor"), SQLDataType.INTEGER);
+ private static final Field COL_STATE_ENTER_TIME =
+ field(name("state_enter_time"), SQLDataType.BIGINT);
+ private static final Field COL_FIRST_SEEN_TIME =
+ field(name("first_seen_time"), SQLDataType.BIGINT);
+ private static final Field COL_LAST_SCAN_TIME =
+ field(name("last_scan_time"), SQLDataType.BIGINT);
+
+ private final Configuration jooqConfiguration;
+
+ @Inject
+ public QuasiClosedContainerSchemaManager(Configuration jooqConfiguration) {
+ this.jooqConfiguration = jooqConfiguration;
+ }
+
+ private DSLContext dsl() {
+ return DSL.using(jooqConfiguration);
+ }
+
+ /**
+ * Atomically replaces all quasi-closed container records.
+ * Deletes all existing rows and inserts the new set in a single transaction.
+ *
+ * @param records new records to insert (may be empty to clear the table)
+ */
+ public void replaceAll(List records) {
+ dsl().transaction(configuration -> {
+ DSLContext txCtx = DSL.using(configuration);
+ // 1. Delete all existing records
+ txCtx.deleteFrom(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME)).execute();
+ // 2. Batch-insert new records
+ if (records != null && !records.isEmpty()) {
+ batchInsert(txCtx, records);
+ }
+ });
+ LOG.debug("Replaced {} quasi-closed container records",
+ records == null ? 0 : records.size());
+ }
+
+ private void batchInsert(DSLContext txCtx, List records) {
+ for (int from = 0; from < records.size(); from += BATCH_INSERT_CHUNK_SIZE) {
+ int to = Math.min(from + BATCH_INSERT_CHUNK_SIZE, records.size());
+ List chunk = records.subList(from, to);
+
+ // Build a multi-row INSERT using jOOQ's fluent API
+ org.jooq.InsertValuesStep10 insert =
+ txCtx.insertInto(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME),
+ COL_CONTAINER_ID, COL_PIPELINE_ID, COL_DATANODE_COUNT,
+ COL_KEY_COUNT, COL_DATA_SIZE, COL_REPLICATION_TYPE,
+ COL_REPLICATION_FACTOR, COL_STATE_ENTER_TIME,
+ COL_FIRST_SEEN_TIME, COL_LAST_SCAN_TIME);
+
+ for (QuasiClosedContainerRecord rec : chunk) {
+ insert = insert.values(
+ rec.getContainerId(),
+ rec.getPipelineId(),
+ rec.getDatanodeCount(),
+ rec.getKeyCount(),
+ rec.getDataSize(),
+ rec.getReplicationType(),
+ rec.getReplicationFactor(),
+ rec.getStateEnterTime(),
+ rec.getFirstSeenTime(),
+ rec.getLastScanTime());
+ }
+ insert.execute();
+ }
+ }
+
+ /**
+ * Returns first_seen_time values for containers already tracked.
+ * Used to preserve the timestamp across scan cycles.
+ *
+ * @param containerIds list of container IDs to look up
+ * @return map of containerId to first_seen_time
+ */
+ public Map getExistingFirstSeenTimes(List containerIds) {
+ Map result = new HashMap<>();
+ if (containerIds == null || containerIds.isEmpty()) {
+ return result;
+ }
+ try {
+ dsl().select(COL_CONTAINER_ID, COL_FIRST_SEEN_TIME)
+ .from(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME))
+ .where(COL_CONTAINER_ID.in(containerIds))
+ .forEach(r -> result.put(
+ r.get(COL_CONTAINER_ID),
+ r.get(COL_FIRST_SEEN_TIME)));
+ } catch (Exception e) {
+ LOG.warn("Failed to load existing first_seen_time values. "
+ + "Will use current time for all.", e);
+ }
+ return result;
+ }
+
+ /**
+ * Paginated forward read of all quasi-closed containers.
+ *
+ * @param minContainerId cursor — only containers with ID > minContainerId
+ * @param limit max rows to return
+ * @return list of records in ascending container_id order
+ */
+ public List getQuasiClosedContainers(
+ long minContainerId, int limit) {
+ SelectQuery query = dsl().selectQuery();
+ query.addFrom(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME));
+ query.addConditions(COL_CONTAINER_ID.greaterThan(minContainerId));
+ query.addOrderBy(COL_CONTAINER_ID.asc());
+ query.addLimit(limit);
+ query.fetchSize(limit);
+ return fetchRecords(query);
+ }
+
+ /**
+ * Paginated read of quasi-closed containers for a specific pipeline.
+ *
+ * @param pipelineId filter by pipeline
+ * @param minContainerId cursor
+ * @param limit max rows
+ * @return list of records
+ */
+ public List getByPipeline(
+ String pipelineId, long minContainerId, int limit) {
+ SelectQuery query = dsl().selectQuery();
+ query.addFrom(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME));
+ query.addConditions(
+ COL_PIPELINE_ID.eq(pipelineId)
+ .and(COL_CONTAINER_ID.greaterThan(minContainerId)));
+ query.addOrderBy(COL_CONTAINER_ID.asc());
+ query.addLimit(limit);
+ query.fetchSize(limit);
+ return fetchRecords(query);
+ }
+
+ /**
+ * Returns total count of quasi-closed containers currently tracked.
+ */
+ public long getCount() {
+ try {
+ return dsl().fetchCount(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME));
+ } catch (Exception e) {
+ LOG.error("Failed to count QUASI_CLOSED_CONTAINERS", e);
+ return 0L;
+ }
+ }
+
+ private List fetchRecords(SelectQuery query) {
+ List result = new ArrayList<>();
+ try {
+ query.fetch().forEach(r -> result.add(new QuasiClosedContainerRecord(
+ r.get(COL_CONTAINER_ID),
+ r.get(COL_PIPELINE_ID),
+ r.get(COL_DATANODE_COUNT),
+ r.get(COL_KEY_COUNT),
+ r.get(COL_DATA_SIZE),
+ r.get(COL_REPLICATION_TYPE),
+ r.get(COL_REPLICATION_FACTOR),
+ r.get(COL_STATE_ENTER_TIME),
+ r.get(COL_FIRST_SEEN_TIME),
+ r.get(COL_LAST_SCAN_TIME))));
+ } catch (Exception e) {
+ LOG.error("Failed to query QUASI_CLOSED_CONTAINERS", e);
+ }
+ return result;
+ }
+
+ @VisibleForTesting
+ public void clearAll() {
+ try {
+ dsl().deleteFrom(table(QUASI_CLOSED_CONTAINERS_TABLE_NAME)).execute();
+ } catch (Exception e) {
+ LOG.error("Failed to clear QUASI_CLOSED_CONTAINERS", e);
+ }
+ }
+
+ // ──────────────────────────────────────────────────────────────────────────
+ // POJO
+ // ──────────────────────────────────────────────────────────────────────────
+
+ /**
+ * Immutable record representing one row in QUASI_CLOSED_CONTAINERS.
+ */
+ public static final class QuasiClosedContainerRecord {
+ private final long containerId;
+ private final String pipelineId;
+ private final int datanodeCount;
+ private final long keyCount;
+ private final long dataSize;
+ private final String replicationType;
+ private final int replicationFactor;
+ private final long stateEnterTime;
+ private final long firstSeenTime;
+ private final long lastScanTime;
+
+ public QuasiClosedContainerRecord(
+ long containerId, String pipelineId, int datanodeCount,
+ long keyCount, long dataSize, String replicationType,
+ int replicationFactor, long stateEnterTime,
+ long firstSeenTime, long lastScanTime) {
+ this.containerId = containerId;
+ this.pipelineId = pipelineId;
+ this.datanodeCount = datanodeCount;
+ this.keyCount = keyCount;
+ this.dataSize = dataSize;
+ this.replicationType = replicationType;
+ this.replicationFactor = replicationFactor;
+ this.stateEnterTime = stateEnterTime;
+ this.firstSeenTime = firstSeenTime;
+ this.lastScanTime = lastScanTime;
+ }
+
+ public long getContainerId() { return containerId; }
+ public String getPipelineId() { return pipelineId; }
+ public int getDatanodeCount() { return datanodeCount; }
+ public long getKeyCount() { return keyCount; }
+ public long getDataSize() { return dataSize; }
+ public String getReplicationType() { return replicationType; }
+ public int getReplicationFactor() { return replicationFactor; }
+ public long getStateEnterTime() { return stateEnterTime; }
+ public long getFirstSeenTime() { return firstSeenTime; }
+ public long getLastScanTime() { return lastScanTime; }
+ }
+}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index 278bac0011dc..a142bc51be8e 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -112,9 +112,11 @@
import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.fsck.ContainerHealthTask;
+import org.apache.hadoop.ozone.recon.fsck.QuasiClosedContainerTask;
import org.apache.hadoop.ozone.recon.fsck.ReconReplicationManager;
import org.apache.hadoop.ozone.recon.fsck.ReconSafeModeMgrTask;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
+import org.apache.hadoop.ozone.recon.persistence.QuasiClosedContainerSchemaManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.ContainerSizeCountTask;
@@ -164,6 +166,7 @@ public class ReconStorageContainerManagerFacade
private ContainerSizeCountTask containerSizeCountTask;
private ContainerCountBySizeDao containerCountBySizeDao;
private ReconReplicationManager reconReplicationManager;
+ private QuasiClosedContainerTask quasiClosedContainerTask;
private AtomicBoolean isSyncDataFromSCMRunning;
private final String threadNamePrefix;
@@ -182,7 +185,8 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
ReconContext reconContext,
DataSource dataSource,
ReconTaskStatusUpdaterManager taskStatusUpdaterManager,
- ContainerHealthSchemaManager containerHealthSchemaManager)
+ ContainerHealthSchemaManager containerHealthSchemaManager,
+ QuasiClosedContainerSchemaManager quasiClosedSchemaManager)
throws IOException {
reconNodeDetails = reconUtils.getReconNodeDetails(conf);
this.threadNamePrefix = reconNodeDetails.threadNamePrefix();
@@ -377,6 +381,15 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
reconScmTasks.add(pipelineSyncTask);
reconScmTasks.add(containerHealthTask);
reconScmTasks.add(containerSizeCountTask);
+
+ // Create QuasiClosedContainerTask
+ LOG.info("Creating QuasiClosedContainerTask");
+ this.quasiClosedContainerTask = new QuasiClosedContainerTask(
+ containerManager,
+ quasiClosedSchemaManager,
+ reconTaskConfig,
+ taskStatusUpdaterManager);
+ reconScmTasks.add(quasiClosedContainerTask);
reconSafeModeMgrTask = new ReconSafeModeMgrTask(
containerManager, nodeManager, safeModeManager,
reconTaskConfig, ozoneConfiguration);