Skip to content

Commit 91a3880

Browse files
authored
10 base multi adaptor clean (#28)
* clean version of multi-adaptor class * ensure_list to general.py to prevent circular import
1 parent 614f9d1 commit 91a3880

10 files changed

+282
-27
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ fabric.properties
197197

198198
# Android studio 3.1+ serialized cache file
199199
.idea/caches/build_file_checksums.ser
200-
200+
.idea/
201201
### PyCharm Patch ###
202202
# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721
203203

@@ -467,3 +467,4 @@ $RECYCLE.BIN/
467467
*.lnk
468468

469469
# End of https://www.toptal.com/developers/gitignore/api/python,jupyternotebooks,vim,visualstudiocode,pycharm,emacs,linux,macos,windows
470+
test.ipynb

cads_adaptors/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
)
3131
from cads_adaptors.adaptors.legacy import LegacyCdsAdaptor
3232
from cads_adaptors.adaptors.mars import DirectMarsCdsAdaptor, MarsCdsAdaptor
33+
from cads_adaptors.adaptors.multi import MultiAdaptor
3334
from cads_adaptors.adaptors.url import UrlCdsAdaptor
3435

3536
from .tools.adaptor_tools import get_adaptor_class
@@ -46,4 +47,5 @@
4647
"LegacyCdsAdaptor",
4748
"MarsCdsAdaptor",
4849
"UrlCdsAdaptor",
50+
"MultiAdaptor",
4951
]

cads_adaptors/adaptors/cds.py

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class AbstractCdsAdaptor(AbstractAdaptor):
99

1010
def __init__(self, form: dict[str, Any], **config: Any):
1111
self.form = form
12+
self.collection_id = config.get("collection_id", "unknown-collection")
1213
self.constraints = config.pop("constraints", [])
1314
self.mapping = config.pop("mapping", {})
1415
self.licences: list[tuple[str, int]] = config.pop("licences", [])

cads_adaptors/adaptors/mars.py

+39-19
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,56 @@
55
from cads_adaptors.adaptors import Request, cds
66

77

8-
class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor):
9-
resources = {"MARS_CLIENT": 1}
8+
def execute_mars(request: Request, target="data.grib"):
9+
import subprocess
1010

11-
def retrieve(self, request: Request) -> BinaryIO:
12-
import subprocess
11+
with open("r", "w") as fp:
12+
print(f"retrieve, target={target}", file=fp)
13+
for key, value in request.items():
14+
if not isinstance(value, (list, tuple)):
15+
value = [value]
16+
print(f", {key}={'/'.join(str(v) for v in value)}", file=fp)
1317

14-
with open("r", "w") as fp:
15-
print("retrieve, target=data.grib", file=fp)
16-
for key, value in request.items():
17-
if not isinstance(value, (list, tuple)):
18-
value = [value]
19-
print(f", {key}={'/'.join(str(v) for v in value)}", file=fp)
18+
env = dict(**os.environ)
19+
# FIXME: set with the namespace and user_id
20+
namespace = "cads"
21+
user_id = 0
22+
env["MARS_USER"] = f"{namespace}-{user_id}"
2023

21-
env = dict(**os.environ)
22-
# FIXME: set with the namespace and user_id
23-
namespace = "cads"
24-
user_id = 0
25-
env["MARS_USER"] = f"{namespace}-{user_id}"
24+
subprocess.run(["/usr/local/bin/mars", "r"], check=True, env=env)
2625

27-
subprocess.run(["/usr/local/bin/mars", "r"], check=True, env=env)
26+
return target
2827

29-
return open("data.grib") # type: ignore
28+
29+
class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor):
30+
resources = {"MARS_CLIENT": 1}
31+
32+
def retrieve(self, request: Request) -> BinaryIO:
33+
result = execute_mars(request)
34+
35+
return open(result) # type: ignore
3036

3137

3238
class MarsCdsAdaptor(DirectMarsCdsAdaptor):
3339
def retrieve(self, request: Request) -> BinaryIO:
40+
from cads_adaptors.tools import download_tools
41+
42+
# Format of data files, grib or netcdf
3443
data_format = request.pop("format", "grib")
3544

45+
# Format of download archive, as_source, zip, tar, list etc.
46+
download_format = request.pop("download_format", "as_source")
47+
3648
mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore
37-
if data_format != "grib":
49+
if data_format not in ["grib"]:
3850
# FIXME: reformat if needed
3951
pass
40-
return super().retrieve(mapped_request)
52+
53+
result = execute_mars(mapped_request)
54+
55+
download_kwargs = {
56+
"base_target": f"{self.collection_id}-{hash(tuple(request))}"
57+
}
58+
return download_tools.DOWNLOAD_FORMATS[download_format](
59+
[result], **download_kwargs
60+
)

