diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/TubeMQResult.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/TubeMQResult.java index 3cfa0eec56a..3b66026c116 100644 --- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/TubeMQResult.java +++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/TubeMQResult.java @@ -29,6 +29,8 @@ @NoArgsConstructor @AllArgsConstructor public class TubeMQResult { + public static final int ERR_CODE = -1; + private String errMsg = ""; private int errCode = 0; private boolean result = true; @@ -61,4 +63,7 @@ public static TubeMQResult successResult(Object data) { .result(true).data(data).build(); } + public boolean isError() { + return ERR_CODE == errCode; + } } diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java index 2c2b755002f..80f192fc3d2 100644 --- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java +++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java @@ -47,6 +47,8 @@ import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; +import static org.apache.inlong.tubemq.manager.service.TubeConst.SUCCESS_CODE; + @RestController @RequestMapping(path = "/v1/cluster") @Slf4j @@ -96,9 +98,12 @@ public TubeMQResult addNewCluster(AddClusterReq req) { if (!req.legal()) { return TubeMQResult.errorResult(TubeMQErrorConst.PARAM_ILLEGAL); } - TubeMQResult checkResult = masterService.checkMasterNodeStatus(req.getMasterIp(), req.getMasterWebPort()); - if (checkResult.getErrCode() != TubeConst.SUCCESS_CODE) { - return TubeMQResult.errorResult("please check master ip and webPort"); + List masterIps = req.getMasterIps(); + for (String masterIp : masterIps) { + TubeMQResult checkResult = masterService.checkMasterNodeStatus(masterIp, req.getMasterWebPort()); + if (checkResult.getErrCode() != SUCCESS_CODE) { + return TubeMQResult.errorResult("please check master ip and webPort"); + } } // 2. add cluster and master node clusterService.addClusterAndMasterNode(req); diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java index bef03dc81d7..2eafbac0c4f 100644 --- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java +++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java @@ -18,11 +18,14 @@ package org.apache.inlong.tubemq.manager.controller.cluster.request; import lombok.Data; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import java.util.List; + @Data public class AddClusterReq { - private String masterIp; + private List masterIps; private String clusterName; private Integer masterPort; private Integer masterWebPort; @@ -30,6 +33,6 @@ public class AddClusterReq { private String token; public boolean legal() { - return StringUtils.isNotBlank(masterIp) && masterPort != null && StringUtils.isNotBlank(token); + return CollectionUtils.isNotEmpty(masterIps) && masterPort != null && StringUtils.isNotBlank(token); } } diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java index 41b701679cf..83f22098ae2 100644 --- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java +++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java @@ -95,13 +95,15 @@ public void addMasterNode(AddClusterReq req, ClusterEntry clusterEntry) { if (clusterEntry == null) { return; } - MasterEntry masterEntry = new MasterEntry(); - masterEntry.setPort(req.getMasterPort()); - masterEntry.setClusterId(clusterEntry.getClusterId()); - masterEntry.setWebPort(req.getMasterWebPort()); - masterEntry.setIp(req.getMasterIp()); - masterEntry.setToken(req.getToken()); - nodeService.addNode(masterEntry); + for (String masterIp : req.getMasterIps()) { + MasterEntry masterEntry = new MasterEntry(); + masterEntry.setPort(req.getMasterPort()); + masterEntry.setClusterId(clusterEntry.getClusterId()); + masterEntry.setWebPort(req.getMasterWebPort()); + masterEntry.setIp(masterIp); + masterEntry.setToken(req.getToken()); + nodeService.addNode(masterEntry); + } } } diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java index ba3635cb5e0..4a316bbc38c 100644 --- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java +++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java @@ -23,9 +23,11 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; @@ -128,14 +130,32 @@ public MasterEntry getMasterNode(Long clusterId) { if (clusterId == null) { return null; } - MasterEntry master = masterRepository - .findMasterEntryByClusterIdEquals( + List masters = getMasterNodes(clusterId); + + for (MasterEntry masterEntry : masters) { + if (!checkMasterNodeStatus(masterEntry.getIp(), + masterEntry.getWebPort()).isError()) { + return masterEntry; + } + } + + throw new RuntimeException("cluster id " + + clusterId + "no master node, please check"); + } + + @Override + public List getMasterNodes(Long clusterId) { + if (clusterId == null) { + return null; + } + List masters = masterRepository + .findMasterEntriesByClusterIdEquals( clusterId); - if (master == null) { - throw new RuntimeException("cluster id " - + clusterId + "no master node, please check"); + if (CollectionUtils.isEmpty(masters)) { + throw new RuntimeException("cluster id " + + clusterId + "no master node, please check"); } - return master; + return masters; } @Override diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java index 33cbf22f2dd..a7e57e500d1 100644 --- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java +++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java @@ -17,6 +17,7 @@ package org.apache.inlong.tubemq.manager.service.interfaces; +import java.util.List; import java.util.Map; import org.apache.inlong.tubemq.manager.controller.TubeMQResult; @@ -67,6 +68,13 @@ public interface MasterService { */ MasterEntry getMasterNode(Long clusterId); + /** + * get master node in one cluster + * @param clusterId + * @return + */ + List getMasterNodes(Long clusterId); + /** * use queryBody to generate queryUrl for master query * diff --git a/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java b/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java index 9ac758379e3..608dd56cb72 100644 --- a/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java +++ b/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java @@ -46,6 +46,8 @@ import org.springframework.test.web.servlet.MvcResult; import org.springframework.test.web.servlet.RequestBuilder; +import java.util.Collections; + @Slf4j @RunWith(SpringRunner.class) @SpringBootTest @@ -157,7 +159,7 @@ public void testAddCluster() throws Exception { AddClusterReq req = new AddClusterReq(); req.setClusterName("test"); - req.setMasterIp("127.0.0.1"); + req.setMasterIps(Collections.singletonList("127.0.0.1")); req.setMasterWebPort(8080); req.setMasterPort(8089); req.setToken("abc");