Skip to content

Commit 2fd1ee7

Browse files
committed
add quartz alarm consumer and cluster
1 parent c32495a commit 2fd1ee7

File tree

17 files changed

+334
-737
lines changed

17 files changed

+334
-737
lines changed

kafka-eagle-api/src/main/java/org/smartloli/kafka/eagle/api/im/IMService.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@ public interface IMService {
3333
public void sendGetMsgByDingDing(String data, String url);
3434

3535
/** Send alert message by wechat. */
36-
public void sendJsonMsgByWeChat(String data);
36+
public void sendJsonMsgByWeChat(String data,String url);
3737

3838
/** Send alert message by webhook. */
39-
public void sendJsonMsgByWebhook(String data, String maillist);
39+
public void sendPostMsgByWebhook(String data, String url);
40+
41+
public void sendGetMsgByWebhook(String data, String url);
4042

4143
}

kafka-eagle-api/src/main/java/org/smartloli/kafka/eagle/api/im/IMServiceImpl.java

+35-16
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
import java.util.HashMap;
2121
import java.util.Map;
22+
import java.util.concurrent.Callable;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
2225

2326
import org.slf4j.Logger;
2427
import org.slf4j.LoggerFactory;
@@ -27,7 +30,6 @@
2730
import org.smartloli.kafka.eagle.common.util.KConstants.WeChat;
2831
import org.smartloli.kafka.eagle.common.util.SystemConfigUtils;
2932

30-
import com.alibaba.fastjson.JSON;
3133
import com.alibaba.fastjson.JSONObject;
3234

