Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/emoncms_http_send_names_compress…
Browse files Browse the repository at this point in the history
…ed_binary'
  • Loading branch information
Your Name committed Sep 16, 2022
2 parents b2e7ccc + 3f5af96 commit 1acbfab
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 8 deletions.
86 changes: 79 additions & 7 deletions src/interfacers/EmonHubEmoncmsHTTPInterfacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import time
import json
import requests
import zlib
from binascii import hexlify
from emonhub_interfacer import EmonHubInterfacer

class EmonHubEmoncmsHTTPInterfacer(EmonHubInterfacer):
Expand All @@ -24,7 +26,9 @@ def __init__(self, name):
'apikey': "",
'url': "http://emoncms.org",
'senddata': 1,
'sendstatus': 0
'sendstatus': 0,
'sendnames': 0,
'compress': 0
}

# set an absolute upper limit for number of items to process per post
Expand All @@ -33,6 +37,40 @@ def __init__(self, name):
# maximum buffer size
self.buffer._maximumEntriesInBuffer = 100000

def add(self, cargo):
"""Append data to buffer.
"""

f = []
try:
f.append(int(cargo.timestamp))

if cargo.nodename and self._settings['sendnames']:
f.append(cargo.nodename)
else:
f.append(cargo.nodeid)

if len(cargo.names) == len(cargo.realdata) and self._settings['sendnames']:
keyvalues = {}
for name, value in zip(cargo.names, cargo.realdata):
keyvalues[name] = value
if cargo.rssi:
keyvalues['rssi'] = cargo.rssi
f.append(keyvalues)
else:
for i in cargo.realdata:
f.append(i)
if cargo.rssi:
f.append(cargo.rssi)
# Note if number of names and values do not match
if len(cargo.names) > 0 and self._settings['sendnames']:
self._log.warning("cargo.names and cargo.realdata have different lengths - " + str(len(cargo.names)) + " vs " + str(len(cargo.realdata)))
except:
self._log.warning("Failed to create emonCMS frame %s", f)

self.buffer.storeItem(f)

def _process_post(self, databuffer):
"""Send data to server."""

Expand All @@ -46,32 +84,58 @@ def _process_post(self, databuffer):
return True

if self._settings['senddata']:
number_of_frames = len(databuffer)
data_string = json.dumps(databuffer, separators=(',', ':'))

# Prepare URL string of the form
# http://domain.tld/emoncms/input/bulk.json?apikey=12345
# &data=[[0,10,82,23],[5,10,82,23],[10,10,82,23]]
# &sentat=15' (requires emoncms >= 8.0)

# time that the request was sent at
sentat = int(time.time())

# Construct post_url (without apikey)
post_url = self._settings['url'] + '/input/bulk.json?'
post_url = self._settings['url'] + '/input/bulk.json?sentat='+str(sentat)

self._log.info("sending: %s data=%s&sentat=%s&apikey=E-M-O-N-C-M-S-A-P-I-K-E-Y", post_url, data_string, sentat)
# If sendnames enabled then always compress:
if self._settings['sendnames']:
self._settings['compress'] = True

# Compress if enabled
if self._settings['compress']:
json_str_size = len(data_string)
# Compress data and encode as hex string.
compressed = zlib.compress(data_string.encode())
compression_ratio = len(compressed) / json_str_size
# Only use compression if it makes sense!
if compression_ratio<1.0:
post_body = compressed
# Set compression flag (cb = compression binary).
post_url = post_url + "&cb=1"
self._log.info("sending: %s (%d bytes of data, %d frames, compressed)", post_url, len(post_body),number_of_frames)
self._log.info("compression ratio: %d%%",compression_ratio*100)
else:
post_body = {'data': data_string}
self._log.info("sending: %s (%d bytes of data, %d frames, uncompressed)", post_url, len(data_string),number_of_frames)
self._log.info("compression ratio: %d%%, sent original",compression_ratio*100)
else:
post_body = {'data': data_string}
self._log.info("sending: %s (%d bytes of data, %d frames, uncompressed)", post_url, len(data_string),number_of_frames)

result = False
try:
reply = requests.post(post_url, {'apikey': self._settings['apikey'], 'data': data_string, 'sentat': str(sentat)}, timeout=60)
st = time.time()
reply = requests.post(post_url, post_body, timeout=60, headers={'Authorization': 'Bearer '+self._settings['apikey']})
dt = (time.time()-st)*1000
reply.raise_for_status() # Raise an exception if status code isn't 200
result = reply.text
except requests.exceptions.RequestException as ex:
self._log.warning("%s couldn't send to server: %s", self.name, ex)
return False

if result == 'ok':
self._log.debug("acknowledged receipt with '%s' from %s", result, self._settings['url'])
self._log.debug("acknowledged receipt with '%s' from %s (%d ms)", result, self._settings['url'], dt)
return True
else:
self._log.warning("send failure: wanted 'ok' but got '%s'", result)
Expand Down Expand Up @@ -134,5 +198,13 @@ def set(self, **kwargs):
self._log.info("Setting %s sendstatus: %s", self.name, setting)
self._settings[key] = int(setting)
continue
elif key == 'sendnames':
self._log.info("Setting " + self.name + " sendnames: " + str(setting))
self._settings[key] = bool(int(setting))
continue
elif key == 'compress':
self._log.info("Setting " + self.name + " compress: " + str(setting))
self._settings[key] = bool(int(setting))
continue
else:
self._log.warning("'%s' is not valid for %s: %s", setting, self.name, key)
2 changes: 1 addition & 1 deletion src/interfacers/EmonHubSocketInterfacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, name, port_nb=50011):
self._settings.update(self._skt_settings)

# Open socket
self._socket = self._open_socket(port_nb)
self._socket = self._open_socket(int(port_nb))

# Initialize RX buffer for socket
self._sock_rx_buf = ''
Expand Down

0 comments on commit 1acbfab

Please sign in to comment.