Skip to content

Commit da0c7cc

Browse files
cxzl25dongjoon-hyun
authored andcommitted
[SPARK-48037][CORE][3.4] Fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data
### What changes were proposed in this pull request? This PR aims to fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data. ### Why are the changes needed? When the shuffle writer is SortShuffleWriter, it does not use SQLShuffleWriteMetricsReporter to update metrics, which causes AQE to obtain runtime statistics and the rowCount obtained is 0. Some optimization rules rely on rowCount statistics, such as `EliminateLimits`. Because rowCount is 0, it removes the limit operator. At this time, we get data results without limit. https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L168-L172 https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2067-L2070 ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Production environment verification. **master metrics** <img width="296" alt="image" src="https://github.com/apache/spark/assets/3898450/dc9b6e8a-93ec-4f59-a903-71aa5b11962c"> **PR metrics** <img width="276" alt="image" src="https://github.com/apache/spark/assets/3898450/2d73b773-2dcc-4d23-81de-25dcadac86c1"> ### Was this patch authored or co-authored using generative AI tooling? No Closes #46464 from cxzl25/SPARK-48037-3.4. Authored-by: sychen <sychen@ctrip.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent 2974e62 commit da0c7cc

File tree

7 files changed

+44
-12
lines changed

7 files changed

+44
-12
lines changed

.github/workflows/build_and_test.yml

+1
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,7 @@ jobs:
644644
python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme 'sphinx-copybutton==0.5.2' nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 'pyzmq<24.0.0' 'sphinxcontrib-applehelp==1.0.4' 'sphinxcontrib-devhelp==1.0.2' 'sphinxcontrib-htmlhelp==2.0.1' 'sphinxcontrib-qthelp==1.0.3' 'sphinxcontrib-serializinghtml==1.1.5' 'nest-asyncio==1.5.8' 'rpds-py==0.16.2' 'alabaster==0.7.13'
645645
python3.9 -m pip install ipython_genutils # See SPARK-38517
646646
python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' 'pyarrow==12.0.1' pandas 'plotly>=4.8'
647+
python3.9 -m pip install 'nbsphinx==0.9.3'
647648
python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421
648649
apt-get update -y
649650
apt-get install -y ruby ruby-dev

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
176176
metrics,
177177
shuffleExecutorComponents)
178178
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
179-
new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)
179+
new SortShuffleWriter(other, mapId, context, metrics, shuffleExecutorComponents)
180180
}
181181
}
182182

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@ import org.apache.spark._
2121
import org.apache.spark.internal.{config, Logging}
2222
import org.apache.spark.scheduler.MapStatus
2323
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleWriter}
24+
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter
2425
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
2526
import org.apache.spark.util.collection.ExternalSorter
2627

