Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LightGBM] training issue - java.net.ConnectException: Connection refused #1248

Closed
SammyBaek opened this issue Nov 8, 2021 · 1 comment
Closed

Comments

@SammyBaek
Copy link

SammyBaek commented Nov 8, 2021

Describe the bug
I've been trying to train a LightGBMClassifier, but the training always seem to crash after many hours. Typically, the training runs into a java.net.ConnectException issue around ~10 hours into training. I've looked at several possible solutions in the issues threads (e.g. useBarrierExecutionMode=True, useSingleDatasetMode=True, different parallelism) and tried with no success so far. Not sure what exactly is the issue with the current stacktrace. Any guidance would be greatly appreciated.

To Reproduce
Training set: ~10,000,000 rows x ~100 features (unbalanced dataset)
Params:

    from synapse.ml.lightgbm import LightGBMClassifier
    params = {
        "objective": "binary",
        "featuresCol": "features",
        "labelCol": "ResultId",
        "isUnbalance": True,
        "validationIndicatorCol": "isValidation",
        "timeout": 10000,
        "useBarrierExecutionMode": True,
        "useSingleDatasetMode": True,
    }
    clf = LightGBMClassifier(**params)
    model = clf.fit(df_train)

Expected behavior
Train model successfully.

Info (please complete the following information):

"conf": {
    "spark.jars.packages": "com.microsoft.azure:synapseml:0.9.1",
    "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
    "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12",
    "spark.yarn.user.classpath.first": "true"
}
  • SynapseML Version: [0.9.1]
  • Spark Version [3.1]
  • Spark Platform [Azure Synapse Analytics]
  • Isolated Compute [disabled]
  • Node size [XXLarge (64 vCores / 432 GB)
  • Autoscale [Enabled]
  • Number of nodes [3 to 10 nodes]
  • Dynamically Allocate Executors [Disabled]

** Stacktrace**

Py4JJavaError: An error occurred while calling o1221.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(99, 49) finished unsuccessfully.
java.net.ConnectException: Connection refused (Connection refused)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:607)
	at java.net.Socket.connect(Socket.java:556)
	at java.net.Socket.<init>(Socket.java:452)
	at java.net.Socket.<init>(Socket.java:229)
	at com.microsoft.azure.synapse.ml.lightgbm.TrainUtils$.getNetworkInitNodes(TrainUtils.scala:239)
	at com.microsoft.azure.synapse.ml.lightgbm.TrainUtils$.$anonfun$getNetworkInfo$2(TrainUtils.scala:340)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.lightgbm.TrainUtils$.$anonfun$getNetworkInfo$1(TrainUtils.scala:340)
	at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:29)
	at com.microsoft.azure.synapse.ml.lightgbm.TrainUtils$.getNetworkInfo(TrainUtils.scala:336)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainLightGBM(LightGBMBase.scala:353)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$innerTrain$4(LightGBMBase.scala:480)
	at org.apache.spark.rdd.RDDBarrier.$anonfun$mapPartitions$2(RDDBarrier.scala:51)
	at org.apache.spark.rdd.RDDBarrier.$anonfun$mapPartitions$2$adapted(RDDBarrier.scala:51)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2263)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2212)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2211)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2211)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2447)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2392)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2381)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:869)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
	at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1120)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1102)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.innerTrain(LightGBMBase.scala:483)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.innerTrain$(LightGBMBase.scala:442)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.innerTrain(LightGBMClassifier.scala:26)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$1(LightGBMBase.scala:63)
	at com.microsoft.azure.synapse.ml.logging.BasicLogging.logVerb(BasicLogging.scala:63)
	at com.microsoft.azure.synapse.ml.logging.BasicLogging.logVerb$(BasicLogging.scala:60)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logVerb(LightGBMClassifier.scala:26)
	at com.microsoft.azure.synapse.ml.logging.BasicLogging.logTrain(BasicLogging.scala:49)
	at com.microsoft.azure.synapse.ml.logging.BasicLogging.logTrain$(BasicLogging.scala:48)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logTrain(LightGBMClassifier.scala:26)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train(LightGBMBase.scala:44)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train$(LightGBMBase.scala:43)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:26)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:26)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

