Skip to content

Commit e5c1f3f

Browse files
authored
Merge pull request #4 from alibaba/master
merge from master
2 parents 134d2c9 + 1e5b8a2 commit e5c1f3f

File tree

7 files changed

+67
-32
lines changed

7 files changed

+67
-32
lines changed

driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

+16-14
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,17 @@
3030
*/
3131
public class MysqlConnector {
3232

33+
public static final int timeout = 5 * 1000; // 5s
3334
private static final Logger logger = LoggerFactory.getLogger(MysqlConnector.class);
3435
private InetSocketAddress address;
3536
private String username;
3637
private String password;
3738
private SslInfo sslInfo;
38-
3939
private String defaultSchema;
4040
private int soTimeout = 30 * 1000;
4141
private int connTimeout = 5 * 1000;
4242
private int receiveBufferSize = 16 * 1024;
4343
private int sendBufferSize = 16 * 1024;
44-
4544
private SocketChannel channel;
4645
private volatile boolean dumping = false;
4746
// mysql connectionId
@@ -50,8 +49,6 @@ public class MysqlConnector {
5049
// serverVersion
5150
private String serverVersion;
5251

53-
public static final int timeout = 5 * 1000; // 5s
54-
5552
public MysqlConnector(){
5653
}
5754

@@ -200,14 +197,15 @@ private void negotiate(SocketChannel channel) throws IOException {
200197
}
201198
HandshakeInitializationPacket handshakePacket = new HandshakeInitializationPacket();
202199
handshakePacket.fromBytes(body);
200+
byte serverCharsetNumber = (handshakePacket.serverCharsetNumber != 0) ? handshakePacket.serverCharsetNumber : 33;
203201
SslMode sslMode = sslInfo != null ? sslInfo.getSslMode() : SslMode.DISABLED;
204202
if (sslMode != SslMode.DISABLED) {
205203
boolean serverSupportSsl = (handshakePacket.serverCapabilities & CLIENT_SSL) > 0;
206204
if (!serverSupportSsl) {
207205
throw new IOException("MySQL Server does not support SSL: " + address + " serverCapabilities: "
208206
+ handshakePacket.serverCapabilities);
209207
}
210-
byte[] sslPacket = new SslRequestCommandPacket(handshakePacket.serverCharsetNumber).toBytes();
208+
byte[] sslPacket = new SslRequestCommandPacket(serverCharsetNumber).toBytes();
211209
HeaderPacket sslHeader = new HeaderPacket();
212210
sslHeader.setPacketBodyLength(sslPacket.length);
213211
sslHeader.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
@@ -225,23 +223,27 @@ private void negotiate(SocketChannel channel) throws IOException {
225223
connectionId = handshakePacket.threadId; // 记录一下connection
226224
serverVersion = handshakePacket.serverVersion; // 记录serverVersion
227225
logger.info("handshake initialization packet received, prepare the client authentication packet to send");
228-
logger.info("auth plugin: {}", new String(handshakePacket.authPluginName));
226+
// 某些老协议的 server 默认不返回 auth plugin,需要使用默认的 mysql_native_password
227+
String authPluginName = (handshakePacket.authPluginName != null
228+
&& handshakePacket.authPluginName.length > 0) ? new String(
229+
handshakePacket.authPluginName) : "mysql_native_password";
230+
logger.info("auth plugin: {}", authPluginName);
229231
boolean isSha2Password = false;
230232
ClientAuthenticationPacket clientAuth;
231-
if ("caching_sha2_password".equals(new String(handshakePacket.authPluginName))) {
233+
if ("caching_sha2_password".equals(authPluginName)) {
232234
clientAuth = new ClientAuthenticationSHA2Packet();
233235
isSha2Password = true;
234236
} else {
235237
clientAuth = new ClientAuthenticationPacket();
236238
}
237-
clientAuth.setCharsetNumber(handshakePacket.serverCharsetNumber);
239+
clientAuth.setCharsetNumber(serverCharsetNumber);
238240

239241
clientAuth.setUsername(username);
240242
clientAuth.setPassword(password);
241243
clientAuth.setServerCapabilities(handshakePacket.serverCapabilities);
242244
clientAuth.setDatabaseName(defaultSchema);
243245
clientAuth.setScrumbleBuff(joinAndCreateScrumbleBuff(handshakePacket));
244-
clientAuth.setAuthPluginName(handshakePacket.authPluginName);
246+
clientAuth.setAuthPluginName(authPluginName.getBytes());
245247

246248
byte[] clientAuthPkgBody = clientAuth.toBytes();
247249
HeaderPacket h = new HeaderPacket();
@@ -284,7 +286,7 @@ private void negotiate(SocketChannel channel) throws IOException {
284286
encryptedPassword = getPassword().getBytes();
285287
header = authSwitchAfterAuth(encryptedPassword, header);
286288
body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
287-
} else if ("mysql_native_password".equals(pluginName)) {
289+
} else if (pluginName == null || "mysql_native_password".equals(pluginName)) {
288290
try {
289291
encryptedPassword = MySQLPasswordEncrypter.scramble411(getPassword().getBytes(), authData);
290292
} catch (NoSuchAlgorithmException e) {
@@ -481,10 +483,6 @@ public void setChannel(SocketChannel channel) {
481483
this.channel = channel;
482484
}
483485

484-
public void setPassword(String password) {
485-
this.password = password;
486-
}
487-
488486
public long getConnectionId() {
489487
return connectionId;
490488
}
@@ -513,6 +511,10 @@ public String getPassword() {
513511
return password;
514512
}
515513

514+
public void setPassword(String password) {
515+
this.password = password;
516+
}
517+
516518
public String getServerVersion() {
517519
return serverVersion;
518520
}

driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java

-10
Original file line numberDiff line numberDiff line change
@@ -167,16 +167,6 @@ public SocketAddress getLocalSocketAddress() {
167167
public void close() {
168168
Socket socket = this.socket;
169169
if (socket != null) {
170-
try {
171-
socket.shutdownInput();
172-
} catch (IOException e) {
173-
// Ignore, could not do anymore
174-
}
175-
try {
176-
socket.shutdownOutput();
177-
} catch (IOException e) {
178-
// Ignore, could not do anymore
179-
}
180170
try {
181171
socket.close();
182172
} catch (IOException e) {

driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannelPool.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ private static SSLSocket createSslSocket(Socket socket, SslInfo sslInfo) throws
109109
}
110110
logger.info("SSL protocol: {}", StringUtils.join(protocolArr, ","));
111111
sslSocket.setEnabledProtocols(protocolArr);
112-
// sslSocket.setEnabledCipherSuites("TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDH_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDH_RSA_WITH_AES_256_GCM_SHA384,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_DHE_DSS_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_DSS_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384,TLS_RSA_WITH_AES_256_CBC_SHA256,TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA384,TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384,TLS_DHE_RSA_WITH_AES_256_CBC_SHA256,TLS_DHE_DSS_WITH_AES_256_CBC_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA,TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA,TLS_ECDH_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,TLS_RSA_WITH_AES_128_CBC_SHA256,TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256,TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_DSS_WITH_AES_128_CBC_SHA256,TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA,TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA,TLS_ECDH_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_DSS_WITH_AES_128_CBC_SHA".split(","));
112+
sslSocket.setSoTimeout(BioSocketChannel.SO_TIMEOUT);
113113
sslSocket.startHandshake();
114114
logger.info("SSL socket handshake success.");
115115
return sslSocket;

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

+28-2
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,32 @@ private void loadVersionComment() {
603603
}
604604
}
605605

606+
/**
607+
* MySQL 8.4版本开始部分命令出现变化
608+
* https://dev.mysql.com/doc/relnotes/mysql/8.4/en/news-8-4-0.html#mysqld-8-4-0-deprecation-removal
609+
*
610+
* @param major
611+
* @param minor
612+
* @return
613+
*/
614+
public boolean atLeast(int major, int minor) {
615+
if (isMariaDB()) {
616+
return false;
617+
}
618+
String version = connector.getServerVersion();
619+
if (StringUtils.isNotEmpty(version)) {
620+
String[] parts = version.split("\\.");
621+
int majorVer = Integer.parseInt(parts[0]);
622+
int minorVer = Integer.parseInt(parts[1]);
623+
return (majorVer > major) || (majorVer == major && minorVer >= minor);
624+
}
625+
return false;
626+
}
627+
628+
public boolean atLeastMySQL84() {
629+
return atLeast(8, 4);
630+
}
631+
606632
private void accumulateReceivedBytes(long x) {
607633
if (receivedBinlogBytes != null) {
608634
receivedBinlogBytes.addAndGet(x);
@@ -627,7 +653,7 @@ public boolean isMixed() {
627653

628654
private String value;
629655

630-
private BinlogFormat(String value){
656+
private BinlogFormat(String value) {
631657
this.value = value;
632658
}
633659

@@ -667,7 +693,7 @@ public boolean isNoBlob() {
667693

668694
private String value;
669695

670-
private BinlogImage(String value){
696+
private BinlogImage(String value) {
671697
this.value = value;
672698
}
673699

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -668,13 +668,20 @@ private Long findServerId(MysqlConnection mysqlConnection) {
668668
* 查询当前的binlog位置
669669
*/
670670
private EntryPosition findEndPosition(MysqlConnection mysqlConnection) {
671+
String showSql = "show master status";
671672
try {
672-
String showSql = multiStreamEnable ? "show master status with " + destination : "show master status";
673+
if (mysqlConnection.atLeastMySQL84()) {
674+
// 8.4新语法
675+
showSql = "show binary log status";
676+
} else if (multiStreamEnable) {
677+
// 兼容polardb-x的多流binlog
678+
showSql = "show master status with " + destination;
679+
}
673680
ResultSetPacket packet = mysqlConnection.query(showSql);
674681
List<String> fields = packet.getFieldValues();
675682
if (CollectionUtils.isEmpty(fields)) {
676683
throw new CanalParseException(
677-
"command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
684+
"command : '" + showSql + "' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
678685
}
679686
EntryPosition endPosition = new EntryPosition(fields.get(0), Long.valueOf(fields.get(1)));
680687
if (isGTIDMode() && fields.size() > 4) {
@@ -690,7 +697,7 @@ private EntryPosition findEndPosition(MysqlConnection mysqlConnection) {
690697
}
691698
return endPosition;
692699
} catch (IOException e) {
693-
throw new CanalParseException("command : 'show master status' has an error!", e);
700+
throw new CanalParseException("command : '" + showSql + "' has an error!", e);
694701
}
695702
}
696703

@@ -721,7 +728,12 @@ private EntryPosition findStartPosition(MysqlConnection mysqlConnection) {
721728
@SuppressWarnings("unused")
722729
private SlaveEntryPosition findSlavePosition(MysqlConnection mysqlConnection) {
723730
try {
724-
ResultSetPacket packet = mysqlConnection.query("show slave status");
731+
String showSql = "show slave stauts";
732+
if (mysqlConnection.atLeastMySQL84()) {
733+
// 兼容mysql 8.4
734+
showSql = "show replica status";
735+
}
736+
ResultSetPacket packet = mysqlConnection.query(showSql);
725737
List<FieldPacket> names = packet.getFieldDescriptors();
726738
List<String> fields = packet.getFieldValues();
727739
if (CollectionUtils.isEmpty(fields)) {

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/AbstractRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ private String makeRequestString(Map<String, String> param) throws Exception {
195195
private final HttpResponse executeHttpRequest(HttpGet getMethod, String host) throws Exception {
196196
SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(null, (TrustStrategy) (arg0, arg1) -> true).build();
197197
SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext,
198-
new String[] { "TLSv1" },
198+
new String[] { "TLSv1.2" },
199199
null,
200200
SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
201201
Registry registry = RegistryBuilder.create()

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

+5
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,11 @@ private boolean dumpTableMeta(MysqlConnection connection, final CanalEventFilter
207207
}
208208

209209
for (String schema : schemas) {
210+
// 如果schema命中黑名单,直接跳过
211+
// 解决部分数据库可以看到database,但实际表级别无权限的情况
212+
if (blackFilter != null && blackFilter.filter(schema + ".%")) {
213+
continue;
214+
}
210215
// filter views
211216
packet = connection.query("show full tables from `" + schema + "` where Table_type = 'BASE TABLE'");
212217
columnSize = packet.getFieldDescriptors().size();

0 commit comments

Comments
 (0)