From b30e78e58d20fb2bd0422a06cb00cd596560fc41 Mon Sep 17 00:00:00 2001 From: Ian Luo Date: Thu, 24 Jan 2019 19:02:24 +0800 Subject: [PATCH 01/11] start to use IdelStateHandler for netty4 client --- .../support/header/HeaderExchangeClient.java | 106 +++++++++----- .../remoting/transport/AbstractClient.java | 135 +----------------- .../transport/netty4/NettyClient.java | 7 + .../transport/netty4/NettyClientHandler.java | 23 ++- 4 files changed, 107 insertions(+), 164 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java index 65862b6d6af..61eab274cd5 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java @@ -40,31 +40,21 @@ public class HeaderExchangeClient implements ExchangeClient { private final Client client; private final ExchangeChannel channel; - private int heartbeat; - private int idleTimeout; - - private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1, - TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL); + private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer( + new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL); private HeartbeatTimerTask heartBeatTimerTask; - private ReconnectTimerTask reconnectTimerTask; - public HeaderExchangeClient(Client client, boolean needHeartbeat) { + public HeaderExchangeClient(Client client, boolean startTimer) { Assert.notNull(client, "Client can't be null"); this.client = client; this.channel = new HeaderExchangeChannel(client); - String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); - - this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && - dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); - this.idleTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); - if (idleTimeout < heartbeat * 2) { - throw new IllegalStateException("idleTimeout < heartbeatInterval * 2"); - } - if (needHeartbeat) { - startIdleCheckTask(); + if (startTimer) { + URL url = client.getUrl(); + startReconnectTask(url); + startHeartBeatTask(url); } } @@ -178,25 +168,35 @@ public boolean hasAttribute(String key) { return channel.hasAttribute(key); } - private void startIdleCheckTask() { - AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); - - long heartbeatTick = calculateLeastDuration(heartbeat); - long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout); - HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat); - ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout); - - this.heartBeatTimerTask = heartBeatTimerTask; - this.reconnectTimerTask = reconnectTimerTask; + private void startHeartBeatTask(URL url) { + String transporter = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.TRANSPORTER_KEY, "netty")); + if (!transporter.startsWith("netty")) { + AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); + int heartbeat = getHeartbeat(url); + long heartbeatTick = calculateLeastDuration(heartbeat); + this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat); + IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); + } + } - // init task and start timer. - IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); - IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS); + private void startReconnectTask(URL url) { + if (shouldReconnect(url)) { + AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); + int idleTimeout = getIdleTimeout(url); + long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout); + this.reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout); + IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS); + } } private void doClose() { - heartBeatTimerTask.cancel(); - reconnectTimerTask.cancel(); + if (heartBeatTimerTask != null) { + heartBeatTimerTask.cancel(); + } + + if (reconnectTimerTask != null) { + reconnectTimerTask.cancel(); + } } /** @@ -210,6 +210,48 @@ private long calculateLeastDuration(int time) { } } + private int getHeartbeat(URL url) { + String dubbo = url.getParameter(Constants.DUBBO_VERSION_KEY); + return url.getParameter(Constants.HEARTBEAT_KEY, dubbo != null && + dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); + } + + private int getIdleTimeout(URL url) { + int heartBeat = getHeartbeat(url); + int idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3); + if (idleTimeout < heartBeat * 2) { + throw new IllegalStateException("idleTimeout < heartbeatInterval * 2"); + } + return idleTimeout; + } + + private boolean shouldReconnect(URL url) { + boolean reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); + if (!reconnect) { + String param = url.getParameter(Constants.RECONNECT_KEY, "true"); + if ("false".equalsIgnoreCase(param)) { + reconnect = false; + } else if ("true".equalsIgnoreCase(param)) { + reconnect = true; + } else { + int value; + try { + value = Integer.parseInt(param); + } catch (Exception e) { + throw new IllegalArgumentException("reconnect param must be non-negative integer or false/true, " + + "input is:" + param); + } + + if (value < 0) { + throw new IllegalArgumentException("reconnect param must be non-negative integer or false/true, " + + "input is:" + param); + } + reconnect = true; + } + } + return reconnect; + } + @Override public String toString() { return "HeaderExchangeClient [channel=" + channel + "]"; diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index 7280b508f5e..4138398f235 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -24,9 +24,7 @@ import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.store.DataStore; import org.apache.dubbo.common.utils.ExecutorUtil; -import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.common.utils.NetUtils; -import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.Client; @@ -35,12 +33,6 @@ import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -51,31 +43,14 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler"; private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class); - private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger(); - private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true)); private final Lock connectLock = new ReentrantLock(); - private final boolean send_reconnect; - private final AtomicInteger reconnect_count = new AtomicInteger(0); - // Reconnection error log has been called before? - private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false); - // reconnect warning period. Reconnect warning interval (log warning after how many times) //for test - private final int reconnect_warning_period; - private final long shutdown_timeout; + private final boolean needReconnect; protected volatile ExecutorService executor; - private volatile ScheduledFuture reconnectExecutorFuture = null; - // the last successed connected time - private long lastConnectedTime = System.currentTimeMillis(); - public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); - send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); - - shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT); - - // The default reconnection interval is 2s, 1800 means warning interval is 1 hour. - reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); + needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); try { doOpen(); @@ -118,105 +93,6 @@ protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handl return ChannelHandlers.wrap(handler, url); } - /** - * @param url - * @return 0-false - */ - private static int getReconnectParam(URL url) { - int reconnect; - String param = url.getParameter(Constants.RECONNECT_KEY); - if (StringUtils.isEmpty(param) || "true".equalsIgnoreCase(param)) { - reconnect = Constants.DEFAULT_RECONNECT_PERIOD; - } else if ("false".equalsIgnoreCase(param)) { - reconnect = 0; - } else { - try { - reconnect = Integer.parseInt(param); - } catch (Exception e) { - throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param); - } - if (reconnect < 0) { - throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param); - } - } - return reconnect; - } - - /** - * init reconnect thread - */ - private synchronized void initConnectStatusCheckCommand() { - //reconnect=false to close reconnect - int reconnect = getReconnectParam(getUrl()); - if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) { - Runnable connectStatusCheckCommand = new Runnable() { - @Override - public void run() { - try { - if (cancelFutureIfOffline()) return; - - if (!isConnected()) { - connect(); - } else { - lastConnectedTime = System.currentTimeMillis(); - } - } catch (Throwable t) { - String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl(); - // wait registry sync provider list - if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) { - if (!reconnect_error_log_flag.get()) { - reconnect_error_log_flag.set(true); - logger.error(errorMsg, t); - return; - } - } - if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) { - logger.warn(errorMsg, t); - } - } - } - - private boolean cancelFutureIfOffline() { - /** - * If the provider service is detected offline, - * the client should not attempt to connect again. - * - * issue: https://github.com/apache/incubator-dubbo/issues/3158 - */ - if(isClosed()) { - ScheduledFuture future = reconnectExecutorFuture; - if(future != null && !future.isCancelled()){ - /** - * Client has been destroyed and - * scheduled task should be cancelled. - */ - future.cancel(true); - } - return true; - } - return false; - } - }; - - reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS); - } - } - - private synchronized void destroyConnectStatusCheckCommand() { - try { - if (reconnectExecutorFuture != null && !reconnectExecutorFuture.isDone()) { - reconnectExecutorFuture.cancel(true); - reconnectExecutorService.purge(); - } - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - } - } - - protected ExecutorService createExecutor() { - return Executors.newCachedThreadPool(new NamedThreadFactory(CLIENT_THREAD_POOL_NAME + CLIENT_THREAD_POOL_ID.incrementAndGet() + "-" + getUrl().getAddress(), true)); - } - public InetSocketAddress getConnectAddress() { return new InetSocketAddress(NetUtils.filterLocalHost(getUrl().getHost()), getUrl().getPort()); } @@ -286,7 +162,7 @@ public boolean hasAttribute(String key) { @Override public void send(Object message, boolean sent) throws RemotingException { - if (send_reconnect && !isConnected()) { + if (needReconnect && !isConnected()) { connect(); } Channel channel = getChannel(); @@ -303,7 +179,7 @@ protected void connect() throws RemotingException { if (isConnected()) { return; } - initConnectStatusCheckCommand(); + doConnect(); if (!isConnected()) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " @@ -316,8 +192,6 @@ protected void connect() throws RemotingException { + ", channel is " + this.getChannel()); } } - reconnect_count.set(0); - reconnect_error_log_flag.set(false); } catch (RemotingException e) { throw e; } catch (Throwable e) { @@ -332,7 +206,6 @@ protected void connect() throws RemotingException { public void disconnect() { connectLock.lock(); try { - destroyConnectStatusCheckCommand(); try { Channel channel = getChannel(); if (channel != null) { diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java index 4ea6fe12c52..625d6895c95 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java @@ -34,6 +34,7 @@ import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.concurrent.TimeUnit; @@ -72,6 +73,11 @@ protected void doOpen() throws Throwable { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout()); } + String dubbo = getUrl().getParameter(Constants.DUBBO_VERSION_KEY); + int heartbeat = getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && + dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); + int heartbeatInterval = getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); + bootstrap.handler(new ChannelInitializer() { @Override @@ -80,6 +86,7 @@ protected void initChannel(Channel ch) throws Exception { ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) + .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, TimeUnit.MILLISECONDS)) .addLast("handler", nettyClientHandler); } }); diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java index 2d8981170f3..f70ed0f7e1b 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java @@ -16,15 +16,19 @@ */ package org.apache.dubbo.remoting.transport.netty4; +import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.Version; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.exchange.Request; import org.apache.dubbo.remoting.exchange.Response; import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.handler.timeout.IdleStateEvent; /** * NettyClientHandler @@ -101,6 +105,23 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + Request req = new Request(); + req.setVersion(Version.getProtocolVersion()); + req.setTwoWay(true); + req.setEvent(Request.HEARTBEAT_EVENT); + int timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); + ChannelFuture future = ctx.writeAndFlush(req); + if (future.awaitUninterruptibly(timeout)) { + // TODO: log + } + } else { + super.userEventTriggered(ctx, evt); + } + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { @@ -111,4 +132,4 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } -} \ No newline at end of file +} From 49047b5cc5751f69c7a084565a5eaf8dc7d5d0a8 Mon Sep 17 00:00:00 2001 From: Ian Luo Date: Fri, 25 Jan 2019 11:09:01 +0800 Subject: [PATCH 02/11] start to use IdleStateHandler when netty4 is enabled --- .../apache/dubbo/common/utils/UrlUtils.java | 15 +++++ .../support/header/HeaderExchangeClient.java | 25 +++----- .../support/header/HeaderExchangeServer.java | 61 +++++++++---------- .../transport/netty4/NettyClient.java | 13 ++-- .../transport/netty4/NettyServer.java | 8 +++ .../transport/netty4/NettyServerHandler.java | 16 ++++- 6 files changed, 82 insertions(+), 56 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java index 5c157c5a03f..10e51975fe2 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java @@ -467,6 +467,21 @@ public static boolean isProvider(URL url) { PROVIDERS_CATEGORY.equals(url.getParameter(CATEGORY_KEY, PROVIDERS_CATEGORY)); } + public static int getHeartbeat(URL url) { + String dubbo = url.getParameter(Constants.DUBBO_VERSION_KEY); + return url.getParameter(Constants.HEARTBEAT_KEY, dubbo != null && + dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); + } + + public static int getIdleTimeout(URL url) { + int heartBeat = getHeartbeat(url); + int idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3); + if (idleTimeout < heartBeat * 2) { + throw new IllegalStateException("idleTimeout < heartbeatInterval * 2"); + } + return idleTimeout; + } + /** * Check if the given value matches the given pattern. The pattern supports wildcard "*". * diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java index 61eab274cd5..c1a763ac707 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java @@ -33,6 +33,11 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; +import static org.apache.dubbo.common.Constants.CLIENT_KEY; +import static org.apache.dubbo.common.Constants.TRANSPORTER_KEY; +import static org.apache.dubbo.common.utils.UrlUtils.getHeartbeat; +import static org.apache.dubbo.common.utils.UrlUtils.getIdleTimeout; + /** * DefaultMessageClient */ @@ -135,6 +140,7 @@ public void startClose() { @Override public void reset(URL url) { client.reset(url); + // FIXME, should cancel and restart timer tasks if parameters in the new URL are different? } @Override @@ -169,8 +175,7 @@ public boolean hasAttribute(String key) { } private void startHeartBeatTask(URL url) { - String transporter = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.TRANSPORTER_KEY, "netty")); - if (!transporter.startsWith("netty")) { + if (shouldHeartbeat(url)) { AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); int heartbeat = getHeartbeat(url); long heartbeatTick = calculateLeastDuration(heartbeat); @@ -210,19 +215,9 @@ private long calculateLeastDuration(int time) { } } - private int getHeartbeat(URL url) { - String dubbo = url.getParameter(Constants.DUBBO_VERSION_KEY); - return url.getParameter(Constants.HEARTBEAT_KEY, dubbo != null && - dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); - } - - private int getIdleTimeout(URL url) { - int heartBeat = getHeartbeat(url); - int idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3); - if (idleTimeout < heartBeat * 2) { - throw new IllegalStateException("idleTimeout < heartbeatInterval * 2"); - } - return idleTimeout; + private boolean shouldHeartbeat(URL url) { + String transporter = url.getParameter(CLIENT_KEY, url.getParameter(TRANSPORTER_KEY, "netty")); + return !transporter.startsWith("netty"); } private boolean shouldReconnect(URL url) { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java index 65b0836fcc8..95b653cfedd 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java @@ -25,6 +25,7 @@ import org.apache.dubbo.common.utils.Assert; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.utils.UrlUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; @@ -40,6 +41,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.unmodifiableCollection; +import static org.apache.dubbo.common.Constants.SERVER_KEY; +import static org.apache.dubbo.common.Constants.TRANSPORTER_KEY; /** * ExchangeServerImpl @@ -49,8 +52,6 @@ public class HeaderExchangeServer implements ExchangeServer { protected final Logger logger = LoggerFactory.getLogger(getClass()); private final Server server; - private int heartbeat; - private int idleTimeout; private AtomicBoolean closed = new AtomicBoolean(false); private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), 1, @@ -61,13 +62,7 @@ public class HeaderExchangeServer implements ExchangeServer { public HeaderExchangeServer(Server server) { Assert.notNull(server, "server == null"); this.server = server; - this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); - this.idleTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); - if (idleTimeout < heartbeat * 2) { - throw new IllegalStateException("idleTimeout < heartbeatInterval * 2"); - } - - startIdleCheckTask(); + startIdleCheckTask(getUrl()); } public Server getServer() { @@ -154,7 +149,9 @@ private void doClose() { } private void cancelCloseTask() { - closeTimerTask.cancel(); + if (closeTimerTask != null) { + closeTimerTask.cancel(); + } } @Override @@ -210,21 +207,13 @@ public ChannelHandler getChannelHandler() { public void reset(URL url) { server.reset(url); try { - if (url.hasParameter(Constants.HEARTBEAT_KEY) - || url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) { - int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat); - int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3); - if (t < h * 2) { - throw new IllegalStateException("idleTimeout < heartbeatInterval * 2"); - } - if (h != heartbeat || t != idleTimeout) { - heartbeat = h; - idleTimeout = t; - - // we need cancel the exist closeTimeout first. - cancelCloseTask(); - startIdleCheckTask(); - } + int currHeartbeat = UrlUtils.getHeartbeat(getUrl()); + int currIdleTimeout = UrlUtils.getIdleTimeout(getUrl()); + int heartbeat = UrlUtils.getHeartbeat(url); + int idleTimeout = UrlUtils.getIdleTimeout(url); + if (currHeartbeat != heartbeat || currIdleTimeout != idleTimeout) { + cancelCloseTask(); + startIdleCheckTask(url); } } catch (Throwable t) { logger.error(t.getMessage(), t); @@ -266,15 +255,21 @@ private long calculateLeastDuration(int time) { } } - private void startIdleCheckTask() { - AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels()); - - long idleTimeoutTick = calculateLeastDuration(idleTimeout); - CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout); - this.closeTimerTask = closeTimerTask; + private void startIdleCheckTask(URL url) { + if (shouldStartCloseTimer(url)) { + AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels()); + int idleTimeout = UrlUtils.getIdleTimeout(url); + long idleTimeoutTick = calculateLeastDuration(idleTimeout); + CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout); + this.closeTimerTask = closeTimerTask; - // init task and start timer. - IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS); + // init task and start timer. + IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS); + } } + private boolean shouldStartCloseTimer(URL url) { + String transporter = url.getParameter(SERVER_KEY, url.getParameter(TRANSPORTER_KEY, "netty")); + return !transporter.startsWith("netty"); + } } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java index 625d6895c95..21c00db6017 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java @@ -22,6 +22,7 @@ import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.common.utils.UrlUtils; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.transport.AbstractClient; @@ -39,6 +40,8 @@ import java.util.concurrent.TimeUnit; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + /** * NettyClient. */ @@ -73,20 +76,16 @@ protected void doOpen() throws Throwable { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout()); } - String dubbo = getUrl().getParameter(Constants.DUBBO_VERSION_KEY); - int heartbeat = getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && - dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); - int heartbeatInterval = getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); - bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { + int heartbeatInterval = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) - .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, TimeUnit.MILLISECONDS)) + .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)) .addLast("handler", nettyClientHandler); } }); @@ -97,7 +96,7 @@ protected void doConnect() throws Throwable { long start = System.currentTimeMillis(); ChannelFuture future = bootstrap.connect(getConnectAddress()); try { - boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS); + boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS); if (ret && future.isSuccess()) { Channel newChannel = future.channel(); diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java index a8da94e05bd..b57f3e9ed36 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java @@ -22,6 +22,7 @@ import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.ExecutorUtil; import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.common.utils.UrlUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; @@ -38,12 +39,16 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultThreadFactory; import java.net.InetSocketAddress; import java.util.Collection; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; /** * NettyServer @@ -84,10 +89,13 @@ protected void doOpen() throws Throwable { .childHandler(new ChannelInitializer() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { + // FIXME: should we use getTimeout()? + int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) + .addLast("server-idle-handler", new IdleStateHandler(0, 0, heartbeatInterval, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java index 56d4ff69c99..889212a3493 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java @@ -24,6 +24,7 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.handler.timeout.IdleStateEvent; import java.net.InetSocketAddress; import java.util.Map; @@ -102,6 +103,19 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); + try { + if (evt instanceof IdleStateEvent) { + channel.close(); + } + super.userEventTriggered(ctx, evt); + } finally { + NettyChannel.removeChannelIfDisconnected(ctx.channel()); + } + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { @@ -112,4 +126,4 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } -} \ No newline at end of file +} From 11f11584c2bf850ef5aa2a95749eb43e796cec37 Mon Sep 17 00:00:00 2001 From: Ian Luo Date: Fri, 25 Jan 2019 16:45:43 +0800 Subject: [PATCH 03/11] unit test fix --- .../apache/dubbo/common/utils/UrlUtils.java | 10 +++++++ .../support/header/HeaderExchangeClient.java | 2 +- .../support/header/HeaderExchangeServer.java | 2 +- .../support/header/ReconnectTimerTask.java | 11 +++++++- .../support/header/HeartbeatHandlerTest.java | 3 +++ .../transport/netty/ClientReconnectTest.java | 27 ------------------- 6 files changed, 25 insertions(+), 30 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java index 10e51975fe2..eeaad5cd73e 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java @@ -479,6 +479,16 @@ public static int getIdleTimeout(URL url) { if (idleTimeout < heartBeat * 2) { throw new IllegalStateException("idleTimeout < heartbeatInterval * 2"); } + if (idleTimeout == 0) { + String reconnect = url.getParameter(Constants.RECONNECT_KEY); + if (StringUtils.isNotEmpty(reconnect)) { + try { + idleTimeout = Integer.parseInt(reconnect); + } catch (NumberFormatException e) { + // ignore + } + } + } return idleTimeout; } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java index c1a763ac707..1241a36419d 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java @@ -217,7 +217,7 @@ private long calculateLeastDuration(int time) { private boolean shouldHeartbeat(URL url) { String transporter = url.getParameter(CLIENT_KEY, url.getParameter(TRANSPORTER_KEY, "netty")); - return !transporter.startsWith("netty"); + return !transporter.equalsIgnoreCase("netty") && !transporter.equalsIgnoreCase("netty4"); } private boolean shouldReconnect(URL url) { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java index 95b653cfedd..8df9c1363e0 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java @@ -270,6 +270,6 @@ private void startIdleCheckTask(URL url) { private boolean shouldStartCloseTimer(URL url) { String transporter = url.getParameter(SERVER_KEY, url.getParameter(TRANSPORTER_KEY, "netty")); - return !transporter.startsWith("netty"); + return !transporter.equalsIgnoreCase("netty") && !transporter.equalsIgnoreCase("netty4"); } } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java index 3c0e9389db8..6b93ddc1a1a 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java @@ -41,8 +41,17 @@ protected void doTask(Channel channel) { try { Long lastRead = lastRead(channel); Long now = now(); + + // Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection + if (!channel.isConnected()) { + try { + logger.info("Initial connection to " + channel); + ((Client) channel).reconnect(); + } catch (Exception e) { + logger.error("Fail to connect to " + channel, e); + } // check pong at client - if (lastRead != null && now - lastRead > idleTimeout) { + } else if (lastRead != null && now - lastRead > idleTimeout) { logger.warn("Close channel " + channel + ", because heartbeat read idle time out: " + idleTimeout + "ms"); try { diff --git a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java index 13b1f99acad..52c3e9d03e7 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java @@ -53,6 +53,9 @@ public void after() throws Exception { server.close(); server = null; } + + // wait for timer to finish + Thread.sleep(2000); } @Test diff --git a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java index 16160b40ae7..7f89d0f32a4 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java @@ -18,7 +18,6 @@ import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.utils.DubboAppender; -import org.apache.dubbo.common.utils.LogUtil; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.Client; @@ -27,7 +26,6 @@ import org.apache.dubbo.remoting.exchange.Exchangers; import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter; -import org.apache.log4j.Level; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,9 +34,6 @@ * Client reconnect test */ public class ClientReconnectTest { - public static void main(String[] args) { - System.out.println(3 % 1); - } @BeforeEach public void clear() { @@ -73,28 +68,6 @@ public void testReconnect() throws RemotingException, InterruptedException { } } - /** - * Reconnect log check, when the time is not enough for shutdown time, there is no error log, but there must be a warn log - */ - @Test - public void testReconnectWarnLog() throws RemotingException, InterruptedException { - int port = NetUtils.getAvailablePort(); - DubboAppender.doStart(); - String url = "exchange://127.0.0.1:" + port + "/client.reconnect.test?check=false&client=netty3&" - + Constants.RECONNECT_KEY + "=" + 1; //1ms reconnect, ensure that there is enough frequency to reconnect - try { - Exchangers.connect(url); - } catch (Exception e) { - - //do nothing - } - Thread.sleep(1500); - //Time is not long enough to produce a error log - Assertions.assertEquals(0, LogUtil.findMessage(Level.ERROR, "client reconnect to "), "no error message "); - //The first reconnection failed to have a warn log - Assertions.assertEquals(1, LogUtil.findMessage(Level.WARN, "client reconnect to "), "must have one warn message "); - DubboAppender.doStop(); - } public Client startClient(int port, int reconnectPeriod) throws RemotingException { final String url = "exchange://127.0.0.1:" + port + "/client.reconnect.test?check=false&client=netty3&" + Constants.RECONNECT_KEY + "=" + reconnectPeriod; From a7b57890db14392105df74d3051bf0b1a1ed879e Mon Sep 17 00:00:00 2001 From: Ian Luo Date: Fri, 25 Jan 2019 18:57:50 +0800 Subject: [PATCH 04/11] fix checkstyle --- .../org/apache/dubbo/remoting/transport/netty4/NettyClient.java | 2 -- .../org/apache/dubbo/remoting/transport/netty4/NettyServer.java | 1 - 2 files changed, 3 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java index 21c00db6017..e1ff0d5c051 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java @@ -38,8 +38,6 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultThreadFactory; -import java.util.concurrent.TimeUnit; - import static java.util.concurrent.TimeUnit.MILLISECONDS; /** diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java index b57f3e9ed36..557fae08ccb 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java @@ -46,7 +46,6 @@ import java.util.Collection; import java.util.HashSet; import java.util.Map; -import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.MILLISECONDS; From a23e64d9787213cbc1532bbda8a3763f22c1f6f8 Mon Sep 17 00:00:00 2001 From: Ian Luo Date: Mon, 28 Jan 2019 10:58:33 +0800 Subject: [PATCH 05/11] add logging message --- .../support/header/ReconnectTimerTask.java | 2 +- .../transport/netty4/NettyClientHandler.java | 29 +++++++++++-------- .../transport/netty4/NettyServerHandler.java | 4 +++ 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java index 6b93ddc1a1a..5fd8c90b956 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java @@ -52,7 +52,7 @@ protected void doTask(Channel channel) { } // check pong at client } else if (lastRead != null && now - lastRead > idleTimeout) { - logger.warn("Close channel " + channel + ", because heartbeat read idle time out: " + logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: " + idleTimeout + "ms"); try { ((Client) channel).reconnect(); diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java index 62467087465..c7077086dbb 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java @@ -16,16 +16,16 @@ */ package org.apache.dubbo.remoting.transport.netty4; -import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.Version; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.exchange.Request; import org.apache.dubbo.remoting.exchange.Response; import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.timeout.IdleStateEvent; @@ -35,6 +35,7 @@ */ @io.netty.channel.ChannelHandler.Sharable public class NettyClientHandler extends ChannelDuplexHandler { + private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); private final URL url; @@ -113,16 +114,20 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { - Request req = new Request(); - req.setVersion(Version.getProtocolVersion()); - req.setTwoWay(true); - req.setEvent(Request.HEARTBEAT_EVENT); - int timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); - ChannelFuture future = ctx.writeAndFlush(req); - if (future.awaitUninterruptibly(timeout)) { - // TODO: log + try { + NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); + if (logger.isDebugEnabled()) { + logger.debug("IdleStateEvent triggered, send heartbeat to channel " + channel); + } + Request req = new Request(); + req.setVersion(Version.getProtocolVersion()); + req.setTwoWay(true); + req.setEvent(Request.HEARTBEAT_EVENT); + channel.send(req); + } finally { + NettyChannel.removeChannelIfDisconnected(ctx.channel()); } - } else { + } else { super.userEventTriggered(ctx, evt); } } @@ -151,4 +156,4 @@ private static Response buildErrorResponse(Request request, Throwable t) { response.setErrorMessage(StringUtils.toString(t)); return response; } -} \ No newline at end of file +} diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java index 889212a3493..f98a8c7d821 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java @@ -17,6 +17,8 @@ package org.apache.dubbo.remoting.transport.netty4; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; @@ -35,6 +37,7 @@ */ @io.netty.channel.ChannelHandler.Sharable public class NettyServerHandler extends ChannelDuplexHandler { + private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); private final Map channels = new ConcurrentHashMap(); // @@ -108,6 +111,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { if (evt instanceof IdleStateEvent) { + logger.info("IdleStateEvent triggered, close channel " + channel); channel.close(); } super.userEventTriggered(ctx, evt); From 0a9bce3004db5385228bc4561b1965ff73e0d127 Mon Sep 17 00:00:00 2001 From: Ian Luo Date: Wed, 30 Jan 2019 14:16:23 +0800 Subject: [PATCH 06/11] use idle timeout instead of heartbeat interval --- .../apache/dubbo/remoting/transport/netty4/NettyServer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java index 557fae08ccb..059e20d20ed 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java @@ -89,12 +89,12 @@ protected void doOpen() throws Throwable { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? - int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); + int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) - .addLast("server-idle-handler", new IdleStateHandler(0, 0, heartbeatInterval, MILLISECONDS)) + .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); From e71f82eb38564819b191b885214907d7684c03bf Mon Sep 17 00:00:00 2001 From: Ian Luo Date: Wed, 30 Jan 2019 14:24:30 +0800 Subject: [PATCH 07/11] use try finally only when IdleStateEvent fires. --- .../transport/netty4/NettyServerHandler.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java index f98a8c7d821..ea5b3249839 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java @@ -108,16 +108,16 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); - try { - if (evt instanceof IdleStateEvent) { + if (evt instanceof IdleStateEvent) { + NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); + try { logger.info("IdleStateEvent triggered, close channel " + channel); channel.close(); + } finally { + NettyChannel.removeChannelIfDisconnected(ctx.channel()); } - super.userEventTriggered(ctx, evt); - } finally { - NettyChannel.removeChannelIfDisconnected(ctx.channel()); } + super.userEventTriggered(ctx, evt); } @Override From ed9920474cc739eee54aebed95b10a56c7582572 Mon Sep 17 00:00:00 2001 From: Ian Luo Date: Wed, 30 Jan 2019 14:33:08 +0800 Subject: [PATCH 08/11] no need to keep compat to 1.0.x --- .../src/main/java/org/apache/dubbo/common/utils/UrlUtils.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java index eeaad5cd73e..bae95be475f 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java @@ -468,9 +468,7 @@ public static boolean isProvider(URL url) { } public static int getHeartbeat(URL url) { - String dubbo = url.getParameter(Constants.DUBBO_VERSION_KEY); - return url.getParameter(Constants.HEARTBEAT_KEY, dubbo != null && - dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); + return url.getParameter(Constants.HEARTBEAT_KEY, Constants.DEFAULT_HEARTBEAT); } public static int getIdleTimeout(URL url) { From 0f1c8c2c88adf7e4afca2a80a70a11900b4e2f6f Mon Sep 17 00:00:00 2001 From: Ian Luo Date: Wed, 30 Jan 2019 15:52:29 +0800 Subject: [PATCH 09/11] introduce a new interface, IdleSensible --- .../org/apache/dubbo/remoting/Client.java | 4 +- .../apache/dubbo/remoting/IdleSensible.java | 37 +++++++++++++++++++ .../org/apache/dubbo/remoting/Server.java | 4 +- .../support/header/HeaderExchangeClient.java | 9 +---- .../support/header/HeaderExchangeServer.java | 9 +---- .../transport/netty4/NettyClient.java | 4 ++ .../transport/netty4/NettyServer.java | 5 +++ 7 files changed, 52 insertions(+), 20 deletions(-) create mode 100644 dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/IdleSensible.java diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Client.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Client.java index b8ccad201c8..7f15535353c 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Client.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Client.java @@ -25,7 +25,7 @@ * * @see org.apache.dubbo.remoting.Transporter#connect(org.apache.dubbo.common.URL, ChannelHandler) */ -public interface Client extends Endpoint, Channel, Resetable { +public interface Client extends Endpoint, Channel, Resetable, IdleSensible { /** * reconnect. @@ -35,4 +35,4 @@ public interface Client extends Endpoint, Channel, Resetable { @Deprecated void reset(org.apache.dubbo.common.Parameters parameters); -} \ No newline at end of file +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/IdleSensible.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/IdleSensible.java new file mode 100644 index 00000000000..14e371e60d6 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/IdleSensible.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.dubbo.remoting; + +/** + * Indicate whether the implementation (for both server and client) has the ability to sense and handle idle connection. + * If the server has the ability to handle idle connection, it should close the connection when it happens, and if + * the client has the ability to handle idle connection, it should send the heartbeat to the server. + */ +public interface IdleSensible { + /** + * Whether the implementation can sense and handle the idle connection. By default it's false, the implementation + * relies on dedicated timer to take care of idle connection. + * + * @return whether has the ability to handle idle connection + */ + default boolean canHandleIdle() { + return false; + } +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Server.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Server.java index f413cb0bb85..c6ecf16d44b 100755 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Server.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Server.java @@ -28,7 +28,7 @@ * * @see org.apache.dubbo.remoting.Transporter#bind(org.apache.dubbo.common.URL, ChannelHandler) */ -public interface Server extends Endpoint, Resetable { +public interface Server extends Endpoint, Resetable, IdleSensible { /** * is bound. @@ -55,4 +55,4 @@ public interface Server extends Endpoint, Resetable { @Deprecated void reset(org.apache.dubbo.common.Parameters parameters); -} \ No newline at end of file +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java index 1241a36419d..50349325cef 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java @@ -33,8 +33,6 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; -import static org.apache.dubbo.common.Constants.CLIENT_KEY; -import static org.apache.dubbo.common.Constants.TRANSPORTER_KEY; import static org.apache.dubbo.common.utils.UrlUtils.getHeartbeat; import static org.apache.dubbo.common.utils.UrlUtils.getIdleTimeout; @@ -175,7 +173,7 @@ public boolean hasAttribute(String key) { } private void startHeartBeatTask(URL url) { - if (shouldHeartbeat(url)) { + if (!client.canHandleIdle()) { AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); int heartbeat = getHeartbeat(url); long heartbeatTick = calculateLeastDuration(heartbeat); @@ -215,11 +213,6 @@ private long calculateLeastDuration(int time) { } } - private boolean shouldHeartbeat(URL url) { - String transporter = url.getParameter(CLIENT_KEY, url.getParameter(TRANSPORTER_KEY, "netty")); - return !transporter.equalsIgnoreCase("netty") && !transporter.equalsIgnoreCase("netty4"); - } - private boolean shouldReconnect(URL url) { boolean reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); if (!reconnect) { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java index 8df9c1363e0..8a74e234134 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java @@ -41,8 +41,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.unmodifiableCollection; -import static org.apache.dubbo.common.Constants.SERVER_KEY; -import static org.apache.dubbo.common.Constants.TRANSPORTER_KEY; /** * ExchangeServerImpl @@ -256,7 +254,7 @@ private long calculateLeastDuration(int time) { } private void startIdleCheckTask(URL url) { - if (shouldStartCloseTimer(url)) { + if (!server.canHandleIdle()) { AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels()); int idleTimeout = UrlUtils.getIdleTimeout(url); long idleTimeoutTick = calculateLeastDuration(idleTimeout); @@ -267,9 +265,4 @@ private void startIdleCheckTask(URL url) { IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS); } } - - private boolean shouldStartCloseTimer(URL url) { - String transporter = url.getParameter(SERVER_KEY, url.getParameter(TRANSPORTER_KEY, "netty")); - return !transporter.equalsIgnoreCase("netty") && !transporter.equalsIgnoreCase("netty4"); - } } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java index e1ff0d5c051..464087e7c23 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java @@ -166,4 +166,8 @@ protected org.apache.dubbo.remoting.Channel getChannel() { return NettyChannel.getOrAddChannel(c, getUrl(), this); } + @Override + public boolean canHandleIdle() { + return true; + } } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java index 059e20d20ed..62b6c55f93c 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java @@ -164,6 +164,11 @@ public Channel getChannel(InetSocketAddress remoteAddress) { return channels.get(NetUtils.toAddressString(remoteAddress)); } + @Override + public boolean canHandleIdle() { + return true; + } + @Override public boolean isBound() { return channel.isActive(); From e83885f249a2a45d7941f495efd5c552a6a4fcaf Mon Sep 17 00:00:00 2001 From: Ian Luo Date: Wed, 30 Jan 2019 17:02:43 +0800 Subject: [PATCH 10/11] fix unit test --- .../src/main/java/org/apache/dubbo/common/utils/UrlUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java index bae95be475f..72d40d37a7d 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java @@ -468,7 +468,7 @@ public static boolean isProvider(URL url) { } public static int getHeartbeat(URL url) { - return url.getParameter(Constants.HEARTBEAT_KEY, Constants.DEFAULT_HEARTBEAT); + return url.getParameter(Constants.HEARTBEAT_KEY, 0); } public static int getIdleTimeout(URL url) { From 12ddca938ba596ee228e03b0a32a615ce5556db8 Mon Sep 17 00:00:00 2001 From: "beiwei.ly" Date: Wed, 30 Jan 2019 17:32:56 +0800 Subject: [PATCH 11/11] don't rely on RECONNECT_KEY, and give HEARTBEAT a default value --- .../apache/dubbo/common/utils/UrlUtils.java | 12 +-------- .../support/header/HeaderExchangeClient.java | 25 +------------------ .../transport/netty/ClientReconnectTest.java | 4 +-- .../transport/netty4/ClientReconnectTest.java | 4 +-- 4 files changed, 6 insertions(+), 39 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java index 72d40d37a7d..23b50339f4c 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java @@ -468,7 +468,7 @@ public static boolean isProvider(URL url) { } public static int getHeartbeat(URL url) { - return url.getParameter(Constants.HEARTBEAT_KEY, 0); + return url.getParameter(Constants.HEARTBEAT_KEY, Constants.DEFAULT_HEARTBEAT); } public static int getIdleTimeout(URL url) { @@ -477,16 +477,6 @@ public static int getIdleTimeout(URL url) { if (idleTimeout < heartBeat * 2) { throw new IllegalStateException("idleTimeout < heartbeatInterval * 2"); } - if (idleTimeout == 0) { - String reconnect = url.getParameter(Constants.RECONNECT_KEY); - if (StringUtils.isNotEmpty(reconnect)) { - try { - idleTimeout = Integer.parseInt(reconnect); - } catch (NumberFormatException e) { - // ignore - } - } - } return idleTimeout; } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java index 50349325cef..3e57fba90c4 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java @@ -214,30 +214,7 @@ private long calculateLeastDuration(int time) { } private boolean shouldReconnect(URL url) { - boolean reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); - if (!reconnect) { - String param = url.getParameter(Constants.RECONNECT_KEY, "true"); - if ("false".equalsIgnoreCase(param)) { - reconnect = false; - } else if ("true".equalsIgnoreCase(param)) { - reconnect = true; - } else { - int value; - try { - value = Integer.parseInt(param); - } catch (Exception e) { - throw new IllegalArgumentException("reconnect param must be non-negative integer or false/true, " + - "input is:" + param); - } - - if (value < 0) { - throw new IllegalArgumentException("reconnect param must be non-negative integer or false/true, " + - "input is:" + param); - } - reconnect = true; - } - } - return reconnect; + return url.getParameter(Constants.RECONNECT_KEY, true); } @Override diff --git a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java index 7f89d0f32a4..4cb7eb6784d 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java @@ -69,8 +69,8 @@ public void testReconnect() throws RemotingException, InterruptedException { } - public Client startClient(int port, int reconnectPeriod) throws RemotingException { - final String url = "exchange://127.0.0.1:" + port + "/client.reconnect.test?check=false&client=netty3&" + Constants.RECONNECT_KEY + "=" + reconnectPeriod; + public Client startClient(int port, int heartbeat) throws RemotingException { + final String url = "exchange://127.0.0.1:" + port + "/client.reconnect.test?check=false&client=netty3&" + Constants.HEARTBEAT_KEY + "=" + heartbeat; return Exchangers.connect(url); } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientReconnectTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientReconnectTest.java index eae66742767..f21918d3d15 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientReconnectTest.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientReconnectTest.java @@ -71,8 +71,8 @@ public void testReconnect() throws RemotingException, InterruptedException { } - public Client startClient(int port, int reconnectPeriod) throws RemotingException { - final String url = "exchange://127.0.0.1:" + port + "/client.reconnect.test?client=netty4&check=false&" + Constants.RECONNECT_KEY + "=" + reconnectPeriod; + public Client startClient(int port, int heartbeat) throws RemotingException { + final String url = "exchange://127.0.0.1:" + port + "/client.reconnect.test?client=netty4&check=false&" + Constants.HEARTBEAT_KEY + "=" + heartbeat; return Exchangers.connect(url); }