forked from PCampi/unimib-simpss
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstress_cassandra.py
133 lines (108 loc) · 3.77 KB
/
stress_cassandra.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import datetime
import json
import os
import time
import mocks
from tqdm import tqdm
import cassandra
import simpss_persistence
LOGGER = simpss_persistence.custom_logging.get_logger('main')
class MockPublisher(simpss_persistence.pub_sub.Publisher):
def __init__(self):
self.subscribers = dict()
def add_subscriber(self, sub_obj, sub_name):
if sub_name not in self.subscribers:
self.subscribers[sub_name] = sub_obj
else:
raise ValueError(
"Subscriber with name {} already exists".format(sub_name))
def remove_subscriber(self, name):
subscriber = self.subscribers.pop(name, None)
if subscriber is None:
print(
"Trying to remove subscriber {} which does not exist!".format(
name))
def publish(self, message):
message['time_received'] = datetime.datetime.now()
message['sensor_group'] = 'g1'
for _, subscriber in self.subscribers.items():
subscriber.receive(message)
def create_database(db_name, replication_factor,
session: cassandra.cluster.Session):
query = """
CREATE KEYSPACE IF NOT EXISTS %s
WITH REPLICATION = {
'class': 'SimpleStrategy',
'replication_factor': %s
}
""" % (db_name, replication_factor)
LOGGER.debug("Executing query {}".format(query))
session.execute(query)
LOGGER.debug("query executed")
def create_table(keyspace, name, session):
query = """
CREATE TABLE IF NOT EXISTS %s.%s (
time_received timestamp,
sensor_group text,
sensor_id int,
uptime int,
temperature int,
pressure int,
humidity int,
ix int,
iy int,
iz int,
mask int,
PRIMARY KEY (sensor_group, sensor_id, time_received)
)
""" % (keyspace, name)
LOGGER.debug("Create table: executing query {}".format(query))
session.execute(query)
LOGGER.debug("query executed")
if __name__ == "__main__":
addresses = os.getenv('CASSANDRA_CLUSTER_ADDRESSES',
'localhost').split(';')
keyspace = os.getenv('CASSANDRA_KEYSPACE', 'simpss')
replication_factor = str(os.getenv('CASSANDRA_REPLICATION', '3'))
cluster = cassandra.cluster.Cluster(addresses)
session = cluster.connect()
create_database(keyspace, replication_factor, session)
create_table(keyspace, 'sensor_data', session)
session.shutdown()
cc_cluster = cassandra.cluster.Cluster(addresses)
cc = simpss_persistence.storage.CassandraStorage(cc_cluster)
# KAFKA
bootstrap_servers = str(
os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'))
consumer_group_id = str(os.environ.get('KAFKA_CONSUMER_GROUP_ID', 'cg1'))
try:
# setup Cassandra
cc.connect()
cc.set_keyspace_table(keyspace, 'sensor_data')
mapping = {
'sensor_group': 'sensor_group',
'id': 'sensor_id',
'time_received': 'time_received',
'uptime': 'uptime',
'T': 'temperature',
'P': 'pressure',
'H': 'humidity',
'Ix': 'ix',
'Iy': 'iy',
'Iz': 'iz',
'M': 'mask',
}
cc.set_name_mapping(mapping)
# setup kafka consumer and subscribe to Kafka
kafka_consumer = simpss_persistence.kafka_consumer.KafkaConsumer(
bootstrap_servers, consumer_group_id)
kafka_consumer.kafka_subscribe(['g1', 'g2'])
# add Cassandra storage as a subscriber to the consumer and run it
cc.set_subscriber_name('sub-1')
cc.subscribe(kafka_consumer)
# start
kafka_consumer.start_consuming()
except Exception as e:
print(e)
finally:
cc.disconnect()