forked from edgargaticaCU/biorxiv-aws
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocessFiles.py
94 lines (80 loc) · 2.84 KB
/
processFiles.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import json
import boto3
import os
import shutil
import time
from zipfile import ZipFile
from zipfile import BadZipFile
MAX_TIME = 14.75 * 60 # The max timeout for Lambda functions is 15 minutes so we'll try to be done in 14m 45s
def clear_and_build_directories():
if os.path.isdir('/tmp/meca'):
shutil.rmtree('/tmp/meca')
os.mkdir('/tmp/meca')
if os.path.isdir('/tmp/xml'):
shutil.rmtree('/tmp/xml')
os.mkdir('/tmp/xml')
def download_archive(client, bucket, remote_filepath):
try:
file_part = remote_filepath.split('/')[-1]
if not os.path.isfile('/tmp/meca/' + file_part):
with open('/tmp/meca/' + file_part, 'wb') as dest:
client.download_fileobj(bucket, remote_filepath, dest, ExtraArgs={'RequestPayer': 'requester'})
return '/tmp/meca/' + file_part
except:
return None
def extract_xml_file(archive_filename, output_directory, prefix='content/'):
try:
with ZipFile(archive_filename) as archive_file:
for name in archive_file.namelist():
if name.startswith(prefix) and name.endswith('.xml'):
return archive_file.extract(name, output_directory)
except BadZipFile as bzp:
print(f"Could not extract from {archive_filename}")
print(bzp)
return None
def lambda_handler(event, context):
start = time.time()
if 'body' in event:
body = json.loads(event['body'])
else:
body = event
client = boto3.client('s3')
clear_and_build_directories()
source_bucket = body['source-bucket']
destination_bucket = body['destination']
destination_prefix = body['directory']
gcp_key_id = body['key_id']
gcp_secret = body['secret']
gcp_client = boto3.client(
's3',
region_name='auto',
endpoint_url='https://storage.googleapis.com',
aws_access_key_id=gcp_key_id,
aws_secret_access_key=gcp_secret
)
success_dict = {}
error_list = []
for filepath in body['paths']:
if not filepath.endswith('.meca'):
error_list.append(filepath)
continue
local_filepath = download_archive(client, source_bucket, filepath)
if not local_filepath:
error_list.append(filepath)
continue
xml_filename = extract_xml_file(local_filepath, '/tmp/xml/')
file_part = xml_filename.split('/')[-1]
gcp_client.upload_file(xml_filename, destination_bucket, destination_prefix + file_part)
os.remove(local_filepath)
os.remove(xml_filename)
success_dict[filepath] = file_part
if time.time() - start >= MAX_TIME:
break
return {
'statusCode': 200,
'body': {
'downloaded_files': success_dict,
'error_files': error_list,
'runtime': time.time() - start
}
}