Skip to content

Commit 9a16332

Browse files
committed
[INLONG-1858][Manager] Fix hive add column
1 parent 95f9dbc commit 9a16332

File tree

4 files changed

+17
-9
lines changed

4 files changed

+17
-9
lines changed

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public enum FieldType {
4242
public static FieldType forName(String name) {
4343
Preconditions.checkNotNull(name, "FieldType should not be null");
4444
for (FieldType value : values()) {
45-
if (value.toString().equals(name)) {
45+
if (value.toString().equalsIgnoreCase(name)) {
4646
return value;
4747
}
4848
}

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,12 @@ public boolean startProcess(String groupId, String streamId, String operator, bo
8686
throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
8787
}
8888
StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
89-
if (status == StreamStatus.CONFIG_ING || status == StreamStatus.CONFIG_SUCCESSFUL) {
89+
if (status == StreamStatus.CONFIG_ING) {
9090
log.warn("GroupId={}, StreamId={} is already in {}", groupId, streamId, status);
9191
return true;
9292
}
93-
if (status != StreamStatus.NEW || status != StreamStatus.CONFIG_FAILED) {
93+
if (status != StreamStatus.NEW && status != StreamStatus.CONFIG_FAILED
94+
&& status != StreamStatus.CONFIG_SUCCESSFUL) {
9495
throw new BusinessException(
9596
String.format("GroupId=%s, StreamId=%s, status=%s not correct for stream start", groupId, streamId,
9697
status));

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.lang3.StringUtils;
2727
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
2828
import org.apache.inlong.manager.common.enums.GlobalConstants;
29+
import org.apache.inlong.manager.common.enums.GroupStatus;
2930
import org.apache.inlong.manager.common.enums.SinkStatus;
3031
import org.apache.inlong.manager.common.enums.SinkType;
3132
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -41,10 +42,12 @@
4142
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
4243
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
4344
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
45+
import org.apache.inlong.manager.service.core.operation.InlongStreamProcessOperation;
4446
import org.apache.inlong.manager.service.group.GroupCheckService;
4547
import org.slf4j.Logger;
4648
import org.slf4j.LoggerFactory;
4749
import org.springframework.beans.factory.annotation.Autowired;
50+
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
4851
import org.springframework.stereotype.Service;
4952
import org.springframework.transaction.annotation.Transactional;
5053

@@ -62,7 +65,6 @@
6265
public class StreamSinkServiceImpl implements StreamSinkService {
6366

6467
private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class);
65-
6668
@Autowired
6769
private SinkOperationFactory operationFactory;
6870
@Autowired
@@ -71,6 +73,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
7173
private StreamSinkEntityMapper sinkMapper;
7274
@Autowired
7375
private StreamSinkFieldEntityMapper sinkFieldMapper;
76+
@Autowired
77+
private AutowireCapableBeanFactory autowireCapableBeanFactory;
7478

7579
@Override
7680
@Transactional(rollbackFor = Throwable.class)
@@ -192,7 +196,6 @@ public Boolean update(SinkRequest request, String operator) {
192196
String streamId = request.getInlongStreamId();
193197
String sinkName = request.getSinkName();
194198
String sinkType = request.getSinkType();
195-
InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
196199

197200
// Check whether the sink name exists with the same groupId and streamId
198201
List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
@@ -214,9 +217,13 @@ public Boolean update(SinkRequest request, String operator) {
214217

215218
// The inlong group status is [Configuration successful], then asynchronously initiate
216219
// the [Single inlong stream resource creation] workflow
217-
// if (EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
218-
// executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
219-
// }
220+
InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
221+
if (GroupStatus.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
222+
// To work around the circular reference check we manually instantiate and wire
223+
InlongStreamProcessOperation streamProcessOperation = new InlongStreamProcessOperation();
224+
autowireCapableBeanFactory.autowireBean(streamProcessOperation);
225+
streamProcessOperation.startProcess(groupId, streamId, operator, true);
226+
}
220227
LOGGER.info("success to update sink info: {}", request);
221228
return true;
222229
}

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ public static ElasticsearchLoadNode createLoadNode(ElasticsearchSink elasticsear
347347
final String host = elasticsearchSink.getHost();
348348
final String documentType = elasticsearchSink.getDocumentType();
349349
final String promaryKey = elasticsearchSink.getPrimaryKey();
350-
final int version = elasticsearchSink.getVersion().intValue();
350+
final int version = elasticsearchSink.getVersion().intValue();
351351

352352
final List<SinkField> fieldList = elasticsearchSink.getFieldList();
353353
List<FieldInfo> fields = fieldList.stream()

0 commit comments

Comments
 (0)