Skip to content

Commit a735368

Browse files
authored
[INLONG-4928][Manager] Modify inlong stream API in the Manager client (#5309)
1 parent 9750004 commit a735368

File tree

3 files changed

+121
-2
lines changed

3 files changed

+121
-2
lines changed

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public interface InlongStream {
6262
StreamSink getSinkInfoByName(String sinkName);
6363

6464
/**
65-
* Return data transform node defined in stream(split,string replace etc)
65+
* Return data transform node defined in stream(split, string replace etc.)
6666
* key is transform name which must be unique within one stream scope.
6767
*/
6868
Map<String, StreamTransform> getTransforms();

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java

+101-1
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@
2222
import org.apache.inlong.manager.client.api.ClientConfiguration;
2323
import org.apache.inlong.manager.client.api.service.InlongStreamApi;
2424
import org.apache.inlong.manager.client.api.util.ClientUtils;
25+
import org.apache.inlong.manager.common.util.Preconditions;
2526
import org.apache.inlong.manager.pojo.common.Response;
27+
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
2628
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
2729
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
28-
import org.apache.inlong.manager.common.util.Preconditions;
2930

3031
import java.util.List;
3132

@@ -49,6 +50,12 @@ public Integer createStreamInfo(InlongStreamInfo streamInfo) {
4950
return response.getData();
5051
}
5152

53+
/**
54+
* Query whether the inlong stream ID exists
55+
*
56+
* @param streamInfo inlong stream info
57+
* @return true: exists, false: does not exist
58+
*/
5259
public Boolean isStreamExists(InlongStreamInfo streamInfo) {
5360
final String groupId = streamInfo.getInlongGroupId();
5461
final String streamId = streamInfo.getInlongStreamId();
@@ -60,6 +67,12 @@ public Boolean isStreamExists(InlongStreamInfo streamInfo) {
6067
return response.getData();
6168
}
6269

70+
/**
71+
* InlongStream info that needs to be modified
72+
*
73+
* @param streamInfo inlong stream info that needs to be modified
74+
* @return whether succeed
75+
*/
6376
public Pair<Boolean, String> updateStreamInfo(InlongStreamInfo streamInfo) {
6477
Response<Boolean> resp = ClientUtils.executeHttpCall(inlongStreamApi.updateStream(streamInfo));
6578

@@ -86,6 +99,19 @@ public InlongStreamInfo getStreamInfo(String groupId, String streamId) {
8699
}
87100
}
88101

102+
/**
103+
* Paging query inlong stream brief info list
104+
*
105+
* @param request query request
106+
* @return inlong stream brief list
107+
*/
108+
public PageInfo<InlongStreamBriefInfo> listByCondition(InlongStreamPageRequest request) {
109+
Response<PageInfo<InlongStreamBriefInfo>> response = ClientUtils.executeHttpCall(
110+
inlongStreamApi.listByCondition(request));
111+
ClientUtils.assertRespSuccess(response);
112+
return response.getData();
113+
}
114+
89115
/**
90116
* Get inlong stream info.
91117
*/
@@ -99,4 +125,78 @@ public List<InlongStreamInfo> listStreamInfo(String inlongGroupId) {
99125
return response.getData().getList();
100126
}
101127

128+
/**
129+
* Create stream in synchronous/asynchronous way.
130+
*
131+
* @param groupId inlong group id
132+
* @param streamId inlong stream id
133+
* @return whether succeed
134+
*/
135+
public boolean startProcess(String groupId, String streamId) {
136+
Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
137+
Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
138+
Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.startProcess(groupId, streamId));
139+
ClientUtils.assertRespSuccess(response);
140+
return response.getData();
141+
}
142+
143+
/**
144+
* Suspend stream in synchronous/asynchronous way.
145+
*
146+
* @param groupId inlong group id
147+
* @param streamId inlong stream id
148+
* @return whether succeed
149+
*/
150+
public boolean suspendProcess(String groupId, String streamId) {
151+
Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
152+
Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
153+
Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.suspendProcess(groupId, streamId));
154+
ClientUtils.assertRespSuccess(response);
155+
return response.getData();
156+
}
157+
158+
/**
159+
* Restart stream in synchronous/asynchronous way.
160+
*
161+
* @param groupId inlong group id
162+
* @param streamId inlong stream id
163+
* @return whether succeed
164+
*/
165+
public boolean restartProcess(String groupId, String streamId) {
166+
Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
167+
Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
168+
Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.restartProcess(groupId, streamId));
169+
ClientUtils.assertRespSuccess(response);
170+
return response.getData();
171+
}
172+
173+
/**
174+
* Delete stream in synchronous/asynchronous way.
175+
*
176+
* @param groupId inlong group id
177+
* @param streamId inlong stream id
178+
* @return whether succeed
179+
*/
180+
public boolean deleteProcess(String groupId, String streamId) {
181+
Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
182+
Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
183+
Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.deleteProcess(groupId, streamId));
184+
ClientUtils.assertRespSuccess(response);
185+
return response.getData();
186+
}
187+
188+
/**
189+
* Delete the specified inlong stream
190+
*
191+
* @param groupId inlong group id
192+
* @param streamId inlong stream id
193+
* @return whether succeed
194+
*/
195+
public boolean delete(String groupId, String streamId) {
196+
Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be empty");
197+
Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be empty");
198+
Response<Boolean> response = ClientUtils.executeHttpCall(inlongStreamApi.delete(groupId, streamId));
199+
ClientUtils.assertRespSuccess(response);
200+
return response.getData();
201+
}
102202
}

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java

+19
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
import com.github.pagehelper.PageInfo;
2121
import org.apache.inlong.manager.pojo.common.Response;
22+
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
2223
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
2324
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
2425
import retrofit2.Call;
2526
import retrofit2.http.Body;
27+
import retrofit2.http.DELETE;
2628
import retrofit2.http.GET;
2729
import retrofit2.http.POST;
2830
import retrofit2.http.Path;
@@ -43,7 +45,24 @@ public interface InlongStreamApi {
4345
Call<Response<InlongStreamInfo>> getStream(@Query("groupId") String groupId,
4446
@Query("streamId") String streamId);
4547

48+
@POST("/stream/list")
49+
Call<Response<PageInfo<InlongStreamBriefInfo>>> listByCondition(@Body InlongStreamPageRequest request);
50+
4651
@POST("stream/listAll")
4752
Call<Response<PageInfo<InlongStreamInfo>>> listStream(@Body InlongStreamPageRequest request);
4853

54+
@POST("/stream/startProcess/{groupId}/{streamId}")
55+
Call<Response<Boolean>> startProcess(@Path("groupId") String groupId, @Path("streamId") String streamId);
56+
57+
@POST("/stream/suspendProcess/{groupId}/{streamId}")
58+
Call<Response<Boolean>> suspendProcess(@Path("groupId") String groupId, @Path("streamId") String streamId);
59+
60+
@POST("/stream/restartProcess/{groupId}/{streamId}")
61+
Call<Response<Boolean>> restartProcess(@Path("groupId") String groupId, @Path("streamId") String streamId);
62+
63+
@POST("/stream/deleteProcess/{groupId}/{streamId}")
64+
Call<Response<Boolean>> deleteProcess(@Path("groupId") String groupId, @Path("streamId") String streamId);
65+
66+
@DELETE("/stream/delete")
67+
Call<Response<Boolean>> delete(@Path("groupId") String groupId, @Path("streamId") String streamId);
4968
}

0 commit comments

Comments
 (0)