Skip to content

Commit 3f59569

Browse files
alanbchristiekaliifAlan Christie
authored
Better MySQL/Tunnel control (1403/1407) (#570)
* stashing * Target loader now accepts experiments marked as 'manual' * Attempt to reduce pop-up "flicker" (1403) (#569) * fix: Attempt to debug timeout errors * fix: More logging on service_query * fix: Varioustimeout adjustments * fix: Removed exception during timeout * fix: Explit log on SSH connection error * fix: Retry attempts for MySQL connections * fix: Service timeout now 28 (was 17) * fix: Add pymysql read and write timeouts * fix: Quiter (expected) connection failure handling * fix: TIMEOUT now DEGRADED * fix: Fix while loop exit conditions * fix: Better loop logic * style: services logging reduced and back to debug * fix: SSHTunnel logging now ERROR (was DEBUG) * fix: Quieter securty * fix: More failures permitted (and debug tweaks) * fix: Leaner logging * fix: Leaner logging (only report when we're having topruble) * fix: Better constant name * fix: Reduced service logging * docs: Doc tweak * fix: Minor log tweak * fix: Fixed duplicate log content --------- Co-authored-by: Alan Christie <alan.christie@matildapeak.com> --------- Co-authored-by: Kalev Takkis <ktakkis@informaticsmatters.com> Co-authored-by: Alan Christie <alan.christie@matildapeak.com>
1 parent ee5fd26 commit 3f59569

File tree

4 files changed

+125
-51
lines changed

4 files changed

+125
-51
lines changed

api/remote_ispyb_connector.py

+62-13
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,21 @@
1111
ISPyBNoResultException,
1212
ISPyBRetrieveFailed,
1313
)
14+
from pymysql.err import OperationalError
1415

1516
logger: logging.Logger = logging.getLogger(__name__)
1617

18+
# Timeout to allow the pymysql.connect() method to connect to the DB.
19+
# The default, if not specified, is 10 seconds.
20+
PYMYSQL_CONNECT_TIMEOUT_S = 3
21+
PYMYSQL_READ_TIMEOUT_S = 3
22+
PYMYSQL_WRITE_TIMEOUT_S = 10
23+
# MySQL DB connection attempts.
24+
# An attempt to cope with intermittent OperationalError exceptions
25+
# that are seen to occur at "busy times". See m2ms-1403.
26+
PYMYSQL_OE_RECONNECT_ATTEMPTS = 5
27+
PYMYSQL_EXCEPTION_RECONNECT_DELAY_S = 1
28+
1729

1830
class SSHConnector(Connector):
1931
def __init__(
@@ -53,6 +65,7 @@ def __init__(
5365
'db_pass': pw,
5466
'db_name': db,
5567
}
68+
logger.debug("Creating remote connector: %s", creds)
5669
self.remote_connect(**creds)
5770
logger.debug(
5871
"Started remote ssh_host=%s ssh_user=%s local_bind_port=%s",
@@ -61,6 +74,7 @@ def __init__(
6174
self.server.local_bind_port,
6275
)
6376
else:
77+
logger.debug("Creating connector")
6478
self.connect(
6579
user=user,
6680
pw=pw,
@@ -83,9 +97,9 @@ def remote_connect(
8397
db_pass,
8498
db_name,
8599
):
86-
sshtunnel.SSH_TIMEOUT = 10.0
87-
sshtunnel.TUNNEL_TIMEOUT = 10.0
88-
sshtunnel.DEFAULT_LOGLEVEL = logging.CRITICAL
100+
sshtunnel.SSH_TIMEOUT = 5.0
101+
sshtunnel.TUNNEL_TIMEOUT = 5.0
102+
sshtunnel.DEFAULT_LOGLEVEL = logging.ERROR
89103
self.conn_inactivity = int(self.conn_inactivity)
90104

91105
if ssh_pkey:
@@ -122,20 +136,55 @@ def remote_connect(
122136
self.server.start()
123137
logger.debug('Started SSH server')
124138

125-
logger.debug('Connecting to ISPyB (db_user=%s db_name=%s)...', db_user, db_name)
126-
self.conn = pymysql.connect(
127-
user=db_user,
128-
password=db_pass,
129-
host='127.0.0.1',
130-
port=self.server.local_bind_port,
131-
database=db_name,
132-
)
139+
# Try to connect to the database
140+
# a number of times (because it is known to fail)
141+
# before giving up...
142+
connect_attempts = 0
143+
self.conn = None
144+
while self.conn is None and connect_attempts < PYMYSQL_OE_RECONNECT_ATTEMPTS:
145+
try:
146+
self.conn = pymysql.connect(
147+
user=db_user,
148+
password=db_pass,
149+
host='127.0.0.1',
150+
port=self.server.local_bind_port,
151+
database=db_name,
152+
connect_timeout=PYMYSQL_CONNECT_TIMEOUT_S,
153+
read_timeout=PYMYSQL_READ_TIMEOUT_S,
154+
write_timeout=PYMYSQL_WRITE_TIMEOUT_S,
155+
)
156+
except OperationalError as oe_e:
157+
if connect_attempts == 0:
158+
# So we only log our connection attempts once
159+
# an error has occurred - to avoid flooding the log
160+
logger.info(
161+
'Connecting to MySQL database (db_user=%s db_name=%s)...',
162+
db_user,
163+
db_name,
164+
)
165+
logger.warning('%s', repr(oe_e))
166+
connect_attempts += 1
167+
time.sleep(PYMYSQL_EXCEPTION_RECONNECT_DELAY_S)
168+
except Exception as e:
169+
if connect_attempts == 0:
170+
# So we only log our connection attempts once
171+
# an error has occurred - to avoid flooding the log
172+
logger.info(
173+
'Connecting to MySQL database (db_user=%s db_name=%s)...',
174+
db_user,
175+
db_name,
176+
)
177+
logger.warning('Unexpected %s', repr(e))
178+
connect_attempts += 1
179+
time.sleep(PYMYSQL_EXCEPTION_RECONNECT_DELAY_S)
133180

134181
if self.conn is not None:
135-
logger.debug('Connected')
182+
if connect_attempts > 0:
183+
logger.info('Connected')
136184
self.conn.autocommit = True
137185
else:
138-
logger.debug('Failed to connect')
186+
if connect_attempts > 0:
187+
logger.info('Failed to connect')
139188
self.server.stop()
140189
raise ISPyBConnectionException
141190
self.last_activity_ts = time.time()

api/security.py

+12-4
Original file line numberDiff line numberDiff line change
@@ -72,19 +72,22 @@ def get_remote_conn(force_error_display=False) -> Optional[SSHConnector]:
7272
# If a host is not defined other properties are useless.
7373
if not credentials["host"]:
7474
if logging.DEBUG >= logger.level or force_error_display:
75-
logger.info("No ISPyB host - cannot return a connector")
75+
logger.debug("No ISPyB host - cannot return a connector")
7676
return None
7777

7878
# Try to get an SSH connection (aware that it might fail)
79+
logger.debug("Creating remote connector with credentials: %s", credentials)
7980
conn: Optional[SSHConnector] = None
8081
try:
8182
conn = SSHConnector(**credentials)
8283
except Exception:
83-
# Log the exception if DEBUG level or lower/finer?
84-
# The following will not log if the level is set to INFO for example.
8584
if logging.DEBUG >= logger.level or force_error_display:
8685
logger.info("credentials=%s", credentials)
87-
logger.exception("Got the following exception creating SSHConnector...")
86+
logger.exception("Got the following exception creating Connector...")
87+
if conn:
88+
logger.debug("Got remote connector")
89+
else:
90+
logger.debug("Failed to get a remote connector")
8891

8992
return conn
9093

@@ -106,6 +109,7 @@ def get_conn(force_error_display=False) -> Optional[Connector]:
106109
logger.info("No ISPyB host - cannot return a connector")
107110
return None
108111

112+
logger.info("Creating connector with credentials: %s", credentials)
109113
conn: Optional[Connector] = None
110114
try:
111115
conn = Connector(**credentials)
@@ -115,6 +119,10 @@ def get_conn(force_error_display=False) -> Optional[Connector]:
115119
if logging.DEBUG >= logger.level or force_error_display:
116120
logger.info("credentials=%s", credentials)
117121
logger.exception("Got the following exception creating Connector...")
122+
if conn:
123+
logger.debug("Got connector")
124+
else:
125+
logger.debug("Did not get a connector")
118126

119127
return conn
120128

viewer/services.py

+13-7
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,18 @@
1515

1616
logger = logging.getLogger(__name__)
1717

18-
DELAY = 10
18+
# Service query timeout
19+
SERVICE_QUERY_TIMEOUT_S = 28
1920
# Default timeout for any request calls
21+
# Used for keycloak atm.
2022
REQUEST_TIMEOUT_S = 5
2123

2224
_NEO4J_LOCATION: str = settings.NEO4J_QUERY
2325
_NEO4J_AUTH: str = settings.NEO4J_AUTH
2426

2527

28+
# TIMEOUT is no longer used.
29+
# A service timeout is considered a service that is degraded
2630
class State(str, Enum):
2731
NOT_CONFIGURED = "NOT_CONFIGURED"
2832
DEGRADED = "DEGRADED"
@@ -122,7 +126,7 @@ async def wrapper_service_query(*args, **kwargs):
122126
# TimeoutError is not caught
123127
executor = futures.ThreadPoolExecutor()
124128
try:
125-
async with asyncio.timeout(DELAY):
129+
async with asyncio.timeout(SERVICE_QUERY_TIMEOUT_S):
126130
future = loop.run_in_executor(
127131
executor, functools.partial(func, *args, **kwargs)
128132
)
@@ -134,18 +138,20 @@ async def wrapper_service_query(*args, **kwargs):
134138

135139
except TimeoutError:
136140
# Timeout is an "expected" condition for a service that's expected
137-
# to be running but may be degraded so we don't log it unless debugging.
138-
logger.debug("Service query '%s' timed out", func.__name__)
139-
state = State.TIMEOUT
141+
# to be running but is taking too long to report its state
142+
# and is also considered DEGRADED.
143+
state = State.DEGRADED
140144
except Exception as exc:
141145
# unknown error with the query
142146
logger.exception(exc, exc_info=True)
143147
state = State.ERROR
144148

145-
# name and ID are 1nd and 0th params respectively.
146-
# alternative solution for this would be to return just a
149+
# ID and Name are the 1st and 2nd params respectively.
150+
# Alternative solution for this would be to return just a
147151
# state and have the service_queries() map the results to the
148152
# correct values
153+
if state not in [State.OK, State.NOT_CONFIGURED]:
154+
logger.info('"%s" is %s', args[1], state.name)
149155
return {"id": args[0], "name": args[1], "state": state}
150156

151157
return wrapper_service_query

viewer/target_loader.py

+38-27
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@
5252
# assemblies and xtalforms
5353
XTALFORMS_FILE = "assemblies.yaml"
5454

55+
# holding horses for now
56+
# # assigned xtalforms, not all are referenced in meta_aligner
57+
# ASSIGNED_XTALFORMS_FILE = "assigned_xtalforms.yaml"
58+
5559
# target name, nothing else
5660
CONFIG_FILE = "config*.yaml"
5761

@@ -737,9 +741,6 @@ def process_experiment(
737741
logger.debug("incoming data: %s", item_data)
738742
experiment_name, data = item_data
739743

740-
if "aligned_files" not in data.keys():
741-
return None
742-
743744
extract = functools.partial(
744745
self._extract,
745746
data=data,
@@ -1480,7 +1481,7 @@ def process_bundle(self):
14801481
self.report.log(logging.ERROR, msg)
14811482
raise FileExistsError(msg)
14821483

1483-
if project_created and committer.pk == settings.ANONYMOUS_USER:
1484+
if project_created and self.project.title in settings.PUBLIC_TAS_LIST: # type: ignore[attr-defined]
14841485
assert self.project
14851486
self.project.open_to_public = True
14861487
self.project.save()
@@ -1694,35 +1695,45 @@ def process_bundle(self):
16941695
# memo to self: there used to be some code to test the
16951696
# position of the iterator in existing entries. This
16961697
# was because it was assumed, that when adding v2
1697-
# uploads, it can bring a long new observations under
1698+
# uploads, it can bring along new observations under
16981699
# existing experiment. Following discussions with
16991700
# Conor, it seems that this will not be the case. But
17001701
# should it agin be, this code was deleted on
17011702
# 2024-03-04, if you need to check
17021703

17031704
for so in so_group.filter(code__isnull=True):
1704-
code_prefix = experiment_objects[so.experiment.code].index_data[
1705-
"code_prefix"
1706-
]
1707-
# iter_pos = next(suffix)
1708-
# code = f"{code_prefix}{so.experiment.code.split('-')[1]}{iter_pos}"
1709-
code = (
1710-
f"{code_prefix}{so.experiment.code.split('-')[1]}{next(suffix)}"
1711-
)
1712-
1713-
# test uniqueness for target
1714-
# TODO: this should ideally be solved by db engine, before
1715-
# rushing to write the trigger, have think about the
1716-
# loader concurrency situations
1717-
if SiteObservation.objects.filter(
1718-
experiment__experiment_upload__target=self.target,
1719-
code=code,
1720-
).exists():
1721-
msg = (
1722-
f"short code {code} already exists for this target; "
1723-
+ "specify a code_prefix to resolve this conflict"
1724-
)
1725-
self.report.log(logging.ERROR, msg)
1705+
if so.experiment.type == 1:
1706+
# manual. code is pdb code
1707+
code = f"{so.experiment.code}-{next(suffix)}"
1708+
# NB! at the time of writing this piece of
1709+
# code, I haven't seen an example of the data
1710+
# so I only have a very vague idea how this is
1711+
# going to work. The way I understand it now,
1712+
# they cannot belong to separate groups so
1713+
# there's no need for different iterators. But
1714+
# could be I need to split them up
1715+
else:
1716+
# model building. generate code
1717+
code_prefix = experiment_objects[so.experiment.code].index_data[
1718+
"code_prefix"
1719+
]
1720+
# iter_pos = next(suffix)
1721+
# code = f"{code_prefix}{so.experiment.code.split('-')[1]}{iter_pos}"
1722+
code = f"{code_prefix}{so.experiment.code.split('-')[1]}{next(suffix)}"
1723+
1724+
# test uniqueness for target
1725+
# TODO: this should ideally be solved by db engine, before
1726+
# rushing to write the trigger, have think about the
1727+
# loader concurrency situations
1728+
if SiteObservation.objects.filter(
1729+
experiment__experiment_upload__target=self.target,
1730+
code=code,
1731+
).exists():
1732+
msg = (
1733+
f"short code {code} already exists for this target; "
1734+
+ "specify a code_prefix to resolve this conflict"
1735+
)
1736+
self.report.log(logging.ERROR, msg)
17261737

17271738
so.code = code
17281739
so.save()

0 commit comments

Comments
 (0)