Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add delegate to monitor training #847

Merged
merged 2 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class LightGBMClassifier(override val uid: String)
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numWorkers, getObjective, modelStr,
getIsUnbalance, getVerbosity, categoricalIndexes, actualNumClasses, getBoostFromAverage,
getBoostingType, getLambdaL1, getLambdaL2, getIsProvideTrainingMetric,
getMetric, getMinGainToSplit, getMaxDeltaStep, getMaxBinByFeature, getMinDataInLeaf, getSlotNames)
getMetric, getMinGainToSplit, getMaxDeltaStep, getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate)
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMClassificationModel = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.ml.spark.lightgbm

import com.microsoft.ml.lightgbm.SWIGTYPE_p_void
import org.slf4j.Logger

abstract class LightGBMDelegate extends Serializable {
def beforeTrainIteration(partitionId: Int, curIters: Int, log: Logger, trainParams: TrainParams,
boosterPtr: Option[SWIGTYPE_p_void], hasValid: Boolean): Unit

def afterTrainIteration(partitionId: Int, curIters: Int, log: Logger, trainParams: TrainParams,
boosterPtr: Option[SWIGTYPE_p_void], hasValid: Boolean, isFinished: Boolean,
trainEvalResults: Option[Map[String, Double]],
validEvalResults: Option[Map[String, Double]]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,11 @@ trait LightGBMParams extends Wrappable with DefaultParamsWritable with HasWeight

def getMinDataInLeaf: Int = $(minDataInLeaf)
def setMinDataInLeaf(value: Int): this.type = set(minDataInLeaf, value)

var delegate: Option[LightGBMDelegate] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably figure out how to make this inherit from param and expose in python layer, but maybe not for this PR

def getDelegate: Option[LightGBMDelegate] = delegate
def setDelegate(delegate: LightGBMDelegate): this.type = {
this.delegate = Option(delegate)
this
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class LightGBMRanker(override val uid: String)
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numWorkers, modelStr,
getVerbosity, categoricalIndexes, getBoostingType, getLambdaL1, getLambdaL2, getMaxPosition, getLabelGain,
getIsProvideTrainingMetric, getMetric, getEvalAt, getMinGainToSplit, getMaxDeltaStep,
getMaxBinByFeature, getMinDataInLeaf, getSlotNames)
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate)
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRankerModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class LightGBMRegressor(override val uid: String)
getEarlyStoppingRound, getImprovementTolerance, getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf,
numWorkers, modelStr, getVerbosity, categoricalIndexes, getBoostFromAverage, getBoostingType, getLambdaL1,
getLambdaL2, getIsProvideTrainingMetric, getMetric, getMinGainToSplit, getMaxDeltaStep,
getMaxBinByFeature, getMinDataInLeaf, getSlotNames)
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate)
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRegressionModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ abstract class TrainParams extends Serializable {
def maxBinByFeature: Array[Int]
def minDataInLeaf: Int
def featureNames: Array[String]
def delegate: Option[LightGBMDelegate]

override def toString: String = {
// Since passing `isProvideTrainingMetric` to LightGBM as a config parameter won't work,
Expand Down Expand Up @@ -70,7 +71,7 @@ case class ClassifierTrainParams(parallelism: String, topK: Int, numIterations:
boostingType: String, lambdaL1: Double, lambdaL2: Double,
isProvideTrainingMetric: Boolean, metric: String, minGainToSplit: Double,
maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int,
featureNames: Array[String])
featureNames: Array[String], delegate: Option[LightGBMDelegate])
extends TrainParams {
override def toString(): String = {
val extraStr =
Expand All @@ -94,7 +95,7 @@ case class RegressorTrainParams(parallelism: String, topK: Int, numIterations: I
boostingType: String, lambdaL1: Double, lambdaL2: Double,
isProvideTrainingMetric: Boolean, metric: String, minGainToSplit: Double,
maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int,
featureNames: Array[String])
featureNames: Array[String], delegate: Option[LightGBMDelegate])
extends TrainParams {
override def toString(): String = {
s"alpha=$alpha tweedie_variance_power=$tweedieVariancePower boost_from_average=${boostFromAverage.toString} " +
Expand All @@ -115,7 +116,7 @@ case class RankerTrainParams(parallelism: String, topK: Int, numIterations: Int,
labelGain: Array[Double], isProvideTrainingMetric: Boolean,
metric: String, evalAt: Array[Int], minGainToSplit: Double,
maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int,
featureNames: Array[String])
featureNames: Array[String], delegate: Option[LightGBMDelegate])
extends TrainParams {
override def toString(): String = {
val labelGainStr =
Expand Down
38 changes: 33 additions & 5 deletions src/main/scala/com/microsoft/ml/spark/lightgbm/TrainUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import java.net._

import com.microsoft.ml.lightgbm._
import com.microsoft.ml.spark.core.env.StreamUtilities._
import org.apache.spark.BarrierTaskContext
import org.apache.spark.{BarrierTaskContext, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg.{DenseVector, SparseVector}
Expand Down Expand Up @@ -199,7 +199,14 @@ private object TrainUtils extends Serializable {
val bestScore = new Array[Double](evalCounts)
val bestScores = new Array[Array[Double]](evalCounts)
val bestIter = new Array[Int](evalCounts)
val delegate = trainParams.delegate
val partitionId = TaskContext.getPartitionId
while (!isFinished && iters < trainParams.numIterations) {

if (delegate.isDefined) {
delegate.get.beforeTrainIteration(partitionId, iters, log, trainParams, boosterPtr, hasValid)
}

try {
log.info("LightGBM worker calling LGBM_BoosterUpdateOneIter")
val result = lightgbmlib.LGBM_BoosterUpdateOneIter(boosterPtr.get, isFinishedPtr)
Expand All @@ -213,24 +220,32 @@ private object TrainUtils extends Serializable {
log.warn("LightGBM reached early termination on one worker," +
" stopping training on worker. This message should rarely occur")
}
if (trainParams.isProvideTrainingMetric && !isFinished) {

val trainEvalResults: Option[Map[String, Double]] = if (trainParams.isProvideTrainingMetric && !isFinished) {
val trainResults = lightgbmlib.new_doubleArray(evalNames.length)
val dummyEvalCountsPtr = lightgbmlib.new_intp()
val resultEval = lightgbmlib.LGBM_BoosterGetEval(boosterPtr.get, 0, dummyEvalCountsPtr, trainResults)
lightgbmlib.delete_intp(dummyEvalCountsPtr)
LightGBMUtils.validate(resultEval, "Booster Get Train Eval")
evalNames.zipWithIndex.foreach { case (evalName, index) =>

val results: Array[(String, Double)] = evalNames.zipWithIndex.map { case (evalName, index) =>
val score = lightgbmlib.doubleArray_getitem(trainResults, index)
log.info(s"Train $evalName=$score")
(evalName, score)
}

Option(Map(results:_*))
} else {
None
}
if (hasValid && !isFinished) {

val validEvalResults: Option[Map[String, Double]] = if (hasValid && !isFinished) {
val evalResults = lightgbmlib.new_doubleArray(evalNames.length)
val dummyEvalCountsPtr = lightgbmlib.new_intp()
val resultEval = lightgbmlib.LGBM_BoosterGetEval(boosterPtr.get, 1, dummyEvalCountsPtr, evalResults)
lightgbmlib.delete_intp(dummyEvalCountsPtr)
LightGBMUtils.validate(resultEval, "Booster Get Valid Eval")
evalNames.zipWithIndex.foreach { case (evalName, index) =>
val results: Array[(String, Double)] = evalNames.zipWithIndex.map { case (evalName, index) =>
val score = lightgbmlib.doubleArray_getitem(evalResults, index)
log.info(s"Valid $evalName=$score")
val cmp =
Expand All @@ -247,9 +262,22 @@ private object TrainUtils extends Serializable {
isFinished = true
log.info("Early stopping, best iteration is " + bestIter(index))
}

(evalName, score)
}

lightgbmlib.delete_doubleArray(evalResults)

Option(Map(results:_*))
} else {
None
}

if (delegate.isDefined) {
delegate.get.afterTrainIteration(partitionId, iters, log, trainParams, boosterPtr, hasValid, isFinished,
trainEvalResults, validEvalResults)
}

iters = iters + 1
}
}
Expand Down