diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index a46c23447f84f..1fead6ad277e4 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -662,7 +662,8 @@ private[spark] class IndexShuffleBlockResolver( override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = { Seq( ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID), - ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) + ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID), + ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID) ) } diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index d86fbc850d1f4..2f785efeffca3 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server.TransportServer import org.apache.spark.network.shuffle.{ExecutorDiskUtils, ExternalBlockHandler, ExternalBlockStoreClient} -import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel} +import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId, ShuffleChecksumBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel} import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.io.ChunkedByteBuffer @@ -270,6 +270,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { ShuffleDataBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, shuffleBlockId.reduceId).name, ShuffleIndexBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, + shuffleBlockId.reduceId).name, + ShuffleChecksumBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, shuffleBlockId.reduceId).name ).map { blockId => new File(ExecutorDiskUtils.getFilePath(dirs, @@ -283,7 +285,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { promise.future } val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) - assert(filesToCheck.length == 4) + assert(filesToCheck.length == 6) assert(filesToCheck.forall(_.exists())) if (enabled) {