Skip to content

Commit

Permalink
Merge pull request #33 from anfeng/master
Browse files Browse the repository at this point in the history
(Issue #32) configuration setting added to wait for YARN report on app master
  • Loading branch information
Derek Dagit committed Aug 2, 2013
2 parents bfafb19 + 183559d commit 510d67c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 11 deletions.
2 changes: 2 additions & 0 deletions src/main/java/com/yahoo/storm/yarn/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class Config {
final public static String MASTER_SIZE_MB = "master.container.size-mb";
final public static String MASTER_NUM_SUPERVISORS = "master.initial-num-supervisors";
final public static String MASTER_CONTAINER_PRIORITY = "master.container.priority";
//# of milliseconds to wait for YARN report on Storm Master host/port
final public static String YARN_REPORT_WAIT_MILLIS = "yarn.report.wait.millis";
final public static String MASTER_HEARTBEAT_INTERVAL_MILLIS = "master.heartbeat.interval.millis";

@SuppressWarnings("rawtypes")
Expand Down
38 changes: 27 additions & 11 deletions src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.utils.Utils;

import com.yahoo.storm.yarn.Config;
import com.yahoo.storm.yarn.generated.StormMaster;

Expand Down Expand Up @@ -93,23 +95,37 @@ public ApplicationId getAppId() {
@SuppressWarnings("unchecked")
public synchronized StormMaster.Client getClient() throws YarnRemoteException {
if (_client == null) {
//TODO need a way to force this to reconnect in case of an error
ApplicationReport report = _yarn.getApplicationReport(_appId);
LOG.info("application report for "+_appId+" :"+report.getHost()+":"+report.getRpcPort());
String host = report.getHost();
if (host == null) {
throw new RuntimeException(
"No host returned for Application Master " + _appId);
String host = null;
int port = 0;
//wait for application to be ready
int max_wait_for_report = Utils.getInt(_stormConf.get(Config.YARN_REPORT_WAIT_MILLIS));
int waited=0;
while (waited<max_wait_for_report) {
ApplicationReport report = _yarn.getApplicationReport(_appId);
host = report.getHost();
port = report.getRpcPort();
if (host == null || port==0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
waited += 1000;
} else {
break;
}
}
if (host == null || port==0) {
LOG.info("No host/port returned for Application Master " + _appId);
return null;
}

LOG.info("application report for "+_appId+" :"+host+":"+port);
if (_stormConf == null ) {
_stormConf = new HashMap<Object,Object>();
_stormConf = new HashMap<Object,Object>();
}
_stormConf.put(Config.MASTER_HOST, host);
int port = report.getRpcPort();
_stormConf.put(Config.MASTER_THRIFT_PORT, port);
LOG.info("Attaching to "+host+":"+port+" to talk to app master "+_appId);
//TODO need a better work around to the config not being set.
_stormConf.put(Config.MASTER_TIMEOUT_SECS, 10);
_client = MasterClient.getConfiguredClient(_stormConf);
}
return _client.getClient();
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/master_defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ master.thrift.port: 9000
master.initial-num-supervisors: 1
master.container.priority: 0
master.heartbeat.interval.millis: 1000
master.timeout.secs: 1000
yarn.report.wait.millis: 10000
nimbusui.startup.ms: 10000

ui.port: 7070
Expand Down

0 comments on commit 510d67c

Please sign in to comment.