Traceback (most recent call last):

  File "<stdin>", line 16, in train_lgbm

  File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/base.py", line 161, in fit
    return self._fit(dataset)

  File "/mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1635961048340_0001/container_1635961048340_0001_01_000001/com.microsoft.azure_synapseml-lightgbm-0.9.1.jar/synapse/ml/lightgbm/LightGBMClassifier.py", line 1447, in _fit
    java_model = self._fit_java(dataset)

  File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 332, in _fit_java
    return self._java_obj.fit(dataset._jdf)

  File "/home/trusted-service-user/cluster-env/env/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(

  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)

  File "/home/trusted-service-user/cluster-env/env/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(

py4j.protocol.Py4JJavaError: An error occurred while calling o1221.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(99, 49) finished unsuccessfully.
java.net.ConnectException: Connection refused (Connection refused)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:607)
	at java.net.Socket.connect(Socket.java:556)
	at java.net.Socket.<init>(Socket.java:452)
	at java.net.Socket.<init>(Socket.java:229)
	at com.microsoft.azure.synapse.ml.lightgbm.TrainUtils$.getNetworkInitNodes(TrainUtils.scala:239)
	at com.microsoft.azure.synapse.ml.lightgbm.TrainUtils$.$anonfun$getNetworkInfo$2(TrainUtils.scala:340)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.lightgbm.TrainUtils$.$anonfun$getNetworkInfo$1(TrainUtils.scala:340)
	at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:29)
	at com.microsoft.azure.synapse.ml.lightgbm.TrainUtils$.getNetworkInfo(TrainUtils.scala:336)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainLightGBM(LightGBMBase.scala:353)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$innerTrain$4(LightGBMBase.scala:480)
	at org.apache.spark.rdd.RDDBarrier.$anonfun$mapPartitions$2(RDDBarrier.scala:51)
	at org.apache.spark.rdd.RDDBarrier.$anonfun$mapPartitions$2$adapted(RDDBarrier.scala:51)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2263)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2212)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2211)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2211)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2447)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2392)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2381)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:869)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
	at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1120)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1102)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.innerTrain(LightGBMBase.scala:483)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.innerTrain$(LightGBMBase.scala:442)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.innerTrain(LightGBMClassifier.scala:26)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$1(LightGBMBase.scala:63)
	at com.microsoft.azure.synapse.ml.logging.BasicLogging.logVerb(BasicLogging.scala:63)
	at com.microsoft.azure.synapse.ml.logging.BasicLogging.logVerb$(BasicLogging.scala:60)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logVerb(LightGBMClassifier.scala:26)
	at com.microsoft.azure.synapse.ml.logging.BasicLogging.logTrain(BasicLogging.scala:49)
	at com.microsoft.azure.synapse.ml.logging.BasicLogging.logTrain$(BasicLogging.scala:48)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logTrain(LightGBMClassifier.scala:26)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train(LightGBMBase.scala:44)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train$(LightGBMBase.scala:43)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:26)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:26)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

If the bug pertains to a specific feature please tag the appropriate CODEOWNER for better visibility
@imatiach-msft

Additional context
Add any other context about the problem here.

@SammyBaek
Copy link
Author

Thank you @imatiach-msft for debugging with me! We were able to resolve, here are some findings. Hope they're helpful if you come across similar issues:

General advices:

  • Biggest fix for me was to offload the dataframe to another parquet and train on a fresh cluster. Having a lot of data wrangling on the same clusters of nodes used up a lot of memory and cache. Spark's lazy loading did not help the training time either.
  • Keep a tab on the memory / CPU usage graph of your spark. Even if they are under total capacity, one of the workers may have exceeded due to unbalanced distribution of tasks.
  • Look at the logs of workers. java.net.ConnectException is usually a red-herring and can be caused by any number of deeper issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant