Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APP-3283: Implemented datafeed id reuse #79

Merged
merged 4 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,16 @@ An example of json has been provided below. (The "botPrivateKeyPath" ends with
"keyManagerProxyUsername": "proxy-username",
"keyManagerProxyPassword": "proxy-password",


// Required: If a truststore is required to access on-prem components, provide a path to the python truststore. Needs to be .pem file. Instructions below for converting JKS to python pem truststore. If truststore is not needed, set value as empty string ("").
"truststorePath": "/path/to/truststore.pem"
"truststorePath": "/path/to/truststore.pem",

// Optional: if set to true, the datafeed id will be stored on the filesystem and subsequently reused.
// Applies for DFv1 only. Default value is true.
"reuseDatafeedID": true,

// Optional: path to the folder where to store the datafeed id. Applies for DFv1 and if reuseDatafeedID set to true.
// Default value is os.getcwd().
"datafeedIdFilePath": "/some/folder/"
}


Expand Down
8 changes: 4 additions & 4 deletions sym_api_client_python/clients/datafeed_client.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import logging

from .api_client import APIClient
from .constants.DatafeedVersion import DatafeedVersion
from .datafeed_client_v1 import DataFeedClientV1
from .datafeed_client_v2 import DataFeedClientV2


# child class of APIClient --> Extends error handling functionality
class DataFeedClient(APIClient):

def __init__(self, bot_client):
self.config = bot_client.get_sym_config()

if DatafeedVersion.version_of(self.config.data.get("datafeedVersion")) == DatafeedVersion.V2:
self.datafeed_client = DataFeedClientV2(bot_client)
else:
if self.config.is_datafeed_v1():
self.datafeed_client = DataFeedClientV1(bot_client)
else:
self.datafeed_client = DataFeedClientV2(bot_client)

# raw api call to create_datafeed --> returns datafeed_id
def create_datafeed(self):
Expand Down
21 changes: 21 additions & 0 deletions sym_api_client_python/configure/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging
import os

from sym_api_client_python.clients.constants.DatafeedVersion import DatafeedVersion


class SymConfig:
# initialize object by passing in path to config file
Expand Down Expand Up @@ -178,3 +180,22 @@ def _build_url(self, host, port, contextPath):
contextPath = contextPath[:-1]

return 'https://' + host + ':' + str(port) + contextPath

def get_agent_url(self):
return self.data.get('agentUrl')

def should_store_datafeed_id(self):
return self.is_datafeed_v1() and self.is_datafeed_id_reused()

def is_datafeed_v1(self):
return DatafeedVersion.version_of(self.data.get("datafeedVersion")) != DatafeedVersion.V2

def is_datafeed_id_reused(self):
reuse_datafeed_id = self.data.get("reuseDatafeedID")
return reuse_datafeed_id is None or reuse_datafeed_id

def get_datafeed_id_folder_path(self):
datafeed_id_file_path = self.data.get("datafeedIdFilePath")
if datafeed_id_file_path:
return datafeed_id_file_path
return os.getcwd()
15 changes: 6 additions & 9 deletions sym_api_client_python/datafeed_event_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import asyncio
from concurrent.futures import CancelledError
from functools import partial
import json
import logging
import datetime
from collections import namedtuple
Expand All @@ -13,7 +11,6 @@
from .exceptions.ServerErrorException import ServerErrorException
from .exceptions.MaxRetryException import MaxRetryException

from .auth.auth_endpoint_constants import auth_endpoint_constants
from .services.abstract_datafeed_event_service import AbstractDatafeedEventService
from .clients.constants.DatafeedVersion import DatafeedVersion
from .services.datafeed_event_service_v1 import DataFeedEventServiceV1
Expand Down Expand Up @@ -152,6 +149,7 @@ def get_and_increase_timeout(self, previous_exc=None):
def decrease_timeout(self):
self.datafeed_event_service.decrease_timeout()


