From 969633b780072b2e384e00abf9df39c4408b11de Mon Sep 17 00:00:00 2001 From: zhangliang Date: Tue, 17 Mar 2026 16:00:30 +0800 Subject: [PATCH 1/4] fix shuffle GC not cleanup checksum file --- .../org/apache/spark/shuffle/IndexShuffleBlockResolver.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index a46c23447f84f..1fead6ad277e4 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -662,7 +662,8 @@ private[spark] class IndexShuffleBlockResolver( override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = { Seq( ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID), - ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) + ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID), + ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID) ) } From eee273d8d3ce17782acc8f98476e0c6751b7c33f Mon Sep 17 00:00:00 2001 From: zhangliang Date: Tue, 17 Mar 2026 19:37:45 +0800 Subject: [PATCH 2/4] add checksum in existing UT --- .../org/apache/spark/ExternalShuffleServiceSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index d86fbc850d1f4..2f785efeffca3 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server.TransportServer import org.apache.spark.network.shuffle.{ExecutorDiskUtils, ExternalBlockHandler, ExternalBlockStoreClient} -import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel} +import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId, ShuffleChecksumBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel} import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.io.ChunkedByteBuffer @@ -270,6 +270,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { ShuffleDataBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, shuffleBlockId.reduceId).name, ShuffleIndexBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, + shuffleBlockId.reduceId).name, + ShuffleChecksumBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, shuffleBlockId.reduceId).name ).map { blockId => new File(ExecutorDiskUtils.getFilePath(dirs, @@ -283,7 +285,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { promise.future } val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) - assert(filesToCheck.length == 4) + assert(filesToCheck.length == 6) assert(filesToCheck.forall(_.exists())) if (enabled) { From 600b2eb80faba975b42a87f25c0f7282a15fa749 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Thu, 19 Mar 2026 21:35:41 +0800 Subject: [PATCH 3/4] debug checksum file is not deleted --- .../shuffle/checksum/ShuffleChecksumHelper.java | 2 +- .../spark/shuffle/IndexShuffleBlockResolver.scala | 1 + .../apache/spark/ExternalShuffleServiceSuite.scala | 12 +++++++++++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java index 2dbf38be954db..8f0bf466d87ab 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java @@ -81,7 +81,7 @@ public static Checksum getChecksumByAlgorithm(String algorithm) { public static String getChecksumFileName(String blockName, String algorithm) { // append the shuffle checksum algorithm as the file extension - return String.format("%s.%s", blockName, algorithm); + return String.format("%s", blockName); } private static long readChecksumByReduceId(File checksumFile, int reduceId) throws IOException { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 1fead6ad277e4..149b65f11afc8 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -200,6 +200,7 @@ private[spark] class IndexShuffleBlockResolver( if (checksumEnabled) { file = getChecksumFile(shuffleId, mapId, algorithm) + println("Deleting checksum file: " + file.getPath) if (file.exists() && !file.delete()) { logWarning(log"Error deleting checksum ${MDC(PATH, file.getPath())}") } diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 2f785efeffca3..3c66286c3c7ba 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -245,6 +245,10 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) .set(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED, enabled) .set(config.EXECUTOR_REMOVE_DELAY.key, "0s") + .set(config.EXECUTOR_INSTANCES, 1) + .set(config.EXECUTOR_CORES, 1) +// .set("spark.executor.extraJavaOptions", +// "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005") sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithLocalDiskReading) sc.env.blockManager.externalShuffleServiceEnabled should equal(true) sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) @@ -274,8 +278,11 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { ShuffleChecksumBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, shuffleBlockId.reduceId).name ).map { blockId => - new File(ExecutorDiskUtils.getFilePath(dirs, + val f = new File(ExecutorDiskUtils.getFilePath(dirs, sc.env.blockManager.subDirsPerLocalDir, blockId)) + println(s"shuffle file: ${f.getAbsolutePath}, exists: ${f.exists()}, " + + s"file length: ${if (f.exists()) f.length() else "N/A"}") + f } } promise.success(files) @@ -285,6 +292,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { promise.future } val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) + println(s"files to check: ${filesToCheck.map(_.getAbsolutePath).mkString(", ")}") assert(filesToCheck.length == 6) assert(filesToCheck.forall(_.exists())) @@ -309,6 +317,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { sc.cleaner.foreach(_.doCleanupShuffle(0, true)) if (enabled) { + println(s"checking files after cleanup: ${filesToCheck.filter(_.exists()). + map(f => s"${f.getAbsolutePath}: exists=${f.exists()}").mkString(", ")}") assert(filesToCheck.forall(!_.exists())) } else { assert(filesToCheck.forall(_.exists())) From 77bc87800f657aec24a163733414e07ac834d945 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Thu, 19 Mar 2026 21:42:50 +0800 Subject: [PATCH 4/4] remove ShuffleChecksumUtils.getChecksumFileName which append algorithm after checksum block name, leading to inconsisitent block file name, causing block clean failure --- .../shuffle/checksum/ShuffleChecksumHelper.java | 5 ----- .../spark/shuffle/IndexShuffleBlockResolver.scala | 4 +--- .../apache/spark/shuffle/ShuffleChecksumUtils.scala | 13 ------------- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 6 ++---- .../apache/spark/ExternalShuffleServiceSuite.scala | 12 +----------- .../sort/BypassMergeSortShuffleWriterSuite.scala | 4 +--- 6 files changed, 5 insertions(+), 39 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java index 8f0bf466d87ab..096994856ea72 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java @@ -79,11 +79,6 @@ public static Checksum getChecksumByAlgorithm(String algorithm) { return getChecksumsByAlgorithm(1, algorithm)[0]; } - public static String getChecksumFileName(String blockName, String algorithm) { - // append the shuffle checksum algorithm as the file extension - return String.format("%s", blockName); - } - private static long readChecksumByReduceId(File checksumFile, int reduceId) throws IOException { try (DataInputStream in = new DataInputStream(new FileInputStream(checksumFile))) { in.skipNBytes(reduceId * 8L); diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 149b65f11afc8..293830596be18 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -36,7 +36,6 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.client.StreamCallbackWithID import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.{ExecutorDiskUtils, MergedBlockMeta} -import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID import org.apache.spark.storage._ @@ -200,7 +199,6 @@ private[spark] class IndexShuffleBlockResolver( if (checksumEnabled) { file = getChecksumFile(shuffleId, mapId, algorithm) - println("Deleting checksum file: " + file.getPath) if (file.exists() && !file.delete()) { logWarning(log"Error deleting checksum ${MDC(PATH, file.getPath())}") } @@ -607,7 +605,7 @@ private[spark] class IndexShuffleBlockResolver( algorithm: String, dirs: Option[Array[String]] = None): File = { val blockId = ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID) - val fileName = ShuffleChecksumHelper.getChecksumFileName(blockId.name, algorithm) + val fileName = blockId.name dirs .map(d => new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, fileName))) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala index b2a18d7538796..75b0efcf5cdd6 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala @@ -22,22 +22,9 @@ import java.util.zip.CheckedInputStream import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper import org.apache.spark.network.util.LimitedInputStream -import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID -import org.apache.spark.storage.{BlockId, ShuffleChecksumBlockId, ShuffleDataBlockId} object ShuffleChecksumUtils { - /** - * Return checksumFile for shuffle data block ID. Otherwise, null. - */ - def getChecksumFileName(blockId: BlockId, algorithm: String): String = blockId match { - case ShuffleDataBlockId(shuffleId, mapId, _) => - ShuffleChecksumHelper.getChecksumFileName( - ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name, algorithm) - case _ => - null - } - /** * Ensure that the checksum values are consistent with index file and data file. */ diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 26f0a86354478..7bac6db566844 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -330,8 +330,7 @@ public void writeChecksumFileWithoutSpill() throws Exception { ShuffleChecksumBlockId checksumBlockId = new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID()); String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM()); - String checksumFileName = ShuffleChecksumHelper.getChecksumFileName( - checksumBlockId.name(), checksumAlgorithm); + String checksumFileName = checksumBlockId.name(); File checksumFile = new File(tempDir, checksumFileName); File dataFile = new File(tempDir, "data"); File indexFile = new File(tempDir, "index"); @@ -361,8 +360,7 @@ public void writeChecksumFileWithSpill() throws Exception { ShuffleChecksumBlockId checksumBlockId = new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID()); String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM()); - String checksumFileName = ShuffleChecksumHelper.getChecksumFileName( - checksumBlockId.name(), checksumAlgorithm); + String checksumFileName = checksumBlockId.name(); File checksumFile = new File(tempDir, checksumFileName); File dataFile = new File(tempDir, "data"); File indexFile = new File(tempDir, "index"); diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 3c66286c3c7ba..2f785efeffca3 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -245,10 +245,6 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) .set(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED, enabled) .set(config.EXECUTOR_REMOVE_DELAY.key, "0s") - .set(config.EXECUTOR_INSTANCES, 1) - .set(config.EXECUTOR_CORES, 1) -// .set("spark.executor.extraJavaOptions", -// "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005") sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithLocalDiskReading) sc.env.blockManager.externalShuffleServiceEnabled should equal(true) sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) @@ -278,11 +274,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { ShuffleChecksumBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, shuffleBlockId.reduceId).name ).map { blockId => - val f = new File(ExecutorDiskUtils.getFilePath(dirs, + new File(ExecutorDiskUtils.getFilePath(dirs, sc.env.blockManager.subDirsPerLocalDir, blockId)) - println(s"shuffle file: ${f.getAbsolutePath}, exists: ${f.exists()}, " + - s"file length: ${if (f.exists()) f.length() else "N/A"}") - f } } promise.success(files) @@ -292,7 +285,6 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { promise.future } val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) - println(s"files to check: ${filesToCheck.map(_.getAbsolutePath).mkString(", ")}") assert(filesToCheck.length == 6) assert(filesToCheck.forall(_.exists())) @@ -317,8 +309,6 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { sc.cleaner.foreach(_.doCleanupShuffle(0, true)) if (enabled) { - println(s"checking files after cleanup: ${filesToCheck.filter(_.exists()). - map(f => s"${f.getAbsolutePath}: exists=${f.exists()}").mkString(", ")}") assert(filesToCheck.forall(!_.exists())) } else { assert(filesToCheck.forall(_.exists())) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index c9b951cf0369a..253129c3a87e3 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -33,7 +33,6 @@ import org.apache.spark._ import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} import org.apache.spark.internal.config import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager} -import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper import org.apache.spark.serializer.{JavaSerializer, SerializerInstance, SerializerManager} import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleChecksumTestHelper} import org.apache.spark.shuffle.api.ShuffleExecutorComponents @@ -269,8 +268,7 @@ class BypassMergeSortShuffleWriterSuite val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, 0) val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, 0) val checksumAlgorithm = conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM) - val checksumFileName = ShuffleChecksumHelper.getChecksumFileName( - checksumBlockId.name, checksumAlgorithm) + val checksumFileName = checksumBlockId.name val checksumFile = new File(tempDir, checksumFileName) val dataFile = new File(tempDir, dataBlockId.name) val indexFile = new File(tempDir, indexBlockId.name)