-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmulti.py
110 lines (91 loc) · 4.06 KB
/
multi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import typing as T
import yaml
from cads_adaptors import AbstractCdsAdaptor
from cads_adaptors.adaptors import Request
from cads_adaptors.tools.general import ensure_list
from cads_adaptors.tools.logger import logger
class MultiAdaptor(AbstractCdsAdaptor):
@staticmethod
def split_request(
full_request: Request, # User request
this_values: T.Dict[str, T.Any], # key: [values] for the adaptor component
**config: T.Any,
) -> Request:
"""
Basic request splitter, splits based on whether the values are relevant to
the specific adaptor.
More complex constraints may need a more detailed splitter.
"""
this_request = {}
# loop over keys in this_values, i.e. the keys relevant to this_adaptor
for key in list(this_values):
# get request values for that key
req_vals = full_request.get(key, [])
# filter for values relevant to this_adaptor:
these_vals = [
v for v in ensure_list(req_vals) if v in this_values.get(key, [])
]
if len(these_vals) > 0:
# if values then add to request
this_request[key] = these_vals
elif key in config.get("required_keys", []):
# If a required key, then return an empty dictionary.
# optional keys must be set in the adaptor.json via gecko
return {}
return this_request
def retrieve(self, request: Request):
from cads_adaptors.tools import adaptor_tools, download_tools
download_format = request.pop("download_format", "zip")
these_requests = {}
exception_logs: T.Dict[str, str] = {}
logger.debug(f"MultiAdaptor, full_request: {request}")
for adaptor_tag, adaptor_desc in self.config["adaptors"].items():
this_adaptor = adaptor_tools.get_adaptor(adaptor_desc, self.form)
this_values = adaptor_desc.get("values", {})
this_request = self.split_request(request, this_values, **self.config)
logger.debug(f"MultiAdaptor, {adaptor_tag}, this_request: {this_request}")
# TODO: check this_request is valid for this_adaptor, or rely on try?
# i.e. split_request does NOT implement constraints.
if len(this_request) > 0:
this_request.setdefault("download_format", "list")
these_requests[this_adaptor] = this_request
results = []
for adaptor, req in these_requests.items():
try:
this_result = adaptor.retrieve(req)
except Exception:
logger.debug(Exception)
else:
results += this_result
# TODO: Add parallelistation via multiprocessing
# # Allow a maximum of 2 parallel processes
# import multiprocessing as mp
# pool = mp.Pool(min(len(these_requests), 2))
# def apply_adaptor(args):
# try:
# result = args[0](args[1])
# except Exception as err:
# # Catch any possible exception and store error message in case all adaptors fail
# logger.debug(f"Adaptor Error ({args}): {err}")
# result = []
# return result
# results = pool.map(
# apply_adaptor,
# ((adaptor, request) for adaptor, request in these_requests.items()),
# )
if len(results) == 0:
raise RuntimeError(
"MultiAdaptor returned no results, the error logs of the sub-adaptors is as follows:\n"
f"{yaml.safe_dump(exception_logs)}"
)
# return self.merge_results(results, prefix=self.collection_id)
# close files
[res.close() for res in results]
# get the paths
paths = [res.name for res in results]
download_kwargs = dict(
base_target=f"{self.collection_id}-{hash(tuple(results))}"
)
return download_tools.DOWNLOAD_FORMATS[download_format](
paths, **download_kwargs
)