-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathutils.py
568 lines (480 loc) · 20.3 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
import fnmatch
import itertools
import json
import logging
import os
import re
import shutil
import string
import tempfile
import uuid
from pathlib import Path
from typing import Dict, Generator, Optional
from urllib.parse import urlparse
import pandas as pd
from django.conf import settings
from django.contrib.auth.models import User
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
from django.core.mail import send_mail
from django.db import IntegrityError, transaction
from django.db.models import F
from django.http import JsonResponse
from rdkit import Chem
from scoring.models import SiteObservationGroup, SiteObvsSiteObservationGroup
from .models import (
SiteObservation,
SiteObservationTag,
SiteObvsSiteObservationTag,
Target,
)
logger = logging.getLogger(__name__)
# Set .sdf file format version
# Used at the start of every SDF file.
SDF_VERSION = 'ver_1.2'
SDF_RECORD_SEPARATOR = '$$$$\n'
# The root of all files constructed by 'dicttocsv'.
# The directory must not contain anything but dicttocsv-generated files.
# It certainly must not be the root of the media directory or any other directory in it.
# Introduced during 1247 security review.
CSV_TO_DICT_DOWNLOAD_ROOT = os.path.join(settings.MEDIA_ROOT, 'downloads', 'dicttocsv')
def is_url(url: Optional[str]) -> bool:
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except (ValueError, AttributeError):
return False
def word_count(text: Optional[str]) -> int:
"""Returns an 'approximate' word count."""
return len(text.split()) if text else 0
def create_squonk_job_request_url(instance_id):
"""Creates the Squonk Instance API url from an instance ID (UUID)."""
return settings.SQUONK2_INSTANCE_API + str(instance_id)
def create_media_sub_directory(sub_directory):
"""Creates a directory (or directories) in the MEDIA directory,
returning the full path.
"""
directory = os.path.join(settings.MEDIA_ROOT, sub_directory)
os.makedirs(directory, exist_ok=True)
return directory
def delete_media_sub_directory(sub_directory):
"""Removes a media sub-directory."""
assert sub_directory
assert len(sub_directory)
directory = os.path.normpath(os.path.join(settings.MEDIA_ROOT, sub_directory))
if not os.path.isdir(directory):
# No such directory!
return
if not directory.startswith(settings.MEDIA_ROOT) or os.path.samefile(
directory, settings.MEDIA_ROOT
):
# Danger!
return
shutil.rmtree(directory)
def add_prop_to_sdf(sdf_file_in, sdf_file_out, properties):
"""Returns an SDF file with the requested property
(a dictionary of keys and values) added.
Note that this assumes the file starts with a blank molecule.
SDF parameters are of format:
> <TransFSScore> (1)
0.115601
In this case we'd provide the following properties dictionary...
{"TransFSScore": "0.115601"}
"""
found_separator = False
with open(sdf_file_out, 'a', encoding='utf-8') as sdf_out:
with open(sdf_file_in, 'r', encoding='utf-8') as sdf_in:
while True:
if line := sdf_in.readline():
if not found_separator and line == SDF_RECORD_SEPARATOR:
# Found first separator
# dump the parameters now
found_separator = True
for name, value in properties.items():
sdf_out.write(f'> <{name}> (1)\n')
sdf_out.write(f'{value}\n')
sdf_out.write('\n')
sdf_out.write(line)
else:
break
def add_props_to_sdf_molecule(
*, sdf_file: str, properties: Dict[str, str], molecule: str
):
"""Given an input SDF, a dictionary of string properties and a molecule
this function inserts the properties at the end of the molecule's record,
just before the record separator. A temporary file is used that then replaces the
input file.
"""
# Strategy...
# Search the file for the Molecule.
# Then move to the end of record, and insert the properties.
found_molecule: bool = False
written_properties: bool = False
with tempfile.NamedTemporaryFile(mode='a', encoding='utf-8') as temp:
with open(sdf_file, 'r', encoding='utf-8') as sdf_in:
while True:
if line := sdf_in.readline():
if not found_molecule:
if line.strip() == molecule:
found_molecule = True
elif line == SDF_RECORD_SEPARATOR and not written_properties:
# Found end of molecule
# dump the parameters now
for name, value in properties.items():
temp.write(f'> <{name}>\n')
temp.write(f'{value}\n')
temp.write('\n')
written_properties = True
# Write the original line
temp.write(line)
else:
break
# Flush the temporary file and replace the original file
temp.flush()
shutil.copy(temp.name, sdf_file)
def add_prop_to_mol(mol_field, mol_file_out, value):
"""Returns a mol_file with the requested property added.
Note that the only property that seems to work with a .mol file if you write it is: "_Name".
"""
rd_mol = Chem.MolFromMolBlock(mol_field)
# Set the new property in the property mol object
rd_mol.SetProp("_Name", value)
Chem.MolToMolFile(rd_mol, mol_file_out)
# TODO: this method may be deprecated, not an issue with new uploads
def clean_filename(filepath):
"""Return the "clean" version of a Django filename without the '_abcdefg_' that is
created when a file is overwritten.
"""
file_split = os.path.splitext(os.path.basename(filepath))
if fnmatch.fnmatch(
file_split[0],
'*_[a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9]'
+ '[a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9]',
):
cleaned_filename = file_split[0][:-8] + file_split[1]
else:
cleaned_filename = os.path.basename(filepath)
return cleaned_filename
def get_https_host(request):
"""Common enabler code for returning https urls
This is to map links to HTTPS to avoid Mixed Content warnings from Chrome browsers
SECURE_PROXY_SSL_HEADER is referenced because it is used in redirecting URLs - if
it is changed it may affect this code.
Using relative links will probably also work, but This workaround allows both the
'download structures' button and the DRF API call to work.
Note that this link will not work on local
"""
return settings.SECURE_PROXY_SSL_HEADER[1] + '://' + request.get_host()
def handle_uploaded_file(path: Path, f):
with open(path, "wb+") as destination:
for chunk in f.chunks(4096):
destination.write(chunk)
def dump_curated_tags(filename: str) -> None:
# fmt: off
curated_tags = SiteObservationTag.objects.filter(
user__isnull=False,
).annotate(
ann_target_name=F('target__title'),
)
users = User.objects.filter(
pk__in=curated_tags.values('user'),
)
siteobs_tag_group = SiteObvsSiteObservationTag.objects.filter(
site_obvs_tag__in=curated_tags.values('pk'),
).annotate(
ann_site_obvs_longcode=F('site_observation__longcode')
)
site_obvs_group = SiteObservationGroup.objects.filter(
pk__in=curated_tags.values('mol_group'),
).annotate(
ann_target_name=F('target__title'),
)
site_obvs_obvs_group = SiteObvsSiteObservationGroup.objects.filter(
site_obvs_group__in=site_obvs_group.values('pk'),
).annotate(
ann_site_obvs_longcode=F('site_observation__longcode')
)
# fmt: on
result = {}
for qs in (
users,
curated_tags,
siteobs_tag_group,
site_obvs_group,
site_obvs_obvs_group,
):
if qs.exists():
jq = JsonResponse(list(qs.values()), safe=False)
# have to pass through JsonResponse because that knows how
# to parse django db field types
data = json.loads(jq.content)
name = qs[0]._meta.label # pylint: disable=protected-access
result[name] = data
with open(filename, 'w', encoding='utf-8') as writer:
writer.write(json.dumps(result, indent=4))
def restore_curated_tags(filename: str) -> None:
with open(filename, 'r', encoding='utf-8') as reader:
content = json.loads(reader.read())
# models have to be saved in this order:
# 1) User
# 1) SiteObservationGroup <- target
# 2) SiteObservationTag <- target, user
# 3) SiteObvsSiteObservationGroup <- siteobvs
# 3) SiteObvsSiteObservationTag <- siteobvs
# takes a bit different approach with target and user - if user is
# missing, restores the user and continues with tags, if target is
# missing, skips the tag. This seems logical (at least at the time
# writing this): if target hasn't been added obviously user
# doesn't care about restoring the tags, but user might be
# legitimately missing (hasn't logged in yet, and somebody else is
# uploading the data)
targets = Target.objects.all()
site_observations = SiteObservation.objects.all()
try:
with transaction.atomic():
new_mol_groups_by_old_pk = {}
new_tags_by_old_pk = {}
new_users_by_old_pk = {}
user_data = content.get(
User._meta.label, # pylint: disable=protected-access
[],
)
for data in user_data:
pk = data.pop('id')
try:
user = User.objects.get(username=data['username'])
except User.DoesNotExist:
user = User(**data)
user.save()
new_users_by_old_pk[pk] = user
so_group_data = content.get(
SiteObservationGroup._meta.label, # pylint: disable=protected-access
[],
)
for data in so_group_data:
try:
target = targets.get(title=data['ann_target_name'])
except Target.DoesNotExist:
logger.warning(
'Tried to restore SiteObservationGroup for target that does not exist: %s',
data['ann_target_name'],
)
continue
data['target'] = target
pk = data.pop('id')
del data['ann_target_name']
del data['target_id']
sog = SiteObservationGroup(**data)
sog.save()
new_mol_groups_by_old_pk[pk] = sog
so_tag_data = content.get(
SiteObservationTag._meta.label, # pylint: disable=protected-access
[],
)
for data in so_tag_data:
try:
target = targets.get(title=data['ann_target_name'])
except Target.DoesNotExist:
logger.warning(
'Tried to restore SiteObservationTag for target that does not exist: %s',
data['ann_target_name'],
)
continue
data['target'] = target
pk = data.pop('id')
del data['ann_target_name']
del data['target_id']
if data['mol_group_id']:
data['mol_group_id'] = new_mol_groups_by_old_pk[
data['mol_group_id']
].pk
data['user'] = new_users_by_old_pk[data['user_id']]
del data['user_id']
tag = SiteObservationTag(**data)
try:
with transaction.atomic():
tag.save()
except IntegrityError:
# this is an incredibly unlikely scenario where
# tag already exists - user must have, before
# restoring the tags, slightly edited an
# auto-generated tag. I can update the curated
# fields, but given they're both curated at this
# point, I choose to do nothing, skip the tag
logger.error(
'Curated tag %s already exists, skipping restore', data['tag']
)
continue
new_tags_by_old_pk[pk] = tag
so_so_group_data = content.get(
SiteObvsSiteObservationGroup._meta.label, # pylint: disable=protected-access
[],
)
for data in so_so_group_data:
try:
site_obvs = site_observations.get(
longcode=data['ann_site_obvs_longcode']
)
except SiteObservation.DoesNotExist:
logger.warning(
'Tried to restore SiteObvsSiteObservationGroup for site_observation that does not exist: %s',
data['ann_site_obvs_longcode'],
)
continue
site_obvs = site_observations.get(
longcode=data['ann_site_obvs_longcode']
)
data['site_observation'] = site_obvs
del data['id']
del data['ann_site_obvs_longcode']
del data['site_observation_id']
data['site_obvs_group'] = new_mol_groups_by_old_pk[
data['site_obvs_group_id']
]
del data['site_obvs_group_id']
SiteObvsSiteObservationGroup(**data).save()
so_so_tag_data = content.get(
SiteObvsSiteObservationTag._meta.label, # pylint: disable=protected-access
[],
)
for data in so_so_tag_data:
try:
site_obvs = site_observations.get(
longcode=data['ann_site_obvs_longcode']
)
except SiteObservation.DoesNotExist:
logger.warning(
'Tried to restore SiteObvsSiteObservationTag for site_observation that does not exist: %s',
data['ann_site_obvs_longcode'],
)
continue
data['site_observation'] = site_obvs
del data['id']
del data['ann_site_obvs_longcode']
del data['site_observation_id']
data['site_obvs_tag'] = new_tags_by_old_pk.get(
data['site_obvs_tag_id'], None
)
del data['site_obvs_tag_id']
if data['site_obvs_tag']:
# tag may be missing if not restored
SiteObvsSiteObservationTag(**data).save()
except IntegrityError as exc:
logger.error(exc)
def alphanumerator(
start_from: str = "", drop_first: bool = True
) -> Generator[str, None, None]:
"""Return alphabetic generator (A, B .. AA, AB...) starting from a specified point.
drop_first - as per workflow, usually it's given the last letter
of previous sequence so the the next in the pipeline should be
start_from + 1. drop_first = False indicates this is not necessary
and start_from will be the first the iterator produces
"""
# since product requries finite maximum return string length set
# to 10 characters. that should be enough for fragalysis (and to
# cause database issues)
generator = (
"".join(word)
for word in itertools.chain.from_iterable(
itertools.product(string.ascii_lowercase, repeat=i) for i in range(1, 11)
)
)
# Drop values until the starting point is reached
if start_from is not None and start_from != '':
start_from = start_from.lower()
generator = itertools.dropwhile(lambda x: x != start_from, generator) # type: ignore[assignment]
if drop_first:
# drop one more, then it starts from after the start from as it should
_ = next(generator)
return generator
def save_tmp_file(myfile):
"""Save file in temporary location for validation/upload processing"""
name = myfile.name
path = default_storage.save('tmp/' + name, ContentFile(myfile.read()))
return str(os.path.join(settings.MEDIA_ROOT, path))
def create_csv_from_dict(input_dict, title=None, filename=None):
"""Write a CSV file containing data from an input dictionary and return a full
to the file (in the media directory).
"""
if not filename:
filename = 'download'
unique_dir = str(uuid.uuid4())
download_path = os.path.join(CSV_TO_DICT_DOWNLOAD_ROOT, unique_dir)
os.makedirs(download_path, exist_ok=True)
download_file = os.path.join(download_path, filename)
# Remove file if it already exists
if os.path.isfile(download_file):
os.remove(download_file)
with open(download_file, "w", newline='', encoding='utf-8') as csvfile:
if title:
csvfile.write(title)
csvfile.write("\n")
df = pd.DataFrame.from_dict(input_dict)
df.to_csv(download_file, mode='a', header=True, index=False)
return download_file
def email_task_completion(
contact_email, message_type, target_name, target_path=None, task_id=None
):
"""Notify user of upload completion"""
logger.info('+ email_notify_task_completion: ' + message_type + ' ' + target_name)
email_from = settings.EMAIL_HOST_USER
if contact_email == '' or not email_from:
# Only send email if configured.
return
if message_type == 'upload-success':
subject = 'Fragalysis: Target: ' + target_name + ' Uploaded'
message = (
'The upload of your target data is complete. Your target is available at: '
+ str(target_path)
)
elif message_type == 'validate-success':
subject = 'Fragalysis: Target: ' + target_name + ' Validation'
message = (
'Your data was validated. It can now be uploaded using the upload option.'
)
else:
# Validation failure
subject = 'Fragalysis: Target: ' + target_name + ' Validation/Upload Failed'
message = (
'The validation/upload of your target data did not complete successfully. '
'Please navigate the following link to check the errors: validate_task/'
+ str(task_id)
)
recipient_list = [
contact_email,
]
logger.info('+ email_notify_task_completion email_from: %s', email_from)
logger.info('+ email_notify_task_completion subject: %s', subject)
logger.info('+ email_notify_task_completion message: %s', message)
logger.info('+ email_notify_task_completion contact_email: %s', contact_email)
# Send email - this should not prevent returning to the screen in the case of error.
send_mail(subject, message, email_from, recipient_list, fail_silently=True)
logger.info('- email_notify_task_completion')
return
def sanitize_directory_name(name: str, path: Path | None = None) -> str:
"""
Sanitize a string to ensure it only contains characters allowed in UNIX directory names.
Parameters:
name: The input string to sanitize.
path (optional): the parent directory where the directory would reside, to check if unique
Returns:
str: A sanitized string with only allowed characters.
"""
# Define allowed characters regex
allowed_chars = re.compile(r'[^a-zA-Z0-9._-]')
# Replace disallowed characters with an underscore
sanitized_name = allowed_chars.sub('_', name.strip())
# Replace multiple underscores with a single underscore
sanitized_name = re.sub(r'__+', '_', sanitized_name)
logger.debug('sanitized name: %s', sanitized_name)
if path:
target_dirs = [d.name for d in list(path.glob("*")) if d.is_dir()]
logger.debug('target dirs: %s', target_dirs)
new_name = sanitized_name
suf = 1
while new_name in target_dirs:
suf = suf + 1
new_name = f'{sanitized_name}_{suf}'
logger.debug('looping suffix: %s', new_name)
sanitized_name = new_name
return sanitized_name