Skip to content

Commit 257cc4d

Browse files
vernedengbruceneenhl
authored andcommitted
[INLONG-5095][SortSDK] Support consumes data stream from a specified time (apache#5357)
1 parent 40993d0 commit 257cc4d

12 files changed

+194
-1
lines changed

inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java

+3
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919

2020
import org.apache.inlong.sdk.sort.entity.InLongTopic;
2121
import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer;
22+
import org.apache.inlong.sdk.sort.impl.interceptor.MsgTimeInterceptor;
2223

2324
public abstract class InLongTopicFetcher {
2425

2526
protected InLongTopic inLongTopic;
2627
protected ClientContext context;
2728
protected Deserializer deserializer;
29+
protected Interceptor interceptor;
2830
protected volatile Thread fetchThread;
2931
protected volatile boolean closed = false;
3032
protected volatile boolean isStopConsume = false;
@@ -36,6 +38,7 @@ public InLongTopicFetcher(InLongTopic inLongTopic, ClientContext context) {
3638
this.inLongTopic = inLongTopic;
3739
this.context = context;
3840
this.deserializer = new MessageDeserializer();
41+
this.interceptor = new MsgTimeInterceptor(inLongTopic);
3942
}
4043

4144
public abstract boolean init(Object client);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.inlong.sdk.sort.api;
20+
21+
import org.apache.inlong.sdk.sort.entity.InLongMessage;
22+
23+
import java.util.List;
24+
25+
/**
26+
* Interceptor interface.
27+
* Use to filter messages by configured strategies.
28+
*
29+
*/
30+
public interface Interceptor {
31+
32+
/**
33+
* Intercept the fetched message by configured strategies.
34+
*
35+
* @param messages message to be intercepted.
36+
* @return Message after being intercepted.
37+
*/
38+
List<InLongMessage> intercept(List<InLongMessage> messages);
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.inlong.sdk.sort.impl.interceptor;
20+
21+
import org.apache.commons.collections.CollectionUtils;
22+
import org.apache.inlong.sdk.sort.api.Interceptor;
23+
import org.apache.inlong.sdk.sort.entity.InLongMessage;
24+
import org.apache.inlong.sdk.sort.entity.InLongTopic;
25+
import org.apache.inlong.sdk.sort.util.TimeUtil;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
import java.util.stream.Collectors;
32+
33+
/**
34+
* The sdk interceptor that use to filter messages do not in the time interval.
35+
*/
36+
public class MsgTimeInterceptor implements Interceptor {
37+
private static final Logger logger = LoggerFactory.getLogger(MsgTimeInterceptor.class);
38+
private long startTime;
39+
private long stopTime;
40+
41+
public MsgTimeInterceptor(InLongTopic inLongTopic) {
42+
startTime = TimeUtil.parseStartTime(inLongTopic);
43+
stopTime = TimeUtil.parseStopTime(inLongTopic);
44+
logger.info("start to config MsgTimeInterceptor, start time is {}, stop time is {}", startTime, stopTime);
45+
}
46+
47+
@Override
48+
public List<InLongMessage> intercept(List<InLongMessage> messages) {
49+
if (CollectionUtils.isEmpty(messages)) {
50+
return new ArrayList<>(0);
51+
}
52+
return messages.stream()
53+
.filter(msg -> isValidMsgTime(msg.getMsgTime()))
54+
.collect(Collectors.toList());
55+
}
56+
57+
private boolean isValidMsgTime(long msgTime) {
58+
return msgTime >= startTime && msgTime <= stopTime;
59+
}
60+
61+
}

inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java

+5
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,11 @@ private void fetchFromKafka() throws Exception {
304304
String offsetKey = getOffset(msg.partition(), msg.offset());
305305
List<InLongMessage> inLongMessages = deserializer
306306
.deserialize(context, inLongTopic, getMsgHeaders(msg.headers()), msg.value());
307+
inLongMessages = interceptor.intercept(inLongMessages);
308+
if (inLongMessages.isEmpty()) {
309+
ack(offsetKey);
310+
continue;
311+
}
307312
msgs.add(new MessageRecord(inLongTopic.getTopicKey(),
308313
inLongMessages,
309314
offsetKey, System.currentTimeMillis()));

inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,11 @@ public void run() {
284284

285285
List<InLongMessage> inLongMessages = deserializer
286286
.deserialize(context, inLongTopic, msg.getProperties(), msg.getData());
287-
287+
inLongMessages = interceptor.intercept(inLongMessages);
288+
if (inLongMessages.isEmpty()) {
289+
ack(offsetKey);
290+
continue;
291+
}
288292
msgs.add(new MessageRecord(inLongTopic.getTopicKey(),
289293
inLongMessages,
290294
offsetKey, System.currentTimeMillis()));

inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java

+4
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,10 @@ public void run() {
270270
List<InLongMessage> deserialize = deserializer
271271
.deserialize(context, inLongTopic, getAttributeMap(msg.getAttribute()),
272272
msg.getData());
273+
deserialize = interceptor.intercept(deserialize);
274+
if (deserialize.isEmpty()) {
275+
continue;
276+
}
273277
msgs.addAll(deserialize);
274278
context.getStatManager()
275279
.getStatistics(context.getConfig().getSortTaskId(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.inlong.sdk.sort.util;
20+
21+
import org.apache.inlong.sdk.sort.entity.InLongTopic;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.time.LocalDateTime;
26+
import java.time.ZoneId;
27+
import java.time.format.DateTimeFormatter;
28+
import java.util.Optional;
29+
30+
public class TimeUtil {
31+
private static final Logger logger = LoggerFactory.getLogger(TimeUtil.class);
32+
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
33+
private static final String KEY_SDK_START_TIME = "sortSdk.startTime";
34+
private static final String KEY_SDK_STOP_TIME = "sortSdk.stopTime";
35+
private static final long DEFAULT_START_TIME = -1L;
36+
private static final long DEFAULT_STOP_TIME = Long.MAX_VALUE;
37+
38+
public static long parseStartTime(InLongTopic inLongTopic) {
39+
return Optional.ofNullable(inLongTopic.getProperties().get(KEY_SDK_START_TIME))
40+
.map(s -> {
41+
try {
42+
LocalDateTime time = LocalDateTime.parse(s.toString(), DATE_FORMAT);
43+
return LocalDateTime.from(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
44+
} catch (Throwable t) {
45+
logger.error("parse start time failed, plz check the format of start time : {}", s, t);
46+
}
47+
return DEFAULT_START_TIME;
48+
})
49+
.orElse(DEFAULT_START_TIME);
50+
}
51+
52+
public static long parseStopTime(InLongTopic inLongTopic) {
53+
return Optional.ofNullable(inLongTopic.getProperties().get(KEY_SDK_STOP_TIME))
54+
.map(s -> {
55+
try {
56+
LocalDateTime time = LocalDateTime.parse(s.toString(), DATE_FORMAT);
57+
return LocalDateTime.from(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
58+
} catch (Throwable t) {
59+
logger.error("parse start time failed, plz check the format of stop time : {}", s, t);
60+
}
61+
return DEFAULT_STOP_TIME;
62+
})
63+
.orElse(DEFAULT_STOP_TIME);
64+
}
65+
66+
}

inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImplTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.powermock.api.mockito.PowerMockito.when;
2121

2222
import java.util.Collection;
23+
import java.util.HashMap;
2324
import java.util.Set;
2425
import java.util.concurrent.ConcurrentHashMap;
2526
import org.apache.inlong.sdk.sort.api.ClientContext;
@@ -53,6 +54,7 @@ public class InLongTopicManagerImplTest {
5354
inLongTopic.setTopic("testTopic");
5455
inLongTopic.setPartitionId(0);
5556
inLongTopic.setTopicType("pulsar");
57+
inLongTopic.setProperties(new HashMap<>());
5658

5759
CacheZoneCluster cacheZoneCluster = new CacheZoneCluster("clusterId", "bootstraps", "token");
5860
inLongTopic.setInLongCluster(cacheZoneCluster);

inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ private void setUp() throws Exception {
6666
inLongTopic.setTopic("testTopic");
6767
CacheZoneCluster cacheZoneCluster = new CacheZoneCluster("clusterId", "bootstraps", "token");
6868
inLongTopic.setInLongCluster(cacheZoneCluster);
69+
inLongTopic.setProperties(new HashMap<>());
6970

7071
when(context.getConfig()).thenReturn(sortClientConfig);
7172
when(context.getStatManager()).thenReturn(statManager);

inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImplTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.powermock.core.classloader.annotations.PrepareForTest;
3737
import org.powermock.modules.junit4.PowerMockRunner;
3838

39+
import java.util.HashMap;
40+
3941
@PowerMockIgnore("javax.management.*")
4042
@RunWith(PowerMockRunner.class)
4143
@PrepareForTest({ClientContext.class})
@@ -57,6 +59,7 @@ public void setUp() throws Exception {
5759
inLongTopic.setTopic("testTopic");
5860
inLongTopic.setPartitionId(0);
5961
inLongTopic.setTopicType("pulsar");
62+
inLongTopic.setProperties(new HashMap<>());
6063

6164
CacheZoneCluster cacheZoneCluster = new CacheZoneCluster("clusterId", "bootstraps", "token");
6265
inLongTopic.setInLongCluster(cacheZoneCluster);

inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.powermock.api.mockito.PowerMockito.doNothing;
2525
import static org.powermock.api.mockito.PowerMockito.when;
2626

27+
import java.util.HashMap;
2728
import java.util.concurrent.ConcurrentHashMap;
2829
import org.apache.inlong.sdk.sort.api.ClientContext;
2930
import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
@@ -68,6 +69,7 @@ public void setUp() throws Exception {
6869
inLongTopic.setTopic("testTopic");
6970
inLongTopic.setPartitionId(0);
7071
inLongTopic.setTopicType("pulsar");
72+
inLongTopic.setProperties(new HashMap<>());
7173

7274
CacheZoneCluster cacheZoneCluster = new CacheZoneCluster("clusterId", "bootstraps", "token");
7375
inLongTopic.setInLongCluster(cacheZoneCluster);

inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.junit.Test;
3434
import org.powermock.api.mockito.PowerMockito;
3535

36+
import java.util.HashMap;
37+
3638
public class InLongTubeFetcherImplTest {
3739

3840
private ClientContext clientContext;
@@ -51,6 +53,7 @@ public void setUp() throws Exception {
5153
inLongTopic.setTopic("testTopic");
5254
inLongTopic.setPartitionId(0);
5355
inLongTopic.setTopicType("pulsar");
56+
inLongTopic.setProperties(new HashMap<>());
5457

5558
CacheZoneCluster cacheZoneCluster = new CacheZoneCluster("clusterId", "bootstraps", "token");
5659
inLongTopic.setInLongCluster(cacheZoneCluster);

0 commit comments

Comments
 (0)