Skip to content

Commit

Permalink
Work around to allow multithread 'api/data-steam'
Browse files Browse the repository at this point in the history
Performance profiling showed that the Django models used in the view end point for 'api/data-stream' were acting as a bottle neck, and also did not allow for asynchronous support. This commit is a patch which replaces those models with direct SQL, which should be more performant and also supports multithreading.
  • Loading branch information
ptomasula committed Dec 29, 2021
1 parent f2c6763 commit 7612930
Showing 1 changed file with 128 additions and 70 deletions.
198 changes: 128 additions & 70 deletions src/dataloaderservices/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from collections import OrderedDict
from datetime import time, timedelta, datetime
from typing import Union
from typing import Union, Dict, Any, final

from io import StringIO
from django.utils import encoding
Expand Down Expand Up @@ -44,10 +44,16 @@
from sqlalchemy.sql import text
import psycopg2
from django.conf import settings

_dbsettings = settings.DATABASES['odm2']
_connection_str = f"postgresql://{_dbsettings['USER']}:{_dbsettings['PASSWORD']}@{_dbsettings['HOST']}:{_dbsettings['PORT']}/{_dbsettings['NAME']}"
_db_engine = sqlalchemy.create_engine(_connection_str, pool_size=5)

_dbsettings_loader = settings.DATABASES['default']
_connection_str_loader = f"postgresql://{_dbsettings_loader['USER']}:{_dbsettings_loader['PASSWORD']}@{_dbsettings_loader['HOST']}:{_dbsettings_loader['PORT']}/{_dbsettings_loader['NAME']}"
_db_engine_loader = sqlalchemy.create_engine(_connection_str_loader, pool_size=5)


# TODO: Check user permissions to edit, add, or remove stuff with a permissions class.
# TODO: Use generic api views for create, edit, delete, and list.

Expand Down Expand Up @@ -293,6 +299,7 @@ def post(self, request, *args, **kwargs):
data_value = row[results_mapping['results'][uuid]['index']]
result_value = TimeseriesResultValueTechDebt(
result_id=sensor.result_id,
result_uuid=uuid,
data_value=data_value,
utc_offset=results_mapping['utc_offset'],
value_datetime=measurement_datetime,
Expand All @@ -302,7 +309,7 @@ def post(self, request, *args, **kwargs):
time_aggregation_interval_unit=data_value_units.unit_id,
)
try:
result = InsertTimeseriesResultValues(result_value)
result = insert_timeseries_result_values(result_value)
except Exception as e:
warnings.append(f"Error inserting value '{data_value}'"\
f"at datetime '{measurement_datetime}' for result uuid '{uuid}'")
Expand Down Expand Up @@ -594,81 +601,64 @@ def post(self, request, format=None):
sampling_feature = SamplingFeature.objects.filter(sampling_feature_uuid__exact=request.data['sampling_feature']).first()
if not sampling_feature:
raise exceptions.ParseError('Sampling Feature code does not match any existing site.')
feature_actions = sampling_feature.feature_actions.prefetch_related('results__variable', 'action').all()

result_uuids = get_result_UUIDs(sampling_feature.sampling_feature_id)
if not result_uuids:
raise exceptions.ParseError(f"No results_uuids matched to sampling_feature '{request.data['sampling_feature']}'")

futures = {}
unit_id = Unit.objects.get(unit_name='hour minute').unit_id
with ThreadPoolExecutor(max_workers=8) as executor:
for feature_action in feature_actions:
result = feature_action.results.all().first()
if str(result.result_uuid) not in request.data:
continue
else:
result_value = TimeseriesResultValueTechDebt(
result_id=result.result_id,
data_value=request.data[str(result.result_uuid)],
value_datetime=measurement_datetime,
utc_offset=utc_offset,
censor_code='Not censored',
quality_code='None',
time_aggregation_interval=1,
time_aggregation_interval_unit=unit_id)
futures[executor.submit(ProcessResultValue, result_value, result)] = None

