From 7062d0e4c4d44bb12d185c43cbb814146d008e3b Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 10 Aug 2023 14:36:18 +0100 Subject: [PATCH 01/14] clean version of multi-adaptor class --- cads_adaptors/__init__.py | 2 + cads_adaptors/adaptors/cds.py | 1 + cads_adaptors/adaptors/multi.py | 106 ++++++++++++++++++++++++++ cads_adaptors/adaptors/url.py | 23 ++++-- cads_adaptors/tools/__init__.py | 5 ++ cads_adaptors/tools/download_tools.py | 59 ++++++++++++++ tests/test_20_adaptor_multi.py | 47 ++++++++++++ 7 files changed, 236 insertions(+), 7 deletions(-) create mode 100644 cads_adaptors/adaptors/multi.py create mode 100644 cads_adaptors/tools/download_tools.py create mode 100644 tests/test_20_adaptor_multi.py diff --git a/cads_adaptors/__init__.py b/cads_adaptors/__init__.py index 6198f896..88fb4a26 100644 --- a/cads_adaptors/__init__.py +++ b/cads_adaptors/__init__.py @@ -27,6 +27,7 @@ from cads_adaptors.adaptors.legacy import LegacyCdsAdaptor from cads_adaptors.adaptors.mars import DirectMarsCdsAdaptor, MarsCdsAdaptor from cads_adaptors.adaptors.url import UrlCdsAdaptor +from cads_adaptors.adaptors.multi import MultiAdaptor from .tools.adaptor_tools import get_adaptor_class @@ -40,4 +41,5 @@ "LegacyCdsAdaptor", "MarsCdsAdaptor", "UrlCdsAdaptor", + "MultiAdaptor", ] diff --git a/cads_adaptors/adaptors/cds.py b/cads_adaptors/adaptors/cds.py index 04c6c3db..2aafcb38 100644 --- a/cads_adaptors/adaptors/cds.py +++ b/cads_adaptors/adaptors/cds.py @@ -9,6 +9,7 @@ class AbstractCdsAdaptor(AbstractAdaptor): def __init__(self, form: dict[str, Any], **config: Any): self.form = form + self.collection_id = config.get("collection_id", "unknown-collection") self.constraints = config.pop("constraints", []) self.mapping = config.pop("mapping", {}) self.licences: list[tuple[str, int]] = config.pop("licences", []) diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py new file mode 100644 index 00000000..cb91264d --- /dev/null +++ b/cads_adaptors/adaptors/multi.py @@ -0,0 +1,106 @@ +import logging +import typing as T + +import yaml + +from cads_adaptors import AbstractCdsAdaptor +from cads_adaptors.adaptors import Request + +logger = logging.Logger(__name__) + + +def ensure_list(input_item): + if not isinstance(input_item, list): + return [input_item] + return input_item + + +class MultiAdaptor(AbstractCdsAdaptor): + @staticmethod + def split_request( + full_request: Request, # User request + this_values: T.Dict[str, T.Any], # key: [values] for the adaptor component + **config: T.Any, + ) -> Request: + """ + Basic request splitter, splits based on whether the values are relevant to + the specific adaptor. + More complex constraints may need a more detailed splitter. + """ + this_request = {} + # loop over keys in this_values, i.e. the keys relevant to this_adaptor + for key in list(this_values): + # get request values for that key + req_vals = full_request.get(key, []) + # filter for values relevant to this_adaptor: + these_vals = [ + v for v in ensure_list(req_vals) if v in this_values.get(key, []) + ] + if len(these_vals) > 0: + # if values then add to request + this_request[key] = these_vals + elif key not in config.get("optional_keys", []): + # If not an optional key, then return an empty dictionary. + # optional keys must be set in the adaptor.json via gecko + return {} + + return this_request + + def retrieve(self, request: Request): + import multiprocessing as mp + + from cads_adaptors.tools import adaptor_tools, download_tools + + download_format = request.pop("download_format", "zip") + + these_requests = {} + exception_logs: T.Dict[str, str] = {} + for adaptor_tag, adaptor_desc in self.config["adaptors"].items(): + this_adaptor = adaptor_tools.get_adaptor(adaptor_desc, self.form) + this_values = adaptor_desc.get("values", {}) + + this_request = self.split_request(request, this_values, **self.config) + logger.debug(f"{adaptor_tag}, request: {this_request}") + + # TODO: check this_request is valid for this_adaptor, or rely on try? + # i.e. split_request does NOT implement constraints. + if len(this_request) > 0: + this_request.setdefault("download_format", "list") + these_requests[this_adaptor] = this_request + + # Allow a maximum of 2 parallel processes + pool = mp.Pool(min(len(these_requests), 2)) + + def apply_adaptor(args): + try: + result = args[0](args[1]) + except Exception as err: + # Catch any possible exception and store error message in case all adaptors fail + logger.debug(f"Adaptor Error ({args}): {err}") + result = [] + return result + + results = pool.map( + apply_adaptor, + ((adaptor, request) for adaptor, request in these_requests.items()), + ) + + if len(results) == 0: + raise RuntimeError( + "MultiAdaptor returned no results, the error logs of the sub-adaptors is as follows:\n" + f"{yaml.safe_dump(exception_logs)}" + ) + + # return self.merge_results(results, prefix=self.collection_id) + # close files + [res.close() for res in results] + # get the paths + paths = [res.name for res in results] + + download_kwargs = dict( + base_target=f"{self.collection_id}-{hash(tuple(results))}" + ) + + return download_tools.DOWNLOAD_FORMATS[download_format]( + paths, **download_kwargs + ) diff --git a/cads_adaptors/adaptors/url.py b/cads_adaptors/adaptors/url.py index 4e718c17..b0c47c9a 100644 --- a/cads_adaptors/adaptors/url.py +++ b/cads_adaptors/adaptors/url.py @@ -6,20 +6,29 @@ class UrlCdsAdaptor(cds.AbstractCdsAdaptor): def retrieve(self, request: Request) -> BinaryIO: - from cads_adaptors.tools import url_tools + from cads_adaptors.tools import download_tools, url_tools - data_format = request.pop("format", "zip") + download_format = request.pop("format", "zip") # TODO: Remove legacy syntax + # CADS syntax over-rules legacy syntax + download_format = request.pop("download_format", download_format) - if data_format not in {"zip", "tgz"}: - raise ValueError(f"{data_format=} is not supported") + # Do not need to check twice + # if download_format not in {"zip", "tgz"}: + # raise ValueError(f"{download_format} is not supported") mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore + # Convert request to list of URLs requests_urls = url_tools.requests_to_urls( mapped_request, patterns=self.config["patterns"] ) - path = url_tools.download_from_urls( - [ru["url"] for ru in requests_urls], data_format=data_format + # try to download URLs + urls = [ru["url"] for ru in requests_urls] + paths = url_tools.try_download(urls) + + download_kwargs = {"base_target": f"{self.collection_id}-{hash(tuple(urls))}"} + + return download_tools.DOWNLOAD_FORMATS[download_format]( + paths, **download_kwargs ) - return open(path, "rb") diff --git a/cads_adaptors/tools/__init__.py b/cads_adaptors/tools/__init__.py index e69de29b..3f6f00fd 100644 --- a/cads_adaptors/tools/__init__.py +++ b/cads_adaptors/tools/__init__.py @@ -0,0 +1,5 @@ +def ensure_list(input_item): + """Ensure that item is a list, generally for iterability.""" + if not isinstance(input_item, list): + return [input_item] + return input_item diff --git a/cads_adaptors/tools/download_tools.py b/cads_adaptors/tools/download_tools.py new file mode 100644 index 00000000..5a4329d8 --- /dev/null +++ b/cads_adaptors/tools/download_tools.py @@ -0,0 +1,59 @@ +import os +from typing import BinaryIO, Callable, Dict, List + +from cads_adaptors.tools import ensure_list + + +# TODO use targzstream +def zip_paths(paths: List[str], base_target: str = "output-data", **kwargs) -> BinaryIO: + import zipfile + + target = f"{base_target}.zip" + with zipfile.ZipFile(target, mode="w") as archive: + for p in paths: + archive.write(p) + + for p in paths: + os.remove(p) + + return open(target, "rb") + + +# TODO zipstream for archive creation +def targz_paths( + paths: List[str], + base_target: str = "output-data", + **kwargs, +) -> BinaryIO: + import tarfile + + target = f"{base_target}.tar.gz" + with tarfile.open(target, "w:gz") as archive: + for p in paths: + archive.add(p) + + for p in paths: + os.remove(p) + + return open(target, "rb") + + +def list_paths( + paths: List[str], + **kwargs, +) -> List: + return [open(path, "rb") for path in ensure_list(paths)] + + +def as_source(paths: List[str], **kwargs) -> BinaryIO: + # Only return as_source if a single path, otherwise list MUST be requested + assert len(paths) == 1 + return open(paths[0], "rb") + + +DOWNLOAD_FORMATS: Dict[str, Callable] = { + "zip": zip_paths, + "tgz": targz_paths, + "list": list_paths, + "as_source": as_source, +} diff --git a/tests/test_20_adaptor_multi.py b/tests/test_20_adaptor_multi.py new file mode 100644 index 00000000..61650e57 --- /dev/null +++ b/tests/test_20_adaptor_multi.py @@ -0,0 +1,47 @@ +from cads_adaptors.adaptors import multi + +FORM = { + "level": ["500", "850"], + "time": ["12:00", "00:00"], + "param": ["Z", "T"], + "stat": ["mean", "max"], +} +REQUEST = FORM.copy() + +ADAPTOR_CONFIG = { + "entry_point": "MultiAdaptor", + "adaptors": { + "mean": { + "entry_point": "cads_adaptors:UrlCdsAdaptor", + "values": { + "level": ["500", "850"], + "time": ["12:00", "00:00"], + "param": ["Z", "T"], + "stat": ["mean"], + }, + }, + "max": { + "entry_point": "cads_adaptors:DummyAdaptor", + "values": { + "level": ["500", "850"], + "time": ["12:00", "00:00"], + "param": ["Z", "T"], + "stat": ["max"], + }, + }, + }, +} + + +def test_multi_adaptor_split(): + multi_adaptor = multi.MultiAdaptor(FORM, **ADAPTOR_CONFIG) + + split_mean = multi_adaptor.split_request( + REQUEST, multi_adaptor.config["adaptors"]["mean"]["values"] + ) + assert split_mean == ADAPTOR_CONFIG["adaptors"]["mean"]["values"] + + split_max = multi_adaptor.split_request( + REQUEST, multi_adaptor.config["adaptors"]["max"]["values"] + ) + assert split_max == ADAPTOR_CONFIG["adaptors"]["max"]["values"] From 6ea0a4b4f833261ee4c55a7654213f03ed39f96c Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 10 Aug 2023 14:37:32 +0100 Subject: [PATCH 02/14] clean version of multi-adaptor class --- cads_adaptors/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cads_adaptors/__init__.py b/cads_adaptors/__init__.py index 88fb4a26..14d86264 100644 --- a/cads_adaptors/__init__.py +++ b/cads_adaptors/__init__.py @@ -26,8 +26,8 @@ from cads_adaptors.adaptors.cds import AbstractCdsAdaptor from cads_adaptors.adaptors.legacy import LegacyCdsAdaptor from cads_adaptors.adaptors.mars import DirectMarsCdsAdaptor, MarsCdsAdaptor -from cads_adaptors.adaptors.url import UrlCdsAdaptor from cads_adaptors.adaptors.multi import MultiAdaptor +from cads_adaptors.adaptors.url import UrlCdsAdaptor from .tools.adaptor_tools import get_adaptor_class From d473aa9f5f4d91da748deafcec83bd338f0ac60e Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 10 Aug 2023 15:35:13 +0100 Subject: [PATCH 03/14] mars adaptor to handle lists of requests --- cads_adaptors/adaptors/mars.py | 16 ++++++---- cads_adaptors/adaptors/multi.py | 53 ++++++++++++++++----------------- cads_adaptors/tools/__init__.py | 5 ---- 3 files changed, 37 insertions(+), 37 deletions(-) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index 38c9f232..03d46085 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -3,6 +3,7 @@ from cads_adaptors import mapping from cads_adaptors.adaptors import Request, cds +from cads_adaptors.tools import ensure_list class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor): @@ -10,13 +11,18 @@ class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor): def retrieve(self, request: Request) -> BinaryIO: import subprocess + + request = ensure_list(request) with open("r", "w") as fp: - print("retrieve, target=data.grib", file=fp) - for key, value in request.items(): - if not isinstance(value, (list, tuple)): - value = [value] - print(f", {key}={'/'.join(str(v) for v in value)}", file=fp) + for i, req in enumerate(request): + print("retrieve,", file=fp) + if i==0: + print("target=data.grib", file=fp) + for key, value in req.items(): + if not isinstance(value, (list, tuple)): + value = [value] + print(f", {key}={'/'.join(str(v) for v in value)}", file=fp) env = dict(**os.environ) # FIXME: set with the namespace and user_id diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py index cb91264d..d0e91c7a 100644 --- a/cads_adaptors/adaptors/multi.py +++ b/cads_adaptors/adaptors/multi.py @@ -5,16 +5,10 @@ from cads_adaptors import AbstractCdsAdaptor from cads_adaptors.adaptors import Request +from cads_adaptors.tools import ensure_list logger = logging.Logger(__name__) - -def ensure_list(input_item): - if not isinstance(input_item, list): - return [input_item] - return input_item - - class MultiAdaptor(AbstractCdsAdaptor): @staticmethod def split_request( @@ -47,8 +41,6 @@ def split_request( return this_request def retrieve(self, request: Request): - import multiprocessing as mp - from cads_adaptors.tools import adaptor_tools, download_tools download_format = request.pop("download_format", "zip") @@ -66,24 +58,31 @@ def retrieve(self, request: Request): # i.e. split_request does NOT implement constraints. if len(this_request) > 0: this_request.setdefault("download_format", "list") - these_requests[this_adaptor] = this_request - - # Allow a maximum of 2 parallel processes - pool = mp.Pool(min(len(these_requests), 2)) - - def apply_adaptor(args): - try: - result = args[0](args[1]) - except Exception as err: - # Catch any possible exception and store error message in case all adaptors fail - logger.debug(f"Adaptor Error ({args}): {err}") - result = [] - return result - - results = pool.map( - apply_adaptor, - ((adaptor, request) for adaptor, request in these_requests.items()), - ) + if this_adaptor not in these_requests: + these_requests[this_adaptor] = [this_request] + else: + these_requests[this_adaptor].append(this_request) + + + # TODO: Add multiprocessing + # # Allow a maximum of 2 parallel processes + # import multiprocessing as mp + + # pool = mp.Pool(min(len(these_requests), 2)) + + # def apply_adaptor(args): + # try: + # result = args[0](args[1]) + # except Exception as err: + # # Catch any possible exception and store error message in case all adaptors fail + # logger.debug(f"Adaptor Error ({args}): {err}") + # result = [] + # return result + + # results = pool.map( + # apply_adaptor, + # ((adaptor, request) for adaptor, request in these_requests.items()), + # ) if len(results) == 0: raise RuntimeError( diff --git a/cads_adaptors/tools/__init__.py b/cads_adaptors/tools/__init__.py index 3f6f00fd..e69de29b 100644 --- a/cads_adaptors/tools/__init__.py +++ b/cads_adaptors/tools/__init__.py @@ -1,5 +0,0 @@ -def ensure_list(input_item): - """Ensure that item is a list, generally for iterability.""" - if not isinstance(input_item, list): - return [input_item] - return input_item From f8a198d81af227c7c9e02b69d9f6b142d1406d33 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 10 Aug 2023 15:42:09 +0100 Subject: [PATCH 04/14] mars adaptor back to main --- cads_adaptors/adaptors/mars.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index 03d46085..38c9f232 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -3,7 +3,6 @@ from cads_adaptors import mapping from cads_adaptors.adaptors import Request, cds -from cads_adaptors.tools import ensure_list class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor): @@ -11,18 +10,13 @@ class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor): def retrieve(self, request: Request) -> BinaryIO: import subprocess - - request = ensure_list(request) with open("r", "w") as fp: - for i, req in enumerate(request): - print("retrieve,", file=fp) - if i==0: - print("target=data.grib", file=fp) - for key, value in req.items(): - if not isinstance(value, (list, tuple)): - value = [value] - print(f", {key}={'/'.join(str(v) for v in value)}", file=fp) + print("retrieve, target=data.grib", file=fp) + for key, value in request.items(): + if not isinstance(value, (list, tuple)): + value = [value] + print(f", {key}={'/'.join(str(v) for v in value)}", file=fp) env = dict(**os.environ) # FIXME: set with the namespace and user_id From 50abb7246c0355e655d4bd9cf3ef9da27f94afdc Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 10 Aug 2023 15:44:50 +0100 Subject: [PATCH 05/14] multi adaptor without multi-processing --- cads_adaptors/adaptors/multi.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py index d0e91c7a..99765b78 100644 --- a/cads_adaptors/adaptors/multi.py +++ b/cads_adaptors/adaptors/multi.py @@ -58,13 +58,13 @@ def retrieve(self, request: Request): # i.e. split_request does NOT implement constraints. if len(this_request) > 0: this_request.setdefault("download_format", "list") - if this_adaptor not in these_requests: - these_requests[this_adaptor] = [this_request] - else: - these_requests[this_adaptor].append(this_request) + these_requests[this_adaptor] = [this_request] + results = [] + for adaptor, req in these_requests.items(): + results.append(adaptor(req)) - # TODO: Add multiprocessing + # TODO: Add parallelistation via multiprocessing # # Allow a maximum of 2 parallel processes # import multiprocessing as mp From 4b6dafbb3442b808c35b3645de045918c9b9b3c9 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 10 Aug 2023 17:23:01 +0100 Subject: [PATCH 06/14] ensure_list to general.py to prevent circular import --- cads_adaptors/adaptors/multi.py | 3 ++- cads_adaptors/tools/download_tools.py | 2 +- cads_adaptors/tools/general.py | 5 +++++ 3 files changed, 8 insertions(+), 2 deletions(-) create mode 100644 cads_adaptors/tools/general.py diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py index 99765b78..bd3366a2 100644 --- a/cads_adaptors/adaptors/multi.py +++ b/cads_adaptors/adaptors/multi.py @@ -5,10 +5,11 @@ from cads_adaptors import AbstractCdsAdaptor from cads_adaptors.adaptors import Request -from cads_adaptors.tools import ensure_list +from cads_adaptors.tools.general import ensure_list logger = logging.Logger(__name__) + class MultiAdaptor(AbstractCdsAdaptor): @staticmethod def split_request( diff --git a/cads_adaptors/tools/download_tools.py b/cads_adaptors/tools/download_tools.py index 5a4329d8..74c8ef1c 100644 --- a/cads_adaptors/tools/download_tools.py +++ b/cads_adaptors/tools/download_tools.py @@ -1,7 +1,7 @@ import os from typing import BinaryIO, Callable, Dict, List -from cads_adaptors.tools import ensure_list +from cads_adaptors.tools.general import ensure_list # TODO use targzstream diff --git a/cads_adaptors/tools/general.py b/cads_adaptors/tools/general.py new file mode 100644 index 00000000..3f6f00fd --- /dev/null +++ b/cads_adaptors/tools/general.py @@ -0,0 +1,5 @@ +def ensure_list(input_item): + """Ensure that item is a list, generally for iterability.""" + if not isinstance(input_item, list): + return [input_item] + return input_item From 13712b3bc6ff26034927c830e48a0a06f2510cfb Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Fri, 11 Aug 2023 08:02:49 +0100 Subject: [PATCH 07/14] pyyaml types --- ci/environment-ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/ci/environment-ci.yml b/ci/environment-ci.yml index 215c6247..1d075cfc 100644 --- a/ci/environment-ci.yml +++ b/ci/environment-ci.yml @@ -13,4 +13,5 @@ dependencies: - sphinx - sphinx-autoapi # DO NOT EDIT ABOVE THIS LINE, ADD DEPENDENCIES BELOW +- types-PyYAML - types-requests From 9eae633ea3a692265d094e6df4d9e598221f26df Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Fri, 11 Aug 2023 08:07:16 +0100 Subject: [PATCH 08/14] pyyaml to dependancies --- environment.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/environment.yml b/environment.yml index 251d0706..6cf19453 100644 --- a/environment.yml +++ b/environment.yml @@ -10,6 +10,7 @@ channels: dependencies: - pip - requests +- pyyaml - pip: - cacholote - multiurl From b7c4ce1d7302c31bda95cd2655d59149594a317e Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Fri, 11 Aug 2023 08:16:28 +0100 Subject: [PATCH 09/14] yaml only imported where needed for integration --- cads_adaptors/adaptors/multi.py | 4 ++-- ci/environment-integration.yml | 1 + environment.yml | 1 - 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py index bd3366a2..ba4f5913 100644 --- a/cads_adaptors/adaptors/multi.py +++ b/cads_adaptors/adaptors/multi.py @@ -1,8 +1,6 @@ import logging import typing as T -import yaml - from cads_adaptors import AbstractCdsAdaptor from cads_adaptors.adaptors import Request from cads_adaptors.tools.general import ensure_list @@ -86,6 +84,8 @@ def retrieve(self, request: Request): # ) if len(results) == 0: + import yaml + raise RuntimeError( "MultiAdaptor returned no results, the error logs of the sub-adaptors is as follows:\n" f"{yaml.safe_dump(exception_logs)}" diff --git a/ci/environment-integration.yml b/ci/environment-integration.yml index 6a8511d3..f45563ff 100644 --- a/ci/environment-integration.yml +++ b/ci/environment-integration.yml @@ -7,3 +7,4 @@ dependencies: - pytest - pytest-cov # DO NOT EDIT ABOVE THIS LINE, ADD DEPENDENCIES BELOW +- pyyaml diff --git a/environment.yml b/environment.yml index 6cf19453..251d0706 100644 --- a/environment.yml +++ b/environment.yml @@ -10,7 +10,6 @@ channels: dependencies: - pip - requests -- pyyaml - pip: - cacholote - multiurl From 9dc0a54addc2e78363c6071e0ea042728e158c0e Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Fri, 11 Aug 2023 16:36:03 +0100 Subject: [PATCH 10/14] multi adaptor dbug --- .gitignore | 5 +-- cads_adaptors/adaptors/mars.py | 56 ++++++++++++++++++++++----------- cads_adaptors/adaptors/multi.py | 28 +++++++++++------ 3 files changed, 60 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index c7d9595b..2ab9b76f 100644 --- a/.gitignore +++ b/.gitignore @@ -128,7 +128,7 @@ Temporary Items .idea/**/usage.statistics.xml .idea/**/dictionaries .idea/**/shelf - +.idea/ # AWS User-specific .idea/**/aws.xml @@ -196,7 +196,7 @@ fabric.properties # Android studio 3.1+ serialized cache file .idea/caches/build_file_checksums.ser - +.idea/ ### PyCharm Patch ### # Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721 @@ -466,3 +466,4 @@ $RECYCLE.BIN/ *.lnk # End of https://www.toptal.com/developers/gitignore/api/python,jupyternotebooks,vim,visualstudiocode,pycharm,emacs,linux,macos,windows +test.ipynb diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index 38c9f232..895de959 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -5,36 +5,56 @@ from cads_adaptors.adaptors import Request, cds +def execute_mars(request: Request, target="data.grib"): + import subprocess + + with open("r", "w") as fp: + print(f"retrieve, target={target}", file=fp) + for key, value in request.items(): + if not isinstance(value, (list, tuple)): + value = [value] + print(f", {key}={'/'.join(str(v) for v in value)}", file=fp) + + env = dict(**os.environ) + # FIXME: set with the namespace and user_id + namespace = "cads" + user_id = 0 + env["MARS_USER"] = f"{namespace}-{user_id}" + + subprocess.run(["/usr/local/bin/mars", "r"], check=True, env=env) + + return target + class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor): resources = {"MARS_CLIENT": 1} def retrieve(self, request: Request) -> BinaryIO: - import subprocess + result = execute_mars(request) - with open("r", "w") as fp: - print("retrieve, target=data.grib", file=fp) - for key, value in request.items(): - if not isinstance(value, (list, tuple)): - value = [value] - print(f", {key}={'/'.join(str(v) for v in value)}", file=fp) + return open(result) # type: ignore - env = dict(**os.environ) - # FIXME: set with the namespace and user_id - namespace = "cads" - user_id = 0 - env["MARS_USER"] = f"{namespace}-{user_id}" - - subprocess.run(["/usr/local/bin/mars", "r"], check=True, env=env) - - return open("data.grib") # type: ignore class MarsCdsAdaptor(DirectMarsCdsAdaptor): def retrieve(self, request: Request) -> BinaryIO: + from cads_adaptors.tools import download_tools + + # Format of data files, grib or netcdf data_format = request.pop("format", "grib") + # Format of download archive, as_source, zip, tar, list etc. + download_format = request.pop("download_format", "as_source") + mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore - if data_format != "grib": + if data_format not in ["grib"]: # FIXME: reformat if needed pass - return super().retrieve(mapped_request) + + result = execute_mars(mapped_request) + + download_kwargs = { + "base_target": f"{self.collection_id}-{hash(tuple(request))}" + } + return download_tools.DOWNLOAD_FORMATS[download_format]( + [result], **download_kwargs + ) diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py index ba4f5913..47d86ed8 100644 --- a/cads_adaptors/adaptors/multi.py +++ b/cads_adaptors/adaptors/multi.py @@ -1,11 +1,13 @@ import logging import typing as T +import yaml + from cads_adaptors import AbstractCdsAdaptor from cads_adaptors.adaptors import Request -from cads_adaptors.tools.general import ensure_list +from cads_adaptors.tools import ensure_list -logger = logging.Logger(__name__) +from cads_adaptors.tools.logger import logger class MultiAdaptor(AbstractCdsAdaptor): @@ -32,8 +34,8 @@ def split_request( if len(these_vals) > 0: # if values then add to request this_request[key] = these_vals - elif key not in config.get("optional_keys", []): - # If not an optional key, then return an empty dictionary. + elif key in config.get("required_keys", []): + # If a required key, then return an empty dictionary. # optional keys must be set in the adaptor.json via gecko return {} @@ -51,17 +53,27 @@ def retrieve(self, request: Request): this_values = adaptor_desc.get("values", {}) this_request = self.split_request(request, this_values, **self.config) - logger.debug(f"{adaptor_tag}, request: {this_request}") + print(f"{adaptor_tag}, request: {request}") + print(f"{adaptor_tag}, this_values: {this_values}") + print(f"{adaptor_tag}, optional_keys: {self.config.get('optional_keys', [])}") + print(f"{adaptor_tag}, this_request: {this_request}") # TODO: check this_request is valid for this_adaptor, or rely on try? # i.e. split_request does NOT implement constraints. if len(this_request) > 0: this_request.setdefault("download_format", "list") - these_requests[this_adaptor] = [this_request] + these_requests[this_adaptor] = this_request results = [] for adaptor, req in these_requests.items(): - results.append(adaptor(req)) + try: + this_result = adaptor.retrieve(req) + except Exception: + logger.debug(Exception) + else: + print(adaptor, req, this_result) + results+=this_result + print(results) # TODO: Add parallelistation via multiprocessing # # Allow a maximum of 2 parallel processes @@ -84,8 +96,6 @@ def retrieve(self, request: Request): # ) if len(results) == 0: - import yaml - raise RuntimeError( "MultiAdaptor returned no results, the error logs of the sub-adaptors is as follows:\n" f"{yaml.safe_dump(exception_logs)}" From 44c72ca8b8e05e7dbbed5f6cf41604674e5618b7 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Tue, 15 Aug 2023 11:03:01 +0100 Subject: [PATCH 11/14] qa --- cads_adaptors/adaptors/mars.py | 4 ++-- cads_adaptors/adaptors/multi.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index 895de959..c7013730 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -25,6 +25,7 @@ def execute_mars(request: Request, target="data.grib"): return target + class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor): resources = {"MARS_CLIENT": 1} @@ -34,7 +35,6 @@ def retrieve(self, request: Request) -> BinaryIO: return open(result) # type: ignore - class MarsCdsAdaptor(DirectMarsCdsAdaptor): def retrieve(self, request: Request) -> BinaryIO: from cads_adaptors.tools import download_tools @@ -49,7 +49,7 @@ def retrieve(self, request: Request) -> BinaryIO: if data_format not in ["grib"]: # FIXME: reformat if needed pass - + result = execute_mars(mapped_request) download_kwargs = { diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py index 47d86ed8..6e460f7d 100644 --- a/cads_adaptors/adaptors/multi.py +++ b/cads_adaptors/adaptors/multi.py @@ -1,4 +1,3 @@ -import logging import typing as T import yaml @@ -6,7 +5,6 @@ from cads_adaptors import AbstractCdsAdaptor from cads_adaptors.adaptors import Request from cads_adaptors.tools import ensure_list - from cads_adaptors.tools.logger import logger @@ -55,7 +53,9 @@ def retrieve(self, request: Request): this_request = self.split_request(request, this_values, **self.config) print(f"{adaptor_tag}, request: {request}") print(f"{adaptor_tag}, this_values: {this_values}") - print(f"{adaptor_tag}, optional_keys: {self.config.get('optional_keys', [])}") + print( + f"{adaptor_tag}, optional_keys: {self.config.get('optional_keys', [])}" + ) print(f"{adaptor_tag}, this_request: {this_request}") # TODO: check this_request is valid for this_adaptor, or rely on try? @@ -72,7 +72,7 @@ def retrieve(self, request: Request): logger.debug(Exception) else: print(adaptor, req, this_result) - results+=this_result + results += this_result print(results) # TODO: Add parallelistation via multiprocessing From f873ad5c23093dcbf416dded523def532271845b Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Tue, 15 Aug 2023 11:05:27 +0100 Subject: [PATCH 12/14] ensure_list in general tools --- cads_adaptors/adaptors/multi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py index 6e460f7d..7662594b 100644 --- a/cads_adaptors/adaptors/multi.py +++ b/cads_adaptors/adaptors/multi.py @@ -4,7 +4,7 @@ from cads_adaptors import AbstractCdsAdaptor from cads_adaptors.adaptors import Request -from cads_adaptors.tools import ensure_list +from cads_adaptors.tools.general import ensure_list from cads_adaptors.tools.logger import logger From 20d6459c6dfa51ab93adaedd328357de80f26440 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Tue, 15 Aug 2023 11:09:57 +0100 Subject: [PATCH 13/14] remove print statements --- cads_adaptors/adaptors/multi.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py index 7662594b..76ba4c15 100644 --- a/cads_adaptors/adaptors/multi.py +++ b/cads_adaptors/adaptors/multi.py @@ -51,12 +51,7 @@ def retrieve(self, request: Request): this_values = adaptor_desc.get("values", {}) this_request = self.split_request(request, this_values, **self.config) - print(f"{adaptor_tag}, request: {request}") - print(f"{adaptor_tag}, this_values: {this_values}") - print( - f"{adaptor_tag}, optional_keys: {self.config.get('optional_keys', [])}" - ) - print(f"{adaptor_tag}, this_request: {this_request}") + logger.debug(f"{adaptor_tag}, this_request: {this_request}") # TODO: check this_request is valid for this_adaptor, or rely on try? # i.e. split_request does NOT implement constraints. @@ -71,9 +66,7 @@ def retrieve(self, request: Request): except Exception: logger.debug(Exception) else: - print(adaptor, req, this_result) results += this_result - print(results) # TODO: Add parallelistation via multiprocessing # # Allow a maximum of 2 parallel processes From c8c53004646795a526e530a3f8ff3cdd87810abf Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Tue, 15 Aug 2023 11:11:09 +0100 Subject: [PATCH 14/14] improved logging --- cads_adaptors/adaptors/multi.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py index 76ba4c15..5b23299c 100644 --- a/cads_adaptors/adaptors/multi.py +++ b/cads_adaptors/adaptors/multi.py @@ -46,12 +46,13 @@ def retrieve(self, request: Request): these_requests = {} exception_logs: T.Dict[str, str] = {} + logger.debug(f"MultiAdaptor, full_request: {request}") for adaptor_tag, adaptor_desc in self.config["adaptors"].items(): this_adaptor = adaptor_tools.get_adaptor(adaptor_desc, self.form) this_values = adaptor_desc.get("values", {}) this_request = self.split_request(request, this_values, **self.config) - logger.debug(f"{adaptor_tag}, this_request: {this_request}") + logger.debug(f"MultiAdaptor, {adaptor_tag}, this_request: {this_request}") # TODO: check this_request is valid for this_adaptor, or rely on try? # i.e. split_request does NOT implement constraints.