Skip to content

Commit b3a4abb

Browse files
committed
[INLONG-4993][Manager] [Manager] Return details when querying a list of StreamSources
1 parent 2b30c6a commit b3a4abb

File tree

67 files changed

+786
-1193
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+786
-1193
lines changed

inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
2929
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
3030
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
31-
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
31+
import org.apache.inlong.manager.common.pojo.source.StreamSource;
3232
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
3333

3434
import java.io.IOException;
@@ -174,8 +174,8 @@ private static class DescribeSource extends AbstractCommandRunner {
174174
@Override
175175
void run() {
176176
try {
177-
List<SourceListResponse> sourceListResponses = managerClient.listSources(group, stream, type);
178-
sourceListResponses.forEach(PrintUtils::printJson);
177+
List<StreamSource> sources = managerClient.listSources(group, stream, type);
178+
sources.forEach(PrintUtils::printJson);
179179
} catch (Exception e) {
180180
System.out.println(e.getMessage());
181181
}

inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
3333
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
3434
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
35-
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
35+
import org.apache.inlong.manager.common.pojo.source.StreamSource;
3636
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
3737

3838
import java.io.IOException;
@@ -180,7 +180,7 @@ private static class ListSource extends AbstractCommandRunner {
180180
@Parameter(names = {"-g", "--group"}, required = true, description = "inlong group id")
181181
private String group;
182182

183-
@Parameter(names = {"-t", "--type"}, description = "sink type")
183+
@Parameter(names = {"-t", "--type"}, description = "source type")
184184
private String type;
185185

186186
ListSource(InnerInlongManagerClient managerClient) {
@@ -190,7 +190,7 @@ private static class ListSource extends AbstractCommandRunner {
190190
@Override
191191
void run() {
192192
try {
193-
List<SourceListResponse> sourceListResponses = managerClient.listSources(group, stream, type);
193+
List<StreamSource> sourceListResponses = managerClient.listSources(group, stream, type);
194194
PrintUtils.print(sourceListResponses, SourceInfo.class);
195195
} catch (Exception e) {
196196
System.out.println(e.getMessage());

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
3333
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
3434
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
35-
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
3635
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
3736
import org.apache.inlong.manager.common.pojo.source.StreamSource;
3837
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -202,12 +201,12 @@ private void initOrUpdateSource() {
202201
InlongStreamInfo streamInfo = streamContext.getStreamInfo();
203202
final String groupId = streamInfo.getInlongGroupId();
204203
final String streamId = streamInfo.getInlongStreamId();
205-
List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId, streamId);
204+
List<StreamSource> streamSources = managerClient.listSources(groupId, streamId);
206205
List<String> updateSourceNames = Lists.newArrayList();
207-
if (CollectionUtils.isNotEmpty(sourceListResponses)) {
208-
for (SourceListResponse sourceListResponse : sourceListResponses) {
209-
final String sourceName = sourceListResponse.getSourceName();
210-
final int id = sourceListResponse.getId();
206+
if (CollectionUtils.isNotEmpty(streamSources)) {
207+
for (StreamSource source : streamSources) {
208+
final String sourceName = source.getSourceName();
209+
final int id = source.getId();
211210
if (sourceRequests.get(sourceName) == null) {
212211
boolean isDelete = managerClient.deleteSource(id);
213212
if (!isDelete) {
@@ -222,7 +221,7 @@ private void initOrUpdateSource() {
222221
updateState.getValue()));
223222
}
224223
updateSourceNames.add(sourceName);
225-
sourceRequest.setId(sourceListResponse.getId());
224+
sourceRequest.setId(source.getId());
226225
}
227226
}
228227
}

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
3535
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
3636
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
37-
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
37+
import org.apache.inlong.manager.common.pojo.source.StreamSource;
3838
import org.apache.inlong.manager.common.util.HttpUtils;
3939

4040
import java.util.List;
@@ -111,8 +111,8 @@ public Map<String, SimpleGroupStatus> listGroupStatus(List<String> groupIds) {
111111
groupListResponses.forEach(response -> {
112112
String groupId = response.getInlongGroupId();
113113
SimpleGroupStatus groupStatus = SimpleGroupStatus.parseStatusByCode(response.getStatus());
114-
List<SourceListResponse> sourceListResponses = response.getSourceResponses();
115-
groupStatus = recheckGroupStatus(groupStatus, sourceListResponses);
114+
List<StreamSource> sources = response.getStreamSources();
115+
groupStatus = recheckGroupStatus(groupStatus, sources);
116116
groupStatusMap.put(groupId, groupStatus);
117117
});
118118
}
@@ -129,10 +129,9 @@ public InlongGroup getGroup(String groupId) {
129129
return new InlongGroupImpl(groupInfo, this);
130130
}
131131

132-
private SimpleGroupStatus recheckGroupStatus(SimpleGroupStatus groupStatus,
133-
List<SourceListResponse> sourceListResponses) {
134-
Map<SimpleSourceStatus, List<SourceListResponse>> statusListMap = Maps.newHashMap();
135-
sourceListResponses.forEach(source -> {
132+
private SimpleGroupStatus recheckGroupStatus(SimpleGroupStatus groupStatus, List<StreamSource> sources) {
133+
Map<SimpleSourceStatus, List<StreamSource>> statusListMap = Maps.newHashMap();
134+
sources.forEach(source -> {
136135
SimpleSourceStatus status = SimpleSourceStatus.parseByStatus(source.getStatus());
137136
statusListMap.computeIfAbsent(status, k -> Lists.newArrayList()).add(source);
138137
});

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.inlong.manager.client.api.util.StreamTransformTransfer;
3030
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
3131
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
32-
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
3332
import org.apache.inlong.manager.common.pojo.source.StreamSource;
3433
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
3534
import org.apache.inlong.manager.common.pojo.stream.StreamField;
@@ -341,15 +340,15 @@ private void initOrUpdateTransform(InlongStreamInfo streamInfo) {
341340
}
342341

343342
private void initOrUpdateSource(InlongStreamInfo streamInfo) {
344-
List<SourceListResponse> sourceListResponses = managerClient.listSources(inlongGroupId, inlongStreamId);
343+
List<StreamSource> streamSources = managerClient.listSources(inlongGroupId, inlongStreamId);
345344
List<String> updateSourceNames = Lists.newArrayList();
346-
for (SourceListResponse sourceListResponse : sourceListResponses) {
347-
final String sourceName = sourceListResponse.getSourceName();
348-
final int id = sourceListResponse.getId();
345+
for (StreamSource source : streamSources) {
346+
final String sourceName = source.getSourceName();
347+
final int id = source.getId();
349348
if (this.streamSources.get(sourceName) == null) {
350349
boolean isDelete = managerClient.deleteSource(id);
351350
if (!isDelete) {
352-
throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));
351+
throw new RuntimeException(String.format("Delete source=%s failed", source));
353352
}
354353
} else {
355354
StreamSource streamSource = this.streamSources.get(sourceName);
@@ -364,7 +363,7 @@ private void initOrUpdateSource(InlongStreamInfo streamInfo) {
364363
updateSourceNames.add(sourceName);
365364
}
366365
}
367-
for (Map.Entry<String, StreamSource> sourceEntry : streamSources.entrySet()) {
366+
for (Map.Entry<String, StreamSource> sourceEntry : this.streamSources.entrySet()) {
368367
String sourceName = sourceEntry.getKey();
369368
if (updateSourceNames.contains(sourceName)) {
370369
continue;

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@
4848
import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
4949
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
5050
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
51-
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
5251
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
52+
import org.apache.inlong.manager.common.pojo.source.StreamSource;
5353
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
5454
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
5555
import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
@@ -326,15 +326,15 @@ public Integer createSource(SourceRequest request) {
326326
/**
327327
* Get information of sources.
328328
*/
329-
public List<SourceListResponse> listSources(String groupId, String streamId) {
329+
public List<StreamSource> listSources(String groupId, String streamId) {
330330
return listSources(groupId, streamId, null);
331331
}
332332

333333
/**
334334
* List information of sources by the specified source type.
335335
*/
336-
public List<SourceListResponse> listSources(String groupId, String streamId, String sourceType) {
337-
Response<PageInfo<SourceListResponse>> response = executeHttpCall(
336+
public List<StreamSource> listSources(String groupId, String streamId, String sourceType) {
337+
Response<PageInfo<StreamSource>> response = executeHttpCall(
338338
streamSourceApi.listSources(groupId, streamId, sourceType));
339339
assertRespSuccess(response);
340340
return response.getData().getList();

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import com.github.pagehelper.PageInfo;
2121
import org.apache.inlong.manager.common.beans.Response;
22-
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
2322
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
23+
import org.apache.inlong.manager.common.pojo.source.StreamSource;
2424
import retrofit2.Call;
2525
import retrofit2.http.Body;
2626
import retrofit2.http.DELETE;
@@ -38,7 +38,7 @@ public interface StreamSourceApi {
3838
Call<Response<Boolean>> updateSource(@Body SourceRequest request);
3939

4040
@GET("source/list")
41-
Call<Response<PageInfo<SourceListResponse>>> listSources(@Query("inlongGroupId") String groupId,
41+
Call<Response<PageInfo<StreamSource>>> listSources(@Query("inlongGroupId") String groupId,
4242
@Query("inlongStreamId") String streamId, @Query("sourceType") String sourceType);
4343

4444
@DELETE("source/delete/{id}")

inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java

+19-25
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@
2929
import org.apache.inlong.manager.common.auth.DefaultAuthentication;
3030
import org.apache.inlong.manager.common.beans.Response;
3131
import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
32+
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterRequest;
3233
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
3334
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
3435
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
35-
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
3636
import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
3737
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
38+
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarRequest;
3839
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
3940
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
4041
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSink;
@@ -48,16 +49,11 @@
4849
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSink;
4950
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkListResponse;
5051
import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkListResponse;
51-
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
5252
import org.apache.inlong.manager.common.pojo.source.StreamSource;
5353
import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSource;
54-
import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceListResponse;
5554
import org.apache.inlong.manager.common.pojo.source.file.FileSource;
56-
import org.apache.inlong.manager.common.pojo.source.file.FileSourceListResponse;
5755
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
58-
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
5956
import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSource;
60-
import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSourceListResponse;
6157
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
6258
import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
6359
import org.apache.inlong.manager.common.pojo.stream.StreamField;
@@ -154,9 +150,9 @@ void testListGroup4AutoPushSource() {
154150
.id(1)
155151
.inlongGroupId("1")
156152
.name("name")
157-
.sourceResponses(
153+
.streamSources(
158154
Lists.newArrayList(
159-
AutoPushSourceListResponse.builder()
155+
AutoPushSource.builder()
160156
.id(22)
161157
.inlongGroupId("1")
162158
.inlongStreamId("2")
@@ -186,9 +182,9 @@ void testListGroup4BinlogSource() {
186182
.id(1)
187183
.inlongGroupId("1")
188184
.name("name")
189-
.sourceResponses(
185+
.streamSources(
190186
Lists.newArrayList(
191-
MySQLBinlogSourceListResponse.builder()
187+
MySQLBinlogSource.builder()
192188
.id(22)
193189
.inlongGroupId("1")
194190
.inlongStreamId("2")
@@ -226,9 +222,9 @@ void testListGroup4FileSource() {
226222
.status(1)
227223
.createTime(new Date())
228224
.modifyTime(new Date())
229-
.sourceResponses(
225+
.streamSources(
230226
Lists.newArrayList(
231-
FileSourceListResponse.builder()
227+
FileSource.builder()
232228
.id(22)
233229
.inlongGroupId("1")
234230
.inlongStreamId("2")
@@ -259,9 +255,9 @@ void testListGroup4KafkaSource() {
259255
InlongGroupListResponse.builder()
260256
.id(1)
261257
.inlongGroupId("1")
262-
.sourceResponses(
258+
.streamSources(
263259
Lists.newArrayList(
264-
KafkaSourceListResponse.builder()
260+
KafkaSource.builder()
265261
.id(22)
266262
.inlongGroupId("1")
267263
.inlongStreamId("2")
@@ -295,16 +291,16 @@ void testListGroup4KafkaSource() {
295291

296292
@Test
297293
void testListGroup4AllSource() {
298-
ArrayList<SourceListResponse> sourceListResponses = Lists.newArrayList(
299-
AutoPushSourceListResponse.builder()
294+
ArrayList<StreamSource> streamSources = Lists.newArrayList(
295+
AutoPushSource.builder()
300296
.id(22)
301297
.inlongGroupId("1")
302298
.inlongStreamId("2")
303299
.sourceType("AUTO_PUSH")
304300
.version(1)
305301
.build(),
306302

307-
MySQLBinlogSourceListResponse.builder()
303+
MySQLBinlogSource.builder()
308304
.id(22)
309305
.inlongGroupId("1")
310306
.inlongStreamId("2")
@@ -317,7 +313,7 @@ void testListGroup4AllSource() {
317313
.tableWhiteList("")
318314
.build(),
319315

320-
FileSourceListResponse.builder()
316+
FileSource.builder()
321317
.id(22)
322318
.inlongGroupId("1")
323319
.inlongStreamId("2")
@@ -327,15 +323,14 @@ void testListGroup4AllSource() {
327323
.timeOffset("timeOffset")
328324
.build(),
329325

330-
KafkaSourceListResponse.builder()
326+
KafkaSource.builder()
331327
.id(22)
332328
.inlongGroupId("1")
333329
.inlongStreamId("2")
334330
.sourceType("KAFKA")
335331
.sourceName("source name")
336332
.serializationType("csv")
337333
.dataNodeName("dataNodeName")
338-
339334
.topic("topic")
340335
.groupId("111")
341336
.bootstrapServers("bootstrapServers")
@@ -349,7 +344,7 @@ void testListGroup4AllSource() {
349344
.inlongGroupId("1")
350345
.name("name")
351346
.inCharges("admin")
352-
.sourceResponses(sourceListResponses)
347+
.streamSources(streamSources)
353348
.build()
354349
);
355350

@@ -390,7 +385,7 @@ void testCreateGroup() {
390385
)
391386
);
392387

393-
String groupId = innerInlongManagerClient.createGroup(new InlongGroupRequest());
388+
String groupId = innerInlongManagerClient.createGroup(new InlongPulsarRequest());
394389
Assertions.assertEquals("1111", groupId);
395390
}
396391

@@ -403,7 +398,7 @@ void testUpdateGroup() {
403398
)
404399
);
405400

406-
Pair<String, String> updateGroup = innerInlongManagerClient.updateGroup(new InlongGroupRequest());
401+
Pair<String, String> updateGroup = innerInlongManagerClient.updateGroup(new InlongPulsarRequest());
407402
Assertions.assertEquals("1111", updateGroup.getKey());
408403
Assertions.assertTrue(StringUtils.isBlank(updateGroup.getValue()));
409404
}
@@ -676,9 +671,8 @@ void testSaveCluster() {
676671
)
677672
)
678673
);
679-
ClusterRequest request = new ClusterRequest();
674+
ClusterRequest request = new PulsarClusterRequest();
680675
request.setName("pulsar");
681-
request.setType("PULSAR");
682676
request.setClusterTags("test_cluster");
683677
Integer clusterIndex = innerInlongManagerClient.saveCluster(request);
684678
Assertions.assertEquals(1, (int) clusterIndex);

0 commit comments

Comments
 (0)