# PRT - long term we would like to remove dataloader database but for now
# this block of code keeps dataloaderinterface_sensormeasurement table in sync
result.value_count = F('value_count') + 1
result.result_datetime = measurement_datetime
result.result_datetime_utc_offset = utc_offset
site_sensor = SiteSensor.objects.filter(result_id=result.result_id).first()
last_measurement = SensorMeasurement.objects.filter(sensor=site_sensor).first()
if not last_measurement:
SensorMeasurement.objects.create(
sensor=site_sensor,
value_datetime=measurement_datetime,
value_datetime_utc_offset=timedelta(hours=utc_offset),
data_value=result_value.data_value
)
elif last_measurement and measurement_datetime > last_measurement.value_datetime:
last_measurement and last_measurement.delete()
SensorMeasurement.objects.create(
sensor=site_sensor,
value_datetime=measurement_datetime,
value_datetime_utc_offset=timedelta(hours=utc_offset),
data_value=result_value.data_value
)

if result.value_count == 0:
result.valid_datetime = measurement_datetime
result.valid_datetime_utc_offset = utc_offset

if not site_sensor.registration.deployment_date:
site_sensor.registration.deployment_date = measurement_datetime
#site_sensor.registration.deployment_date_utc_offset = utc_offset
site_sensor.registration.save(update_fields=['deployment_date'])


with ThreadPoolExecutor() as executor:
for key in request.data:
try:
result.save(update_fields=[
'result_datetime', 'value_count', 'result_datetime_utc_offset',
'valid_datetime', 'valid_datetime_utc_offset'
])
except Exception as e:
#PRT - An exception here means the dataloaderinterface data tables will not in sync
# for this sensor, but that is better than a fail state where data is lost so pass
# expection for now. Long term plan is to remove this whole block of code.
pass
result_id = result_uuids[key]
except KeyError:
continue

result_value = TimeseriesResultValueTechDebt(
result_id=result_id,
result_uuid=key,
data_value=request.data[str(key)],
value_datetime=measurement_datetime,
utc_offset=utc_offset,
censor_code='Not censored',
quality_code='None',
time_aggregation_interval=1,
time_aggregation_interval_unit=unit_id)
futures[executor.submit(process_result_value, result_value)] = None

errors = []
for future in as_completed(futures):
if future.result() is not None: errors.append(future.result())

