Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-2353] Tube manager cluster adds support for multi-master conf… #2356

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
@NoArgsConstructor
@AllArgsConstructor
public class TubeMQResult {
public static final int ERR_CODE = -1;

private String errMsg = "";
private int errCode = 0;
private boolean result = true;
Expand Down Expand Up @@ -61,4 +63,7 @@ public static TubeMQResult successResult(Object data) {
.result(true).data(data).build();
}

public boolean isError() {
return ERR_CODE == errCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import static org.apache.inlong.tubemq.manager.service.TubeConst.SUCCESS_CODE;

@RestController
@RequestMapping(path = "/v1/cluster")
@Slf4j
Expand Down Expand Up @@ -96,9 +98,12 @@ public TubeMQResult addNewCluster(AddClusterReq req) {
if (!req.legal()) {
return TubeMQResult.errorResult(TubeMQErrorConst.PARAM_ILLEGAL);
}
TubeMQResult checkResult = masterService.checkMasterNodeStatus(req.getMasterIp(), req.getMasterWebPort());
if (checkResult.getErrCode() != TubeConst.SUCCESS_CODE) {
return TubeMQResult.errorResult("please check master ip and webPort");
List<String> masterIps = req.getMasterIps();
for (String masterIp : masterIps) {
TubeMQResult checkResult = masterService.checkMasterNodeStatus(masterIp, req.getMasterWebPort());
if (checkResult.getErrCode() != SUCCESS_CODE) {
return TubeMQResult.errorResult("please check master ip and webPort");
}
}
// 2. add cluster and master node
clusterService.addClusterAndMasterNode(req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
package org.apache.inlong.tubemq.manager.controller.cluster.request;

import lombok.Data;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.List;

@Data
public class AddClusterReq {
private String masterIp;
private List<String> masterIps;
private String clusterName;
private Integer masterPort;
private Integer masterWebPort;
private String createUser;
private String token;

public boolean legal() {
return StringUtils.isNotBlank(masterIp) && masterPort != null && StringUtils.isNotBlank(token);
return CollectionUtils.isNotEmpty(masterIps) && masterPort != null && StringUtils.isNotBlank(token);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,15 @@ public void addMasterNode(AddClusterReq req, ClusterEntry clusterEntry) {
if (clusterEntry == null) {
return;
}
MasterEntry masterEntry = new MasterEntry();
masterEntry.setPort(req.getMasterPort());
masterEntry.setClusterId(clusterEntry.getClusterId());
masterEntry.setWebPort(req.getMasterWebPort());
masterEntry.setIp(req.getMasterIp());
masterEntry.setToken(req.getToken());
nodeService.addNode(masterEntry);
for (String masterIp : req.getMasterIps()) {
MasterEntry masterEntry = new MasterEntry();
masterEntry.setPort(req.getMasterPort());
masterEntry.setClusterId(clusterEntry.getClusterId());
masterEntry.setWebPort(req.getMasterWebPort());
masterEntry.setIp(masterIp);
masterEntry.setToken(req.getToken());
nodeService.addNode(masterEntry);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
Expand Down Expand Up @@ -128,14 +130,32 @@ public MasterEntry getMasterNode(Long clusterId) {
if (clusterId == null) {
return null;
}
MasterEntry master = masterRepository
.findMasterEntryByClusterIdEquals(
List<MasterEntry> masters = getMasterNodes(clusterId);

for (MasterEntry masterEntry : masters) {
if (!checkMasterNodeStatus(masterEntry.getIp(),
masterEntry.getWebPort()).isError()) {
return masterEntry;
}
}

throw new RuntimeException("cluster id " +
clusterId + "no master node, please check");
}

@Override
public List<MasterEntry> getMasterNodes(Long clusterId) {
if (clusterId == null) {
return null;
}
List<MasterEntry> masters = masterRepository
.findMasterEntriesByClusterIdEquals(
clusterId);
if (master == null) {
throw new RuntimeException("cluster id "
+ clusterId + "no master node, please check");
if (CollectionUtils.isEmpty(masters)) {
throw new RuntimeException("cluster id " +
clusterId + "no master node, please check");
}
return master;
return masters;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.tubemq.manager.service.interfaces;

import java.util.List;
import java.util.Map;

import org.apache.inlong.tubemq.manager.controller.TubeMQResult;
Expand Down Expand Up @@ -67,6 +68,13 @@ public interface MasterService {
*/
MasterEntry getMasterNode(Long clusterId);

/**
* get master node in one cluster
* @param clusterId
* @return
*/
List<MasterEntry> getMasterNodes(Long clusterId);

/**
* use queryBody to generate queryUrl for master query
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.RequestBuilder;

import java.util.Collections;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
Expand Down Expand Up @@ -157,7 +159,7 @@ public void testAddCluster() throws Exception {

AddClusterReq req = new AddClusterReq();
req.setClusterName("test");
req.setMasterIp("127.0.0.1");
req.setMasterIps(Collections.singletonList("127.0.0.1"));
req.setMasterWebPort(8080);
req.setMasterPort(8089);
req.setToken("abc");
Expand Down