Skip to content

Commit ee546a1

Browse files
luchunliangbruceneenhl
authored andcommittedAug 12, 2022
[INLONG-4741][Audit] AuditStore support ClickHouse sink (apache#4747)
1 parent 68f0367 commit ee546a1

File tree

18 files changed

+625
-67
lines changed

18 files changed

+625
-67
lines changed
 

‎inlong-audit/audit-store/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@
156156
<groupId>org.elasticsearch</groupId>
157157
<artifactId>elasticsearch</artifactId>
158158
</dependency>
159+
<dependency>
160+
<groupId>ru.yandex.clickhouse</groupId>
161+
<artifactId>clickhouse-jdbc</artifactId>
162+
</dependency>
159163
<dependency>
160164
<groupId>com.google.code.gson</groupId>
161165
<artifactId>gson</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.audit.config;
19+
20+
import org.springframework.beans.factory.annotation.Value;
21+
import org.springframework.context.annotation.Configuration;
22+
23+
import lombok.Getter;
24+
import lombok.Setter;
25+
26+
@Configuration
27+
@Getter
28+
@Setter
29+
public class ClickHouseConfig {
30+
31+
@Value("${clickhouse.driver}")
32+
private String driver;
33+
34+
@Value("${clickhouse.url}")
35+
private String url;
36+
37+
@Value("${clickhouse.username}")
38+
private String username;
39+
40+
@Value("${clickhouse.password}")
41+
private String password;
42+
43+
@Value("${clickhouse.batchIntervalMs:1000}")
44+
private int batchIntervalMs;
45+
46+
@Value("${clickhouse.batchThreshold:500}")
47+
private int batchThreshold;
48+
49+
@Value("${clickhouse.processIntervalMs:100}")
50+
private int processIntervalMs;
51+
}

‎inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java

+4
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,8 @@ public boolean isElasticsearchStore() {
3838
return store.contains("elasticsearch");
3939
}
4040

41+
public boolean isClickHouseStore() {
42+
return store.contains("clickhouse");
43+
}
44+
4145
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.audit.db.entities;
19+
20+
import java.sql.Timestamp;
21+
22+
import lombok.Getter;
23+
import lombok.Setter;
24+
25+
@Getter
26+
@Setter
27+
public class ClickHouseDataPo {
28+
29+
private String ip;
30+
private String dockerId;
31+
private String threadId;
32+
private Timestamp sdkTs;
33+
private long packetId;
34+
private Timestamp logTs;
35+
private String inlongGroupId;
36+
private String inlongStreamId;
37+
private String auditId;
38+
private long count;
39+
private long size;
40+
private long delay;
41+
private Timestamp updateTime;
42+
}

‎inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java

+27-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.inlong.audit.service;
1919

20+
import org.apache.inlong.audit.config.ClickHouseConfig;
2021
import org.apache.inlong.audit.config.MessageQueueConfig;
2122
import org.apache.inlong.audit.config.StoreConfig;
2223
import org.apache.inlong.audit.db.dao.AuditDataDao;
@@ -29,6 +30,9 @@
2930
import org.springframework.beans.factory.annotation.Autowired;
3031
import org.springframework.stereotype.Service;
3132

33+
import java.util.ArrayList;
34+
import java.util.List;
35+
3236
@Service
3337
public class AuditMsgConsumerServer implements InitializingBean {
3438

@@ -41,16 +45,19 @@ public class AuditMsgConsumerServer implements InitializingBean {
4145
private ElasticsearchService esService;
4246
@Autowired
4347
private StoreConfig storeConfig;
48+
@Autowired
49+
private ClickHouseConfig chConfig;
4450

4551
/**
4652
* Initializing bean
4753
*/
4854
public void afterPropertiesSet() {
49-
BaseConsume mqConsume;
55+
BaseConsume mqConsume = null;
56+
List<InsertData> insertServiceList = this.getInsertServiceList();
5057
if (mqConfig.isPulsar()) {
51-
mqConsume = new PulsarConsume(auditDataDao, esService, storeConfig, mqConfig);
58+
mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
5259
} else if (mqConfig.isTube()) {
53-
mqConsume = new TubeConsume(auditDataDao, esService, storeConfig, mqConfig);
60+
mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
5461
} else {
5562
LOG.error("unkown MessageQueue {}", mqConfig.getMqType());
5663
return;
@@ -59,12 +66,24 @@ public void afterPropertiesSet() {
5966
if (storeConfig.isElasticsearchStore()) {
6067
esService.startTimerRoutine();
6168
}
69+
mqConsume.start();
70+
}
6271

63-
if (mqConsume != null) {
64-
mqConsume.start();
65-
} else {
66-
LOG.error("fail to auditMsgConsumerServer");
72+
/**
73+
* getInsertServiceList
74+
* @return
75+
*/
76+
private List<InsertData> getInsertServiceList() {
77+
List<InsertData> insertServiceList = new ArrayList<>();
78+
if (storeConfig.isMysqlStore()) {
79+
insertServiceList.add(new MySqlService(auditDataDao));
80+
}
81+
if (storeConfig.isElasticsearchStore()) {
82+
insertServiceList.add(esService);
6783
}
84+
if (storeConfig.isClickHouseStore()) {
85+
insertServiceList.add(new ClickHouseService(chConfig));
86+
}
87+
return insertServiceList;
6888
}
69-
7089
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.audit.service;
19+
20+
import org.apache.inlong.audit.config.ClickHouseConfig;
21+
import org.apache.inlong.audit.db.entities.ClickHouseDataPo;
22+
import org.apache.inlong.audit.protocol.AuditData;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.sql.Connection;
27+
import java.sql.DriverManager;
28+
import java.sql.PreparedStatement;
29+
import java.sql.SQLException;
30+
import java.sql.Timestamp;
31+
import java.util.concurrent.Executors;
32+
import java.util.concurrent.LinkedBlockingQueue;
33+
import java.util.concurrent.ScheduledExecutorService;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.concurrent.atomic.AtomicInteger;
37+
38+
/**
39+
* ClickHouseService
40+
*/
41+
public class ClickHouseService implements InsertData, AutoCloseable {
42+
43+
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseService.class);
44+
public static final String INSERT_SQL = "insert into audit_data (ip, docker_id, thread_id,\r\n"
45+
+ " sdk_ts, packet_id, log_ts,\r\n"
46+
+ " inlong_group_id, inlong_stream_id, audit_id,\r\n"
47+
+ " count, size, delay, \r\n"
48+
+ " update_time)\r\n"
49+
+ " values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
50+
51+
private ClickHouseConfig chConfig;
52+
53+
private ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
54+
private LinkedBlockingQueue<ClickHouseDataPo> batchQueue;
55+
private AtomicBoolean needBatchOutput = new AtomicBoolean(false);
56+
private AtomicInteger batchCounter = new AtomicInteger(0);
57+
58+
private Connection conn;
59+
60+
/**
61+
* Constructor
62+
* @param chConfig ClickHouse service config, such as jdbc url, jdbc username, jdbc password.
63+
*/
64+
public ClickHouseService(ClickHouseConfig chConfig) {
65+
this.chConfig = chConfig;
66+
}
67+
68+
/**
69+
* start
70+
*/
71+
public void start() {
72+
// queue
73+
this.batchQueue = new LinkedBlockingQueue<>(
74+
chConfig.getBatchThreshold() * chConfig.getBatchIntervalMs() / chConfig.getProcessIntervalMs());
75+
// connection
76+
try {
77+
Class.forName(chConfig.getDriver());
78+
this.reconnect();
79+
} catch (Exception e) {
80+
LOG.error(e.getMessage(), e);
81+
}
82+
// timer
83+
long currentTime = System.currentTimeMillis();
84+
// batch output interval
85+
timerService.scheduleWithFixedDelay(() -> needBatchOutput.compareAndSet(false, true),
86+
currentTime + chConfig.getBatchIntervalMs(),
87+
chConfig.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
88+
// batch output process
89+
timerService.scheduleWithFixedDelay(() -> processOutput(),
90+
currentTime + chConfig.getProcessIntervalMs(),
91+
chConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS);
92+
}
93+
94+
/**
95+
* processOutput
96+
*/
97+
private void processOutput() {
98+
if (!this.needBatchOutput.get()) {
99+
return;
100+
}
101+
// output
102+
try (PreparedStatement pstat = this.conn.prepareStatement(INSERT_SQL)) {
103+
// insert data
104+
ClickHouseDataPo data = this.batchQueue.poll();
105+
int counter = 0;
106+
while (data != null) {
107+
pstat.setString(1, data.getIp());
108+
pstat.setString(2, data.getDockerId());
109+
pstat.setString(3, data.getThreadId());
110+
pstat.setTimestamp(4, data.getSdkTs());
111+
pstat.setLong(5, data.getPacketId());
112+
pstat.setTimestamp(6, data.getLogTs());
113+
pstat.setString(7, data.getInlongGroupId());
114+
pstat.setString(8, data.getInlongStreamId());
115+
pstat.setString(9, data.getAuditId());
116+
pstat.setLong(10, data.getCount());
117+
pstat.setLong(11, data.getSize());
118+
pstat.setLong(12, data.getDelay());
119+
pstat.setTimestamp(13, data.getUpdateTime());
120+
pstat.addBatch();
121+
this.batchCounter.decrementAndGet();
122+
if (++counter >= chConfig.getBatchThreshold()) {
123+
pstat.executeBatch();
124+
this.conn.commit();
125+
counter = 0;
126+
}
127+
}
128+
this.batchCounter.set(0);
129+
pstat.executeBatch();
130+
this.conn.commit();
131+
} catch (Exception e) {
132+
LOG.error(e.getMessage(), e);
133+
try {
134+
this.reconnect();
135+
} catch (SQLException e1) {
136+
LOG.error(e1.getMessage(), e1);
137+
}
138+
}
139+
140+
// recover
141+
this.needBatchOutput.compareAndSet(true, false);
142+
}
143+
144+
/**
145+
* reconnect
146+
* @throws SQLException Exception when creating connection.
147+
*/
148+
private void reconnect() throws SQLException {
149+
if (this.conn != null) {
150+
try {
151+
this.conn.close();
152+
} catch (Exception e) {
153+
LOG.error(e.getMessage(), e);
154+
}
155+
this.conn = null;
156+
}
157+
this.conn = DriverManager.getConnection(chConfig.getUrl(), chConfig.getUsername(),
158+
chConfig.getPassword());
159+
this.conn.setAutoCommit(false);
160+
}
161+
162+
/**
163+
* insert
164+
* @param msgBody audit data reading from Pulsar or other MessageQueue.
165+
*/
166+
@Override
167+
public void insert(AuditData msgBody) {
168+
ClickHouseDataPo data = new ClickHouseDataPo();
169+
data.setIp(msgBody.getIp());
170+
data.setThreadId(msgBody.getThreadId());
171+
data.setDockerId(msgBody.getDockerId());
172+
data.setPacketId(msgBody.getPacketId());
173+
data.setSdkTs(new Timestamp(msgBody.getSdkTs()));
174+
data.setLogTs(new Timestamp(msgBody.getLogTs()));
175+
data.setAuditId(msgBody.getAuditId());
176+
data.setCount(msgBody.getCount());
177+
data.setDelay(msgBody.getDelay());
178+
data.setInlongGroupId(msgBody.getInlongGroupId());
179+
data.setInlongStreamId(msgBody.getInlongStreamId());
180+
data.setSize(msgBody.getSize());
181+
data.setUpdateTime(new Timestamp(System.currentTimeMillis()));
182+
try {
183+
this.batchQueue.offer(data, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
184+
if (this.batchCounter.incrementAndGet() >= chConfig.getBatchThreshold()) {
185+
this.needBatchOutput.compareAndSet(false, true);
186+
}
187+
} catch (InterruptedException e) {
188+
LOG.error(e.getMessage(), e);
189+
throw new RuntimeException(e.getMessage(), e);
190+
}
191+
}
192+
193+
/**
194+
* close
195+
* @throws Exception Exception when closing ClickHouse connection.
196+
*/
197+
@Override
198+
public void close() throws Exception {
199+
this.conn.close();
200+
this.timerService.shutdown();
201+
}
202+
}

0 commit comments

Comments
 (0)