class AsyncDataFeedEventService(AbstractDatafeedEventService):
"""Non-blocking datafeed event service.

Expand Down Expand Up @@ -181,6 +179,7 @@ class AsyncDataFeedEventService(AbstractDatafeedEventService):
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queue = asyncio.Queue()
self.exception_queue = asyncio.Queue()
self.exception_handler = kwargs.pop('exception_handler', None)
Expand All @@ -189,12 +188,11 @@ def __init__(self, *args, **kwargs):
self.trace_dict = {}
self.handle_events_task = None
self.tasks = []

super().__init__(*args, **kwargs)
self.datafeed_id = None

async def start_datafeed(self):
log.debug('AsyncDataFeedEventService/start_datafeed()')
self.datafeed_id = self.datafeed_client.create_datafeed()
self.datafeed_id = self._get_from_file_or_create_datafeed_id()
await asyncio.gather(self.read_datafeed(), self.handle_events(), self.handle_exceptions())

async def deactivate_datafeed(self, wait_for_handler_completions=True):
Expand Down Expand Up @@ -277,7 +275,7 @@ async def handle_datafeed_errors(self, thrown_exception):
await asyncio.sleep(sleep_for)
try:
log.debug('AsyncDataFeedEventService/handle_event() --> Restarting Datafeed')
self.datafeed_id = self.datafeed_client.create_datafeed()
self.datafeed_id = self._create_datafeed_and_persist()
except Exception as exc:
await self.handle_datafeed_errors(exc)

Expand Down Expand Up @@ -313,8 +311,7 @@ def _process_full_trace(self, id):
log.debug("Responded to message in: {:.4g}s. Including {:.4g}s inside the bot"
.format(total_time.total_seconds(), time_in_bot.total_seconds()))
if self.trace_recorder is not None:
# TODO: Verify that we should have self.trace_recorder - it should as we want to keep trace so we should use the instance variable
trace_recorder.append(trace)
self.trace_recorder.append(trace)
del self.trace_dict[id]
except Exception as exc:
log.error("Error while computing trace results for id: " + id)
Expand Down
28 changes: 21 additions & 7 deletions sym_api_client_python/services/abstract_datafeed_event_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from abc import ABC, abstractmethod
import logging
import json
import time

from .datafeed_id_repository import OnDiskDatafeedIdRepository
from ..listeners.elements_listener import ElementsActionListener
from ..listeners.connection_listener import ConnectionListener
from ..listeners.im_listener import IMListener
Expand All @@ -12,6 +11,7 @@

log = logging.getLogger(__name__)


class AbstractDatafeedEventService(ABC):

def __init__(self, sym_bot_client, error_timeout_sec=None, maximum_timeout_sec=None):
Expand All @@ -25,10 +25,11 @@ def __init__(self, sym_bot_client, error_timeout_sec=None, maximum_timeout_sec=N
self.registered_triggers = []

self.bot_client = sym_bot_client
self.config = sym_bot_client.get_sym_config()
self.datafeed_client = self.bot_client.get_datafeed_client()
self.datafeed_id_repository = OnDiskDatafeedIdRepository(self.config.get_datafeed_id_folder_path())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as it is specific to v1 only is it a logic we could push down to the subclass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed but it would require to duplicate this logic in AsyncDataFeedEventService and in DataFeedEventServiceV1.

self.stop = False

config = sym_bot_client.get_sym_config()
# TODO: Should not be handled in the DF ES like this, why put a timeout in the config if we can put a parameter with default value(config related)
# Timeout will start at and eventually reset to this
_config_key = 'datafeedEventsErrorTimeout'
Expand All @@ -52,9 +53,9 @@ def __init__(self, sym_bot_client, error_timeout_sec=None, maximum_timeout_sec=N
}

if error_timeout_sec is None:
self.baseline_timeout_sec = config.data.get(_config_key, auth_endpoint_constants["TIMEOUT"])
self.baseline_timeout_sec = self.config.data.get(_config_key, auth_endpoint_constants["TIMEOUT"])
else:
if _config_key in config.data:
if _config_key in self.config.data:
log.debug('{} listed in config, but overriden to {}s by parammeter'
.format(_config_key, error_timeout_sec))
self.baseline_timeout_sec = error_timeout_sec
Expand All @@ -64,7 +65,7 @@ def __init__(self, sym_bot_client, error_timeout_sec=None, maximum_timeout_sec=N
self.lower_threshold = self.baseline_timeout_sec
# Raise a RuntimeError once this upper threshold exceeded
if maximum_timeout_sec is None:
self.upper_threshold = config.data.get('datafeedEventsErrorMaxTimeout', 60)
self.upper_threshold = self.config.data.get('datafeedEventsErrorMaxTimeout', 60)
else:
self.upper_threshold = maximum_timeout_sec
# After every failure multiply the timeout by a factor
Expand Down Expand Up @@ -310,4 +311,17 @@ def decrease_timeout(self):
log.debug('DataFeedEventService/decrease_timeout() --> '
'Decreasing timeout from {:.4g}s to {:.4g}s'.format(original, new_timeout))
self.current_timeout_sec = new_timeout
return self.current_timeout_sec
return self.current_timeout_sec

def _get_from_file_or_create_datafeed_id(self):
if self.config.should_store_datafeed_id():
datafeed_id = self.datafeed_id_repository.read_datafeed_id_from_file()
if datafeed_id:
return datafeed_id
return self._create_datafeed_and_persist()

def _create_datafeed_and_persist(self):
datafeed_id = self.datafeed_client.create_datafeed()
if self.config.should_store_datafeed_id():
self.datafeed_id_repository.store_datafeed_id_to_file(datafeed_id, self.config.get_agent_url())
return datafeed_id
10 changes: 5 additions & 5 deletions sym_api_client_python/services/datafeed_event_service_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@

log = logging.getLogger(__name__)


class DataFeedEventServiceV1(AbstractDatafeedEventService):

def __init__(self, *args, **kwargs):
self.datafeed_id = None
super().__init__(*args, **kwargs)

self.datafeed_id = None

def start_datafeed(self):
log.debug('DataFeedEventService/startDataFeed()')
self.datafeed_id = self.datafeed_client.create_datafeed()
self.datafeed_id = self._get_from_file_or_create_datafeed_id()
self.read_datafeed()

def activate_datafeed(self):
Expand Down Expand Up @@ -78,7 +78,7 @@ def handle_datafeed_errors(self, thrown_exception):

try:
log.debug('DataFeedEventService/handle_event() --> Restarting Datafeed')
self.datafeed_id = self.datafeed_client.create_datafeed()
self.datafeed_id = self._create_datafeed_and_persist()

except Exception as exc:
self.handle_datafeed_errors(exc)
self.handle_datafeed_errors(exc)
37 changes: 37 additions & 0 deletions sym_api_client_python/services/datafeed_id_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
import os

DATAFEED_ID_FILE = 'datafeed.id'

log = logging.getLogger(__name__)


class OnDiskDatafeedIdRepository:
def __init__(self, datafeed_id_folder):
self.datafeed_id_file_path = self._get_datafeed_id_file_path(datafeed_id_folder)

def read_datafeed_id_from_file(self):
log.debug(f'Retrieving datafeed id from file {self.datafeed_id_file_path}')
if os.path.exists(self.datafeed_id_file_path):
with open(self.datafeed_id_file_path, 'r') as datafeed_id_file:
line = datafeed_id_file.readline()
if line:
index = line.find("@")
if index != -1:
datafeed_id = line[0:index]
log.debug(f'Retrieved datafeed id: {datafeed_id}')
return datafeed_id
log.debug(f'Could not retrieve datafeed id from file {self.datafeed_id_file_path}')
return None

def store_datafeed_id_to_file(self, datafeed_id, agent_url):
with open(self.datafeed_id_file_path, 'w') as datafeed_id_file:
line = f'{datafeed_id}@{agent_url}'
log.debug(f'Writing {line} to {self.datafeed_id_file_path}')
datafeed_id_file.write(line)

def _get_datafeed_id_file_path(self, datafeed_id_folder):
datafeed_id_file_path = os.path.join(datafeed_id_folder, DATAFEED_ID_FILE)
if os.path.exists(datafeed_id_file_path) and os.path.isdir(datafeed_id_file_path):
datafeed_id_file_path = os.path.join(datafeed_id_file_path, DATAFEED_ID_FILE)
return os.path.abspath(datafeed_id_file_path)