From 1b3fef829e54eebb227df104c252f5cd49fd9df7 Mon Sep 17 00:00:00 2001 From: Livia Zhu Date: Tue, 3 Mar 2026 15:16:18 -0500 Subject: [PATCH] Fix race condition in no-overwrite FS when RocksDB version files cache is out of sync --- .../streaming/state/RocksDBFileManager.scala | 19 ++- .../streaming/state/RocksDBSuite.scala | 152 ++++++++++++++++++ 2 files changed, 169 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 7135421f4866c..e9ef36dd1b48d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -680,13 +680,28 @@ class RocksDBFileManager( // Resolve RocksDB files for all the versions and find the max version each file is used val fileToMaxUsedVersion = new mutable.HashMap[String, Long] sortedSnapshotVersionsAndUniqueIds.foreach { case (version, uniqueId) => - val files = Option(versionToRocksDBFiles.get((version, uniqueId))).getOrElse { + var readFromCache = true + val cachedFiles = Option(versionToRocksDBFiles.get((version, uniqueId))).getOrElse { + readFromCache = false val newResolvedFiles = getImmutableFilesFromVersionZip(version, uniqueId) versionToRocksDBFiles.put((version, uniqueId), newResolvedFiles) newResolvedFiles } - files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) = + cachedFiles.foreach(f => fileToMaxUsedVersion(f.dfsFileName) = math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName, version))) + + // For the minimum retained version, fetch metadata from the cloud zip file + // to protect files that may differ from the in-memory cache. This handles the case + // where a no-overwrite FS prevented a snapshot zip from being overwritten: the + // in-memory cache may reference a newer set of SST files (from a retry), while the + // cloud zip still references the original SST files. + // We do this for the min retained version, to make sure the files from the original + // upload for this version are not seen as orphan files. + if (version == minVersionToRetain && readFromCache) { + val cloudFiles = getImmutableFilesFromVersionZip(version, uniqueId) + cloudFiles.foreach(f => fileToMaxUsedVersion(f.dfsFileName) = + math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName, version))) + } } // Best effort attempt to delete SST files that were last used in to-be-deleted versions diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index bb02dad76ac69..8be2b610d0995 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -4109,6 +4109,158 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + // Test ensuring a race condition on no-overwrite filesystems (e.g., ABFS) + // does not occur: + // 1. Query run 1 uploads snapshot X.zip pointing to SST file Y.SST + // 2. Query run 1 is cancelled before the commit log is written + // 3. Query run 2 retries the batch, uploads Z.SST, tries to re-upload X.zip + // pointing to Z.SST. The zip overwrite silently fails on no-overwrite FS, + // but versionToRocksDBFiles maps version X -> Z.SST (stale) + // 4. Maintenance/cleanup uses the stale in-memory mapping, sees Y.SST as + // untracked, and deletes it + // 5. A subsequent query run tries to load X.zip from cloud, which still + // references Y.SST -> FileNotFoundException + testWithChangelogCheckpointingEnabled("no-overwrite FS maintenance " + + "does not delete SST files still referenced by zip in DFS") { + withTempDir { dir => + val remoteDir = dir.getCanonicalPath + val fmClass = "org.apache.spark.sql.execution.streaming.state." + + "NoOverwriteFileSystemBasedCheckpointFileManager" + val noOverwriteConf = new Configuration() + noOverwriteConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fmClass) + noOverwriteConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) + + // Snapshots at versions 10, 20, 30. With minVersionsToRetain = 2, + // minVersionToRetain = 20, so version 10 is deleted and cleanup runs. + // The fix fetches cloud metadata for the min retained version (20), + // protecting run 1's SSTs referenced by the on-disk 20.zip. + val conf = dbConf.copy( + compactOnCommit = false, + minVersionsToRetain = 2, + minVersionsToDelete = 0, + minDeltasForSnapshot = 10) + + // Phase 1: versions 1-19 (snapshot at 10, rest changelog-only). + val localDir0 = Utils.createTempDir() + val db0 = new RocksDB( + remoteDir, conf = conf, localRootDir = localDir0, + hadoopConf = noOverwriteConf, + loggingId = s"[Thread-${Thread.currentThread.getId}]") + try { + db0.load(0) + db0.put("setup_key", "setup_value") + db0.commit() // version 1 + db0.doMaintenance() + for (v <- 2 to 19) { + db0.load(v - 1) + db0.put(s"setup_key_v$v", s"setup_value_v$v") + db0.commit() // snapshot at v=10, changelog-only otherwise + db0.doMaintenance() + } + } finally { + db0.close() + } + + val sstDir = new File(remoteDir, "SSTs") + val setupSstFiles = if (sstDir.exists()) { + sstDir.listFiles().filter(_.getName.endsWith(".sst")).map(_.getName).toSet + } else { + Set.empty[String] + } + + // Phase 2: Run 1 commits version 20, creating 20.zip with run 1's SSTs. + val localDir1 = Utils.createTempDir() + val db1 = new RocksDB( + remoteDir, conf = conf, localRootDir = localDir1, + hadoopConf = noOverwriteConf, + loggingId = s"[Thread-${Thread.currentThread.getId}]") + try { + db1.load(19) + db1.put("key", "value_from_run1") + db1.commit() // version 20 -- snapshot queued + db1.doMaintenance() // uploads 20.zip + run 1's SST files + } finally { + db1.close() + } + + // Verify 20.zip was created + val zipFilesAfterRun1 = new File(remoteDir).listFiles() + .filter(_.getName.endsWith(".zip")) + assert(zipFilesAfterRun1.exists(_.getName.startsWith("20")), + s"Expected 20.zip after query run 1, found: " + + s"${zipFilesAfterRun1.map(_.getName).mkString(", ")}") + + // Identify query run 1's SST files + val sstFilesAfterRun1 = sstDir.listFiles() + .filter(_.getName.endsWith(".sst")).map(_.getName).toSet + val run1SstFiles = sstFilesAfterRun1 -- setupSstFiles + assert(run1SstFiles.nonEmpty, + "Expected new SST files from query run 1") + + // Phase 3: Run 2 (retry) commits version 20 again. 20.zip is not overwritten, + // but the in-memory cache now maps version 20 to run 2's SSTs. + val localDir2 = Utils.createTempDir() + val db2 = new RocksDB( + remoteDir, + conf = conf, + localRootDir = localDir2, + hadoopConf = noOverwriteConf, + loggingId = s"[Thread-${Thread.currentThread.getId}]") + try { + db2.load(19) + db2.put("key", "value_from_run2") + db2.commit() // version 20 -- run 2's SSTs created, snapshot queued + db2.doMaintenance() // run 2's SSTs uploaded, 20.zip silently not overwritten + + val sstFilesAfterRun2 = sstDir.listFiles() + .filter(_.getName.endsWith(".sst")).map(_.getName).toSet + val run2SstFiles = sstFilesAfterRun2 -- sstFilesAfterRun1 + assert(run2SstFiles.nonEmpty, + "Expected new SST files from query run 2") + + // Phase 4: versions 21-30. Snapshot at 30 triggers cleanup + // (minVersionToRetain = 20, deletes 10.zip). + for (v <- 21 to 30) { + db2.load(v - 1) + db2.put(s"key_v$v", s"value_v$v") + db2.commit() + db2.doMaintenance() + } + + // Run 1's SSTs survive cleanup: the fix fetches cloud metadata for + // version 20, protecting files referenced by the on-disk 20.zip. + val sstFilesAfterCleanup = sstDir.listFiles() + .filter(_.getName.endsWith(".sst")).map(_.getName).toSet + run1SstFiles.foreach { name => + assert(sstFilesAfterCleanup.contains(name), + s"Expected run 1 SST file $name to still exist (protected by cloud " + + s"metadata fetch), but it was deleted.") + } + + // Phase 5: Fresh instance loads version 29 from 20.zip + changelogs. + // Succeeds because run 1's SSTs (referenced by 20.zip) were preserved. + val localDir3 = Utils.createTempDir() + val db3 = new RocksDB( + remoteDir, + conf = conf, + localRootDir = localDir3, + hadoopConf = noOverwriteConf, + loggingId = s"[Thread-${Thread.currentThread.getId}]") + try { + db3.load(29) + val value = new String(db3.get("key"), "UTF-8") + assert(value == "value_from_run1", + s"Expected stale value 'value_from_run1' from 20.zip (run 1's SSTs), " + + s"but got '$value'") + } finally { + db3.close() + } + } finally { + db2.close() + } + } + } + testWithChangelogCheckpointingEnabled("SPARK-52553 - v1 changelog with invalid version number" + " does not cause NumberFormatException") { withTempDir { dir =>