forked from PCampi/unimib-simpss
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlink_mqtt_kafka.py
67 lines (57 loc) · 2.23 KB
/
link_mqtt_kafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""File linking the MQTT broker to the Kafka cluster."""
import os
from typing import Dict
import pandas as pd
from simpss.producers import MqttKafkaProducer
import utils
def main():
"""Main function."""
logger = utils.get_logger()
logger.info("setting up MQTT")
# MQTT config
qos = int(os.environ.get("MQTT_QOS", 2))
client_id = str(os.environ.get("MQTT_CLIENT_ID", 'prod1'))
mqtt_address = str(os.environ.get("MQTT_ADDRESS", 'localhost'))
mqtt_topic = str(os.environ.get("MQTT_TOPIC", 'simpss'))
mqtt_max_inflight = int(os.environ.get("MQTT_MAX_INFLIGHT", 100))
mqtt_payload_key = str(os.environ.get("MQTT_PAYLOAD_KEY", 'id'))
mqtt_config = {
'client-id': client_id,
'address': mqtt_address,
'port': 1883,
'transport': 'tcp',
'topic': mqtt_topic,
'qos': qos,
'max-inflight': mqtt_max_inflight,
'payload-key': mqtt_payload_key,
'timeout': 1.0, # optional, default is 1.0
}
# KAFKA config
logger.info("setting up KAFKA")
bootstrap_servers = str(
os.environ.get("KAFKA_BOOTSTRAP_SERVERS", 'localhost:9092'))
kafka_timeout_ms = int(os.environ.get("KAFKA_TIMEOUS_MS", 6000))
kafka_client_id = str(os.environ.get("KAFKA_CLIENT_ID", 'k-prod-1'))
kafka_max_inflight = int(os.environ.get("KAFKA_MAX_INFLIGHT", 100))
kafka_linger_ms = int(os.environ.get("KAFKA_LINGER_MS", 1))
kafka_group_id = str(os.environ.get("KAFKA_GROUP_ID", '1'))
kk_config = {
'bootstrap.servers': bootstrap_servers,
'session.timeout.ms': kafka_timeout_ms,
'group.id': kafka_group_id,
'client.id': kafka_client_id,
'max.in.flight': kafka_max_inflight,
'linger.ms': kafka_linger_ms, # 0.001 seconds
}
logger.info("reading sensor file")
sensor_groups = utils.read_sensor_group_mapping(
os.path.join(os.getcwd(), 'sensor_group.csv'))
logger.info(f"configuration read: {str(sensor_groups)}")
bonzo = MqttKafkaProducer(mqtt_config,
kk_config,
sensor_groups,
mqtt_timeout=1.0,
kafka_timeout=0.3)
bonzo.run()
if __name__ == "__main__":
main()