26
26
import org .apache .commons .lang3 .StringUtils ;
27
27
import org .apache .inlong .manager .common .enums .ErrorCodeEnum ;
28
28
import org .apache .inlong .manager .common .enums .GlobalConstants ;
29
+ import org .apache .inlong .manager .common .enums .GroupStatus ;
29
30
import org .apache .inlong .manager .common .enums .SinkStatus ;
30
31
import org .apache .inlong .manager .common .enums .SinkType ;
31
32
import org .apache .inlong .manager .common .exceptions .BusinessException ;
41
42
import org .apache .inlong .manager .dao .entity .StreamSinkEntity ;
42
43
import org .apache .inlong .manager .dao .mapper .StreamSinkEntityMapper ;
43
44
import org .apache .inlong .manager .dao .mapper .StreamSinkFieldEntityMapper ;
45
+ import org .apache .inlong .manager .service .core .operation .InlongStreamProcessOperation ;
44
46
import org .apache .inlong .manager .service .group .GroupCheckService ;
45
47
import org .slf4j .Logger ;
46
48
import org .slf4j .LoggerFactory ;
47
49
import org .springframework .beans .factory .annotation .Autowired ;
50
+ import org .springframework .beans .factory .config .AutowireCapableBeanFactory ;
48
51
import org .springframework .stereotype .Service ;
49
52
import org .springframework .transaction .annotation .Transactional ;
50
53
62
65
public class StreamSinkServiceImpl implements StreamSinkService {
63
66
64
67
private static final Logger LOGGER = LoggerFactory .getLogger (StreamSinkServiceImpl .class );
65
-
66
68
@ Autowired
67
69
private SinkOperationFactory operationFactory ;
68
70
@ Autowired
@@ -71,6 +73,9 @@ public class StreamSinkServiceImpl implements StreamSinkService {
71
73
private StreamSinkEntityMapper sinkMapper ;
72
74
@ Autowired
73
75
private StreamSinkFieldEntityMapper sinkFieldMapper ;
76
+ @ Autowired
77
+ private AutowireCapableBeanFactory autowireCapableBeanFactory ;
78
+ private InlongStreamProcessOperation streamProcessOperation ;
74
79
75
80
@ Override
76
81
@ Transactional (rollbackFor = Throwable .class )
@@ -192,7 +197,7 @@ public Boolean update(SinkRequest request, String operator) {
192
197
String streamId = request .getInlongStreamId ();
193
198
String sinkName = request .getSinkName ();
194
199
String sinkType = request .getSinkType ();
195
- InlongGroupEntity groupEntity = groupCheckService .checkGroupStatus (groupId , operator );
200
+ final InlongGroupEntity groupEntity = groupCheckService .checkGroupStatus (groupId , operator );
196
201
197
202
// Check whether the sink name exists with the same groupId and streamId
198
203
List <StreamSinkEntity > sinkList = sinkMapper .selectByRelatedId (groupId , streamId , sinkName );
@@ -214,9 +219,14 @@ public Boolean update(SinkRequest request, String operator) {
214
219
215
220
// The inlong group status is [Configuration successful], then asynchronously initiate
216
221
// the [Single inlong stream resource creation] workflow
217
- // if (EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
218
- // executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
219
- // }
222
+ if (GroupStatus .CONFIG_SUCCESSFUL .getCode ().equals (groupEntity .getStatus ())) {
223
+ // To work around the circular reference check we manually instantiate and wire
224
+ if (streamProcessOperation == null ) {
225
+ streamProcessOperation = new InlongStreamProcessOperation ();
226
+ autowireCapableBeanFactory .autowireBean (streamProcessOperation );
227
+ }
228
+ streamProcessOperation .startProcess (groupId , streamId , operator , true );
229
+ }
220
230
LOGGER .info ("success to update sink info: {}" , request );
221
231
return true ;
222
232
}
0 commit comments