Skip to content

Commit 9183744

Browse files
authored
[INLONG-4468][Manager] Use the user-configured URL instead of the DNS of Pulsar (#4479)
1 parent 262efd9 commit 9183744

File tree

5 files changed

+54
-82
lines changed

5 files changed

+54
-82
lines changed

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java

+21-24
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
8383
log.warn("inlong stream is empty for groupId={}, skip to create pulsar subscription", groupId);
8484
return ListenerResult.success();
8585
}
86-
PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
87-
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
86+
PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
87+
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
8888
String tenant = clusterBean.getDefaultTenant();
8989
String namespace = groupInfo.getMqResource();
9090

@@ -94,29 +94,26 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
9494
topicBean.setNamespace(namespace);
9595
String topic = streamEntity.getMqResource();
9696
topicBean.setTopicName(topic);
97-
List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
98-
99-
// Create a subscription in the Pulsar cluster (cross-region), you need to ensure that the Topic exists
100-
for (String cluster : pulsarClusters) {
101-
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
102-
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
103-
.token(globalCluster.getToken()).adminUrl(serviceUrl).build();
104-
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
105-
boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
106-
107-
if (!exist) {
108-
String topicFull = tenant + "/" + namespace + "/" + topic;
109-
log.error("topic={} not exists in {}", topicFull, serviceUrl);
110-
throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + serviceUrl);
111-
}
112-
113-
// Consumer naming rules: sortAppName_topicName_consumer_group
114-
String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
115-
pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
116-
117-
// Insert the consumption data into the consumption table
118-
consumptionService.saveSortConsumption(groupInfo, topic, subscription);
97+
98+
// Create a subscription in the Pulsar cluster you need to ensure that the Topic exists
99+
try {
100+
boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
101+
if (!exist) {
102+
String topicFull = tenant + "/" + namespace + "/" + topic;
103+
String serviceUrl = pulsarClusterInfo.getAdminUrl();
104+
log.error("topic={} not exists in {}", topicFull, serviceUrl);
105+
throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + serviceUrl);
119106
}
107+
108+
// Consumer naming rules: sortAppName_topicName_consumer_group
109+
String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
110+
pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
111+
112+
// Insert the consumption data into the consumption table
113+
consumptionService.saveSortConsumption(groupInfo, topic, subscription);
114+
} catch (Exception e) {
115+
log.error("create pulsar subscription error for groupId={}", groupId);
116+
throw new WorkflowListenerException("create pulsar subscription error: " + e.getMessage());
120117
}
121118
}
122119
} catch (Exception e) {

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,9 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
7676
throw new WorkflowListenerException("inlong group or pulsar cluster not found for groupId=" + groupId);
7777
}
7878
InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
79-
PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType());
80-
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
81-
List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
82-
for (String cluster : pulsarClusters) {
83-
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
84-
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
85-
.token(globalCluster.getToken()).adminUrl(serviceUrl).build();
86-
this.createPulsarProcess(pulsarInfo, pulsarClusterInfo);
87-
}
79+
PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType());
80+
try {
81+
this.createPulsarProcess(pulsarInfo, pulsarClusterInfo);
8882
} catch (Exception e) {
8983
log.error("create pulsar resource error for groupId={}", groupId, e);
9084
throw new WorkflowListenerException("create pulsar resource error for groupId=" + groupId);

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java

+19-21
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
7373
final String streamId = streamInfo.getInlongStreamId();
7474
final String namespace = groupInfo.getMqResource();
7575
final String topic = streamInfo.getMqResource();
76-
PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
77-
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
76+
PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
77+
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
7878
// Query data sink info based on groupId and streamId
7979
List<String> sinkTypeList = sinkService.getSinkTypeList(groupId, streamId);
8080
if (sinkTypeList == null || sinkTypeList.size() == 0) {
@@ -87,28 +87,26 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
8787
topicBean.setTenant(tenant);
8888
topicBean.setNamespace(namespace);
8989
topicBean.setTopicName(topic);
90-
List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
9190

92-
// Create a subscription in the Pulsar cluster (cross-region), you need to ensure that the Topic exists
93-
for (String cluster : pulsarClusters) {
94-
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
95-
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
96-
.token(globalCluster.getToken()).adminUrl(serviceUrl).build();
97-
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
98-
boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
99-
if (!exist) {
100-
String fullTopic = tenant + "/" + namespace + "/" + topic;
101-
log.error("topic={} not exists in {}", fullTopic, pulsarAdmin.getServiceUrl());
102-
throw new BusinessException("topic=" + fullTopic + " not exists in " + serviceUrl);
103-
}
91+
// Create a subscription in the Pulsar cluster, you need to ensure that the Topic exists
92+
try {
93+
boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
94+
if (!exist) {
95+
String fullTopic = tenant + "/" + namespace + "/" + topic;
96+
String serviceUrl = pulsarClusterInfo.getAdminUrl();
97+
log.error("topic={} not exists in {}", fullTopic, serviceUrl);
98+
throw new BusinessException("topic=" + fullTopic + " not exists in " + serviceUrl);
99+
}
104100

105-
// Consumer naming rules: sortAppName_topicName_consumer_group
106-
String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
107-
pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
101+
// Consumer naming rules: sortAppName_topicName_consumer_group
102+
String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
103+
pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
108104

109-
// Insert the consumption data into the consumption table
110-
consumptionService.saveSortConsumption(groupInfo, topic, subscription);
111-
}
105+
// Insert the consumption data into the consumption table
106+
consumptionService.saveSortConsumption(groupInfo, topic, subscription);
107+
} catch (Exception e) {
108+
log.error("create pulsar subscription error for groupId={}, streamId={}", groupId, streamId, e);
109+
throw new WorkflowListenerException("create pulsar subscription error, reason: " + e.getMessage());
112110
}
113111
} catch (Exception e) {
114112
log.error("create pulsar subscription error for groupId={}, streamId={}", groupId, streamId, e);

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java

+4-12
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import org.springframework.beans.factory.annotation.Autowired;
3838
import org.springframework.stereotype.Component;
3939

40-
import java.util.List;
41-
4240
/**
4341
* Create task listener for Pulsar Topic
4442
*/
@@ -68,16 +66,10 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
6866
log.info("begin to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
6967

7068
InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
71-
PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType());
72-
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
73-
List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
74-
for (String cluster : pulsarClusters) {
75-
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
76-
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
77-
.token(globalCluster.getToken()).adminUrl(serviceUrl).build();
78-
String pulsarTopic = streamInfo.getMqResource();
79-
this.createTopic(pulsarInfo, pulsarTopic, pulsarClusterInfo);
80-
}
69+
PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType());
70+
try {
71+
String pulsarTopic = streamInfo.getMqResource();
72+
this.createTopic(pulsarInfo, pulsarTopic, pulsarClusterInfo);
8173
} catch (Exception e) {
8274
log.error("create pulsar topic error for groupId={}, streamId={}", groupId, streamId, e);
8375
throw new WorkflowListenerException(

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java

+7-16
Original file line numberDiff line numberDiff line change
@@ -111,44 +111,35 @@ private void updateConsumerInfo(Integer consumptionId, String consumerGroup) {
111111
}
112112

113113
/**
114-
* Create Pulsar consumption information, including cross-regional cycle creation of consumption groups
114+
* Create Pulsar consumption information
115115
*/
116116
private void createPulsarTopicMessage(ConsumptionEntity entity) {
117117
String groupId = entity.getInlongGroupId();
118118
InlongGroupInfo groupInfo = groupService.get(groupId);
119119
Preconditions.checkNotNull(groupInfo, "inlong group not found for groupId=" + groupId);
120120
String mqResource = groupInfo.getMqResource();
121121
Preconditions.checkNotNull(mqResource, "mq resource cannot empty for groupId=" + groupId);
122-
PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(entity.getMqType());
123-
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
122+
PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(entity.getMqType());
123+
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
124124
PulsarTopicBean topicMessage = new PulsarTopicBean();
125125
String tenant = clusterBean.getDefaultTenant();
126126
topicMessage.setTenant(tenant);
127127
topicMessage.setNamespace(mqResource);
128128

129-
// If cross-regional replication is started, each cluster needs to create consumer groups in cycles
130129
String consumerGroup = entity.getConsumerGroup();
131-
List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
132130
List<String> topics = Arrays.asList(entity.getTopic().split(","));
133-
this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, clusters, topics, globalCluster);
131+
this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, topics);
134132
} catch (Exception e) {
135133
log.error("create pulsar topic failed", e);
136134
throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: "
137135
+ e.getMessage());
138136
}
139137
}
140138

141-
private void createPulsarSubscription(PulsarAdmin globalPulsarAdmin, String subscription, PulsarTopicBean topicBean,
142-
List<String> clusters, List<String> topics, PulsarClusterInfo globalCluster) {
139+
private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
140+
List<String> topics) {
143141
try {
144-
for (String cluster : clusters) {
145-
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
146-
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
147-
.token(globalCluster.getToken()).adminUrl(serviceUrl).build();
148-
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
149-
pulsarMqOptService.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
150-
}
151-
}
142+
pulsarMqOptService.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
152143
} catch (Exception e) {
153144
log.error("create pulsar consumer group failed", e);
154145
throw new WorkflowListenerException("failed to create pulsar consumer group");

0 commit comments

Comments
 (0)