Skip to content

Commit 0b476c0

Browse files
committed
Log Stage and Task ID to understand I/O bottlenecks.
Signed-off-by: Pascal Spörri <psp@zurich.ibm.com>
1 parent 11df887 commit 0b476c0

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

src/main/scala/org/apache/spark/shuffle/S3MeasureOutputStream.scala

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.apache.spark.shuffle
22

3+
import org.apache.spark.TaskContext
34
import org.apache.spark.internal.Logging
45

56
import java.io.{IOException, OutputStream}
@@ -52,8 +53,12 @@ class S3MeasureOutputStream(var out: OutputStream, label: String = "") extends O
5253
isOpen = false
5354
super.close()
5455

56+
val tc = TaskContext.get()
57+
val sId = tc.stageId()
58+
val sAt = tc.stageAttemptNumber()
5559
val t = timings / 1000000
5660
val bw = bytes.toDouble / (t.toDouble / 1000) / (1024 * 1024)
57-
logInfo(s"Statistics: Writing ${label} ${bytes} took ${t} ms (${bw} MiB/s)")
61+
logInfo(s"Statistics: Stage ${sId}.${sAt} TID ${tc.taskAttemptId()} -- " +
62+
s"Writing ${label} ${bytes} took ${t} ms (${bw} MiB/s)")
5863
}
5964
}

src/main/scala/org/apache/spark/storage/S3BufferedPrefetchIterator.scala

+6-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.apache.spark.storage
77

8+
import org.apache.spark.TaskContext
89
import org.apache.spark.internal.Logging
910
import org.apache.spark.shuffle.helper.S3ShuffleDispatcher
1011

@@ -152,6 +153,9 @@ class S3BufferedPrefetchIterator(iter: Iterator[(BlockId, S3ShuffleBlockStream)]
152153

153154
private def printStatistics(): Unit = synchronized {
154155
val totalRuntime = System.nanoTime() - startTime
156+
val tc = TaskContext.get()
157+
val sId = tc.stageId()
158+
val sAt = tc.stageAttemptNumber()
155159
try {
156160
val tR = totalRuntime / 1000000
157161
val wPer = 100 * timeWaiting / totalRuntime
@@ -169,7 +173,8 @@ class S3BufferedPrefetchIterator(iter: Iterator[(BlockId, S3ShuffleBlockStream)]
169173
val bs = bR / r
170174
// Threads
171175
val ta = desiredActiveThreads.get()
172-
logInfo(s"Statistics: ${bR} bytes, ${tW} ms waiting (${atW} avg), " +
176+
logInfo(s"Statistics: Stage ${sId}.${sAt} TID ${tc.taskAttemptId()} -- " +
177+
s"${bR} bytes, ${tW} ms waiting (${atW} avg), " +
173178
s"${tP} ms prefetching (avg: ${atP} ms - ${bs} block size - ${bW} MiB/s). " +
174179
s"Total: ${tR} ms - ${wPer}% waiting. ${ta} active threads.")
175180
} catch {

0 commit comments

Comments
 (0)