Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -680,13 +680,28 @@ class RocksDBFileManager(
// Resolve RocksDB files for all the versions and find the max version each file is used
val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
sortedSnapshotVersionsAndUniqueIds.foreach { case (version, uniqueId) =>
val files = Option(versionToRocksDBFiles.get((version, uniqueId))).getOrElse {
var readFromCache = true
val cachedFiles = Option(versionToRocksDBFiles.get((version, uniqueId))).getOrElse {
readFromCache = false
val newResolvedFiles = getImmutableFilesFromVersionZip(version, uniqueId)
versionToRocksDBFiles.put((version, uniqueId), newResolvedFiles)
newResolvedFiles
}
files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
cachedFiles.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName, version)))

// For the minimum retained version, fetch metadata from the cloud zip file
// to protect files that may differ from the in-memory cache. This handles the case
// where a no-overwrite FS prevented a snapshot zip from being overwritten: the
// in-memory cache may reference a newer set of SST files (from a retry), while the
// cloud zip still references the original SST files.
// We do this for the min retained version, to make sure the files from the original
// upload for this version are not seen as orphan files.
if (version == minVersionToRetain && readFromCache) {
val cloudFiles = getImmutableFilesFromVersionZip(version, uniqueId)
cloudFiles.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName, version)))
}
}

// Best effort attempt to delete SST files that were last used in to-be-deleted versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4109,6 +4109,158 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
}
}

// Test ensuring a race condition on no-overwrite filesystems (e.g., ABFS)
// does not occur:
// 1. Query run 1 uploads snapshot X.zip pointing to SST file Y.SST
// 2. Query run 1 is cancelled before the commit log is written
// 3. Query run 2 retries the batch, uploads Z.SST, tries to re-upload X.zip
// pointing to Z.SST. The zip overwrite silently fails on no-overwrite FS,
// but versionToRocksDBFiles maps version X -> Z.SST (stale)
// 4. Maintenance/cleanup uses the stale in-memory mapping, sees Y.SST as
// untracked, and deletes it
// 5. A subsequent query run tries to load X.zip from cloud, which still
// references Y.SST -> FileNotFoundException
testWithChangelogCheckpointingEnabled("no-overwrite FS maintenance " +
"does not delete SST files still referenced by zip in DFS") {
withTempDir { dir =>
val remoteDir = dir.getCanonicalPath
val fmClass = "org.apache.spark.sql.execution.streaming.state." +
"NoOverwriteFileSystemBasedCheckpointFileManager"
val noOverwriteConf = new Configuration()
noOverwriteConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fmClass)
noOverwriteConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)

// Snapshots at versions 10, 20, 30. With minVersionsToRetain = 2,
// minVersionToRetain = 20, so version 10 is deleted and cleanup runs.
// The fix fetches cloud metadata for the min retained version (20),
// protecting run 1's SSTs referenced by the on-disk 20.zip.
val conf = dbConf.copy(
compactOnCommit = false,
minVersionsToRetain = 2,
minVersionsToDelete = 0,
minDeltasForSnapshot = 10)

// Phase 1: versions 1-19 (snapshot at 10, rest changelog-only).
val localDir0 = Utils.createTempDir()
val db0 = new RocksDB(
remoteDir, conf = conf, localRootDir = localDir0,
hadoopConf = noOverwriteConf,
loggingId = s"[Thread-${Thread.currentThread.getId}]")
try {
db0.load(0)
db0.put("setup_key", "setup_value")
db0.commit() // version 1
db0.doMaintenance()
for (v <- 2 to 19) {
db0.load(v - 1)
db0.put(s"setup_key_v$v", s"setup_value_v$v")
db0.commit() // snapshot at v=10, changelog-only otherwise
db0.doMaintenance()
}
} finally {
db0.close()
}

val sstDir = new File(remoteDir, "SSTs")
val setupSstFiles = if (sstDir.exists()) {
sstDir.listFiles().filter(_.getName.endsWith(".sst")).map(_.getName).toSet
} else {
Set.empty[String]
}

