Skip to content

Commit f340ce5

Browse files
authored
[INLONG-1931][Feature][sort-sdk] sort-sdk support consume tube events from cachezone (#2186)
1 parent 566f6e3 commit f340ce5

File tree

9 files changed

+566
-63
lines changed

9 files changed

+566
-63
lines changed

inlong-sdk/sort-sdk/pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@
118118
<version>${project.version}</version>
119119
</dependency>
120120

121+
<dependency>
122+
<groupId>org.apache.inlong</groupId>
123+
<artifactId>tubemq-client</artifactId>
124+
<version>${project.version}</version>
125+
</dependency>
126+
121127
</dependencies>
122128

123129
</project>

inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java

+5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ public abstract class InLongTopicFetcher {
2323

2424
protected InLongTopic inLongTopic;
2525
protected ClientContext context;
26+
protected volatile boolean closed = false;
27+
protected volatile boolean isStopConsume = false;
28+
// use for empty topic to sleep
29+
protected long sleepTime = 0L;
30+
protected int emptyFetchTimes = 0;
2631

2732
public InLongTopicFetcher(InLongTopic inLongTopic, ClientContext context) {
2833
this.inLongTopic = inLongTopic;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.inlong.sdk.sort.api;
21+
22+
public class SysConstants {
23+
24+
public static final String TUBE_TOPIC_FILTER_KEY = "tube_topic_filter_key";
25+
26+
}

inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZoneCluster.java

+5
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,9 @@ public boolean equals(Object o) {
7171
public int hashCode() {
7272
return Objects.hash(clusterId);
7373
}
74+
75+
@Override
76+
public String toString() {
77+
return "CacheZoneCluster>>>" + clusterId + "|" + bootstraps + "|" + token;
78+
}
7479
}

inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.inlong.sdk.sort.entity;
1919

20+
import java.util.Map;
2021
import java.util.Objects;
2122

2223
public class InLongTopic {
@@ -26,6 +27,7 @@ public class InLongTopic {
2627
private int partitionId;
2728
//pulsar,kafka,tube
2829
private String topicType;
30+
private Map<String, Object> properties;
2931

3032
public String getTopic() {
3133
return topic;
@@ -59,6 +61,14 @@ public void setTopicType(String topicType) {
5961
this.topicType = topicType;
6062
}
6163

64+
public Map<String, Object> getProperties() {
65+
return properties;
66+
}
67+
68+
public void setProperties(Map<String, Object> properties) {
69+
this.properties = properties;
70+
}
71+
6272
@Override
6373
public boolean equals(Object o) {
6474
if (this == o) {
@@ -84,11 +94,6 @@ public String getTopicKey() {
8494

8595
@Override
8696
public String toString() {
87-
return "InLongTopic{"
88-
+ "topic='" + topic
89-
+ ", inLongCluster=" + cacheZoneCluster
90-
+ ", partitionId=" + partitionId
91-
+ ", topicType='" + topicType
92-
+ '}';
97+
return "InLongTopic>>>" + topic + "|" + "|" + partitionId + "|" + topicType + "|" + cacheZoneCluster;
9398
}
9499
}

0 commit comments

Comments
 (0)