Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #9191] Provide the ability to replace the remoting layer implementation for Proxy and Broker #9192

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 116 additions & 86 deletions broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -94,4 +101,43 @@ public void run() {
TimeUnit.MILLISECONDS.sleep(headSlowTimeMills);
assertThat(brokerController.headSlowTimeMills(queue)).isGreaterThanOrEqualTo(headSlowTimeMills);
}

@Test
public void testCustomRemotingServer() throws CloneNotSupportedException {
final RemotingServer mockRemotingServer = new NettyRemotingServer(nettyServerConfig);
final String mockRemotingServerName = "MOCK_REMOTING_SERVER";

BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, new NettyClientConfig(), messageStoreConfig);
brokerController.setRemotingServerByName(mockRemotingServerName, mockRemotingServer);
brokerController.initializeRemotingServer();

final RPCHook rpcHook = new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {

}

@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {

}
};
brokerController.registerServerRPCHook(rpcHook);

// setRequestPipelineTest
final RequestPipeline requestPipeline = (ctx, request) -> {

};
brokerController.setRequestPipeline(requestPipeline);

NettyRemotingAbstract tcpRemotingServer = (NettyRemotingAbstract) brokerController.getRemotingServer();
Assert.assertTrue(tcpRemotingServer.getRPCHook().contains(rpcHook));

NettyRemotingAbstract fastRemotingServer = (NettyRemotingAbstract) brokerController.getFastRemotingServer();
Assert.assertTrue(fastRemotingServer.getRPCHook().contains(rpcHook));

