-
Notifications
You must be signed in to change notification settings - Fork 856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark Executor Spill Heuristic - (Depends on Custom SHS - Requires totalMemoryBytesSpilled metric) #310
Conversation
if(evaluator.severity != Severity.NONE){ | ||
resultDetails :+ new HeuristicResultDetails("Note", "Your memory is being spilled. Kindly look into it.") | ||
if(evaluator.sparkExecutorCores >=4 && evaluator.sparkExecutorMemory >= MemoryFormatUtils.stringToBytes("10GB")) { | ||
resultDetails :+ new HeuristicResultDetails("Recommendation", "You can try decreasing the number of cores to reduce the number of concurrently running tasks.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if increasing memory could be a recommendation. That should also reduce the memory spill?
@@ -87,6 +87,7 @@ trait ExecutorSummary{ | |||
def totalShuffleRead: Long | |||
def totalShuffleWrite: Long | |||
def maxMemory: Long | |||
def totalMemoryBytesSpilled: Long |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the regular Spark history server REST API provide totalMemoryBytesSpilled? Or you are using a custom Spark history server?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the custom Spark History Server
val fractionOfExecutorsHavingBytesSpilled: Double = executorSummaries.count(_.totalMemoryBytesSpilled > 0).toDouble / executorSummaries.size.toDouble | ||
val severity: Severity = { | ||
if (fractionOfExecutorsHavingBytesSpilled != 0) { | ||
if (fractionOfExecutorsHavingBytesSpilled < 0.2 && maxMemorySpilled < 0.05 * ratioMemoryCores) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the threshold configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
if (fractionOfExecutorsHavingBytesSpilled < 0.2 && maxMemorySpilled < 0.05 * ratioMemoryCores) { | ||
Severity.LOW | ||
} | ||
else if (fractionOfExecutorsHavingBytesSpilled < 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the threshold configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Severity.MODERATE | ||
} | ||
|
||
else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the threshold configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) { | ||
Severity.SEVERE | ||
} | ||
else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled >= 0.05 * ratioMemoryCores) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the threshold configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
204d5a7
to
681e577
Compare
) | ||
|
||
if(evaluator.severity != Severity.NONE){ | ||
resultDetails :+ new HeuristicResultDetails("Note", "Your memory is being spilled. Kindly look into it.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change "memory" to "execution memory". We can recommend increasing executor memory, if it isn't too high (perhaps <10G) already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
object ExecutorStorageSpillHeuristic { | ||
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" | ||
val SPARK_EXECUTOR_CORES = "spark.executor.cores" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the number of executors, to get a sense of the amount of memory spilled per executor, rather than dividing by the number of cores per executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove these variables if not in use.
a769026
to
20bd753
Compare
|
||
if(evaluator.severity != Severity.NONE){ | ||
resultDetails :+ new HeuristicResultDetails("Note", "Your execution memory is being spilled. Kindly look into it.") | ||
if(evaluator.sparkExecutorCores >=4 && evaluator.sparkExecutorMemory >= MemoryFormatUtils.stringToBytes("10GB")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 and 10GB should be made configurable in HeuristicConf
|
||
object ExecutorStorageSpillHeuristic { | ||
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" | ||
val SPARK_EXECUTOR_CORES = "spark.executor.cores" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove these variables if not in use.
} | ||
|
||
lazy val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0) | ||
lazy val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove if not required.
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*@ | ||
<p>Spark performs best when data is kept in memory. Spilled execution memory is tracked by memoryBytesSpilled, which is available at the task level, and each stage will have this information for its tasks. memoryBytesSpilled is cumulative for a task -- it is incremented whenever more execution memory is spilled. If execution memory is being spilled, then the warnings are as follows:</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the help page reflecting the non usage of sparkExecutorCores
…talMemoryBytesSpilled metric) (#310)
…talMemoryBytesSpilled metric) (#310)
…talMemoryBytesSpilled metric) (#310)
…talMemoryBytesSpilled metric) (linkedin#310)
…talMemoryBytesSpilled metric) (#310)
…talMemoryBytesSpilled metric) (#310)
…talMemoryBytesSpilled metric) (#310)
…talMemoryBytesSpilled metric) (#310)
…talMemoryBytesSpilled metric) (#310)
…talMemoryBytesSpilled metric) (linkedin#310)
…talMemoryBytesSpilled metric) (#310)
checks whether memory is getting spilled and recommendations are given accordingly.