Commit 206ecf8 1 parent 9a136a6 commit 206ecf8 Copy full SHA for 206ecf8
File tree 3 files changed +6
-10
lines changed
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink
3 files changed +6
-10
lines changed Original file line number Diff line number Diff line change @@ -259,14 +259,16 @@ private TubeClientConfig initTubeConfig(String masterUrl) throws Exception {
259
259
* @throws FlumeException if an RPC client connection could not be opened
260
260
*/
261
261
private void initCreateConnection () throws FlumeException {
262
- // synchronized (tubeSessionLock) {
262
+ // check the TubeMQ address
263
+ if (masterHostAndPortLists == null || masterHostAndPortLists .isEmpty ()) {
264
+ logger .warn ("Failed to get TubeMQ Cluster, make sure register TubeMQ to manager successfully." );
265
+ return ;
266
+ }
263
267
// if already connected, just skip
264
268
if (sessionFactories != null ) {
265
269
return ;
266
270
}
267
271
sessionFactories = new HashMap <>();
268
- Preconditions .checkState (masterHostAndPortLists != null && !masterHostAndPortLists .isEmpty (),
269
- "No tube service url specified" );
270
272
for (String masterUrl : masterHostAndPortLists ) {
271
273
createConnection (masterUrl );
272
274
}
Original file line number Diff line number Diff line change @@ -136,7 +136,7 @@ public void setConfigLogMetric(StreamConfigLogMetric streamConfigLogMetric) {
136
136
}
137
137
138
138
public void initCreateConnection (CreatePulsarClientCallBack callBack ) {
139
- if (pulsarUrl2token .isEmpty ()) {
139
+ if (pulsarUrl2token == null || pulsarUrl2token .isEmpty ()) {
140
140
logger .warn ("Failed to get Pulsar Cluster, make sure register pulsar to manager successfully." );
141
141
return ;
142
142
}
Original file line number Diff line number Diff line change 1007
1007
<artifactId >iceberg-flink-runtime-1.13</artifactId >
1008
1008
<version >${iceberg.flink.version} </version >
1009
1009
</dependency >
1010
- <dependency >
1011
- <groupId >org.apache.iceberg</groupId >
1012
- <artifactId >iceberg-flink-runtime-1.13</artifactId >
1013
- <version >${iceberg.flink.version} </version >
1014
- </dependency >
1015
-
1016
1010
<dependency >
1017
1011
<groupId >org.apache.flink</groupId >
1018
1012
<artifactId >flink-table-planner-blink_${flink.scala.binary.version}</artifactId >
You can’t perform that action at this time.
0 commit comments