From 0c79afb343f1e67020ab9c72126cadcad8a819d3 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sat, 7 Mar 2026 01:47:11 +0900 Subject: [PATCH] Add more tests --- .../history/FsHistoryProviderSuite.scala | 233 ++++++++++++++++++ 1 file changed, 233 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index a1d0b7dc4c054..7c76b50b07acb 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -2148,6 +2148,239 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } + test("SPARK-55864: directory removed while SHS is running") { + val dir2 = Utils.createTempDir(namePrefix = "logDir2") + try { + val conf = createTestConf().set(HISTORY_LOG_DIR, + s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}") + val provider = new FsHistoryProvider(conf) + + val log1 = newLogFile("app1", None, inProgress = false) + writeFile(log1, None, + SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", None), + SparkListenerApplicationEnd(5L)) + val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2", None, None) + val log2 = new File(new Path(logUri2).toUri.getPath) + writeFile(log2, None, + SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", None), + SparkListenerApplicationEnd(6L)) + + updateAndCheck(provider) { list => + list.size should be(2) + } + + // Remove dir2 while SHS is running + Utils.deleteRecursively(dir2) + + // Next scan should not throw and should still list app1 from testDir + updateAndCheck(provider) { list => + list.size should be(1) + list.head.id should be("app1-id") + } + + provider.stop() + } finally { + if (dir2.exists()) { + Utils.deleteRecursively(dir2) + } + } + } + + test("SPARK-55864: directory does not exist at startup but created later") { + val dir2 = Utils.createTempDir(namePrefix = "logDir2") + val dir2Path = dir2.getAbsolutePath + Utils.deleteRecursively(dir2) + + try { + val conf = createTestConf().set(HISTORY_LOG_DIR, + s"${testDir.getAbsolutePath},${dir2Path}") + val provider = new FsHistoryProvider(conf) + + val log1 = newLogFile("app1", None, inProgress = false) + writeFile(log1, None, + SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", None), + SparkListenerApplicationEnd(5L)) + + // First scan: dir2 does not exist, but app1 from testDir should be listed + updateAndCheck(provider) { list => + list.size should be(1) + list.head.id should be("app1-id") + } + + // Create dir2 and add a log file + dir2.mkdirs() + val logUri2 = SingleEventLogFileWriter.getLogPath(new File(dir2Path).toURI, "app2", None, + None) + val log2 = new File(new Path(logUri2).toUri.getPath) + writeFile(log2, None, + SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", None), + SparkListenerApplicationEnd(6L)) + + // Next scan should pick up app2 + updateAndCheck(provider) { list => + list.size should be(2) + list.map(_.id).toSet should be(Set("app1-id", "app2-id")) + } + + provider.stop() + } finally { + if (new File(dir2Path).exists()) { + Utils.deleteRecursively(new File(dir2Path)) + } + } + } + + test("SPARK-55864: directory temporarily inaccessible then recovers") { + val dir2 = Utils.createTempDir(namePrefix = "logDir2") + try { + val conf = createTestConf().set(HISTORY_LOG_DIR, + s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}") + val provider = new FsHistoryProvider(conf) + + val log1 = newLogFile("app1", None, inProgress = false) + writeFile(log1, None, + SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", None), + SparkListenerApplicationEnd(5L)) + val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2", None, None) + val log2 = new File(new Path(logUri2).toUri.getPath) + writeFile(log2, None, + SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", None), + SparkListenerApplicationEnd(6L)) + + updateAndCheck(provider) { list => + list.size should be(2) + } + + // Make dir2 inaccessible by removing it + val dir2Backup = Utils.createTempDir(namePrefix = "logDir2Backup") + Utils.deleteRecursively(dir2Backup) + assert(dir2.renameTo(dir2Backup)) + + // Scan should still work for testDir + updateAndCheck(provider) { list => + list.size should be(1) + list.head.id should be("app1-id") + } + + // Restore dir2 + assert(dir2Backup.renameTo(dir2)) + + // Next scan should recover app2 + updateAndCheck(provider) { list => + list.size should be(2) + list.map(_.id).toSet should be(Set("app1-id", "app2-id")) + } + + provider.stop() + } finally { + if (dir2.exists()) { + Utils.deleteRecursively(dir2) + } + } + } + + test("SPARK-55864: all directories inaccessible does not crash") { + val dir2 = Utils.createTempDir(namePrefix = "logDir2") + try { + val conf = createTestConf().set(HISTORY_LOG_DIR, + s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}") + val provider = new FsHistoryProvider(conf) + + val log1 = newLogFile("app1", None, inProgress = false) + writeFile(log1, None, + SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", None), + SparkListenerApplicationEnd(5L)) + + updateAndCheck(provider) { list => + list.size should be(1) + } + + // Remove both directories + val testDirBackup = Utils.createTempDir(namePrefix = "testDirBackup") + Utils.deleteRecursively(testDirBackup) + assert(testDir.renameTo(testDirBackup)) + Utils.deleteRecursively(dir2) + + try { + // Should not throw + provider.checkForLogs() + // After all dirs gone, listing should return no apps + provider.getListing().toSeq.size should be(0) + } finally { + // Always restore testDir so afterEach / subsequent tests are not affected + assert(testDirBackup.renameTo(testDir)) + } + provider.stop() + } finally { + if (dir2.exists()) { + Utils.deleteRecursively(dir2) + } + } + } + + test("SPARK-55864: config with empty entries between commas") { + val dir2 = Utils.createTempDir(namePrefix = "logDir2") + try { + // "dir1,,dir2" - empty entry between commas + val conf = createTestConf().set(HISTORY_LOG_DIR, + s"${testDir.getAbsolutePath},,${dir2.getAbsolutePath}") + val provider = new FsHistoryProvider(conf) + + val log1 = newLogFile("app1", None, inProgress = false) + writeFile(log1, None, + SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", None), + SparkListenerApplicationEnd(5L)) + val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2", None, None) + val log2 = new File(new Path(logUri2).toUri.getPath) + writeFile(log2, None, + SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", None), + SparkListenerApplicationEnd(6L)) + + updateAndCheck(provider) { list => + list.size should be(2) + list.map(_.id).toSet should be(Set("app1-id", "app2-id")) + } + + provider.stop() + } finally { + Utils.deleteRecursively(dir2) + } + } + + test("SPARK-55864: logDirectory.names count mismatch falls back to full paths") { + val dir2 = Utils.createTempDir(namePrefix = "logDir2") + try { + val conf = createTestConf() + .set(HISTORY_LOG_DIR, + s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}") + .set(HISTORY_LOG_DIR_NAMES, "OnlyOneName") + val provider = new FsHistoryProvider(conf) + + val log1 = newLogFile("app1", None, inProgress = false) + writeFile(log1, None, + SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", None), + SparkListenerApplicationEnd(5L)) + val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2", None, None) + val log2 = new File(new Path(logUri2).toUri.getPath) + writeFile(log2, None, + SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", None), + SparkListenerApplicationEnd(6L)) + + updateAndCheck(provider) { list => + list.size should be(2) + // Names mismatch: should fall back to full paths + val app1 = list.find(_.id == "app1-id").get + val app2 = list.find(_.id == "app2-id").get + app1.attempts.head.logSourceName should be(Some(testDir.getAbsolutePath)) + app2.attempts.head.logSourceName should be(Some(dir2.getAbsolutePath)) + } + + provider.stop() + } finally { + Utils.deleteRecursively(dir2) + } + } + private class SafeModeTestProvider(conf: SparkConf, clock: Clock) extends FsHistoryProvider(conf, clock) {