2728
private[spark] class SortShuffleWriter[K, V, C](
2829
handle: BaseShuffleHandle[K, V, C],
2930
mapId: Long,
3031
context: TaskContext,
32+
writeMetrics: ShuffleWriteMetricsReporter,
3133
shuffleExecutorComponents: ShuffleExecutorComponents)
3234
extends ShuffleWriter[K, V] with Logging {
3335

@@ -46,8 +48,6 @@ private[spark] class SortShuffleWriter[K, V, C](
4648

4749
private var partitionLengths: Array[Long] = _
4850

49-
private val writeMetrics = context.taskMetrics().shuffleWriteMetrics
50-
5151
/** Write a bunch of records to this task's output */
5252
override def write(records: Iterator[Product2[K, V]]): Unit = {
5353
sorter = if (dep.mapSideCombine) {
@@ -67,7 +67,7 @@ private[spark] class SortShuffleWriter[K, V, C](
6767
// (see SPARK-3570).
6868
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
6969
dep.shuffleId, mapId, dep.partitioner.numPartitions)
70-
sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
70+
sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter, writeMetrics)
7171
partitionLengths = mapOutputWriter.commitAllPartitions(sorter.getChecksums).getPartitionLengths
7272
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
7373
}

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

+5-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark._
2929
import org.apache.spark.executor.ShuffleWriteMetrics
3030
import org.apache.spark.internal.{config, Logging}
3131
import org.apache.spark.serializer._
32-
import org.apache.spark.shuffle.ShufflePartitionPairsWriter
32+
import org.apache.spark.shuffle.{ShufflePartitionPairsWriter, ShuffleWriteMetricsReporter}
3333
import org.apache.spark.shuffle.api.{ShuffleMapOutputWriter, ShufflePartitionWriter}
3434
import org.apache.spark.shuffle.checksum.ShuffleChecksumSupport
3535
import org.apache.spark.storage.{BlockId, DiskBlockObjectWriter, ShuffleBlockId}
@@ -696,7 +696,8 @@ private[spark] class ExternalSorter[K, V, C](
696696
def writePartitionedMapOutput(
697697
shuffleId: Int,
698698
mapId: Long,
699-
mapOutputWriter: ShuffleMapOutputWriter): Unit = {
699+
mapOutputWriter: ShuffleMapOutputWriter,
700+
writeMetrics: ShuffleWriteMetricsReporter): Unit = {
700701
var nextPartitionId = 0
701702
if (spills.isEmpty) {
702703
// Case where we only have in-memory data
@@ -714,7 +715,7 @@ private[spark] class ExternalSorter[K, V, C](
714715
serializerManager,
715716
serInstance,
716717
blockId,
717-
context.taskMetrics().shuffleWriteMetrics,
718+
writeMetrics,
718719
if (partitionChecksums.nonEmpty) partitionChecksums(partitionId) else null)
719720
while (it.hasNext && it.nextPartition() == partitionId) {
720721
it.writeNext(partitionPairsWriter)
@@ -739,7 +740,7 @@ private[spark] class ExternalSorter[K, V, C](
739740
serializerManager,
740741
serInstance,
741742
blockId,
742-
context.taskMetrics().shuffleWriteMetrics,
743+
writeMetrics,
743744
if (partitionChecksums.nonEmpty) partitionChecksums(id) else null)
744745
if (elements.hasNext) {
745746
for (elem <- elements) {

core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala

+3
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class SortShuffleWriterSuite
8585
shuffleHandle,
8686
mapId = 1,
8787
context,
88+
context.taskMetrics().shuffleWriteMetrics,
8889
shuffleExecutorComponents)
8990
writer.write(Iterator.empty)
9091
writer.stop(success = true)
@@ -102,6 +103,7 @@ class SortShuffleWriterSuite
102103
shuffleHandle,
103104
mapId = 2,
104105
context,
106+
context.taskMetrics().shuffleWriteMetrics,
105107
shuffleExecutorComponents)
106108
writer.write(records.iterator)
107109
writer.stop(success = true)
@@ -158,6 +160,7 @@ class SortShuffleWriterSuite
158160
shuffleHandle,
159161
mapId = 0,
160162
context,
163+
context.taskMetrics().shuffleWriteMetrics,
161164
new LocalDiskShuffleExecutorComponents(
162165
conf, shuffleBlockResolver._blockManager, shuffleBlockResolver))
163166
writer.write(records.iterator)

sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession {
130130
assert(sorter.numSpills > 0)
131131

132132
// Merging spilled files should not throw assertion error
133-
sorter.writePartitionedMapOutput(0, 0, mapOutputWriter)
133+
sorter.writePartitionedMapOutput(0, 0, mapOutputWriter,
134+
taskContext.taskMetrics.shuffleWriteMetrics)
134135
}
135136

136137
test("SPARK-10403: unsafe row serializer with SortShuffleManager") {

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

+29-3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.scalatest.time.SpanSugar._
2626

2727
import org.apache.spark.SparkException
2828
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
29+
import org.apache.spark.shuffle.sort.SortShuffleManager
2930
import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
3031
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
3132
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
@@ -59,7 +60,8 @@ class AdaptiveQueryExecSuite
5960

6061
setupTestData()
6162

62-
private def runAdaptiveAndVerifyResult(query: String): (SparkPlan, SparkPlan) = {
63+
private def runAdaptiveAndVerifyResult(query: String,
64+
skipCheckAnswer: Boolean = false): (SparkPlan, SparkPlan) = {
6365
var finalPlanCnt = 0
6466
val listener = new SparkListener {
6567
override def onOtherEvent(event: SparkListenerEvent): Unit = {
@@ -80,8 +82,10 @@ class AdaptiveQueryExecSuite
8082
assert(planBefore.toString.startsWith("AdaptiveSparkPlan isFinalPlan=false"))
8183
val result = dfAdaptive.collect()
8284
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
83-
val df = sql(query)
84-
checkAnswer(df, result)
85+
if (!skipCheckAnswer) {
86+
val df = sql(query)
87+
checkAnswer(df, result)
88+
}
8589
}
8690
val planAfter = dfAdaptive.queryExecution.executedPlan
8791
assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
@@ -2390,6 +2394,28 @@ class AdaptiveQueryExecSuite
23902394
}
23912395
}
23922396

2397+
test("SPARK-48037: Fix SortShuffleWriter lacks shuffle write related metrics " +
2398+
"resulting in potentially inaccurate data") {
2399+
withTable("t3") {
2400+
withSQLConf(
2401+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
2402+
SQLConf.SHUFFLE_PARTITIONS.key -> (SortShuffleManager
2403+
.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE + 1).toString) {
2404+
sql("CREATE TABLE t3 USING PARQUET AS SELECT id FROM range(2)")
2405+
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
2406+
"""
2407+
|SELECT id, count(*)
2408+
|FROM t3
2409+
|GROUP BY id
2410+
|LIMIT 1
2411+
|""".stripMargin, skipCheckAnswer = true)
2412+
// The shuffle stage produces two rows and the limit operator should not been optimized out.
2413+
assert(findTopLevelLimit(plan).size == 1)
2414+
assert(findTopLevelLimit(adaptivePlan).size == 1)
2415+
}
2416+
}
2417+
}
2418+
23932419
test("SPARK-37063: OptimizeSkewInRebalancePartitions support optimize non-root node") {
23942420
withTempView("v") {
23952421
withSQLConf(

0 commit comments

Comments
 (0)