Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge master #1

Merged
merged 2 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package com.alibaba.otter.canal.client.adapter.es8x.support;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Map;

import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand All @@ -36,7 +32,20 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.util.Arrays;
import java.util.Map;

/**
* ES 连接器, 只支持 Rest 方式
Expand All @@ -50,20 +59,59 @@ public class ESConnection {

private RestHighLevelClient restHighLevelClient;

public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException{
public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException {
String caPath = properties.get("security.ca.path");
if (StringUtils.isNotEmpty(caPath)) {
connectEsWithCa(hosts, properties, caPath);
} else {
connectEsWithoutCa(hosts, properties);
}
}
private void connectEsWithCa(String[] hosts, Map<String, String> properties, String caPath) {
Path caCertificatePath = Paths.get(caPath);
try (InputStream is = Files.newInputStream(caCertificatePath)) {
CertificateFactory factory = CertificateFactory.getInstance("X.509");
Certificate trustedCa = factory.generateCertificate(is);
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore, null);
final SSLContext sslContext = sslContextBuilder.build();

HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
String nameAndPwd = properties.get("security.auth");
if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
String[] nameAndPwdArr = nameAndPwd.split(":");
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder.setSSLContext(sslContext);
});
}
restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true).build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void connectEsWithoutCa(String[] hosts, Map<String, String> properties) {
HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
String nameAndPwd = properties.get("security.auth");
if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
String[] nameAndPwdArr = nameAndPwd.split(":");
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
restClientBuilder.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true)
.build();
.build();
}

public void close() {
Expand Down Expand Up @@ -99,9 +147,9 @@ public class ES8xIndexRequest implements ESBulkRequest.ESIndexRequest {

private IndexRequestBuilder indexRequestBuilder;

private IndexRequest indexRequest;
private IndexRequest indexRequest;

public ES8xIndexRequest(String index, String id){
public ES8xIndexRequest(String index, String id) {
indexRequest = new IndexRequest(index);
indexRequest.id(id);

Expand Down Expand Up @@ -142,9 +190,9 @@ public class ES8xUpdateRequest implements ESBulkRequest.ESUpdateRequest {

private UpdateRequestBuilder updateRequestBuilder;

private UpdateRequest updateRequest;
private UpdateRequest updateRequest;

public ES8xUpdateRequest(String index, String id){
public ES8xUpdateRequest(String index, String id) {

updateRequest = new UpdateRequest(index, id);
}
Expand Down Expand Up @@ -191,9 +239,9 @@ public class ES8xDeleteRequest implements ESBulkRequest.ESDeleteRequest {

private DeleteRequestBuilder deleteRequestBuilder;

private DeleteRequest deleteRequest;
private DeleteRequest deleteRequest;

public ES8xDeleteRequest(String index, String id){
public ES8xDeleteRequest(String index, String id) {

deleteRequest = new DeleteRequest(index, id);

Expand All @@ -220,11 +268,11 @@ public class ESSearchRequest {

private SearchRequestBuilder searchRequestBuilder;

private SearchRequest searchRequest;
private SearchRequest searchRequest;

private SearchSourceBuilder sourceBuilder;
private SearchSourceBuilder sourceBuilder;

public ESSearchRequest(String index){
public ESSearchRequest(String index) {

searchRequest = new SearchRequest(index);
sourceBuilder = new SearchSourceBuilder();
Expand Down Expand Up @@ -277,9 +325,9 @@ public class ES8xBulkRequest implements ESBulkRequest {

private BulkRequestBuilder bulkRequestBuilder;

private BulkRequest bulkRequest;
private BulkRequest bulkRequest;

public ES8xBulkRequest(){
public ES8xBulkRequest() {

bulkRequest = new BulkRequest();

Expand Down Expand Up @@ -350,7 +398,7 @@ public static class ES8xBulkResponse implements ESBulkRequest.ESBulkResponse {

private BulkResponse bulkResponse;

public ES8xBulkResponse(BulkResponse bulkResponse){
public ES8xBulkResponse(BulkResponse bulkResponse) {
this.bulkResponse = bulkResponse;
}

Expand Down Expand Up @@ -390,7 +438,7 @@ private HttpHost createHttpHost(String uriStr) {
}
try {
return HttpHost.create(new URI(uri
.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())
.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())
.toString());
} catch (URISyntaxException ex) {
throw new IllegalStateException(ex);
Expand Down
3 changes: 2 additions & 1 deletion client-adapter/launcher/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ canal.conf:
# hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
# properties:
# mode: transport # or rest
# # security.ca.path: /etc/es8/ca.crt
# # security.auth: test:123456 # only used for rest mode
# cluster.name: elasticsearch
# - name: kudu
Expand All @@ -113,4 +114,4 @@ canal.conf:
# jdbc.password: 123456
# batchSize: 3000
# scheduleTime: 600 # second unit
# threads: 3 # parallel threads
# threads: 3 # parallel threads
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
* 角色认证token
*/
private String roleToken;

/**
* listener name
*/
private String listenerName;

