Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
swasti committed Dec 18, 2017
1 parent 681e577 commit a769026
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: Heur
import ExecutorStorageSpillHeuristic._
import JavaConverters._

val spillFractionOfExecutorsThreshold: Double =
if(heuristicConfigurationData.getParamMap.get(SPILL_FRACTION_OF_EXECUTORS_THRESHOLD_KEY) == null) DEFAULT_SPILL_FRACTION_OF_EXECUTORS_THRESHOLD
else heuristicConfigurationData.getParamMap.get(SPILL_FRACTION_OF_EXECUTORS_THRESHOLD_KEY).toDouble

val spillMaxMemoryThreshold: Double =
if(heuristicConfigurationData.getParamMap.get(SPILL_MAX_MEMORY_THRESHOLD_KEY) == null) DEFAULT_SPILL_MAX_MEMORY_THRESHOLD
else heuristicConfigurationData.getParamMap.get(SPILL_MAX_MEMORY_THRESHOLD_KEY).toDouble


override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData

override def apply(data: SparkApplicationData): HeuristicResult = {
Expand All @@ -48,9 +57,11 @@ class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: Heur
)

if(evaluator.severity != Severity.NONE){
resultDetails :+ new HeuristicResultDetails("Note", "Your memory is being spilled. Kindly look into it.")
resultDetails :+ new HeuristicResultDetails("Note", "Your execution memory is being spilled. Kindly look into it.")
if(evaluator.sparkExecutorCores >=4 && evaluator.sparkExecutorMemory >= MemoryFormatUtils.stringToBytes("10GB")) {
resultDetails :+ new HeuristicResultDetails("Recommendation", "You can try decreasing the number of cores to reduce the number of concurrently running tasks.")
} else if (evaluator.sparkExecutorMemory >= MemoryFormatUtils.stringToBytes("10GB")) {
resultDetails :+ new HeuristicResultDetails("Recommendation", "You can try increasing the executor memory to reduce spill.")
}
}

Expand All @@ -68,29 +79,38 @@ class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: Heur
object ExecutorStorageSpillHeuristic {
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"
val SPARK_EXECUTOR_CORES = "spark.executor.cores"
val SPARK_EXECUTOR_INSTANCES = "spark.executor.instances"
val SPILL_FRACTION_OF_EXECUTORS_THRESHOLD_KEY = "spill_fraction_of_executors_threshold"
val SPILL_MAX_MEMORY_THRESHOLD_KEY = "spill_max_memory_threshold"
val DEFAULT_SPILL_FRACTION_OF_EXECUTORS_THRESHOLD : Double = 0.2
val DEFAULT_SPILL_MAX_MEMORY_THRESHOLD : Double = 0.05

class Evaluator(memoryFractionHeuristic: ExecutorStorageSpillHeuristic, data: SparkApplicationData) {
class Evaluator(executorStorageSpillHeuristic: ExecutorStorageSpillHeuristic, data: SparkApplicationData) {
lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties
val ratioMemoryCores: Long = (sparkExecutorMemory / sparkExecutorCores)
val memoryPerExecutor: Long = (sparkExecutorMemory / sparkExecutorInstances)
val maxMemorySpilled: Long = executorSummaries.map(_.totalMemoryBytesSpilled).max
val meanMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum / executorSummaries.size
val totalMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum
val fractionOfExecutorsHavingBytesSpilled: Double = executorSummaries.count(_.totalMemoryBytesSpilled > 0).toDouble / executorSummaries.size.toDouble
val severity: Severity = {
if (fractionOfExecutorsHavingBytesSpilled != 0) {
if (fractionOfExecutorsHavingBytesSpilled < 0.2 && maxMemorySpilled < 0.05 * ratioMemoryCores) {
if (fractionOfExecutorsHavingBytesSpilled < executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold
&& maxMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * memoryPerExecutor) {
Severity.LOW
}
else if (fractionOfExecutorsHavingBytesSpilled < 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) {
else if (fractionOfExecutorsHavingBytesSpilled < executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold
&& meanMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * memoryPerExecutor) {
Severity.MODERATE
}

else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) {
else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold
&& meanMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * memoryPerExecutor) {
Severity.SEVERE
}
else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled >= 0.05 * ratioMemoryCores) {
else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold
&& meanMemorySpilled >= executorStorageSpillHeuristic.spillMaxMemoryThreshold * memoryPerExecutor) {
Severity.CRITICAL
} else Severity.NONE
}
Expand All @@ -99,6 +119,7 @@ object ExecutorStorageSpillHeuristic {

lazy val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0)
lazy val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0)
lazy val sparkExecutorInstances: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_INSTANCES).map(_.toInt)).getOrElse(0)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,11 @@ class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers {

describe("ExecutorStorageSpillHeuristic") {
val heuristicConfigurationData = newFakeHeuristicConfigurationData(
Map(
"max_to_median_ratio_severity_thresholds" -> "1.414,2,4,16",
"ignore_max_bytes_less_than_threshold" -> "4000000",
"ignore_max_millis_less_than_threshold" -> "4000001"
)
Map.empty
)
val executorStorageSpillHeuristic = new ExecutorStorageSpillHeuristic(heuristicConfigurationData)

val appConfigurationProperties = Map("spark.executor.memory" -> "4g", "spark.executor.cores"->"4")
val appConfigurationProperties = Map("spark.executor.memory" -> "4g", "spark.executor.cores"->"4", "spark.executor.instances"->"4")

val executorSummaries = Seq(
newFakeExecutorSummary(
Expand Down

0 comments on commit a769026

Please sign in to comment.