Skip to content

Commit 9a136a6

Browse files
luchunliangvernedeng
authored andcommitted
[INLONG-4777][SortStandalone] Change ack policy from checking message count to checking every message (apache#4778)
1 parent 395a4b9 commit 9a136a6

File tree

10 files changed

+217
-61
lines changed

10 files changed

+217
-61
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.sort.standalone.config.holder;
19+
20+
/**
21+
* AckPolicy
22+
*
23+
*/
24+
public enum AckPolicy {
25+
26+
COUNT(0), TOKEN(1);
27+
28+
private final int value;
29+
30+
/**
31+
* Constructor
32+
* @param value
33+
*/
34+
private AckPolicy(int value) {
35+
this.value = value;
36+
}
37+
38+
/**
39+
* getValue
40+
* @return int
41+
*/
42+
public int getValue() {
43+
return value;
44+
}
45+
46+
/**
47+
* getAckPolicy
48+
* @param value
49+
* @return AckPolicy
50+
*/
51+
public static AckPolicy getAckPolicy(int value) {
52+
switch (value) {
53+
case 0 :
54+
return COUNT;
55+
case 1 :
56+
return TOKEN;
57+
default :
58+
return COUNT;
59+
}
60+
}
61+
62+
/**
63+
* getAckPolicy
64+
* @param name
65+
* @return AckPolicy
66+
*/
67+
public static AckPolicy getAckPolicy(String name) {
68+
if (AckPolicy.COUNT.name().equalsIgnoreCase(name)) {
69+
return AckPolicy.COUNT;
70+
} else if (AckPolicy.TOKEN.name().equalsIgnoreCase(name)) {
71+
return AckPolicy.TOKEN;
72+
} else {
73+
return AckPolicy.COUNT;
74+
}
75+
}
76+
77+
}

inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
package org.apache.inlong.sort.standalone.config.holder;
1919

20-
import java.util.Map;
21-
import java.util.concurrent.ConcurrentHashMap;
22-
2320
import org.apache.commons.lang3.ClassUtils;
2421
import org.apache.commons.lang3.math.NumberUtils;
2522
import org.apache.flume.Context;
@@ -28,6 +25,9 @@
2825
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
2926
import org.slf4j.Logger;
3027

28+
import java.util.Map;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
3131
/**
3232
*
3333
* CommonPropertiesHolder
@@ -38,11 +38,13 @@ public class CommonPropertiesHolder {
3838
public static final String DEFAULT_LOADER = ClassResourceCommonPropertiesLoader.class.getName();
3939
public static final String KEY_COMMON_PROPERTIES = "common_properties_loader";
4040
public static final String KEY_CLUSTER_ID = "clusterId";
41+
public static final String KEY_SORT_SOURCE_ACKPOLICY = "sortSource.ackPolicy";
4142

4243
private static Map<String, String> props;
4344
private static Context context;
4445

4546
private static long auditFormatInterval = 60000L;
47+
private static AckPolicy ackPolicy;
4648

4749
/**
4850
* init
@@ -60,8 +62,11 @@ private static void init() {
6062
CommonPropertiesLoader loader = (CommonPropertiesLoader) loaderObject;
6163
props.putAll(loader.load());
6264
LOG.info("loaderClass:{},properties:{}", loaderClassName, props);
63-
auditFormatInterval = NumberUtils
65+
CommonPropertiesHolder.auditFormatInterval = NumberUtils
6466
.toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L);
67+
String strAckPolicy = CommonPropertiesHolder.getString(KEY_SORT_SOURCE_ACKPOLICY,
68+
AckPolicy.COUNT.name());
69+
CommonPropertiesHolder.ackPolicy = AckPolicy.getAckPolicy(strAckPolicy);
6570
}
6671
} catch (Throwable t) {
6772
LOG.error("Fail to init CommonPropertiesLoader,loaderClass:{},error:{}",
@@ -205,4 +210,12 @@ public static long getAuditFormatInterval() {
205210
return auditFormatInterval;
206211
}
207212

213+
/**
214+
* get ackPolicy
215+
* @return the ackPolicy
216+
*/
217+
public static AckPolicy getAckPolicy() {
218+
return ackPolicy;
219+
}
220+
208221
}

inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void put(Event event) throws ChannelException {
8181
ProfileEvent profile = (ProfileEvent) event;
8282
transaction.doPut(profile);
8383
} else {
84-
ProfileEvent profile = new ProfileEvent(event.getBody(), event.getHeaders(), null);
84+
ProfileEvent profile = new ProfileEvent(event.getHeaders(), event.getBody());
8585
transaction.doPut(profile);
8686
}
8787
}

inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java

+53-2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919

2020
import org.apache.inlong.sdk.sort.api.SortClient;
2121
import org.apache.inlong.sdk.sort.entity.MessageRecord;
22+
import org.apache.inlong.sort.standalone.config.holder.AckPolicy;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
2425

26+
import java.util.HashSet;
27+
import java.util.Set;
2528
import java.util.concurrent.atomic.AtomicInteger;
2629

2730
/**
@@ -35,24 +38,56 @@ public class CacheMessageRecord {
3538
private final String msgKey;
3639
private final String offset;
3740
private final AtomicInteger ackCount;
41+
private final AckPolicy ackPolicy;
42+
private Set<Integer> tokenSet;
3843

3944
/**
4045
* Constructor
4146
*
4247
* @param msgRecord
4348
* @param client
4449
*/
45-
public CacheMessageRecord(MessageRecord msgRecord, SortClient client) {
50+
public CacheMessageRecord(MessageRecord msgRecord, SortClient client, AckPolicy ackPolicy) {
4651
this.msgKey = msgRecord.getMsgKey();
4752
this.offset = msgRecord.getOffset();
4853
this.ackCount = new AtomicInteger(msgRecord.getMsgs().size());
4954
this.client = client;
55+
this.ackPolicy = ackPolicy;
56+
if (AckPolicy.TOKEN.equals(ackPolicy)) {
57+
this.tokenSet = new HashSet<>();
58+
for (int i = 0; i < msgRecord.getMsgs().size(); i++) {
59+
this.tokenSet.add(i);
60+
}
61+
}
62+
}
63+
64+
/**
65+
* getToken
66+
* @return
67+
*/
68+
public Integer getToken() {
69+
if (AckPolicy.TOKEN.equals(ackPolicy)) {
70+
return this.ackCount.decrementAndGet();
71+
}
72+
return 0;
5073
}
5174

5275
/**
5376
* ackMessage
77+
* @param ackToken ackToken
78+
*/
79+
public void ackMessage(int ackToken) {
80+
if (AckPolicy.TOKEN.equals(ackPolicy)) {
81+
this.ackMessageByToken(ackToken);
82+
return;
83+
}
84+
this.ackMessageByCount();
85+
}
86+
87+
/**
88+
* ackMessageByCount
5489
*/
55-
public void ackMessage() {
90+
private void ackMessageByCount() {
5691
int result = this.ackCount.decrementAndGet();
5792
if (result == 0 && client != null) {
5893
try {
@@ -62,4 +97,20 @@ public void ackMessage() {
6297
}
6398
}
6499
}
100+
101+
/**
102+
* ackMessageByToken
103+
* @param ackToken ackToken
104+
*/
105+
private void ackMessageByToken(int ackToken) {
106+
this.tokenSet.remove(ackToken);
107+
int result = this.tokenSet.size();
108+
if (result == 0 && client != null) {
109+
try {
110+
client.ack(msgKey, offset);
111+
} catch (Exception e) {
112+
LOG.error(e.getMessage(), e);
113+
}
114+
}
115+
}
65116
}

inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java

+37-8
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.commons.lang3.math.NumberUtils;
2121
import org.apache.flume.event.SimpleEvent;
22+
import org.apache.inlong.sdk.sort.entity.InLongMessage;
2223
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
2324
import org.apache.inlong.sort.standalone.utils.Constants;
2425

@@ -35,25 +36,45 @@ public class ProfileEvent extends SimpleEvent {
3536
private final String uid;
3637

3738
private final long rawLogTime;
39+
private final String sourceIp;
3840
private final long fetchTime;
39-
private final CacheMessageRecord cacheRecord;
41+
private CacheMessageRecord cacheRecord;
42+
private final int ackToken;
4043

4144
/**
4245
* Constructor
43-
*
44-
* @param body
4546
* @param headers
46-
* @param cacheRecord
47+
* @param body
4748
*/
48-
public ProfileEvent(byte[] body, Map<String, String> headers, CacheMessageRecord cacheRecord) {
49-
super.setBody(body);
49+
public ProfileEvent(Map<String, String> headers, byte[] body) {
5050
super.setHeaders(headers);
51-
this.cacheRecord = cacheRecord;
51+
super.setBody(body);
5252
this.inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
5353
this.inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
5454
this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
5555
this.fetchTime = System.currentTimeMillis();
5656
this.rawLogTime = NumberUtils.toLong(headers.get(Constants.HEADER_KEY_MSG_TIME), fetchTime);
57+
this.sourceIp = headers.get(Constants.HEADER_KEY_SOURCE_IP);
58+
this.ackToken = 0;
59+
}
60+
61+
/**
62+
* Constructor
63+
*
64+
* @param sdkMessage
65+
* @param cacheRecord
66+
*/
67+
public ProfileEvent(InLongMessage sdkMessage, CacheMessageRecord cacheRecord) {
68+
super.setHeaders(sdkMessage.getParams());
69+
super.setBody(sdkMessage.getBody());
70+
this.inlongGroupId = sdkMessage.getInlongGroupId();
71+
this.inlongStreamId = sdkMessage.getInlongStreamId();
72+
this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
73+
this.rawLogTime = sdkMessage.getMsgTime();
74+
this.sourceIp = sdkMessage.getSourceIp();
75+
this.cacheRecord = cacheRecord;
76+
this.fetchTime = System.currentTimeMillis();
77+
this.ackToken = cacheRecord.getToken();
5778
}
5879

5980
/**
@@ -83,6 +104,14 @@ public long getRawLogTime() {
83104
return rawLogTime;
84105
}
85106

107+
/**
108+
* get sourceIp
109+
* @return the sourceIp
110+
*/
111+
public String getSourceIp() {
112+
return sourceIp;
113+
}
114+
86115
/**
87116
* get fetchTime
88117
*
@@ -115,7 +144,7 @@ public CacheMessageRecord getCacheRecord() {
115144
*/
116145
public void ack() {
117146
if (cacheRecord != null) {
118-
cacheRecord.ackMessage();
147+
cacheRecord.ackMessage(ackToken);
119148
}
120149
}
121150
}

inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java

+9-13
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.tencentcloudapi.cls.producer.util.NetworkUtils;
2626

2727
import org.apache.commons.lang3.ClassUtils;
28-
import org.apache.commons.lang3.math.NumberUtils;
2928
import org.apache.flume.Channel;
3029
import org.apache.flume.Context;
3130
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
@@ -268,25 +267,22 @@ private void removeExpireClient(String secretId) {
268267
public void addSendResultMetric(ProfileEvent currentRecord, String bid, boolean result, long sendTime) {
269268
Map<String, String> dimensions = this.getDimensions(currentRecord, bid);
270269
SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
271-
long count = 1;
272-
long size = currentRecord.getBody().length;
273270
if (result) {
274-
metricItem.sendSuccessCount.addAndGet(count);
275-
metricItem.sendSuccessSize.addAndGet(size);
271+
metricItem.sendSuccessCount.incrementAndGet();
272+
metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
276273
AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, currentRecord);
277274
if (sendTime > 0) {
278-
long currentTime = System.currentTimeMillis();
275+
final long currentTime = System.currentTimeMillis();
279276
long sinkDuration = currentTime - sendTime;
280-
long nodeDuration = currentTime
281-
- NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, currentRecord.getRawLogTime());
277+
long nodeDuration = currentTime - currentRecord.getFetchTime();
282278
long wholeDuration = currentTime - currentRecord.getRawLogTime();
283-
metricItem.sinkDuration.addAndGet(sinkDuration * count);
284-
metricItem.nodeDuration.addAndGet(nodeDuration * count);
285-
metricItem.wholeDuration.addAndGet(wholeDuration * count);
279+
metricItem.sinkDuration.addAndGet(sinkDuration);
280+
metricItem.nodeDuration.addAndGet(nodeDuration);
281+
metricItem.wholeDuration.addAndGet(wholeDuration);
286282
}
287283
} else {
288-
metricItem.sendFailCount.addAndGet(count);
289-
metricItem.sendFailSize.addAndGet(size);
284+
metricItem.sendFailCount.incrementAndGet();
285+
metricItem.sendFailSize.addAndGet(currentRecord.getBody().length);
290286
}
291287
}
292288

0 commit comments

Comments
 (0)