diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java index 62ad33d0f98..96763a4adde 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java @@ -83,8 +83,8 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc log.warn("inlong stream is empty for groupId={}, skip to create pulsar subscription", groupId); return ListenerResult.success(); } - PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType()); - try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) { + PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType()); + try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) { String tenant = clusterBean.getDefaultTenant(); String namespace = groupInfo.getMqResource(); @@ -94,29 +94,26 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc topicBean.setNamespace(namespace); String topic = streamEntity.getMqResource(); topicBean.setTopicName(topic); - List pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin); - - // Create a subscription in the Pulsar cluster (cross-region), you need to ensure that the Topic exists - for (String cluster : pulsarClusters) { - String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster); - PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder() - .token(globalCluster.getToken()).adminUrl(serviceUrl).build(); - try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) { - boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic); - - if (!exist) { - String topicFull = tenant + "/" + namespace + "/" + topic; - log.error("topic={} not exists in {}", topicFull, serviceUrl); - throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + serviceUrl); - } - - // Consumer naming rules: sortAppName_topicName_consumer_group - String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group"; - pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription); - - // Insert the consumption data into the consumption table - consumptionService.saveSortConsumption(groupInfo, topic, subscription); + + // Create a subscription in the Pulsar cluster you need to ensure that the Topic exists + try { + boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic); + if (!exist) { + String topicFull = tenant + "/" + namespace + "/" + topic; + String serviceUrl = pulsarClusterInfo.getAdminUrl(); + log.error("topic={} not exists in {}", topicFull, serviceUrl); + throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + serviceUrl); } + + // Consumer naming rules: sortAppName_topicName_consumer_group + String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group"; + pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription); + + // Insert the consumption data into the consumption table + consumptionService.saveSortConsumption(groupInfo, topic, subscription); + } catch (Exception e) { + log.error("create pulsar subscription error for groupId={}", groupId); + throw new WorkflowListenerException("create pulsar subscription error: " + e.getMessage()); } } } catch (Exception e) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java index 3cf4ee74d95..168ea87e922 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java @@ -76,15 +76,9 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc throw new WorkflowListenerException("inlong group or pulsar cluster not found for groupId=" + groupId); } InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo; - PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType()); - try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) { - List pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin); - for (String cluster : pulsarClusters) { - String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster); - PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder() - .token(globalCluster.getToken()).adminUrl(serviceUrl).build(); - this.createPulsarProcess(pulsarInfo, pulsarClusterInfo); - } + PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType()); + try { + this.createPulsarProcess(pulsarInfo, pulsarClusterInfo); } catch (Exception e) { log.error("create pulsar resource error for groupId={}", groupId, e); throw new WorkflowListenerException("create pulsar resource error for groupId=" + groupId); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java index 4bea66e867d..f5c9c9dc7ea 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java @@ -73,8 +73,8 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc final String streamId = streamInfo.getInlongStreamId(); final String namespace = groupInfo.getMqResource(); final String topic = streamInfo.getMqResource(); - PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType()); - try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) { + PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType()); + try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) { // Query data sink info based on groupId and streamId List sinkTypeList = sinkService.getSinkTypeList(groupId, streamId); if (sinkTypeList == null || sinkTypeList.size() == 0) { @@ -87,28 +87,26 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc topicBean.setTenant(tenant); topicBean.setNamespace(namespace); topicBean.setTopicName(topic); - List pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin); - // Create a subscription in the Pulsar cluster (cross-region), you need to ensure that the Topic exists - for (String cluster : pulsarClusters) { - String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster); - PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder() - .token(globalCluster.getToken()).adminUrl(serviceUrl).build(); - try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) { - boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic); - if (!exist) { - String fullTopic = tenant + "/" + namespace + "/" + topic; - log.error("topic={} not exists in {}", fullTopic, pulsarAdmin.getServiceUrl()); - throw new BusinessException("topic=" + fullTopic + " not exists in " + serviceUrl); - } + // Create a subscription in the Pulsar cluster, you need to ensure that the Topic exists + try { + boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic); + if (!exist) { + String fullTopic = tenant + "/" + namespace + "/" + topic; + String serviceUrl = pulsarClusterInfo.getAdminUrl(); + log.error("topic={} not exists in {}", fullTopic, serviceUrl); + throw new BusinessException("topic=" + fullTopic + " not exists in " + serviceUrl); + } - // Consumer naming rules: sortAppName_topicName_consumer_group - String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group"; - pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription); + // Consumer naming rules: sortAppName_topicName_consumer_group + String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group"; + pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription); - // Insert the consumption data into the consumption table - consumptionService.saveSortConsumption(groupInfo, topic, subscription); - } + // Insert the consumption data into the consumption table + consumptionService.saveSortConsumption(groupInfo, topic, subscription); + } catch (Exception e) { + log.error("create pulsar subscription error for groupId={}, streamId={}", groupId, streamId, e); + throw new WorkflowListenerException("create pulsar subscription error, reason: " + e.getMessage()); } } catch (Exception e) { log.error("create pulsar subscription error for groupId={}, streamId={}", groupId, streamId, e); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java index ffb859c0fe6..213127961bd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java @@ -37,8 +37,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.List; - /** * Create task listener for Pulsar Topic */ @@ -68,16 +66,10 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc log.info("begin to create pulsar topic for groupId={}, streamId={}", groupId, streamId); InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo; - PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType()); - try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) { - List pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin); - for (String cluster : pulsarClusters) { - String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster); - PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder() - .token(globalCluster.getToken()).adminUrl(serviceUrl).build(); - String pulsarTopic = streamInfo.getMqResource(); - this.createTopic(pulsarInfo, pulsarTopic, pulsarClusterInfo); - } + PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType()); + try { + String pulsarTopic = streamInfo.getMqResource(); + this.createTopic(pulsarInfo, pulsarTopic, pulsarClusterInfo); } catch (Exception e) { log.error("create pulsar topic error for groupId={}, streamId={}", groupId, streamId, e); throw new WorkflowListenerException( diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java index 0df4b236103..3ae77c41e73 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java @@ -111,7 +111,7 @@ private void updateConsumerInfo(Integer consumptionId, String consumerGroup) { } /** - * Create Pulsar consumption information, including cross-regional cycle creation of consumption groups + * Create Pulsar consumption information */ private void createPulsarTopicMessage(ConsumptionEntity entity) { String groupId = entity.getInlongGroupId(); @@ -119,18 +119,16 @@ private void createPulsarTopicMessage(ConsumptionEntity entity) { Preconditions.checkNotNull(groupInfo, "inlong group not found for groupId=" + groupId); String mqResource = groupInfo.getMqResource(); Preconditions.checkNotNull(mqResource, "mq resource cannot empty for groupId=" + groupId); - PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(entity.getMqType()); - try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) { + PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(entity.getMqType()); + try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) { PulsarTopicBean topicMessage = new PulsarTopicBean(); String tenant = clusterBean.getDefaultTenant(); topicMessage.setTenant(tenant); topicMessage.setNamespace(mqResource); - // If cross-regional replication is started, each cluster needs to create consumer groups in cycles String consumerGroup = entity.getConsumerGroup(); - List clusters = PulsarUtils.getPulsarClusters(pulsarAdmin); List topics = Arrays.asList(entity.getTopic().split(",")); - this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, clusters, topics, globalCluster); + this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, topics); } catch (Exception e) { log.error("create pulsar topic failed", e); throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: " @@ -138,17 +136,10 @@ private void createPulsarTopicMessage(ConsumptionEntity entity) { } } - private void createPulsarSubscription(PulsarAdmin globalPulsarAdmin, String subscription, PulsarTopicBean topicBean, - List clusters, List topics, PulsarClusterInfo globalCluster) { + private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean, + List topics) { try { - for (String cluster : clusters) { - String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster); - PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder() - .token(globalCluster.getToken()).adminUrl(serviceUrl).build(); - try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) { - pulsarMqOptService.createSubscriptions(pulsarAdmin, subscription, topicBean, topics); - } - } + pulsarMqOptService.createSubscriptions(pulsarAdmin, subscription, topicBean, topics); } catch (Exception e) { log.error("create pulsar consumer group failed", e); throw new WorkflowListenerException("failed to create pulsar consumer group");