// Phase 2: Run 1 commits version 20, creating 20.zip with run 1's SSTs.
val localDir1 = Utils.createTempDir()
val db1 = new RocksDB(
remoteDir, conf = conf, localRootDir = localDir1,
hadoopConf = noOverwriteConf,
loggingId = s"[Thread-${Thread.currentThread.getId}]")
try {
db1.load(19)
db1.put("key", "value_from_run1")
db1.commit() // version 20 -- snapshot queued
db1.doMaintenance() // uploads 20.zip + run 1's SST files
} finally {
db1.close()
}

// Verify 20.zip was created
val zipFilesAfterRun1 = new File(remoteDir).listFiles()
.filter(_.getName.endsWith(".zip"))
assert(zipFilesAfterRun1.exists(_.getName.startsWith("20")),
s"Expected 20.zip after query run 1, found: " +
s"${zipFilesAfterRun1.map(_.getName).mkString(", ")}")

// Identify query run 1's SST files
val sstFilesAfterRun1 = sstDir.listFiles()
.filter(_.getName.endsWith(".sst")).map(_.getName).toSet
val run1SstFiles = sstFilesAfterRun1 -- setupSstFiles
assert(run1SstFiles.nonEmpty,
"Expected new SST files from query run 1")

// Phase 3: Run 2 (retry) commits version 20 again. 20.zip is not overwritten,
// but the in-memory cache now maps version 20 to run 2's SSTs.
val localDir2 = Utils.createTempDir()
val db2 = new RocksDB(
remoteDir,
conf = conf,
localRootDir = localDir2,
hadoopConf = noOverwriteConf,
loggingId = s"[Thread-${Thread.currentThread.getId}]")
try {
db2.load(19)
db2.put("key", "value_from_run2")
db2.commit() // version 20 -- run 2's SSTs created, snapshot queued
db2.doMaintenance() // run 2's SSTs uploaded, 20.zip silently not overwritten

val sstFilesAfterRun2 = sstDir.listFiles()
.filter(_.getName.endsWith(".sst")).map(_.getName).toSet
val run2SstFiles = sstFilesAfterRun2 -- sstFilesAfterRun1
assert(run2SstFiles.nonEmpty,
"Expected new SST files from query run 2")

// Phase 4: versions 21-30. Snapshot at 30 triggers cleanup
// (minVersionToRetain = 20, deletes 10.zip).
for (v <- 21 to 30) {
db2.load(v - 1)
db2.put(s"key_v$v", s"value_v$v")
db2.commit()
db2.doMaintenance()
}

// Run 1's SSTs survive cleanup: the fix fetches cloud metadata for
// version 20, protecting files referenced by the on-disk 20.zip.
val sstFilesAfterCleanup = sstDir.listFiles()
.filter(_.getName.endsWith(".sst")).map(_.getName).toSet
run1SstFiles.foreach { name =>
assert(sstFilesAfterCleanup.contains(name),
s"Expected run 1 SST file $name to still exist (protected by cloud " +
s"metadata fetch), but it was deleted.")
}

// Phase 5: Fresh instance loads version 29 from 20.zip + changelogs.
// Succeeds because run 1's SSTs (referenced by 20.zip) were preserved.
val localDir3 = Utils.createTempDir()
val db3 = new RocksDB(
remoteDir,
conf = conf,
localRootDir = localDir3,
hadoopConf = noOverwriteConf,
loggingId = s"[Thread-${Thread.currentThread.getId}]")
try {
db3.load(29)
val value = new String(db3.get("key"), "UTF-8")
assert(value == "value_from_run1",
s"Expected stale value 'value_from_run1' from 20.zip (run 1's SSTs), " +
s"but got '$value'")
} finally {
db3.close()
}
} finally {
db2.close()
}
}
}

testWithChangelogCheckpointingEnabled("SPARK-52553 - v1 changelog with invalid version number" +
" does not cause NumberFormatException") {
withTempDir { dir =>
Expand Down