/**
* 订阅客户端名称
*/
Expand Down Expand Up @@ -129,6 +135,11 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
*/
public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
, String subscriptName) {
this(isFlatMessage, serviceUrl, roleToken, topic, subscriptName,null);
}

public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
, String subscriptName, String listenerName) {
this.isFlatMessage = isFlatMessage;
this.serviceUrl = serviceUrl;
this.roleToken = roleToken;
Expand All @@ -137,6 +148,7 @@ public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String r
if (StringUtils.isEmpty(this.subscriptName)) {
throw new RuntimeException("Pulsar Consumer subscriptName required");
}
this.listenerName = listenerName;
}

/**
Expand All @@ -150,6 +162,15 @@ public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String r
, String subscriptName, int batchSize, int getBatchTimeoutSeconds, int batchProcessTimeoutSeconds
, int redeliveryDelaySeconds, int ackTimeoutSeconds, boolean isRetry, boolean isRetryDLQUpperCase
, int maxRedeliveryCount) {
this(isFlatMessage, serviceUrl, roleToken, topic, subscriptName, batchSize, getBatchTimeoutSeconds
, batchProcessTimeoutSeconds, redeliveryDelaySeconds, ackTimeoutSeconds, isRetry, isRetryDLQUpperCase
, maxRedeliveryCount, null);
}

public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
, String subscriptName, int batchSize, int getBatchTimeoutSeconds, int batchProcessTimeoutSeconds
, int redeliveryDelaySeconds, int ackTimeoutSeconds, boolean isRetry, boolean isRetryDLQUpperCase
, int maxRedeliveryCount, String listenerName) {
this.isFlatMessage = isFlatMessage;
this.serviceUrl = serviceUrl;
this.roleToken = roleToken;
Expand All @@ -166,15 +187,20 @@ public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String r
this.isRetry = isRetry;
this.isRetryDLQUpperCase = isRetryDLQUpperCase;
this.maxRedeliveryCount = maxRedeliveryCount;
this.listenerName = listenerName;
}

@Override
public void connect() throws CanalClientException {
// 连接创建客户端
try {
pulsarClient = PulsarClient.builder()
ClientBuilder builder = PulsarClient.builder()
.serviceUrl(serviceUrl)
.authentication(AuthenticationFactory.token(roleToken))
.authentication(AuthenticationFactory.token(roleToken));
if (StringUtils.isNotEmpty(listenerName)) {
builder.listenerName(listenerName);
}
pulsarClient = builder
.build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,9 @@ public class PulsarMQConstants {
*/
public static final String PULSARMQ_ADMIN_SERVER_URL = ROOT + "." + "adminServerUrl";

/**
* Pulsar admin服务器地址
*/
public static final String PULSARMQ_LISTENER_NAME = ROOT + "." + "listenerName";

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public class PulsarMQProducerConfig extends MQProperties {
*/
private String adminServerUrl;

/**
* listener name
*/
private String listenerName;

public String getServerUrl() {
return serverUrl;
}
Expand Down Expand Up @@ -68,4 +73,12 @@ public String getAdminServerUrl() {
public void setAdminServerUrl(String adminServerUrl) {
this.adminServerUrl = adminServerUrl;
}

public String getListenerName() {
return listenerName;
}

public void setListenerName(String listenerName) {
this.listenerName = listenerName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public class CanalPulsarMQConsumer implements CanalMsgConsumer {
* 角色认证token
*/
private String roleToken;

/**
* listener name
*/
private String listenerName;

/**
* 订阅客户端名称
*/
Expand Down Expand Up @@ -110,6 +116,7 @@ public void init(Properties properties, String topic, String groupId) {
}
this.serviceUrl = properties.getProperty(PulsarMQConstants.PULSARMQ_SERVER_URL);
this.roleToken = properties.getProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN);
this.listenerName = properties.getProperty(PulsarMQConstants.PULSARMQ_LISTENER_NAME);
this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
// 采用groupId作为subscriptName,避免所有的都是同一个订阅者名称
if (StringUtils.isEmpty(this.subscriptName)) {
Expand Down Expand Up @@ -165,6 +172,9 @@ public void connect() {
if (StringUtils.isNotEmpty(roleToken)) {
builder.authentication(AuthenticationFactory.token(roleToken));
}
if (StringUtils.isNotEmpty(listenerName)) {
builder.authentication(AuthenticationFactory.token(listenerName));
}
pulsarClient = builder.build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public void init(Properties properties) {
// 角色权限认证的token
builder.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()));
}
if (StringUtils.isNotEmpty(pulsarMQProducerConfig.getListenerName())) {
//listener name
builder.listenerName(pulsarMQProducerConfig.getListenerName());
}

client = builder.build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -127,6 +132,11 @@ private void loadPulsarMQProperties(Properties properties) {
if (!StringUtils.isEmpty(adminServerUrl)) {
tmpProperties.setAdminServerUrl(adminServerUrl);
}
String listenerName = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_LISTENER_NAME);
if (!StringUtils.isEmpty(listenerName)) {
tmpProperties.setListenerName(listenerName);
}

if (logger.isDebugEnabled()) {
logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
}
Expand Down