-
Notifications
You must be signed in to change notification settings - Fork 856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark Executor Spill Heuristic - (Depends on Custom SHS - Requires totalMemoryBytesSpilled metric) #310
Merged
Merged
Spark Executor Spill Heuristic - (Depends on Custom SHS - Requires totalMemoryBytesSpilled metric) #310
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
dabc925
adding spill heuristic
919fd62
changed logic to fetch spilled bytes information from executors
20bd753
review comments
512719f
deleted unwanted files
58586e7
fixed the error divide by 0
b69d0a0
acknowledging review comments
91eb40d
changing instances to cores
4f3d280
acknowledging review comments and some other required changes
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
133 changes: 133 additions & 0 deletions
133
app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
/* | ||
* Copyright 2016 LinkedIn Corp. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.linkedin.drelephant.spark.heuristics | ||
|
||
import com.linkedin.drelephant.analysis.Severity | ||
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ExecutorStageSummary, ExecutorSummary, StageData} | ||
import com.linkedin.drelephant.analysis._ | ||
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData | ||
import com.linkedin.drelephant.spark.data.SparkApplicationData | ||
import com.linkedin.drelephant.util.MemoryFormatUtils | ||
|
||
import scala.collection.JavaConverters | ||
|
||
|
||
/** | ||
* A heuristic based on memory spilled. | ||
* | ||
*/ | ||
class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) | ||
extends Heuristic[SparkApplicationData] { | ||
|
||
import ExecutorStorageSpillHeuristic._ | ||
import JavaConverters._ | ||
|
||
val spillFractionOfExecutorsThreshold: Double = | ||
if(heuristicConfigurationData.getParamMap.get(SPILL_FRACTION_OF_EXECUTORS_THRESHOLD_KEY) == null) DEFAULT_SPILL_FRACTION_OF_EXECUTORS_THRESHOLD | ||
else heuristicConfigurationData.getParamMap.get(SPILL_FRACTION_OF_EXECUTORS_THRESHOLD_KEY).toDouble | ||
|
||
val spillMaxMemoryThreshold: Double = | ||
if(heuristicConfigurationData.getParamMap.get(SPILL_MAX_MEMORY_THRESHOLD_KEY) == null) DEFAULT_SPILL_MAX_MEMORY_THRESHOLD | ||
else heuristicConfigurationData.getParamMap.get(SPILL_MAX_MEMORY_THRESHOLD_KEY).toDouble | ||
|
||
val sparkExecutorCoresThreshold : Int = | ||
if(heuristicConfigurationData.getParamMap.get(SPARK_EXECUTOR_CORES_THRESHOLD_KEY) == null) DEFAULT_SPARK_EXECUTOR_CORES_THRESHOLD | ||
else heuristicConfigurationData.getParamMap.get(SPARK_EXECUTOR_CORES_THRESHOLD_KEY).toInt | ||
|
||
val sparkExecutorMemoryThreshold : String = | ||
if(heuristicConfigurationData.getParamMap.get(SPARK_EXECUTOR_MEMORY_THRESHOLD_KEY) == null) DEFAULT_SPARK_EXECUTOR_MEMORY_THRESHOLD | ||
else heuristicConfigurationData.getParamMap.get(SPARK_EXECUTOR_MEMORY_THRESHOLD_KEY) | ||
|
||
override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData | ||
|
||
override def apply(data: SparkApplicationData): HeuristicResult = { | ||
val evaluator = new Evaluator(this, data) | ||
var resultDetails = Seq( | ||
new HeuristicResultDetails("Total memory spilled", MemoryFormatUtils.bytesToString(evaluator.totalMemorySpilled)), | ||
new HeuristicResultDetails("Max memory spilled", MemoryFormatUtils.bytesToString(evaluator.maxMemorySpilled)), | ||
new HeuristicResultDetails("Mean memory spilled", MemoryFormatUtils.bytesToString(evaluator.meanMemorySpilled)), | ||
new HeuristicResultDetails("Fraction of executors having non zero bytes spilled", evaluator.fractionOfExecutorsHavingBytesSpilled.toString) | ||
) | ||
|
||
if(evaluator.severity != Severity.NONE){ | ||
resultDetails :+ new HeuristicResultDetails("Note", "Your execution memory is being spilled. Kindly look into it.") | ||
if(evaluator.sparkExecutorCores >= sparkExecutorCoresThreshold && evaluator.sparkExecutorMemory >= MemoryFormatUtils.stringToBytes(sparkExecutorMemoryThreshold)) { | ||
resultDetails :+ new HeuristicResultDetails("Recommendation", "You can try decreasing the number of cores to reduce the number of concurrently running tasks.") | ||
} else if (evaluator.sparkExecutorMemory <= MemoryFormatUtils.stringToBytes(sparkExecutorMemoryThreshold)) { | ||
resultDetails :+ new HeuristicResultDetails("Recommendation", "You can try increasing the executor memory to reduce spill.") | ||
} | ||
} | ||
|
||
val result = new HeuristicResult( | ||
heuristicConfigurationData.getClassName, | ||
heuristicConfigurationData.getHeuristicName, | ||
evaluator.severity, | ||
0, | ||
resultDetails.asJava | ||
) | ||
result | ||
} | ||
} | ||
|
||
object ExecutorStorageSpillHeuristic { | ||
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" | ||
val SPARK_EXECUTOR_CORES = "spark.executor.cores" | ||
val SPILL_FRACTION_OF_EXECUTORS_THRESHOLD_KEY = "spill_fraction_of_executors_threshold" | ||
val SPILL_MAX_MEMORY_THRESHOLD_KEY = "spill_max_memory_threshold" | ||
val SPARK_EXECUTOR_CORES_THRESHOLD_KEY = "spark_executor_cores_threshold" | ||
val SPARK_EXECUTOR_MEMORY_THRESHOLD_KEY = "spark_executor_memory_threshold" | ||
val DEFAULT_SPILL_FRACTION_OF_EXECUTORS_THRESHOLD : Double = 0.2 | ||
val DEFAULT_SPILL_MAX_MEMORY_THRESHOLD : Double = 0.05 | ||
val DEFAULT_SPARK_EXECUTOR_CORES_THRESHOLD : Int = 4 | ||
val DEFAULT_SPARK_EXECUTOR_MEMORY_THRESHOLD : String ="10GB" | ||
|
||
class Evaluator(executorStorageSpillHeuristic: ExecutorStorageSpillHeuristic, data: SparkApplicationData) { | ||
lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries | ||
lazy val appConfigurationProperties: Map[String, String] = | ||
data.appConfigurationProperties | ||
val maxMemorySpilled: Long = executorSummaries.map(_.totalMemoryBytesSpilled).max | ||
val meanMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum / executorSummaries.size | ||
val totalMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum | ||
val fractionOfExecutorsHavingBytesSpilled: Double = executorSummaries.count(_.totalMemoryBytesSpilled > 0).toDouble / executorSummaries.size.toDouble | ||
val severity: Severity = { | ||
if (fractionOfExecutorsHavingBytesSpilled != 0) { | ||
if (fractionOfExecutorsHavingBytesSpilled < executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold | ||
&& maxMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) { | ||
Severity.LOW | ||
} | ||
else if (fractionOfExecutorsHavingBytesSpilled < executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold | ||
&& meanMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) { | ||
Severity.MODERATE | ||
} | ||
|
||
else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold | ||
&& meanMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) { | ||
Severity.SEVERE | ||
} | ||
else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold | ||
&& meanMemorySpilled >= executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) { | ||
Severity.CRITICAL | ||
} else Severity.NONE | ||
} | ||
else Severity.NONE | ||
} | ||
|
||
lazy val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0) | ||
lazy val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove if not required. |
||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
23 changes: 23 additions & 0 deletions
23
app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
@* | ||
* Copyright 2016 LinkedIn Corp. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*@ | ||
<p>Spark performs best when data is kept in memory. Spilled execution memory is tracked by memoryBytesSpilled, which is available executor level. If execution memory is being spilled, then the warnings are as follows:</p> | ||
<p>Low: memoryBytesSpilled is non-zero for 1 or more executors, greater than zero for < 20% of executors, and max size is < .05 * <i>spark.executor.memory</i>.</p> | ||
<p>Moderate: memoryBytesSpilled is non-zero for 1 or more executors, greater than zero for < 20% of executors, and avg size is < .05 * <i>spark.executor.memory</i>.</p> | ||
<p>Severe: memoryBytes Spilled is greater than zero for > 20% of executors and avg size is < .05 * <i>spark.executor.memory</i>.</p> | ||
<p>Critical: memoryBytes Spilled is greater than zero for > 20% of executors and/or avg size is >= .05 * <i>spark.executor.memory</i>.</p> | ||
<h3>Suggestions</h3> | ||
<p>If number of cores (spark.executor.cores) is more than 4 and executor memory is > 10GB : Try decreasing the number of cores which would decrese the number of tasks running in parallel, hence decreasing the number of bytes spilled.</p> | ||
<p>You can also try increasing the <i>spark.executor.memory</i> which will reduce memory spilled.</p> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
136 changes: 136 additions & 0 deletions
136
test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
/* | ||
* Copyright 2016 LinkedIn Corp. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.linkedin.drelephant.spark.heuristics | ||
|
||
import scala.collection.JavaConverters | ||
import com.linkedin.drelephant.analysis.{ApplicationType, Severity, SeverityThresholds} | ||
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData | ||
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} | ||
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl, StageDataImpl} | ||
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate | ||
import org.scalatest.{FunSpec, Matchers} | ||
|
||
|
||
class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers { | ||
import ExecutorStorageSpillHeuristicTest._ | ||
|
||
describe("ExecutorStorageSpillHeuristic") { | ||
val heuristicConfigurationData = newFakeHeuristicConfigurationData( | ||
Map.empty | ||
) | ||
val executorStorageSpillHeuristic = new ExecutorStorageSpillHeuristic(heuristicConfigurationData) | ||
|
||
val appConfigurationProperties = Map("spark.executor.memory" -> "4g", "spark.executor.cores"->"4", "spark.executor.instances"->"4") | ||
|
||
val executorSummaries = Seq( | ||
newFakeExecutorSummary( | ||
id = "1", | ||
totalMemoryBytesSpilled = 200000L | ||
), | ||
newFakeExecutorSummary( | ||
id = "2", | ||
totalMemoryBytesSpilled = 100000L | ||
), | ||
newFakeExecutorSummary( | ||
id = "3", | ||
totalMemoryBytesSpilled = 300000L | ||
), | ||
newFakeExecutorSummary( | ||
id = "4", | ||
totalMemoryBytesSpilled = 200000L | ||
) | ||
) | ||
|
||
describe(".apply") { | ||
val data1 = newFakeSparkApplicationData(executorSummaries, appConfigurationProperties) | ||
val heuristicResult = executorStorageSpillHeuristic.apply(data1) | ||
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails | ||
|
||
it("returns the severity") { | ||
heuristicResult.getSeverity should be(Severity.SEVERE) | ||
} | ||
|
||
it("returns the total memory spilled") { | ||
val details = heuristicResultDetails.get(0) | ||
details.getName should include("Total memory spilled") | ||
details.getValue should be("781.25 KB") | ||
} | ||
|
||
it("returns the max memory spilled") { | ||
val details = heuristicResultDetails.get(1) | ||
details.getName should include("Max memory spilled") | ||
details.getValue should be("292.97 KB") | ||
} | ||
|
||
it("returns the mean memory spilled") { | ||
val details = heuristicResultDetails.get(2) | ||
details.getName should include("Mean memory spilled") | ||
details.getValue should be("195.31 KB") | ||
} | ||
} | ||
} | ||
} | ||
|
||
object ExecutorStorageSpillHeuristicTest { | ||
import JavaConverters._ | ||
|
||
def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData = | ||
new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava) | ||
|
||
def newFakeExecutorSummary( | ||
id: String, | ||
totalMemoryBytesSpilled: Long | ||
): ExecutorSummaryImpl = new ExecutorSummaryImpl( | ||
id, | ||
hostPort = "", | ||
rddBlocks = 0, | ||
memoryUsed=0, | ||
diskUsed = 0, | ||
activeTasks = 0, | ||
failedTasks = 0, | ||
completedTasks = 0, | ||
totalTasks = 0, | ||
totalDuration=0, | ||
totalInputBytes=0, | ||
totalShuffleRead=0, | ||
totalShuffleWrite= 0, | ||
maxMemory= 0, | ||
totalGCTime = 0, | ||
totalMemoryBytesSpilled, | ||
executorLogs = Map.empty | ||
) | ||
|
||
def newFakeSparkApplicationData( | ||
executorSummaries: Seq[ExecutorSummaryImpl], | ||
appConfigurationProperties: Map[String, String] | ||
): SparkApplicationData = { | ||
val appId = "application_1" | ||
|
||
val restDerivedData = SparkRestDerivedData( | ||
new ApplicationInfoImpl(appId, name = "app", Seq.empty), | ||
jobDatas = Seq.empty, | ||
stageDatas = Seq.empty, | ||
executorSummaries = executorSummaries | ||
) | ||
|
||
val logDerivedData = SparkLogDerivedData( | ||
SparkListenerEnvironmentUpdate(Map("Spark Properties" -> appConfigurationProperties.toSeq)) | ||
) | ||
|
||
SparkApplicationData(appId, restDerivedData, Some(logDerivedData)) | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the number of executors, to get a sense of the amount of memory spilled per executor, rather than dividing by the number of cores per executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove these variables if not in use.