Skip to content

Commit 57cf8a7

Browse files
authored
[INLONG-2255] DataProxy support audit sdk. (#2260)
1 parent cb2229c commit 57cf8a7

File tree

9 files changed

+284
-82
lines changed

9 files changed

+284
-82
lines changed

inlong-dataproxy/dataproxy-source/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,10 @@
4343
<version>${project.version}</version>
4444
<scope>compile</scope>
4545
</dependency>
46+
<dependency>
47+
<groupId>org.apache.inlong</groupId>
48+
<artifactId>audit-sdk</artifactId>
49+
<version>${project.version}</version>
50+
</dependency>
4651
</dependencies>
4752
</project>

inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java

+46-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
import org.apache.inlong.commons.config.metrics.Dimension;
2626
import org.apache.inlong.commons.config.metrics.MetricDomain;
2727
import org.apache.inlong.commons.config.metrics.MetricItem;
28+
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
29+
import org.apache.inlong.dataproxy.consts.AttributeConstants;
30+
import org.apache.inlong.dataproxy.consts.ConfigConstants;
31+
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
2832
import org.apache.inlong.dataproxy.utils.Constants;
2933

3034
/**
@@ -112,12 +116,52 @@ public class DataProxyMetricItem extends MetricItem {
112116
*/
113117
public static void fillInlongId(Event event, Map<String, String> dimensions) {
114118
Map<String, String> headers = event.getHeaders();
115-
String inlongGroupId = headers.getOrDefault(Constants.INLONG_GROUP_ID, "");
116-
String inlongStreamId = headers.getOrDefault(Constants.INLONG_STREAM_ID, "");
119+
String inlongGroupId = getInlongGroupId(headers);
120+
String inlongStreamId = getInlongStreamId(headers);
117121
dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
118122
dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
119123
}
120124

125+
/**
126+
* fillAuditFormatTime
127+
*
128+
* @param event
129+
* @param dimensions
130+
*/
131+
public static void fillAuditFormatTime(Event event, Map<String, String> dimensions) {
132+
long msgTime = AuditUtils.getLogTime(event);
133+
long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
134+
dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
135+
}
136+
137+
/**
138+
* getInlongGroupId
139+
*
140+
* @param headers
141+
* @return
142+
*/
143+
public static String getInlongGroupId(Map<String, String> headers) {
144+
String inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
145+
if (inlongGroupId == null) {
146+
inlongGroupId = headers.getOrDefault(ConfigConstants.TOPIC_KEY, "");
147+
}
148+
return inlongGroupId;
149+
}
150+
151+
/**
152+
* getInlongStreamId
153+
*
154+
* @param headers
155+
* @return
156+
*/
157+
public static String getInlongStreamId(Map<String, String> headers) {
158+
String inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
159+
if (inlongStreamId == null) {
160+
inlongStreamId = headers.getOrDefault(AttributeConstants.INTERFACE_ID, "");
161+
}
162+
return inlongStreamId;
163+
}
164+
121165
/**
122166
* get clusterId
123167
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.dataproxy.metrics.audit;
19+
20+
import java.util.HashSet;
21+
import java.util.Map;
22+
23+
import org.apache.commons.lang.BooleanUtils;
24+
import org.apache.commons.lang.math.NumberUtils;
25+
import org.apache.commons.lang3.StringUtils;
26+
import org.apache.flume.Event;
27+
import org.apache.inlong.audit.AuditImp;
28+
import org.apache.inlong.audit.util.AuditConfig;
29+
import org.apache.inlong.dataproxy.config.ConfigManager;
30+
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
31+
import org.apache.inlong.dataproxy.consts.AttributeConstants;
32+
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
33+
import org.apache.inlong.dataproxy.utils.Constants;
34+
35+
/**
36+
*
37+
* AuditUtils
38+
*/
39+
public class AuditUtils {
40+
41+
public static final String AUDIT_KEY_FILE_PATH = "audit.filePath";
42+
public static final String AUDIT_DEFAULT_FILE_PATH = "/data/inlong/audit/";
43+
public static final String AUDIT_KEY_MAX_CACHE_ROWS = "audit.maxCacheRows";
44+
public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
45+
public static final String AUDIT_KEY_PROXYS = "audit.proxys";
46+
public static final String AUDIT_KEY_IS_AUDIT = "audit.isAudit";
47+
48+
public static final int AUDIT_ID_DATAPROXY_READ_SUCCESS = 5;
49+
public static final int AUDIT_ID_DATAPROXY_SEND_SUCCESS = 6;
50+
51+
private static boolean IS_AUDIT = true;
52+
53+
/**
54+
* initAudit
55+
*/
56+
public static void initAudit() {
57+
// IS_AUDIT
58+
IS_AUDIT = BooleanUtils.toBoolean(ConfigManager.getInstance().getCommonProperties().get(AUDIT_KEY_IS_AUDIT));
59+
if (IS_AUDIT) {
60+
// AuditProxy
61+
String strIpPorts = ConfigManager.getInstance().getCommonProperties().get(AUDIT_KEY_PROXYS);
62+
HashSet<String> proxys = new HashSet<>();
63+
if (!StringUtils.isBlank(strIpPorts)) {
64+
String[] ipPorts = strIpPorts.split("\\s+");
65+
for (String ipPort : ipPorts) {
66+
proxys.add(ipPort);
67+
}
68+
}
69+
AuditImp.getInstance().setAuditProxy(proxys);
70+
// AuditConfig
71+
String filePath = ConfigManager.getInstance().getCommonProperties().getOrDefault(AUDIT_KEY_FILE_PATH,
72+
AUDIT_DEFAULT_FILE_PATH);
73+
int maxCacheRow = NumberUtils.toInt(
74+
ConfigManager.getInstance().getCommonProperties().get(AUDIT_KEY_MAX_CACHE_ROWS),
75+
AUDIT_DEFAULT_MAX_CACHE_ROWS);
76+
AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
77+
AuditImp.getInstance().setAuditConfig(auditConfig);
78+
}
79+
}
80+
81+
/**
82+
* add
83+
*
84+
* @param auditID
85+
* @param event
86+
*/
87+
public static void add(int auditID, Event event) {
88+
if (IS_AUDIT && event != null) {
89+
Map<String, String> headers = event.getHeaders();
90+
String inlongGroupId = DataProxyMetricItem.getInlongGroupId(headers);
91+
String inlongStreamId = DataProxyMetricItem.getInlongStreamId(headers);
92+
long logTime = getLogTime(headers);
93+
AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
94+
}
95+
}
96+
97+
/**
98+
* getLogTime
99+
*
100+
* @param headers
101+
* @return
102+
*/
103+
public static long getLogTime(Map<String, String> headers) {
104+
String strLogTime = headers.get(Constants.HEADER_KEY_MSG_TIME);
105+
if (strLogTime == null) {
106+
strLogTime = headers.get(AttributeConstants.DATA_TIME);
107+
}
108+
if (strLogTime == null) {
109+
return System.currentTimeMillis();
110+
}
111+
long logTime = NumberUtils.toLong(strLogTime, 0);
112+
if (logTime == 0) {
113+
logTime = System.currentTimeMillis();
114+
}
115+
return logTime;
116+
}
117+
118+
/**
119+
* getLogTime
120+
*
121+
* @param event
122+
* @return
123+
*/
124+
public static long getLogTime(Event event) {
125+
if (event != null) {
126+
Map<String, String> headers = event.getHeaders();
127+
return getLogTime(headers);
128+
}
129+
return System.currentTimeMillis();
130+
}
131+
132+
/**
133+
* getAuditFormatTime
134+
*
135+
* @param msgTime
136+
* @return
137+
*/
138+
public static long getAuditFormatTime(long msgTime) {
139+
long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
140+
return auditFormatTime;
141+
}
142+
143+
/**
144+
* sendReport
145+
*/
146+
public static void sendReport() {
147+
AuditImp.getInstance().sendReport();
148+
}
149+
}

inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.inlong.dataproxy.config.ConfigManager;
5656
import org.apache.inlong.dataproxy.config.RemoteConfigManager;
5757
import org.apache.inlong.dataproxy.metrics.MetricObserver;
58+
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
5859
import org.slf4j.Logger;
5960
import org.slf4j.LoggerFactory;
6061

@@ -269,6 +270,7 @@ private void loadMonitoring() {
269270

270271
/**
271272
* main
273+
*
272274
* @param args
273275
*/
274276
public static void main(String[] args) {
@@ -390,17 +392,20 @@ public static void main(String[] args) {
390392
application.handleConfigurationEvent(configurationProvider.getConfiguration());
391393
}
392394
}
393-
//metrics
395+
// metrics
394396
MetricObserver.init(ConfigManager.getInstance().getCommonProperties());
395-
396-
//start application
397+
// audit
398+
AuditUtils.initAudit();
399+
400+
// start application
397401
application.start();
398402

399403
final Application appReference = application;
400404
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
401405

402406
@Override
403407
public void run() {
408+
AuditUtils.sendReport();
404409
appReference.stop();
405410
}
406411
});

inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.inlong.dataproxy.consts.ConfigConstants;
4747
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
4848
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
49+
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
4950
import org.apache.inlong.dataproxy.utils.Constants;
5051
import org.apache.inlong.dataproxy.utils.NetworkUtils;
5152
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
@@ -571,27 +572,25 @@ public void onMessageSent(final MessageSentResult result) {
571572
/**
572573
* addMetric
573574
*
574-
* @param currentRecord
575-
* @param topic
575+
* @param event
576576
* @param result
577-
* @param size
577+
* @param sendTime
578578
*/
579-
private void addMetric(Event currentRecord, boolean result, long sendTime) {
579+
private void addMetric(Event event, boolean result, long sendTime) {
580580
Map<String, String> dimensions = new HashMap<>();
581581
dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, MetaSink.this.getName());
582582
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, MetaSink.this.getName());
583-
if (currentRecord.getHeaders().containsKey(TOPIC)) {
584-
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, currentRecord.getHeaders().get(TOPIC));
585-
} else {
586-
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
587-
}
583+
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, ""));
584+
DataProxyMetricItem.fillInlongId(event, dimensions);
585+
DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
588586
DataProxyMetricItem metricItem = MetaSink.this.metricItemSet.findMetricItem(dimensions);
589587
if (result) {
590588
metricItem.sendSuccessCount.incrementAndGet();
591-
metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
589+
metricItem.sendSuccessSize.addAndGet(event.getBody().length);
590+
AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
592591
if (sendTime > 0) {
593592
long currentTime = System.currentTimeMillis();
594-
long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
593+
long msgTime = NumberUtils.toLong(event.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
595594
sendTime);
596595
long sinkDuration = currentTime - sendTime;
597596
long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
@@ -602,7 +601,7 @@ private void addMetric(Event currentRecord, boolean result, long sendTime) {
602601
}
603602
} else {
604603
metricItem.sendFailCount.incrementAndGet();
605-
metricItem.sendFailSize.addAndGet(currentRecord.getBody().length);
604+
metricItem.sendFailSize.addAndGet(event.getBody().length);
606605
}
607606
}
608607

inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.inlong.dataproxy.consts.ConfigConstants;
4747
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
4848
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
49+
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
4950
import org.apache.inlong.dataproxy.utils.Constants;
5051
import org.apache.inlong.dataproxy.utils.NetworkUtils;
5152
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
@@ -579,27 +580,26 @@ public void onMessageSent(final MessageSentResult result) {
579580
/**
580581
* addMetric
581582
*
582-
* @param currentRecord
583-
* @param topic
583+
* @param event
584584
* @param result
585-
* @param size
585+
* @param sendTime
586586
*/
587-
private void addMetric(Event currentRecord, boolean result, long sendTime) {
587+
private void addMetric(Event event, boolean result, long sendTime) {
588588
Map<String, String> dimensions = new HashMap<>();
589589
dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, SimpleMessageTubeSink.this.getName());
590590
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, SimpleMessageTubeSink.this.getName());
591-
if (currentRecord.getHeaders().containsKey(TOPIC)) {
592-
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, currentRecord.getHeaders().get(TOPIC));
593-
} else {
594-
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
595-
}
591+
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, ""));
592+
DataProxyMetricItem.fillInlongId(event, dimensions);
593+
DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
594+
596595
DataProxyMetricItem metricItem = SimpleMessageTubeSink.this.metricItemSet.findMetricItem(dimensions);
597596
if (result) {
598597
metricItem.sendSuccessCount.incrementAndGet();
599-
metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
598+
metricItem.sendSuccessSize.addAndGet(event.getBody().length);
599+
AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
600600
if (sendTime > 0) {
601601
long currentTime = System.currentTimeMillis();
602-
long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
602+
long msgTime = NumberUtils.toLong(event.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
603603
sendTime);
604604
long sinkDuration = currentTime - sendTime;
605605
long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
@@ -610,7 +610,7 @@ private void addMetric(Event currentRecord, boolean result, long sendTime) {
610610
}
611611
} else {
612612
metricItem.sendFailCount.incrementAndGet();
613-
metricItem.sendFailSize.addAndGet(currentRecord.getBody().length);
613+
metricItem.sendFailSize.addAndGet(event.getBody().length);
614614
}
615615
}
616616

0 commit comments

Comments
 (0)