Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-4468][Manager] Use the user-configured URL instead of the DNS of Pulsar #4479

Merged
merged 5 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -83,8 +84,8 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
log.warn("inlong stream is empty for groupId={}, skip to create pulsar subscription", groupId);
return ListenerResult.success();
}
PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
String tenant = clusterBean.getDefaultTenant();
String namespace = groupInfo.getMqResource();

Expand All @@ -94,29 +95,26 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
topicBean.setNamespace(namespace);
String topic = streamEntity.getMqResource();
topicBean.setTopicName(topic);
List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);

// Create a subscription in the Pulsar cluster (cross-region), you need to ensure that the Topic exists
for (String cluster : pulsarClusters) {
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
.token(globalCluster.getToken()).adminUrl(serviceUrl).build();
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);

if (!exist) {
String topicFull = tenant + "/" + namespace + "/" + topic;
log.error("topic={} not exists in {}", topicFull, serviceUrl);
throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + serviceUrl);
}

// Consumer naming rules: sortAppName_topicName_consumer_group
String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);

// Insert the consumption data into the consumption table
consumptionService.saveSortConsumption(groupInfo, topic, subscription);

// Create a subscription in the Pulsar cluster you need to ensure that the Topic exists
try {
boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
if (!exist) {
String topicFull = tenant + "/" + namespace + "/" + topic;
String serviceUrl = pulsarClusterInfo.getAdminUrl();
log.error("topic={} not exists in {}", topicFull, serviceUrl);
throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + serviceUrl);
}

// Consumer naming rules: sortAppName_topicName_consumer_group
String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);

// Insert the consumption data into the consumption table
consumptionService.saveSortConsumption(groupInfo, topic, subscription);
} catch (PulsarAdminException |WorkflowListenerException e) {
log.error("create pulsar subscription error for groupId={}", groupId);
throw new WorkflowListenerException("create pulsar subscription error: " + e.getMessage());
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,9 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
throw new WorkflowListenerException("inlong group or pulsar cluster not found for groupId=" + groupId);
}
InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType());
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
for (String cluster : pulsarClusters) {
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
.token(globalCluster.getToken()).adminUrl(serviceUrl).build();
this.createPulsarProcess(pulsarInfo, pulsarClusterInfo);
}
PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType());
try {
this.createPulsarProcess(pulsarInfo, pulsarClusterInfo);
} catch (Exception e) {
log.error("create pulsar resource error for groupId={}", groupId, e);
throw new WorkflowListenerException("create pulsar resource error for groupId=" + groupId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -73,8 +74,8 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
final String streamId = streamInfo.getInlongStreamId();
final String namespace = groupInfo.getMqResource();
final String topic = streamInfo.getMqResource();
PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
// Query data sink info based on groupId and streamId
List<String> sinkTypeList = sinkService.getSinkTypeList(groupId, streamId);
if (sinkTypeList == null || sinkTypeList.size() == 0) {
Expand All @@ -87,28 +88,26 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
topicBean.setTenant(tenant);
topicBean.setNamespace(namespace);
topicBean.setTopicName(topic);
List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);

// Create a subscription in the Pulsar cluster (cross-region), you need to ensure that the Topic exists
for (String cluster : pulsarClusters) {
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
.token(globalCluster.getToken()).adminUrl(serviceUrl).build();
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
if (!exist) {
String fullTopic = tenant + "/" + namespace + "/" + topic;
log.error("topic={} not exists in {}", fullTopic, pulsarAdmin.getServiceUrl());
throw new BusinessException("topic=" + fullTopic + " not exists in " + serviceUrl);
}
// Create a subscription in the Pulsar cluster, you need to ensure that the Topic exists
try {
boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
if (!exist) {
String fullTopic = tenant + "/" + namespace + "/" + topic;
String serviceUrl = pulsarClusterInfo.getAdminUrl();
log.error("topic={} not exists in {}", fullTopic, serviceUrl);
throw new BusinessException("topic=" + fullTopic + " not exists in " + serviceUrl);
}

// Consumer naming rules: sortAppName_topicName_consumer_group
String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
// Consumer naming rules: sortAppName_topicName_consumer_group
String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);

// Insert the consumption data into the consumption table
consumptionService.saveSortConsumption(groupInfo, topic, subscription);
}
// Insert the consumption data into the consumption table
consumptionService.saveSortConsumption(groupInfo, topic, subscription);
} catch (PulsarAdminException | BusinessException e) {
log.error("create pulsar subscription error for groupId={}, streamId={}", groupId, streamId, e);
throw new WorkflowListenerException("create pulsar subscription error, reason: " + e.getMessage());
}
} catch (Exception e) {
log.error("create pulsar subscription error for groupId={}, streamId={}", groupId, streamId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

/**
* Create task listener for Pulsar Topic
*/
Expand Down Expand Up @@ -68,16 +66,10 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
log.info("begin to create pulsar topic for groupId={}, streamId={}", groupId, streamId);

InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType());
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
for (String cluster : pulsarClusters) {
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
.token(globalCluster.getToken()).adminUrl(serviceUrl).build();
String pulsarTopic = streamInfo.getMqResource();
this.createTopic(pulsarInfo, pulsarTopic, pulsarClusterInfo);
}
PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType());
try {
String pulsarTopic = streamInfo.getMqResource();
this.createTopic(pulsarInfo, pulsarTopic, pulsarClusterInfo);
} catch (Exception e) {
log.error("create pulsar topic error for groupId={}, streamId={}", groupId, streamId, e);
throw new WorkflowListenerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,44 +111,35 @@ private void updateConsumerInfo(Integer consumptionId, String consumerGroup) {
}

/**
* Create Pulsar consumption information, including cross-regional cycle creation of consumption groups
* Create Pulsar consumption information
*/
private void createPulsarTopicMessage(ConsumptionEntity entity) {
String groupId = entity.getInlongGroupId();
InlongGroupInfo groupInfo = groupService.get(groupId);
Preconditions.checkNotNull(groupInfo, "inlong group not found for groupId=" + groupId);
String mqResource = groupInfo.getMqResource();
Preconditions.checkNotNull(mqResource, "mq resource cannot empty for groupId=" + groupId);
PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(entity.getMqType());
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(entity.getMqType());
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
PulsarTopicBean topicMessage = new PulsarTopicBean();
String tenant = clusterBean.getDefaultTenant();
topicMessage.setTenant(tenant);
topicMessage.setNamespace(mqResource);

// If cross-regional replication is started, each cluster needs to create consumer groups in cycles
String consumerGroup = entity.getConsumerGroup();
List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
List<String> topics = Arrays.asList(entity.getTopic().split(","));
this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, clusters, topics, globalCluster);
this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, topics);
} catch (Exception e) {
log.error("create pulsar topic failed", e);
throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: "
+ e.getMessage());
}
}

private void createPulsarSubscription(PulsarAdmin globalPulsarAdmin, String subscription, PulsarTopicBean topicBean,
List<String> clusters, List<String> topics, PulsarClusterInfo globalCluster) {
private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
List<String> topics) {
try {
for (String cluster : clusters) {
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
.token(globalCluster.getToken()).adminUrl(serviceUrl).build();
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
pulsarMqOptService.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
}
}
pulsarMqOptService.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
} catch (Exception e) {
log.error("create pulsar consumer group failed", e);
throw new WorkflowListenerException("failed to create pulsar consumer group");
Expand Down