Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-1858][Manager] Fix hive add column bug #4554

Merged
merged 3 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public enum FieldType {
public static FieldType forName(String name) {
Preconditions.checkNotNull(name, "FieldType should not be null");
for (FieldType value : values()) {
if (value.toString().equals(name)) {
if (value.toString().equalsIgnoreCase(name)) {
return value;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ public boolean startProcess(String groupId, String streamId, String operator, bo
throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
}
StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
if (status == StreamStatus.CONFIG_ING || status == StreamStatus.CONFIG_SUCCESSFUL) {
if (status == StreamStatus.CONFIG_ING) {
log.warn("GroupId={}, StreamId={} is already in {}", groupId, streamId, status);
return true;
}
if (status != StreamStatus.NEW || status != StreamStatus.CONFIG_FAILED) {
if (status != StreamStatus.NEW && status != StreamStatus.CONFIG_FAILED
&& status != StreamStatus.CONFIG_SUCCESSFUL) {
throw new BusinessException(
String.format("GroupId=%s, StreamId=%s, status=%s not correct for stream start", groupId, streamId,
status));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GlobalConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
Expand All @@ -41,10 +42,12 @@
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.service.core.operation.InlongStreamProcessOperation;
import org.apache.inlong.manager.service.group.GroupCheckService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -62,7 +65,6 @@
public class StreamSinkServiceImpl implements StreamSinkService {

private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class);

@Autowired
private SinkOperationFactory operationFactory;
@Autowired
Expand All @@ -71,6 +73,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
private StreamSinkEntityMapper sinkMapper;
@Autowired
private StreamSinkFieldEntityMapper sinkFieldMapper;
@Autowired
private AutowireCapableBeanFactory autowireCapableBeanFactory;

@Override
@Transactional(rollbackFor = Throwable.class)
Expand Down Expand Up @@ -192,7 +196,7 @@ public Boolean update(SinkRequest request, String operator) {
String streamId = request.getInlongStreamId();
String sinkName = request.getSinkName();
String sinkType = request.getSinkType();
InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
final InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);

// Check whether the sink name exists with the same groupId and streamId
List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
Expand All @@ -214,9 +218,12 @@ public Boolean update(SinkRequest request, String operator) {

// The inlong group status is [Configuration successful], then asynchronously initiate
// the [Single inlong stream resource creation] workflow
// if (EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
// executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
// }
if (GroupStatus.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
// To work around the circular reference check we manually instantiate and wire
InlongStreamProcessOperation streamProcessOperation = new InlongStreamProcessOperation();
autowireCapableBeanFactory.autowireBean(streamProcessOperation);
streamProcessOperation.startProcess(groupId, streamId, operator, true);
}
LOGGER.info("success to update sink info: {}", request);
return true;
}
Expand Down