NettyRemotingAbstract mockRemotingServer1 = (NettyRemotingAbstract) brokerController.getRemotingServerByName(mockRemotingServerName);
Assert.assertTrue(mockRemotingServer1.getRPCHook().contains(rpcHook));
Assert.assertSame(mockRemotingServer, mockRemotingServer1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ObjectCreator;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
Expand Down Expand Up @@ -268,19 +269,43 @@ public class MQClientAPIImpl implements NameServerUpdateCallback, StartAndShutdo
private String nameSrvAddr = null;
private ClientConfig clientConfig;

public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
public MQClientAPIImpl(
final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
final RPCHook rpcHook,
final ClientConfig clientConfig
) {
this(nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig, null);
}

public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
public MQClientAPIImpl(
final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig, final ChannelEventListener channelEventListener) {
final RPCHook rpcHook,
final ClientConfig clientConfig,
final ChannelEventListener channelEventListener
) {
this(
nettyClientConfig,
clientRemotingProcessor,
rpcHook,
clientConfig,
channelEventListener,
null
);
}

public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig,
final ChannelEventListener channelEventListener,
final ObjectCreator<RemotingClient> remotingClientCreator) {
this.clientConfig = clientConfig;
topAddressing = new DefaultTopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
topAddressing.registerChangeCallBack(this);
this.remotingClient = new NettyRemotingClient(nettyClientConfig, channelEventListener);
this.remotingClient = remotingClientCreator != null
? remotingClientCreator.create(nettyClientConfig, channelEventListener)
: new NettyRemotingClient(nettyClientConfig, channelEventListener);
this.clientRemotingProcessor = clientRemotingProcessor;

this.remotingClient.registerRPCHook(new NamespaceRpcHook(clientConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.rocketmq.client.impl.admin.MqClientAdminImpl;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.ObjectCreator;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
Expand All @@ -48,6 +49,7 @@
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
Expand Down Expand Up @@ -97,7 +99,17 @@ public MQClientAPIExt(
ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook
) {
super(nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig);
this(clientConfig, nettyClientConfig, clientRemotingProcessor, rpcHook, null);
}

public MQClientAPIExt(
ClientConfig clientConfig,
NettyClientConfig nettyClientConfig,
ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook,
ObjectCreator<RemotingClient> remotingClientCreator
) {
super(nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig, null, remotingClientCreator);
this.clientConfig = clientConfig;
this.mqClientAdmin = new MqClientAdminImpl(getRemotingClient());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
package org.apache.rocketmq.client.impl.mqclient;

import com.google.common.base.Strings;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.common.NameserverAccessConfig;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.utils.AsyncShutdownHelper;
import org.apache.rocketmq.common.ObjectCreator;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;

public class MQClientAPIFactory implements StartAndShutdown {
Expand All @@ -40,16 +44,35 @@ public class MQClientAPIFactory implements StartAndShutdown {
private final RPCHook rpcHook;
private final ScheduledExecutorService scheduledExecutorService;
private final NameserverAccessConfig nameserverAccessConfig;
private final ObjectCreator<RemotingClient> remotingClientCreator;

public MQClientAPIFactory(
NameserverAccessConfig nameserverAccessConfig,
String namePrefix,
int clientNum,
ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook,
ScheduledExecutorService scheduledExecutorService
) {
this(nameserverAccessConfig, namePrefix, clientNum, clientRemotingProcessor, rpcHook, scheduledExecutorService, null);
}

public MQClientAPIFactory(NameserverAccessConfig nameserverAccessConfig, String namePrefix, int clientNum,
public MQClientAPIFactory(
NameserverAccessConfig nameserverAccessConfig,
String namePrefix,
int clientNum,
ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, ScheduledExecutorService scheduledExecutorService) {
RPCHook rpcHook,
ScheduledExecutorService scheduledExecutorService,
ObjectCreator<RemotingClient> remotingClientCreator
) {
this.nameserverAccessConfig = nameserverAccessConfig;
this.namePrefix = namePrefix;
this.clientNum = clientNum;
this.clientRemotingProcessor = clientRemotingProcessor;
this.rpcHook = rpcHook;
this.scheduledExecutorService = scheduledExecutorService;
this.remotingClientCreator = remotingClientCreator;

this.init();
}
Expand Down Expand Up @@ -102,9 +125,13 @@ protected MQClientAPIExt createAndStart(String instanceName) {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setDisableCallbackExecutor(true);

MQClientAPIExt mqClientAPIExt = new MQClientAPIExt(clientConfig, nettyClientConfig,
MQClientAPIExt mqClientAPIExt = new MQClientAPIExt(
clientConfig,
nettyClientConfig,
clientRemotingProcessor,
rpcHook);
rpcHook,
remotingClientCreator
);

if (!mqClientAPIExt.updateNameServerAddressList()) {
mqClientAPIExt.fetchNameServerAddr();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ObjectCreator;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
Expand All @@ -59,6 +60,7 @@
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
Expand Down Expand Up @@ -2104,6 +2106,48 @@ public void operationFail(Throwable throwable) {
done.await();
}

@Test
public void testMQClientAPIImplWithoutObjectCreator() {
MQClientAPIImpl clientAPI = new MQClientAPIImpl(
new NettyClientConfig(),
null,
null,
new ClientConfig(),
null,
null
);
RemotingClient remotingClient1 = clientAPI.getRemotingClient();
Assert.assertTrue(remotingClient1 instanceof NettyRemotingClient);
}

@Test
public void testMQClientAPIImplWithObjectCreator() {
ObjectCreator<RemotingClient> clientObjectCreator = args -> new MockRemotingClientTest((NettyClientConfig) args[0]);
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
MQClientAPIImpl clientAPI = new MQClientAPIImpl(
nettyClientConfig,
null,
null,
new ClientConfig(),
null,
clientObjectCreator
);
RemotingClient remotingClient1 = clientAPI.getRemotingClient();
Assert.assertTrue(remotingClient1 instanceof MockRemotingClientTest);
MockRemotingClientTest remotingClientTest = (MockRemotingClientTest) remotingClient1;
Assert.assertSame(remotingClientTest.getNettyClientConfig(), nettyClientConfig);
}

private static class MockRemotingClientTest extends NettyRemotingClient {
public MockRemotingClientTest(NettyClientConfig nettyClientConfig) {
super(nettyClientConfig);
}

public NettyClientConfig getNettyClientConfig() {
return nettyClientConfig;
}
}

private Properties createProperties() {
Properties result = new Properties();
result.put("key", "value");
Expand Down
21 changes: 21 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/ObjectCreator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;

public interface ObjectCreator<T> {
T create(Object... args);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.MessageStore;
Expand All @@ -43,8 +44,11 @@ public InnerBrokerController(

@Override
protected void initializeRemotingServer() {
this.remotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort());
this.fastRemotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort() - 2);
RemotingServer remotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort());
RemotingServer fastRemotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort() - 2);

setRemotingServer(remotingServer);
setFastRemotingServer(fastRemotingServer);
}

@Override
Expand Down Expand Up @@ -119,11 +123,11 @@ public void shutdown() {
scheduledFuture.cancel(true);
}

if (this.remotingServer != null) {
if (getRemotingServer() != null) {
this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort());
}

if (this.fastRemotingServer != null) {
if (getFastRemotingServer() != null) {
this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort() - 2);
}
}
Expand Down
Loading
Loading