cads_adaptors/adaptors/multi.py

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import typing as T
2+
3+
import yaml
4+
5+
from cads_adaptors import AbstractCdsAdaptor
6+
from cads_adaptors.adaptors import Request
7+
from cads_adaptors.tools.general import ensure_list
8+
from cads_adaptors.tools.logger import logger
9+
10+
11+
class MultiAdaptor(AbstractCdsAdaptor):
12+
@staticmethod
13+
def split_request(
14+
full_request: Request, # User request
15+
this_values: T.Dict[str, T.Any], # key: [values] for the adaptor component
16+
**config: T.Any,
17+
) -> Request:
18+
"""
19+
Basic request splitter, splits based on whether the values are relevant to
20+
the specific adaptor.
21+
More complex constraints may need a more detailed splitter.
22+
"""
23+
this_request = {}
24+
# loop over keys in this_values, i.e. the keys relevant to this_adaptor
25+
for key in list(this_values):
26+
# get request values for that key
27+
req_vals = full_request.get(key, [])
28+
# filter for values relevant to this_adaptor:
29+
these_vals = [
30+
v for v in ensure_list(req_vals) if v in this_values.get(key, [])
31+
]
32+
if len(these_vals) > 0:
33+
# if values then add to request
34+
this_request[key] = these_vals
35+
elif key in config.get("required_keys", []):
36+
# If a required key, then return an empty dictionary.
37+
# optional keys must be set in the adaptor.json via gecko
38+
return {}
39+
40+
return this_request
41+
42+
def retrieve(self, request: Request):
43+
from cads_adaptors.tools import adaptor_tools, download_tools
44+
45+
download_format = request.pop("download_format", "zip")
46+
47+
these_requests = {}
48+
exception_logs: T.Dict[str, str] = {}
49+
logger.debug(f"MultiAdaptor, full_request: {request}")
50+
for adaptor_tag, adaptor_desc in self.config["adaptors"].items():
51+
this_adaptor = adaptor_tools.get_adaptor(adaptor_desc, self.form)
52+
this_values = adaptor_desc.get("values", {})
53+
54+
this_request = self.split_request(request, this_values, **self.config)
55+
logger.debug(f"MultiAdaptor, {adaptor_tag}, this_request: {this_request}")
56+
57+
# TODO: check this_request is valid for this_adaptor, or rely on try?
58+
# i.e. split_request does NOT implement constraints.
59+
if len(this_request) > 0:
60+
this_request.setdefault("download_format", "list")
61+
these_requests[this_adaptor] = this_request
62+
63+
results = []
64+
for adaptor, req in these_requests.items():
65+
try:
66+
this_result = adaptor.retrieve(req)
67+
except Exception:
68+
logger.debug(Exception)
69+
else:
70+
results += this_result
71+
72+
# TODO: Add parallelistation via multiprocessing
73+
# # Allow a maximum of 2 parallel processes
74+
# import multiprocessing as mp
75+
76+
# pool = mp.Pool(min(len(these_requests), 2))
77+
78+
# def apply_adaptor(args):
79+
# try:
80+
# result = args[0](args[1])
81+
# except Exception as err:
82+
# # Catch any possible exception and store error message in case all adaptors fail
83+
# logger.debug(f"Adaptor Error ({args}): {err}")
84+
# result = []
85+
# return result
86+
87+
# results = pool.map(
88+
# apply_adaptor,
89+
# ((adaptor, request) for adaptor, request in these_requests.items()),
90+
# )
91+
92+
if len(results) == 0:
93+
raise RuntimeError(
94+
"MultiAdaptor returned no results, the error logs of the sub-adaptors is as follows:\n"
95+
f"{yaml.safe_dump(exception_logs)}"
96+
)
97+
98+
# return self.merge_results(results, prefix=self.collection_id)
99+
# close files
100+
[res.close() for res in results]
101+
# get the paths
102+
paths = [res.name for res in results]
103+
104+
download_kwargs = dict(
105+
base_target=f"{self.collection_id}-{hash(tuple(results))}"
106+
)
107+
108+
return download_tools.DOWNLOAD_FORMATS[download_format](
109+
paths, **download_kwargs
110+
)

cads_adaptors/adaptors/url.py

+16-7
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,29 @@
66

77
class UrlCdsAdaptor(cds.AbstractCdsAdaptor):
88
def retrieve(self, request: Request) -> BinaryIO:
9-
from cads_adaptors.tools import url_tools
9+
from cads_adaptors.tools import download_tools, url_tools
1010

11-
data_format = request.pop("format", "zip")
11+
download_format = request.pop("format", "zip") # TODO: Remove legacy syntax
12+
# CADS syntax over-rules legacy syntax
13+
download_format = request.pop("download_format", download_format)
1214

13-
if data_format not in {"zip", "tgz"}:
14-
raise ValueError(f"{data_format=} is not supported")
15+
# Do not need to check twice
16+
# if download_format not in {"zip", "tgz"}:
17+
# raise ValueError(f"{download_format} is not supported")
1518

1619
mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore
1720

21+
# Convert request to list of URLs
1822
requests_urls = url_tools.requests_to_urls(
1923
mapped_request, patterns=self.config["patterns"]
2024
)
2125

22-
path = url_tools.download_from_urls(
23-
[ru["url"] for ru in requests_urls], data_format=data_format
26+
# try to download URLs
27+
urls = [ru["url"] for ru in requests_urls]
28+
paths = url_tools.try_download(urls)
29+
30+
download_kwargs = {"base_target": f"{self.collection_id}-{hash(tuple(urls))}"}
31+
32+
return download_tools.DOWNLOAD_FORMATS[download_format](
33+
paths, **download_kwargs
2434
)
25-
return open(path, "rb")

cads_adaptors/tools/download_tools.py

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import os
2+
from typing import BinaryIO, Callable, Dict, List
3+
4+
from cads_adaptors.tools.general import ensure_list
5+
6+
7+
# TODO use targzstream
8+
def zip_paths(paths: List[str], base_target: str = "output-data", **kwargs) -> BinaryIO:
9+
import zipfile
10+
11+
target = f"{base_target}.zip"
12+
with zipfile.ZipFile(target, mode="w") as archive:
13+
for p in paths:
14+
archive.write(p)
15+
16+
for p in paths:
17+
os.remove(p)
18+
19+
return open(target, "rb")
20+
21+
22+
# TODO zipstream for archive creation
23+
def targz_paths(
24+
paths: List[str],
25+
base_target: str = "output-data",
26+
**kwargs,
27+
) -> BinaryIO:
28+
import tarfile
29+
30+
target = f"{base_target}.tar.gz"
31+
with tarfile.open(target, "w:gz") as archive:
32+
for p in paths:
33+
archive.add(p)
34+
35+
for p in paths:
36+
os.remove(p)
37+
38+
return open(target, "rb")
39+
40+
41+
def list_paths(
42+
paths: List[str],
43+
**kwargs,
44+
) -> List:
45+
return [open(path, "rb") for path in ensure_list(paths)]
46+
47+
48+
def as_source(paths: List[str], **kwargs) -> BinaryIO:
49+
# Only return as_source if a single path, otherwise list MUST be requested
50+
assert len(paths) == 1
51+
return open(paths[0], "rb")
52+
53+
54+
DOWNLOAD_FORMATS: Dict[str, Callable] = {
55+
"zip": zip_paths,
56+
"tgz": targz_paths,
57+
"list": list_paths,
58+
"as_source": as_source,
59+
}

cads_adaptors/tools/general.py

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
def ensure_list(input_item):
2+
"""Ensure that item is a list, generally for iterability."""
3+
if not isinstance(input_item, list):
4+
return [input_item]
5+
return input_item

ci/environment-integration.yml

+1
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ dependencies:
77
- pytest
88
- pytest-cov
99
# DO NOT EDIT ABOVE THIS LINE, ADD DEPENDENCIES BELOW
10+
- pyyaml

tests/test_20_adaptor_multi.py

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from cads_adaptors.adaptors import multi
2+
3+
FORM = {
4+
"level": ["500", "850"],
5+
"time": ["12:00", "00:00"],
6+
"param": ["Z", "T"],
7+
"stat": ["mean", "max"],
8+
}
9+
REQUEST = FORM.copy()
10+
11+
ADAPTOR_CONFIG = {
12+
"entry_point": "MultiAdaptor",
13+
"adaptors": {
14+
"mean": {
15+
"entry_point": "cads_adaptors:UrlCdsAdaptor",
16+
"values": {
17+
"level": ["500", "850"],
18+
"time": ["12:00", "00:00"],
19+
"param": ["Z", "T"],
20+
"stat": ["mean"],
21+
},
22+
},
23+
"max": {
24+
"entry_point": "cads_adaptors:DummyAdaptor",
25+
"values": {
26+
"level": ["500", "850"],
27+
"time": ["12:00", "00:00"],
28+
"param": ["Z", "T"],
29+
"stat": ["max"],
30+
},
31+
},
32+
},
33+
}
34+
35+
36+
def test_multi_adaptor_split():
37+
multi_adaptor = multi.MultiAdaptor(FORM, **ADAPTOR_CONFIG)
38+
39+
split_mean = multi_adaptor.split_request(
40+
REQUEST, multi_adaptor.config["adaptors"]["mean"]["values"]
41+
)
42+
assert split_mean == ADAPTOR_CONFIG["adaptors"]["mean"]["values"]
43+
44+
split_max = multi_adaptor.split_request(
45+
REQUEST, multi_adaptor.config["adaptors"]["max"]["values"]
46+
)
47+
assert split_max == ADAPTOR_CONFIG["adaptors"]["max"]["values"]

0 commit comments

Comments
 (0)