-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprocess_events.py
122 lines (89 loc) · 3.56 KB
/
process_events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import sys
import json
import os
import http.server
import threading
import time
import yaml
import queue
from globalnoc_alertmon_agent import AlertMonAgent, Alert
script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
config_file = script_dir+"/conf/config.yaml"
# read in config file
with open(config_file, "r") as stream:
config = yaml.safe_load(stream)
# Define the HTTP server settings
host = '127.0.0.1'
port = 8080
alert_queue = queue.Queue()
# Create a simple HTTP request handler
class MyHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
def do_POST(self):
# Get the content length from the headers
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
try:
json_data = json.loads(post_data)
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(f'Successfully received and parsed alert\n'.encode('utf-8'))
alert_queue.put_nowait(json_data)
except json.JSONDecodeError as e:
self.send_response(400)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(f'Error parsing alert: {e}\n'.encode('utf-8'))
# Function to start the HTTP server
def start_http_server():
with http.server.HTTPServer((host, port), MyHTTPRequestHandler) as httpd:
print(f"HTTP server is running at http://{host}:{port}")
httpd.serve_forever()
# Function to execute a task every five minutes
def send_alerts():
while True:
time.sleep(1 * 30)
print("Sending Alerts...")
# initialize the alertmon agent
agent = AlertMonAgent(
username = config.get('username'),
password = config.get('password'),
server = config.get('server'),
realm = config.get('realm')
)
# start with the alerts currently added in our system
agent.add_current_alerts()
# add alerts that we've queued up until there are no more remaining or we've reached 250
alerts_processed = 0
while alert_queue.qsize() > 0 and alerts_processed <= 250:
alert_msg = alert_queue.get()
alert_queue.task_done()
alert = Alert(
node_name = alert_msg.get('node_name'),
service_name = alert_msg.get('service_name'),
description = alert_msg.get('description'),
severity = alert_msg.get('severity')
)
# if this alert is OK ( i.e. it has cleared ) delete any active alert that might exist
if( alert.get('severity') == 'OK'):
agent.delete_alert(alert)
# otherwise add the alert to our agent's list of alerts to send
else:
agent.add_alert(alert)
alerts_processed += 1
# expire event based alerts based on time
for alert in agent.get_alerts():
if( alert.is_older_than(seconds=250) ):
agent.delete_alert( alert )
agent.send_alerts()
def main():
# Create and start the HTTP server thread
http_server_thread = threading.Thread(target=start_http_server)
http_server_thread.start()
# Create and start the task execution thread
send_alerts_thread = threading.Thread(target=send_alerts)
send_alerts_thread.start()
# Wait for the threads to finish
http_server_thread.join()
send_alerts_thread.join()
main()