Skip to content

Commit 11101fd

Browse files
committed
Remove unused thread pool in S3ShuffleReader and cleanup unused imports.
Signed-off-by: Pascal Spörri <psp@zurich.ibm.com>
1 parent 4659baa commit 11101fd

5 files changed

+3
-16
lines changed

src/main/scala/org/apache/spark/shuffle/S3ShuffleMapOutputWriter.scala

-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package org.apache.spark.shuffle
88
import org.apache.hadoop.fs.FSDataOutputStream
99
import org.apache.spark.SparkConf
1010
import org.apache.spark.internal.Logging
11-
import org.apache.spark.internal.config.SHUFFLE_FILE_BUFFER_SIZE
1211
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
1312
import org.apache.spark.shuffle.api.metadata.MapOutputCommitMessage
1413
import org.apache.spark.shuffle.api.{ShuffleMapOutputWriter, ShufflePartitionWriter, WritableByteChannelWrapper}

src/main/scala/org/apache/spark/shuffle/S3SingleSpillShuffleMapOutputWriter.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import org.apache.spark.shuffle.helper.{S3ShuffleDispatcher, S3ShuffleHelper}
1010
import org.apache.spark.storage.ShuffleDataBlockId
1111
import org.apache.spark.util.Utils
1212

13-
import java.io.{BufferedOutputStream, File, FileInputStream}
13+
import java.io.{File, FileInputStream}
1414

1515
class S3SingleSpillShuffleMapOutputWriter(shuffleId: Int, mapId: Long) extends SingleSpillShuffleMapOutputWriter {
1616

src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala

-5
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,14 @@
66
package org.apache.spark.shuffle.helper
77

88
import org.apache.hadoop.fs._
9-
import org.apache.hadoop.fs.impl.OpenFileParameters
109
import org.apache.spark.deploy.SparkHadoopUtil
11-
import org.apache.spark.internal.config.{MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, SHUFFLE_FILE_BUFFER_SIZE}
1210
import org.apache.spark.internal.{Logging, config}
1311
import org.apache.spark.shuffle.ConcurrentObjectMap
1412
import org.apache.spark.storage._
1513
import org.apache.spark.{SparkConf, SparkEnv}
1614

1715
import java.io.IOException
1816
import java.net.URI
19-
import java.util.concurrent.CompletableFuture
2017
import scala.concurrent.ExecutionContext.Implicits.global
2118
import scala.concurrent.duration.Duration
2219
import scala.concurrent.{Await, Future}
@@ -32,8 +29,6 @@ class S3ShuffleDispatcher extends Logging {
3229

3330
// Required
3431
val rootDir = conf.get("spark.shuffle.s3.rootDir", defaultValue = "sparkS3shuffle")
35-
private val isCOS = rootDir.startsWith("cos://")
36-
private val isS3A = rootDir.startsWith("s3a://")
3732

3833
// Optional
3934
val bufferSize: Int = conf.getInt("spark.shuffle.s3.bufferSize", defaultValue = 8 * 1024 * 1024)

src/main/scala/org/apache/spark/storage/S3ShuffleBlockStream.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import org.apache.spark.internal.Logging
99
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
1010
import org.apache.spark.shuffle.helper.S3ShuffleDispatcher
1111

12-
import java.io.{EOFException, IOException, InputStream}
12+
import java.io.{IOException, InputStream}
1313

1414
/**
1515
* InputStream that reads data from a shuffleBlock, mapId and exposes an InputStream from startReduceId to endReduceId.

src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala

+1-8
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,10 @@ import org.apache.spark.serializer.SerializerManager
2828
import org.apache.spark.shuffle.helper.{S3ShuffleDispatcher, S3ShuffleHelper}
2929
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReadMetricsReporter, ShuffleReader}
3030
import org.apache.spark.storage.ShuffleBlockFetcherIterator.FetchBlockInfo
31-
import org.apache.spark.util.{CompletionIterator, ThreadUtils}
31+
import org.apache.spark.util.CompletionIterator
3232
import org.apache.spark.util.collection.ExternalSorter
3333
import org.apache.spark.{InterruptibleIterator, SparkConf, SparkEnv, TaskContext}
3434

35-
import scala.concurrent.ExecutionContext
36-
3735
/**
3836
* This class was adapted from Apache Spark: BlockStoreShuffleReader.
3937
*/
@@ -176,8 +174,3 @@ class S3ShuffleReader[K, C](
176174
}
177175
}
178176
}
179-
180-
object S3ShuffleReader {
181-
private lazy val asyncThreadPool = ThreadUtils.newDaemonCachedThreadPool("s3-shuffle-reader-async-thread-pool", S3ShuffleDispatcher.get.prefetchThreadPoolSize)
182-
private lazy implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
183-
}

0 commit comments

Comments
 (0)