Skip to content

Commit ce3bf56

Browse files
baomingyuhealchow
authored andcommitted
[INLONG-5097][TubeMQ] Keep the protocol compatible with previous versions (apache#5214)
1 parent 961a433 commit ce3bf56

File tree

7 files changed

+80
-70
lines changed

7 files changed

+80
-70
lines changed

inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java

-3
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public final class RpcConstants {
4343
public static final String NETTY_WRITE_HIGH_MARK = "rpc.netty.write.highmark";
4444
public static final String NETTY_WRITE_LOW_MARK = "rpc.netty.write.lowmark";
4545
public static final String NETTY_TCP_SENDBUF = "rpc.netty.send.buffer";
46-
public static final String NETTY_TCP_MAX_MESSAGE_SIZE = "rpc.netty.max.message.size";
4746
public static final String NETTY_TCP_RECEIVEBUF = "rpc.netty.receive.buffer";
4847
public static final String NETTY_TCP_ENABLEBUSYWAIT = "rpc.netty.enable.busy.wait";
4948

@@ -133,6 +132,4 @@ public final class RpcConstants {
133132
public static final long CFG_UNAVAILABLE_FORBIDDEN_DURATION_MS = 50000;
134133
public static final long CFG_DEFAULT_NETTY_WRITEBUFFER_HIGH_MARK = 50 * 1024 * 1024;
135134
public static final long CFG_DEFAULT_NETTY_WRITEBUFFER_LOW_MARK = 5 * 1024 * 1024;
136-
public static final int CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE = 5 * 1024 * 1024;
137-
138135
}

inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java

-2
Original file line numberDiff line numberDiff line change
@@ -285,9 +285,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
285285
*/
286286
@Override
287287
public void channelRead(ChannelHandlerContext ctx, Object e) {
288-
logger.debug("client message receive!");
289288
if (e instanceof RpcDataPack) {
290-
logger.debug("RpcDataPack client message receive!");
291289
RpcDataPack dataPack = (RpcDataPack) e;
292290
Callback callback = requests.remove(dataPack.getSerialNo());
293291
if (callback != null) {

inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java

+4-24
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,12 @@
2525
import io.netty.channel.EventLoopGroup;
2626
import io.netty.channel.WriteBufferWaterMark;
2727
import io.netty.channel.socket.SocketChannel;
28-
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2928
import io.netty.handler.ssl.SslHandler;
3029
import io.netty.handler.timeout.ReadTimeoutHandler;
3130
import io.netty.util.concurrent.DefaultThreadFactory;
3231

3332
import java.net.InetSocketAddress;
3433
import java.util.concurrent.ConcurrentHashMap;
35-
import java.util.concurrent.ExecutorService;
36-
import java.util.concurrent.Executors;
3734
import java.util.concurrent.TimeUnit;
3835
import java.util.concurrent.atomic.AtomicBoolean;
3936
import java.util.concurrent.atomic.AtomicInteger;
@@ -61,11 +58,8 @@ public class NettyClientFactory implements ClientFactory {
6158
new ConcurrentHashMap<>();
6259
protected AtomicBoolean shutdown = new AtomicBoolean(true);
6360
private EventLoopGroup eventLoopGroup;
64-
private ExecutorService bossExecutorService;
65-
private ExecutorService workerExecutorService;
6661
private AtomicInteger workerIdCounter = new AtomicInteger(0);
6762
// TSL encryption and need Two Way Authentic
68-
private int maxMessageSize;
6963
private boolean enableTLS = false;
7064
private boolean needTwoWayAuthentic = false;
7165
private String keyStorePath;
@@ -87,8 +81,6 @@ public void configure(final RpcConfig conf) throws IllegalArgumentException {
8781
if (this.shutdown.compareAndSet(true, false)) {
8882
enableTLS = conf.getBoolean(RpcConstants.TLS_OVER_TCP, false);
8983
needTwoWayAuthentic = conf.getBoolean(RpcConstants.TLS_TWO_WAY_AUTHENTIC, false);
90-
this.maxMessageSize = conf.getInt(RpcConstants.NETTY_TCP_MAX_MESSAGE_SIZE,
91-
RpcConstants.CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE);
9284
if (enableTLS) {
9385
trustStorePath = conf.getString(RpcConstants.TLS_TRUSTSTORE_PATH);
9486
trustStorePassword = conf.getString(RpcConstants.TLS_TRUSTSTORE_PASSWORD);
@@ -105,16 +97,9 @@ public void configure(final RpcConfig conf) throws IllegalArgumentException {
10597
trustStorePath = null;
10698
trustStorePassword = null;
10799
}
108-
final int bossCount =
109-
conf.getInt(RpcConstants.BOSS_COUNT,
110-
RpcConstants.CFG_DEFAULT_BOSS_COUNT);
111100
final int workerCount =
112101
conf.getInt(RpcConstants.WORKER_COUNT,
113102
RpcConstants.CFG_DEFAULT_CLIENT_WORKER_COUNT);
114-
final int callbackCount =
115-
conf.getInt(RpcConstants.CALLBACK_WORKER_COUNT, 3);
116-
bossExecutorService = Executors.newCachedThreadPool();
117-
workerExecutorService = Executors.newCachedThreadPool();
118103
String threadName = new StringBuilder(256)
119104
.append(conf.getString(RpcConstants.WORKER_THREAD_NAME,
120105
RpcConstants.CFG_DEFAULT_WORKER_THREAD_NAME))
@@ -202,11 +187,8 @@ public void shutdown() {
202187
}
203188
}
204189
}
205-
if (this.bossExecutorService != null) {
206-
this.bossExecutorService.shutdown();
207-
}
208-
if (this.workerExecutorService != null) {
209-
this.workerExecutorService.shutdown();
190+
if (this.eventLoopGroup != null && !eventLoopGroup.isShutdown()) {
191+
this.eventLoopGroup.shutdownGracefully();
210192
}
211193
} catch (Exception e) {
212194
logger.error("has exception ", e);
@@ -248,7 +230,8 @@ public void initChannel(SocketChannel socketChannel) throws Exception {
248230
try {
249231
SSLEngine sslEngine =
250232
TSSLEngineUtil.createSSLEngine(keyStorePath, trustStorePath,
251-
keyStorePassword, trustStorePassword, true, needTwoWayAuthentic);
233+
keyStorePassword, trustStorePassword, true,
234+
needTwoWayAuthentic);
252235
pipeline.addLast("ssl", new SslHandler(sslEngine));
253236
} catch (Throwable t) {
254237
logger.error(new StringBuilder(256)
@@ -257,9 +240,6 @@ public void initChannel(SocketChannel socketChannel) throws Exception {
257240
throw new Exception(t);
258241
}
259242
}
260-
socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxMessageSize,
261-
0, 4, 0, 4));
262-
263243
// Encode the data
264244
pipeline.addLast("protocolEncoder", new NettyProtocolEncoder());
265245
// Decode the bytes into a Rpc Data Pack

inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java

+67-28
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import static org.apache.inlong.tubemq.corebase.utils.AddressUtils.getRemoteAddressIP;
2121

2222
import io.netty.buffer.ByteBuf;
23+
import io.netty.buffer.Unpooled;
2324
import io.netty.channel.Channel;
2425
import io.netty.channel.ChannelHandlerContext;
2526
import io.netty.handler.codec.MessageToMessageDecoder;
27+
import io.netty.util.ReferenceCountUtil;
2628
import java.nio.ByteBuffer;
2729
import java.util.ArrayList;
2830
import java.util.List;
@@ -43,50 +45,83 @@ public class NettyProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
4345
new ConcurrentHashMap<>();
4446
private static AtomicLong lastProtolTime = new AtomicLong(0);
4547
private static AtomicLong lastSizeTime = new AtomicLong(0);
48+
private boolean packHeaderRead = false;
49+
private int listSize;
50+
private List<RpcDataPack> rpcDataPackList = new ArrayList<>();
51+
private RpcDataPack dataPack;
52+
private ByteBuf lastByteBuf;
4653

4754
@Override
4855
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
49-
if (buffer.readableBytes() < 12) {
50-
logger.warn("Decode buffer.readableBytes() < 12 !");
51-
return;
52-
}
53-
int frameToken = buffer.readInt();
54-
filterIllegalPkgToken(frameToken,
55-
RpcConstants.RPC_PROTOCOL_BEGIN_TOKEN, ctx.channel());
56-
int serialNo = buffer.readInt();
57-
int tmpListSize = buffer.readInt();
58-
filterIllegalPackageSize(true, tmpListSize,
59-
RpcConstants.MAX_FRAME_MAX_LIST_SIZE, ctx.channel());
60-
RpcDataPack dataPack = new RpcDataPack(serialNo, new ArrayList<ByteBuffer>());
61-
// get PackBody
62-
int i = 0;
63-
while (i < tmpListSize) {
64-
i++;
56+
buffer = convertToNewBuf(buffer);
57+
while (buffer.readableBytes() > 0) {
58+
if (!packHeaderRead) {
59+
if (buffer.readableBytes() < 12) {
60+
saveRemainedByteBuf(buffer);
61+
break;
62+
}
63+
int frameToken = buffer.readInt();
64+
filterIllegalPkgToken(frameToken, RpcConstants.RPC_PROTOCOL_BEGIN_TOKEN, ctx.channel());
65+
int serialNo = buffer.readInt();
66+
int tmpListSize = buffer.readInt();
67+
filterIllegalPackageSize(true, tmpListSize,
68+
RpcConstants.MAX_FRAME_MAX_LIST_SIZE, ctx.channel());
69+
this.listSize = tmpListSize;
70+
this.dataPack = new RpcDataPack(serialNo, new ArrayList<>(this.listSize));
71+
this.packHeaderRead = true;
72+
}
73+
// get PackBody
6574
if (buffer.readableBytes() < 4) {
66-
logger.warn("Decode buffer.readableBytes() < 4 !");
75+
saveRemainedByteBuf(buffer);
6776
break;
6877
}
6978
buffer.markReaderIndex();
7079
int length = buffer.readInt();
71-
filterIllegalPackageSize(false, length,
72-
RpcConstants.RPC_MAX_BUFFER_SIZE, ctx.channel());
80+
if (buffer.readableBytes() < length) {
81+
buffer.resetReaderIndex();
82+
saveRemainedByteBuf(buffer);
83+
break;
84+
}
7385
ByteBuffer bb = ByteBuffer.allocate(length);
7486
buffer.readBytes(bb);
7587
bb.flip();
7688
dataPack.getDataLst().add(bb);
89+
if (dataPack.getDataLst().size() == listSize) {
90+
packHeaderRead = false;
91+
rpcDataPackList.add(dataPack);
92+
}
7793
}
94+
if (rpcDataPackList.size() > 0) {
95+
out.addAll(rpcDataPackList);
96+
rpcDataPackList.clear();
97+
}
98+
}
99+
100+
private void saveRemainedByteBuf(ByteBuf byteBuf) {
101+
if (byteBuf != null && byteBuf.readableBytes() > 0) {
102+
lastByteBuf = Unpooled.copiedBuffer(byteBuf);
103+
}
104+
}
78105

79-
if (dataPack.getDataLst().size() == tmpListSize) {
80-
out.add(dataPack);
81-
} else {
82-
logger.warn("Decode dataPack.getDataLst().size()[{}] != tmpListSize [{}] !",
83-
dataPack.getDataLst().size(), tmpListSize);
84-
return;
106+
private ByteBuf convertToNewBuf(ByteBuf byteBuf) {
107+
ByteBuf newByteBuf = byteBuf;
108+
int totalReadBytes = byteBuf.readableBytes();
109+
if (lastByteBuf != null) {
110+
try {
111+
totalReadBytes += lastByteBuf.readableBytes();
112+
newByteBuf = Unpooled.buffer(totalReadBytes);
113+
newByteBuf.writeBytes(lastByteBuf);
114+
newByteBuf.writeBytes(byteBuf);
115+
} finally {
116+
ReferenceCountUtil.release(lastByteBuf);
117+
}
118+
lastByteBuf = null;
85119
}
120+
return newByteBuf;
86121
}
87122

88-
private void filterIllegalPkgToken(int inParamValue,
89-
int allowTokenVal, Channel channel) throws UnknownProtocolException {
123+
private void filterIllegalPkgToken(int inParamValue, int allowTokenVal,
124+
Channel channel) throws UnknownProtocolException {
90125
if (inParamValue != allowTokenVal) {
91126
String rmtaddrIp = getRemoteAddressIP(channel);
92127
if (rmtaddrIp != null) {
@@ -103,7 +138,11 @@ private void filterIllegalPkgToken(int inParamValue,
103138
long curTime = System.currentTimeMillis();
104139
if (curTime - befTime > 180000) {
105140
if (lastProtolTime.compareAndSet(befTime, System.currentTimeMillis())) {
106-
logger.warn("[Abnormal Visit] OSS Tube visit list is :" + errProtolAddrMap.toString());
141+
logger.warn("[Abnormal Visit] OSS Tube [inParamValue = {} vs "
142+
+ "allowTokenVal = {}] visit "
143+
+ "list is : {}",
144+
inParamValue, allowTokenVal,
145+
errProtolAddrMap.toString());
107146
errProtolAddrMap.clear();
108147
}
109148
}

inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.ByteArrayOutputStream;
2525
import java.io.IOException;
2626
import java.nio.ByteBuffer;
27+
import java.util.Arrays;
2728
import java.util.Iterator;
2829
import java.util.List;
2930
import org.apache.inlong.tubemq.corerpc.RpcConstants;
@@ -38,19 +39,17 @@ public class NettyProtocolEncoder extends MessageToMessageEncoder<RpcDataPack> {
3839
@Override
3940
protected void encode(ChannelHandlerContext chx, RpcDataPack msg, List<Object> out) {
4041
RpcDataPack dataPack = msg;
41-
List<ByteBuffer> origs = dataPack.getDataLst();
42-
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
43-
try {
42+
try (ByteArrayOutputStream byteOut = new ByteArrayOutputStream()) {
4443
byteOut.write(getPackHeader(dataPack).array());
44+
List<ByteBuffer> origs = dataPack.getDataLst();
4545
Iterator<ByteBuffer> iter = origs.iterator();
4646
while (iter.hasNext()) {
4747
ByteBuffer entry = iter.next();
4848
byteOut.write(getLengthHeader(entry).array());
49-
byteOut.write(entry.array());
49+
byteOut.write(getLengthBody(entry));
5050
}
5151
byte[] body = byteOut.toByteArray();
52-
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(4 + body.length);
53-
buf.writeInt(body.length);
52+
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(body.length);
5453
buf.writeBytes(body);
5554
out.add(buf);
5655
} catch (IOException e) {
@@ -73,4 +72,8 @@ private ByteBuffer getLengthHeader(ByteBuffer buf) {
7372
header.flip();
7473
return header;
7574
}
75+
76+
private byte[] getLengthBody(ByteBuffer buf) {
77+
return Arrays.copyOf(buf.array(), buf.limit());
78+
}
7679
}

inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java

-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.netty.channel.EventLoopGroup;
2929
import io.netty.channel.WriteBufferWaterMark;
3030
import io.netty.channel.socket.SocketChannel;
31-
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
3231
import io.netty.handler.ssl.SslHandler;
3332
import io.netty.util.concurrent.DefaultThreadFactory;
3433
import java.io.DataOutputStream;
@@ -82,7 +81,6 @@ public class NettyRpcServer implements ServiceRpcServer {
8281
private boolean needTwoWayAuthentic = false;
8382
private String trustStorePath = "";
8483
private String trustStorePassword = "";
85-
private int maxMessageSize;
8684

8785
/**
8886
* create a server with rpc config info
@@ -115,8 +113,6 @@ public NettyRpcServer(RpcConfig conf) throws Exception {
115113
}
116114
}
117115
this.enableBusyWait = conf.getBoolean(RpcConstants.NETTY_TCP_ENABLEBUSYWAIT, false);
118-
this.maxMessageSize = conf.getInt(RpcConstants.NETTY_TCP_MAX_MESSAGE_SIZE,
119-
RpcConstants.CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE);
120116
int bossCount =
121117
conf.getInt(RpcConstants.BOSS_COUNT,
122118
RpcConstants.CFG_DEFAULT_BOSS_COUNT);
@@ -172,8 +168,6 @@ public void initChannel(SocketChannel socketChannel) {
172168
System.exit(1);
173169
}
174170
}
175-
socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
176-
maxMessageSize, 0, 4, 0, 4));
177171
// Encode the data handler
178172
socketChannel.pipeline().addLast("protocolEncoder", new NettyProtocolDecoder());
179173
// Decode the bytes into a Rpc Data Pack

inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public void encode() {
5151
// read data.
5252
int i = buf.readInt();
5353
i = buf.readInt();
54-
i = buf.readInt();
5554
Assert.assertEquals(123, i);
5655
} catch (Exception e) {
5756
e.printStackTrace();

0 commit comments

Comments
 (0)