3335
/**
@@ -40,17 +42,36 @@
4042
public class IMServiceImpl implements IMService {
4143

4244
private final Logger LOG = LoggerFactory.getLogger(IMServiceImpl.class);
45+
private static ExecutorService service = Executors.newFixedThreadPool(10);
4346

4447
/** Send Json msg by dingding. */
4548
@Override
46-
public void sendPostMsgByDingDing(String data, String url) {
47-
Map<String, Object> dingDingMarkdownMessage = getDingDingMarkdownMessage(IM.TITLE, data, true);
48-
LOG.info("IM[DingDing] response: " + HttpClientUtils.doPostJson(url, JSONObject.toJSONString(dingDingMarkdownMessage)));
49+
public void sendPostMsgByDingDing(final String data, final String url) {
50+
service.submit(new Callable<Integer>() {
51+
@Override
52+
public Integer call() throws Exception {
53+
Thread.sleep(200);
54+
return sendMsg(data, url);
55+
}
56+
});
57+
// release
58+
service.shutdown();
59+
}
60+
61+
private int sendMsg(String data, String url) {
62+
try {
63+
Map<String, Object> dingDingMarkdownMessage = getDingDingMarkdownMessage(IM.TITLE, data, true);
64+
LOG.info("IM[DingDing] response: " + HttpClientUtils.doPostJson(url, JSONObject.toJSONString(dingDingMarkdownMessage)));
65+
} catch (Exception e) {
66+
e.printStackTrace();
67+
return 0;
68+
}
69+
return 1;
4970
}
5071

5172
@Override
5273
public void sendGetMsgByDingDing(String data, String url) {
53-
74+
5475
}
5576

5677
/**
@@ -78,15 +99,9 @@ private static Map<String, Object> getDingDingMarkdownMessage(String title, Stri
7899

79100
/** Send Json msg by wechat. */
80101
@Override
81-
public void sendJsonMsgByWeChat(String data) {
82-
if (SystemConfigUtils.getBooleanProperty("kafka.eagle.im.wechat.enable")) {
83-
String token = SystemConfigUtils.getProperty("kafka.eagle.im.wechat.token");
84-
String uri = SystemConfigUtils.getProperty("kafka.eagle.im.wechat.url");
85-
String getToken = HttpClientUtils.doGet(token);
86-
String accessToken = JSON.parseObject(getToken).getString("access_token");
87-
Map<String, Object> wechatMarkdownMessage = getWeChatMarkdownMessage(data);
88-
LOG.info("IM[WeChat] response: " + HttpClientUtils.doPostJson(uri + accessToken, JSONObject.toJSONString(wechatMarkdownMessage)));
89-
}
102+
public void sendJsonMsgByWeChat(String data, String url) {
103+
Map<String, Object> wechatMarkdownMessage = getWeChatMarkdownMessage(data);
104+
LOG.info("IM[WeChat] response: " + HttpClientUtils.doPostJson(url, JSONObject.toJSONString(wechatMarkdownMessage)));
90105
}
91106

92107
private static Map<String, Object> getWeChatMarkdownMessage(String text) {
@@ -106,8 +121,12 @@ private static Map<String, Object> getWeChatMarkdownMessage(String text) {
106121
}
107122

108123
@Override
109-
public void sendJsonMsgByWebhook(String data, String maillist) {
110-
// TODO Auto-generated method stub
124+
public void sendPostMsgByWebhook(String data, String url) {
125+
126+
}
127+
128+
@Override
129+
public void sendGetMsgByWebhook(String data, String url) {
111130

112131
}
113132

kafka-eagle-api/src/test/java/org/smartloli/kafka/eagle/api/sms/TestIM.java

+32-6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import org.smartloli.kafka.eagle.api.email.module.ClusterContentModule;
2121
import org.smartloli.kafka.eagle.api.email.module.LagContentModule;
22+
import org.smartloli.kafka.eagle.api.im.IMFactory;
23+
import org.smartloli.kafka.eagle.api.im.IMService;
2224
import org.smartloli.kafka.eagle.api.im.IMServiceImpl;
2325
import org.smartloli.kafka.eagle.common.protocol.alarm.AlarmMessageInfo;
2426
import org.smartloli.kafka.eagle.common.util.CalendarUtils;
@@ -32,14 +34,36 @@
3234
*/
3335
public class TestIM {
3436
public static void main(String[] args) {
37+
testAlarmClusterByDingDingMarkDown();
38+
//testAlarmClusterByWeChatMarkDown();
39+
//testConsumerHeathyByWeChat();
40+
41+
}
42+
43+
/** New alarm im api. */
44+
private static void testAlarmClusterByWeChatMarkDown() {
45+
AlarmMessageInfo alarmMsg = new AlarmMessageInfo();
46+
// FF0000 (red), 008000(green), FFA500(yellow)
47+
alarmMsg.setTitle("`Kafka Eagle Alarm Notice`\n");
48+
alarmMsg.setAlarmContent("<font color=\"warning\">node.shutdown [ localhost:9092 ]</font>");
49+
// alarmMsg.setAlarmContent("<font color=\"#008000\">node.alive [
50+
// localhost:9092 ]</font>");
51+
alarmMsg.setAlarmDate("2019-10-07 21:43:22");
52+
alarmMsg.setAlarmLevel("P0");
53+
alarmMsg.setAlarmProject("Kafka");
54+
alarmMsg.setAlarmStatus("<font color=\"warning\">PROBLEM</font>");
55+
// alarmMsg.setAlarmStatus("<font color=\"#008000\">NORMAL</font>");
56+
alarmMsg.setAlarmTimes("current(1), max(7)");
3557

36-
testAlarmClusterByDingDing();
58+
IMServiceImpl im = new IMServiceImpl();
59+
im.sendJsonMsgByWeChat(alarmMsg.toWeChatMarkDown(),"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=maMN2krp0GwiyxoA6JCULLk8oLHwfnjnojeYGma_5KG5J_JHqTledeY6AHWE2rwLTF6I5yqu5LJUmUpSn7feXauFySZtnOGlAvRACz33V2UegR596xuyOT4fZIfNzB1cqJi3A-Eahbw7UVG2a8AaHvN0ZrSRPkQiqWola5p71FfCpnuDEAw63THmURdfMIcF3QB5KFzl-qHblqXfQLtpeA");
3760
}
3861

3962
/** New alarm im api. */
40-
private static void testAlarmClusterByDingDing() {
63+
private static void testAlarmClusterByDingDingMarkDown() {
4164
AlarmMessageInfo alarmMsg = new AlarmMessageInfo();
4265
// FF0000 (red), 008000(green), FFA500(yellow)
66+
alarmMsg.setTitle("**<font color=\"#FF0000\">Kafka Eagle Alarm Notice</font>** \n\n");
4367
alarmMsg.setAlarmContent("<font color=\"#FF0000\">node.shutdown [ localhost:9092 ]</font>");
4468
// alarmMsg.setAlarmContent("<font color=\"#008000\">node.alive [
4569
// localhost:9092 ]</font>");
@@ -50,8 +74,10 @@ private static void testAlarmClusterByDingDing() {
5074
// alarmMsg.setAlarmStatus("<font color=\"#008000\">NORMAL</font>");
5175
alarmMsg.setAlarmTimes("current(1), max(7)");
5276

53-
IMServiceImpl im = new IMServiceImpl();
54-
im.sendPostMsgByDingDing(alarmMsg.toDingDingMarkDown(), "");
77+
IMService im = new IMFactory().create();
78+
im.sendPostMsgByDingDing(alarmMsg.toDingDingMarkDown(),"https://oapi.dingtalk.com/robot/send?access_token=3b7b59d17db0145549b1f65f62921b44bacd1701e635e797da45318a94339060");
79+
//IMServiceImpl im = new IMServiceImpl();
80+
//im.sendPostMsgByDingDing(alarmMsg.toDingDingMarkDown(),"https://oapi.dingtalk.com/robot/send?access_token=3b7b59d17db0145549b1f65f62921b44bacd1701e635e797da45318a94339060");
5581
}
5682

5783
private static void testConsumerHeathyByWeChat() {
@@ -73,8 +99,8 @@ private static void testConsumerHeathyByWeChat() {
7399
lcm.setUser("smartloli.org@gmail.com");
74100

75101
IMServiceImpl im = new IMServiceImpl();
76-
im.sendJsonMsgByWeChat(ccm.toWeChatMarkDown());
77-
im.sendJsonMsgByWeChat(lcm.toWeChatMarkDown());
102+
im.sendJsonMsgByWeChat(ccm.toWeChatMarkDown(),"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=maMN2krp0GwiyxoA6JCULLk8oLHwfnjnojeYGma_5KG5J_JHqTledeY6AHWE2rwLTF6I5yqu5LJUmUpSn7feXauFySZtnOGlAvRACz33V2UegR596xuyOT4fZIfNzB1cqJi3A-Eahbw7UVG2a8AaHvN0ZrSRPkQiqWola5p71FfCpnuDEAw63THmURdfMIcF3QB5KFzl-qHblqXfQLtpeA");
103+
im.sendJsonMsgByWeChat(lcm.toWeChatMarkDown(),"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=maMN2krp0GwiyxoA6JCULLk8oLHwfnjnojeYGma_5KG5J_JHqTledeY6AHWE2rwLTF6I5yqu5LJUmUpSn7feXauFySZtnOGlAvRACz33V2UegR596xuyOT4fZIfNzB1cqJi3A-Eahbw7UVG2a8AaHvN0ZrSRPkQiqWola5p71FfCpnuDEAw63THmURdfMIcF3QB5KFzl-qHblqXfQLtpeA");
78104
}
79105

80106
private static void testClusterHeathyByDingDing() {

kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/protocol/alarm/AlarmConsumerInfo.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@ public class AlarmConsumerInfo extends BaseProtocol {
3030

3131
private int id;
3232
private String cluster = "";
33+
/** Consumer group name. */
3334
private String group = "";
3435
private String topic = "";
3536
private long lag = 0L;
3637
private String alarmGroup = "";
3738
private int alarmTimes;
38-
private int alarmMaxTimes ;
39+
private int alarmMaxTimes;
3940
private String alarmLevel = "";
4041
private String isNormal = "";
4142
private String isEnable = "";
@@ -50,6 +51,7 @@ public void setId(int id) {
5051
this.id = id;
5152
}
5253

54+
/** Consumer group name. */
5355
public String getGroup() {
5456
return group;
5557
}
@@ -82,8 +84,6 @@ public void setLag(long lag) {
8284
this.lag = lag;
8385
}
8486

85-
86-
8787
public String getAlarmGroup() {
8888
return alarmGroup;
8989
}

kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/protocol/alarm/AlarmMessageInfo.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class AlarmMessageInfo extends BaseProtocol {
4343
*
4444
*/
4545

46+
private String title;
4647
private String alarmStatus;
4748
private String alarmProject;
4849
private String alarmLevel;
@@ -51,8 +52,21 @@ public class AlarmMessageInfo extends BaseProtocol {
5152
private String alarmDate;
5253

5354
public String toDingDingMarkDown() {
54-
return ">#### AlarmStatus : [ **" + alarmStatus + "** ]\n" + "> #### AlarmLevel : [ " + alarmLevel + " ]\n" + "> #### AlarmProject : [ " + alarmProject + " ]\n" + "> #### AlarmTimes : [ " + alarmTimes + " ]\n"
55-
+ "> #### AlarmDate : [ " + alarmDate + " ]\n" + "> #### AlarmContent : [ " + alarmContent + " ]";
55+
return title + " \n\n>#### AlarmStatus : [ **" + alarmStatus + "** ]\n" + "> #### AlarmLevel : [ " + alarmLevel + " ]\n" + "> #### AlarmProject : [ " + alarmProject + " ]\n" + "> #### AlarmTimes : [ " + alarmTimes + " ]\n"
56+
+ "> #### AlarmDate : [ " + alarmDate + " ]\n" + "> #### AlarmContent : [ " + alarmContent + " ]";
57+
}
58+
59+
public String toWeChatMarkDown() {
60+
return title + " \n\n>AlarmStatus : [ **" + alarmStatus + "** ]\n" + "> AlarmLevel : [ " + alarmLevel + " ]\n" + "> AlarmProject : [ " + alarmProject + " ]\n" + "> AlarmTimes : [ " + alarmTimes + " ]\n"
61+
+ "> AlarmDate : [ " + alarmDate + " ]\n" + "> AlarmContent : [ " + alarmContent + " ]";
62+
}
63+
64+
public String getTitle() {
65+
return title;
66+
}
67+
68+
public void setTitle(String title) {
69+
this.title = title;
5670
}
5771

5872
public String getAlarmStatus() {

kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/util/EncryptUtils.java

-102
This file was deleted.

0 commit comments

Comments
 (0)