Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Executor Spill Heuristic - (Depends on Custom SHS - Requires totalMemoryBytesSpilled metric) #310

Merged
merged 8 commits into from
Jan 10, 2018

Conversation

skakker
Copy link
Contributor

@skakker skakker commented Dec 5, 2017

checks whether memory is getting spilled and recommendations are given accordingly.

if(evaluator.severity != Severity.NONE){
resultDetails :+ new HeuristicResultDetails("Note", "Your memory is being spilled. Kindly look into it.")
if(evaluator.sparkExecutorCores >=4 && evaluator.sparkExecutorMemory >= MemoryFormatUtils.stringToBytes("10GB")) {
resultDetails :+ new HeuristicResultDetails("Recommendation", "You can try decreasing the number of cores to reduce the number of concurrently running tasks.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if increasing memory could be a recommendation. That should also reduce the memory spill?

@@ -87,6 +87,7 @@ trait ExecutorSummary{
def totalShuffleRead: Long
def totalShuffleWrite: Long
def maxMemory: Long
def totalMemoryBytesSpilled: Long
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the regular Spark history server REST API provide totalMemoryBytesSpilled? Or you are using a custom Spark history server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the custom Spark History Server

val fractionOfExecutorsHavingBytesSpilled: Double = executorSummaries.count(_.totalMemoryBytesSpilled > 0).toDouble / executorSummaries.size.toDouble
val severity: Severity = {
if (fractionOfExecutorsHavingBytesSpilled != 0) {
if (fractionOfExecutorsHavingBytesSpilled < 0.2 && maxMemorySpilled < 0.05 * ratioMemoryCores) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the threshold configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (fractionOfExecutorsHavingBytesSpilled < 0.2 && maxMemorySpilled < 0.05 * ratioMemoryCores) {
Severity.LOW
}
else if (fractionOfExecutorsHavingBytesSpilled < 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the threshold configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Severity.MODERATE
}

else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the threshold configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) {
Severity.SEVERE
}
else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled >= 0.05 * ratioMemoryCores) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the threshold configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

)

if(evaluator.severity != Severity.NONE){
resultDetails :+ new HeuristicResultDetails("Note", "Your memory is being spilled. Kindly look into it.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change "memory" to "execution memory". We can recommend increasing executor memory, if it isn't too high (perhaps <10G) already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


object ExecutorStorageSpillHeuristic {
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"
val SPARK_EXECUTOR_CORES = "spark.executor.cores"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the number of executors, to get a sense of the amount of memory spilled per executor, rather than dividing by the number of cores per executor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these variables if not in use.


if(evaluator.severity != Severity.NONE){
resultDetails :+ new HeuristicResultDetails("Note", "Your execution memory is being spilled. Kindly look into it.")
if(evaluator.sparkExecutorCores >=4 && evaluator.sparkExecutorMemory >= MemoryFormatUtils.stringToBytes("10GB")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 and 10GB should be made configurable in HeuristicConf


object ExecutorStorageSpillHeuristic {
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"
val SPARK_EXECUTOR_CORES = "spark.executor.cores"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these variables if not in use.

}

lazy val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0)
lazy val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove if not required.

* License for the specific language governing permissions and limitations under
* the License.
*@
<p>Spark performs best when data is kept in memory. Spilled execution memory is tracked by memoryBytesSpilled, which is available at the task level, and each stage will have this information for its tasks. memoryBytesSpilled is cumulative for a task -- it is incremented whenever more execution memory is spilled. If execution memory is being spilled, then the warnings are as follows:</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the help page reflecting the non usage of sparkExecutorCores

@akshayrai akshayrai changed the title adding spill heuristic Executor Spill Heuristic Jan 10, 2018
@akshayrai akshayrai changed the title Executor Spill Heuristic Spark Executor Spill Heuristic - (Depends on Custom SHS - Requires totalMemoryBytesSpilled) Jan 10, 2018
@akshayrai akshayrai changed the title Spark Executor Spill Heuristic - (Depends on Custom SHS - Requires totalMemoryBytesSpilled) Spark Executor Spill Heuristic - (Depends on Custom SHS - Requires totalMemoryBytesSpilled metric) Jan 10, 2018
@skakker skakker changed the base branch from master to customSHSWork January 10, 2018 05:59
@akshayrai akshayrai merged commit 14ec8f2 into linkedin:customSHSWork Jan 10, 2018
akshayrai pushed a commit that referenced this pull request Feb 21, 2018
akshayrai pushed a commit that referenced this pull request Feb 27, 2018
akshayrai pushed a commit that referenced this pull request Mar 6, 2018
arpang pushed a commit to arpang/dr-elephant that referenced this pull request Mar 14, 2018
akshayrai pushed a commit that referenced this pull request Mar 19, 2018
akshayrai pushed a commit that referenced this pull request Mar 19, 2018
akshayrai pushed a commit that referenced this pull request Mar 30, 2018
akshayrai pushed a commit that referenced this pull request Apr 6, 2018
akshayrai pushed a commit that referenced this pull request May 21, 2018
pralabhkumar pushed a commit to pralabhkumar/dr-elephant that referenced this pull request Aug 31, 2018
varunsaxena pushed a commit that referenced this pull request Oct 16, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants