Skip to content

Commit 6d1c80b

Browse files
ciscozhouvernedeng
authored andcommitted
[INLONG-4443][Manager] Remove some deprecated classes (apache#4444)
1 parent b5cf7d5 commit 6d1c80b

File tree

11 files changed

+59
-228
lines changed

11 files changed

+59
-228
lines changed

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java

-50
This file was deleted.

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorDetailTaskEntityStatus.java

-47
This file was deleted.

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskConstant.java

-28
This file was deleted.

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskReturnCode.java

-46
This file was deleted.

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/IcebergPartition.java inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergPartition.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.inlong.manager.common.enums;
18+
package org.apache.inlong.manager.common.pojo.sink.iceberg;
1919

2020
import org.apache.inlong.manager.common.util.Preconditions;
2121

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/IcebergType.java inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergType.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.inlong.manager.common.enums;
18+
package org.apache.inlong.manager.common.pojo.sink.iceberg;
1919

2020
import lombok.Getter;
2121

@@ -40,7 +40,7 @@ public enum IcebergType {
4040
BINARY("binary");
4141

4242
@Getter
43-
private String type;
43+
private final String type;
4444

4545
IcebergType(String type) {
4646
this.type = type;

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.github.pagehelper.PageInfo;
2323
import lombok.extern.slf4j.Slf4j;
2424
import org.apache.inlong.manager.common.beans.ClusterBean;
25-
import org.apache.inlong.manager.common.enums.Constant;
2625
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
2726
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
2827
import org.apache.inlong.manager.common.enums.GlobalConstants;
@@ -73,6 +72,10 @@
7372
@Service
7473
public class ConsumptionServiceImpl implements ConsumptionService {
7574

75+
private static final String PREFIX_DLQ = "dlq"; // prefix of the Topic of the dead letter queue
76+
77+
private static final String PREFIX_RLQ = "rlq"; // prefix of the Topic of the retry letter queue
78+
7679
@Autowired
7780
private ClusterBean clusterBean;
7881
@Autowired
@@ -183,7 +186,7 @@ private void savePulsarInfo(ConsumptionMqExtBase mqExtBase, ConsumptionEntity en
183186
// when closing, delete the related configuration
184187
String groupId = entity.getInlongGroupId();
185188
if (dlqEnable) {
186-
String dlqTopic = Constant.PREFIX_DLQ + "_" + pulsarInfo.getDeadLetterTopic();
189+
String dlqTopic = PREFIX_DLQ + "_" + pulsarInfo.getDeadLetterTopic();
187190
Boolean exist = streamService.exist(groupId, dlqTopic);
188191
if (exist) {
189192
throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_DUPLICATED);
@@ -193,7 +196,7 @@ private void savePulsarInfo(ConsumptionMqExtBase mqExtBase, ConsumptionEntity en
193196
pulsarInfo.setDeadLetterTopic(null);
194197
}
195198
if (rlqEnable) {
196-
String rlqTopic = Constant.PREFIX_RLQ + "_" + pulsarInfo.getRetryLetterTopic();
199+
String rlqTopic = PREFIX_RLQ + "_" + pulsarInfo.getRetryLetterTopic();
197200
Boolean exist = streamService.exist(groupId, rlqTopic);
198201
if (exist) {
199202
throw new BusinessException(ErrorCodeEnum.PULSAR_RLQ_DUPLICATED);
@@ -267,7 +270,7 @@ public Boolean update(ConsumptionInfo info, String operator) {
267270
streamService.logicDeleteDlqOrRlq(groupId, dlqNameOld, operator);
268271
} else if (!Objects.equals(dlqNameNew, dlqNameOld)) {
269272
pulsarEntity.setIsDlq(1);
270-
String topic = Constant.PREFIX_DLQ + "_" + dlqNameNew;
273+
String topic = PREFIX_DLQ + "_" + dlqNameNew;
271274
topic = topic.toLowerCase(Locale.ROOT);
272275
pulsarEntity.setDeadLetterTopic(topic);
273276
streamService.insertDlqOrRlq(groupId, topic, operator);
@@ -281,7 +284,7 @@ public Boolean update(ConsumptionInfo info, String operator) {
281284
streamService.logicDeleteDlqOrRlq(groupId, rlqNameOld, operator);
282285
} else if (!Objects.equals(rlqNameNew, pulsarEntity.getRetryLetterTopic())) {
283286
pulsarEntity.setIsRlq(1);
284-
String topic = Constant.PREFIX_RLQ + "_" + rlqNameNew;
287+
String topic = PREFIX_RLQ + "_" + rlqNameNew;
285288
topic = topic.toLowerCase(Locale.ROOT);
286289
pulsarEntity.setRetryLetterTopic(topic);
287290
streamService.insertDlqOrRlq(groupId, topic, operator);

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DBCollectorTaskServiceImpl.java

+29-35
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818
package org.apache.inlong.manager.service.core.impl;
1919

2020
import lombok.extern.slf4j.Slf4j;
21-
import org.apache.inlong.manager.common.enums.DBCollectorDetailTaskEntityStatus;
22-
import org.apache.inlong.manager.common.enums.DBCollectorTaskConstant;
23-
import org.apache.inlong.manager.common.enums.DBCollectorTaskReturnCode;
2421
import org.apache.inlong.manager.common.pojo.dbcollector.DBCollectorReportTaskRequest;
2522
import org.apache.inlong.manager.common.pojo.dbcollector.DBCollectorTaskInfo;
2623
import org.apache.inlong.manager.common.pojo.dbcollector.DBCollectorTaskRequest;
@@ -40,31 +37,34 @@
4037
public class DBCollectorTaskServiceImpl implements DBCollectorTaskService {
4138

4239
private static final Logger LOGGER = LoggerFactory.getLogger(DBCollectorTaskServiceImpl.class);
40+
private static final String INTERFACE_VERSION = "1.0";
41+
42+
private static final int INIT = 0;
43+
private static final int DISPATCHED = 1;
44+
private static final int DONE = 2;
45+
private static final int FAILED = 3;
46+
47+
private static final int RETURN_SUCC = 0;
48+
private static final int RETURN_EMPTY = 1;
49+
private static final int RETURN_INVALID_VERSION = 2;
50+
private static final int RETURN_INVALID_STATE = 3;
4351

4452
@Autowired
4553
private DBCollectorDetailTaskMapper detailTaskMapper;
4654

4755
@Override
4856
public DBCollectorTaskInfo getTask(DBCollectorTaskRequest req) {
4957
LOGGER.debug("db collector task request: {}", req);
50-
if (!DBCollectorTaskConstant.INTERFACE_VERSION.equals(req.getVersion())) {
51-
return DBCollectorTaskInfo.builder()
52-
.version(DBCollectorTaskConstant.INTERFACE_VERSION)
53-
.returnCode(DBCollectorTaskReturnCode.INVALID_VERSION.getCode()).build();
58+
if (!INTERFACE_VERSION.equals(req.getVersion())) {
59+
return DBCollectorTaskInfo.builder().version(INTERFACE_VERSION).returnCode(RETURN_INVALID_VERSION).build();
5460
}
55-
DBCollectorDetailTaskEntity entity = detailTaskMapper
56-
.selectOneByState(DBCollectorDetailTaskEntityStatus.INIT.getCode());
61+
DBCollectorDetailTaskEntity entity = detailTaskMapper.selectOneByState(INIT);
5762
if (entity == null) {
58-
return DBCollectorTaskInfo.builder()
59-
.version(DBCollectorTaskConstant.INTERFACE_VERSION)
60-
.returnCode(DBCollectorTaskReturnCode.EMPTY.getCode()).build();
63+
return DBCollectorTaskInfo.builder().version(INTERFACE_VERSION).returnCode(RETURN_EMPTY).build();
6164
}
62-
int ret = detailTaskMapper.changeState(entity.getId(), 0, DBCollectorDetailTaskEntityStatus.INIT.getCode(),
63-
DBCollectorDetailTaskEntityStatus.DISPATCHED.getCode());
65+
int ret = detailTaskMapper.changeState(entity.getId(), 0, INIT, DISPATCHED);
6466
if (ret == 0) {
65-
return DBCollectorTaskInfo.builder()
66-
.version(DBCollectorTaskConstant.INTERFACE_VERSION)
67-
.returnCode(DBCollectorTaskReturnCode.EMPTY.getCode()).build();
67+
return DBCollectorTaskInfo.builder().version(INTERFACE_VERSION).returnCode(RETURN_EMPTY).build();
6868
} else {
6969
DBCollectorTaskInfo.TaskInfo task = new DBCollectorTaskInfo.TaskInfo();
7070
task.setId(entity.getId());
@@ -82,34 +82,28 @@ public DBCollectorTaskInfo getTask(DBCollectorTaskRequest req) {
8282
task.setRetryTimes(entity.getRetryTimes());
8383
task.setInlongGroupId(entity.getGroupId());
8484
task.setInlongStreamId(entity.getStreamId());
85-
return DBCollectorTaskInfo.builder()
86-
.version(DBCollectorTaskConstant.INTERFACE_VERSION)
87-
.returnCode(DBCollectorTaskReturnCode.SUCC.getCode())
88-
.data(task).build();
85+
return DBCollectorTaskInfo.builder().version(INTERFACE_VERSION).returnCode(RETURN_SUCC).data(task).build();
8986
}
9087
}
9188

9289
@Override
9390
public Integer reportTask(DBCollectorReportTaskRequest req) {
94-
if (!Objects.equals(req.getVersion(), DBCollectorTaskConstant.INTERFACE_VERSION)) {
95-
return DBCollectorTaskReturnCode.INVALID_VERSION.getCode();
91+
if (!Objects.equals(req.getVersion(), INTERFACE_VERSION)) {
92+
return RETURN_INVALID_VERSION;
9693
}
97-
DBCollectorDetailTaskEntity entity = detailTaskMapper
98-
.selectByTaskId(req.getId());
94+
DBCollectorDetailTaskEntity entity = detailTaskMapper.selectByTaskId(req.getId());
9995
if (entity == null) {
100-
return DBCollectorTaskReturnCode.EMPTY.getCode();
96+
return RETURN_EMPTY;
10197
}
102-
if (req.getState() != DBCollectorDetailTaskEntityStatus.DISPATCHED.getCode()
103-
&& req.getState() != DBCollectorDetailTaskEntityStatus.DONE.getCode()
104-
&& req.getState() != DBCollectorDetailTaskEntityStatus.FAILED.getCode()) {
105-
return DBCollectorTaskReturnCode.INVALID_STATE.getCode();
98+
if (req.getState() != DISPATCHED
99+
&& req.getState() != DONE
100+
&& req.getState() != FAILED) {
101+
return RETURN_INVALID_STATE;
106102
}
107-
int ret = detailTaskMapper
108-
.changeState(entity.getId(), req.getOffset(), DBCollectorDetailTaskEntityStatus.DISPATCHED.getCode(),
109-
req.getState());
103+
int ret = detailTaskMapper.changeState(entity.getId(), req.getOffset(), DISPATCHED, req.getState());
110104
if (ret == 0) {
111-
return DBCollectorTaskReturnCode.EMPTY.getCode();
105+
return RETURN_EMPTY;
112106
}
113-
return DBCollectorTaskReturnCode.SUCC.getCode();
107+
return RETURN_SUCC;
114108
}
115109
}

0 commit comments

Comments
 (0)