if errors: return Response(errors, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({}, status.HTTP_201_CREATED)

#######################################################
### Temporary HOT fix to address model performance
#######################################################
#PRT - the code in this block is meant as a hot fix to address poor model performance
#the long term goal is to refactor the application models to make them more performant.

def get_result_UUIDs(sampling_feature_id:str) -> Union[Dict[str, str],None]:
try:
with _db_engine.connect() as connection:
query = text("SELECT r.resultid, r.resultuuid FROM odm2.results AS r " \
"JOIN odm2.featureactions AS fa ON r.featureactionid = fa.featureactionid "\
"WHERE fa.samplingfeatureid = ':sampling_feature_id';")
df = pd.read_sql(query, connection, params={'sampling_feature_id': sampling_feature_id})
df['resultuuid'] = df['resultuuid'].astype(str)
df = df.set_index('resultuuid')
results = df['resultid'].to_dict()
return results
except:
return None

class TimeseriesResultValueTechDebt():
def __init__(self,
result_id:str,
result_id:str,
result_uuid:str,
data_value:float,
value_datetime:datetime,
utc_offset:int,
Expand All @@ -677,6 +667,7 @@ def __init__(self,
time_aggregation_interval:int,
time_aggregation_interval_unit:int) -> None:
self.result_id = result_id
self.result_uuid = result_uuid
self.data_value = data_value
self.utc_offset = utc_offset
self.value_datetime = value_datetime
Expand All @@ -685,23 +676,88 @@ def __init__(self,
self.time_aggregation_interval = time_aggregation_interval
self.time_aggregation_interval_unit = time_aggregation_interval_unit

def ProcessResultValue(result_value:TimeseriesResultValueTechDebt, result:Result) -> Union[str,None]:
def process_result_value(result_value:TimeseriesResultValueTechDebt) -> Union[str,None]:
try:
query_result = InsertTimeseriesResultValues(result_value)
pass
query_result = insert_timeseries_result_values(result_value)
sync_dataloader_tables(result_value)
sync_result_table(result_value)
except sqlalchemy.exc.IntegrityError as e:
if hasattr(e, 'orig'):
if isinstance(e.orig, psycopg2.errors.UniqueViolation):
pass #data is already in database
else:
return (f"Failed to INSERT data for uuid('{result.result_uuid}')")
return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')")
else:
return (f"Failed to INSERT data for uuid('{result.result_uuid}')")
return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')")
except Exception as e:
return (f"Failed to INSERT data for uuid('{result.result_uuid}')")
return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')")


# PRT - long term we would like to remove dataloader database but for now
# this block of code keeps dataloaderinterface_sensormeasurement table in sync


#if not site_sensor.registration.deployment_date:
#site_sensor.registration.deployment_date = measurement_datetime
# #site_sensor.registration.deployment_date_utc_offset = utc_offset
# site_sensor.registration.save(update_fields=['deployment_date'])

return None

#dataloader utility function
def get_site_sensor(resultid:str) -> Union[Dict[str, Any],None]:
with _db_engine_loader.connect() as connection:
query = text('SELECT * FROM dataloaderinterface_sitesensor ' \
'WHERE "ResultID"=:resultid;'
)
df = pd.read_sql(query, connection, params={'resultid':resultid})
return df.to_dict(orient='records')[0]

#dataloader utility function
def update_sensormeasurement(sensor_id:str, result_value:TimeseriesResultValueTechDebt) -> None:
with _db_engine_loader.connect() as connection:
query = text("DO $condition_insert$ BEGIN " \
'IF (SELECT COUNT(sensor_id > 0) FROM ' \
' dataloaderinterface_sensormeasurement WHERE ' \
' sensor_id = :sensor_id) THEN ' \
' UPDATE dataloaderinterface_sensormeasurement ' \
" SET value_datetime=:datetime, " \
" value_datetime_utc_offset = ':utc_offset', " \
' data_value = data_value ' \
' WHERE sensor_id=:sensor_id; ' \
'ELSE ' \
' INSERT INTO dataloaderinterface_sensormeasurement ' \
" VALUES (:sensor_id,:datetime,':utc_offset',:data_value); " \
'END IF;' \
'END $condition_insert$')
connection.execute(query,
sensor_id=sensor_id,
datetime=result_value.value_datetime,
utc_offset=result_value.utc_offset,
data_value=result_value.data_value
)
return None

def InsertTimeseriesResultValues(result_value : TimeseriesResultValueTechDebt) -> None:
#dataloader utility function
def sync_dataloader_tables(result_value: TimeseriesResultValueTechDebt) -> None:
site_sensor = get_site_sensor(result_value.result_id)
if not site_sensor: return None
update_sensormeasurement(site_sensor['id'], result_value)
return None

def sync_result_table(result_value: TimeseriesResultValueTechDebt) -> None:
with _db_engine.connect() as connection:
query = text("DO $sync_function$ BEGIN "\
"UPDATE odm2.results SET valuecount = valuecount + 1 WHERE resultid=:result_id; "\
"IF (SELECT (resultdatetime < :result_datetime) FROM odm2.results WHERE resultid=:result_id) THEN" \
" UPDATE odm2.results SET resultdatetime = :result_datetime WHERE resultid=:result_id; "\
"END IF; END $sync_function$ ")
return connection.execute(query,
result_id=result_value.result_id,
result_datetime=result_value.value_datetime,
)

def insert_timeseries_result_values(result_value : TimeseriesResultValueTechDebt) -> None:
with _db_engine.connect() as connection:
query = text("INSERT INTO odm2.timeseriesresultvalues " \
"(valueid, resultid, datavalue, valuedatetime, valuedatetimeutcoffset, " \
Expand All @@ -726,4 +782,6 @@ def InsertTimeseriesResultValues(result_value : TimeseriesResultValueTechDebt) -
time_aggregation_interval=result_value.time_aggregation_interval,
time_aggregation_interval_unit=result_value.time_aggregation_interval_unit,
)
return result
if result:
return sync_result_table(result_value)
return None

0 comments on commit 7612930

Please sign in to comment.