diff --git a/.gitignore b/.gitignore index bff32633..20dd21ed 100644 --- a/.gitignore +++ b/.gitignore @@ -197,7 +197,7 @@ fabric.properties # Android studio 3.1+ serialized cache file .idea/caches/build_file_checksums.ser - +.idea/ ### PyCharm Patch ### # Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721 @@ -467,3 +467,4 @@ $RECYCLE.BIN/ *.lnk # End of https://www.toptal.com/developers/gitignore/api/python,jupyternotebooks,vim,visualstudiocode,pycharm,emacs,linux,macos,windows +test.ipynb diff --git a/cads_adaptors/__init__.py b/cads_adaptors/__init__.py index d6fa96f6..57f7b4f9 100644 --- a/cads_adaptors/__init__.py +++ b/cads_adaptors/__init__.py @@ -30,6 +30,7 @@ ) from cads_adaptors.adaptors.legacy import LegacyCdsAdaptor from cads_adaptors.adaptors.mars import DirectMarsCdsAdaptor, MarsCdsAdaptor +from cads_adaptors.adaptors.multi import MultiAdaptor from cads_adaptors.adaptors.url import UrlCdsAdaptor from .tools.adaptor_tools import get_adaptor_class @@ -46,4 +47,5 @@ "LegacyCdsAdaptor", "MarsCdsAdaptor", "UrlCdsAdaptor", + "MultiAdaptor", ] diff --git a/cads_adaptors/adaptors/cds.py b/cads_adaptors/adaptors/cds.py index 04c6c3db..2aafcb38 100644 --- a/cads_adaptors/adaptors/cds.py +++ b/cads_adaptors/adaptors/cds.py @@ -9,6 +9,7 @@ class AbstractCdsAdaptor(AbstractAdaptor): def __init__(self, form: dict[str, Any], **config: Any): self.form = form + self.collection_id = config.get("collection_id", "unknown-collection") self.constraints = config.pop("constraints", []) self.mapping = config.pop("mapping", {}) self.licences: list[tuple[str, int]] = config.pop("licences", []) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index 38c9f232..c7013730 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -5,36 +5,56 @@ from cads_adaptors.adaptors import Request, cds -class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor): - resources = {"MARS_CLIENT": 1} +def execute_mars(request: Request, target="data.grib"): + import subprocess - def retrieve(self, request: Request) -> BinaryIO: - import subprocess + with open("r", "w") as fp: + print(f"retrieve, target={target}", file=fp) + for key, value in request.items(): + if not isinstance(value, (list, tuple)): + value = [value] + print(f", {key}={'/'.join(str(v) for v in value)}", file=fp) - with open("r", "w") as fp: - print("retrieve, target=data.grib", file=fp) - for key, value in request.items(): - if not isinstance(value, (list, tuple)): - value = [value] - print(f", {key}={'/'.join(str(v) for v in value)}", file=fp) + env = dict(**os.environ) + # FIXME: set with the namespace and user_id + namespace = "cads" + user_id = 0 + env["MARS_USER"] = f"{namespace}-{user_id}" - env = dict(**os.environ) - # FIXME: set with the namespace and user_id - namespace = "cads" - user_id = 0 - env["MARS_USER"] = f"{namespace}-{user_id}" + subprocess.run(["/usr/local/bin/mars", "r"], check=True, env=env) - subprocess.run(["/usr/local/bin/mars", "r"], check=True, env=env) + return target - return open("data.grib") # type: ignore + +class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor): + resources = {"MARS_CLIENT": 1} + + def retrieve(self, request: Request) -> BinaryIO: + result = execute_mars(request) + + return open(result) # type: ignore class MarsCdsAdaptor(DirectMarsCdsAdaptor): def retrieve(self, request: Request) -> BinaryIO: + from cads_adaptors.tools import download_tools + + # Format of data files, grib or netcdf data_format = request.pop("format", "grib") + # Format of download archive, as_source, zip, tar, list etc. + download_format = request.pop("download_format", "as_source") + mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore - if data_format != "grib": + if data_format not in ["grib"]: # FIXME: reformat if needed pass - return super().retrieve(mapped_request) + + result = execute_mars(mapped_request) + + download_kwargs = { + "base_target": f"{self.collection_id}-{hash(tuple(request))}" + } + return download_tools.DOWNLOAD_FORMATS[download_format]( + [result], **download_kwargs + ) diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py new file mode 100644 index 00000000..5b23299c --- /dev/null +++ b/cads_adaptors/adaptors/multi.py @@ -0,0 +1,110 @@ +import typing as T + +import yaml + +from cads_adaptors import AbstractCdsAdaptor +from cads_adaptors.adaptors import Request +from cads_adaptors.tools.general import ensure_list +from cads_adaptors.tools.logger import logger + + +class MultiAdaptor(AbstractCdsAdaptor): + @staticmethod + def split_request( + full_request: Request, # User request + this_values: T.Dict[str, T.Any], # key: [values] for the adaptor component + **config: T.Any, + ) -> Request: + """ + Basic request splitter, splits based on whether the values are relevant to + the specific adaptor. + More complex constraints may need a more detailed splitter. + """ + this_request = {} + # loop over keys in this_values, i.e. the keys relevant to this_adaptor + for key in list(this_values): + # get request values for that key + req_vals = full_request.get(key, []) + # filter for values relevant to this_adaptor: + these_vals = [ + v for v in ensure_list(req_vals) if v in this_values.get(key, []) + ] + if len(these_vals) > 0: + # if values then add to request + this_request[key] = these_vals + elif key in config.get("required_keys", []): + # If a required key, then return an empty dictionary. + # optional keys must be set in the adaptor.json via gecko + return {} + + return this_request + + def retrieve(self, request: Request): + from cads_adaptors.tools import adaptor_tools, download_tools + + download_format = request.pop("download_format", "zip") + + these_requests = {} + exception_logs: T.Dict[str, str] = {} + logger.debug(f"MultiAdaptor, full_request: {request}") + for adaptor_tag, adaptor_desc in self.config["adaptors"].items(): + this_adaptor = adaptor_tools.get_adaptor(adaptor_desc, self.form) + this_values = adaptor_desc.get("values", {}) + + this_request = self.split_request(request, this_values, **self.config) + logger.debug(f"MultiAdaptor, {adaptor_tag}, this_request: {this_request}") + + # TODO: check this_request is valid for this_adaptor, or rely on try? + # i.e. split_request does NOT implement constraints. + if len(this_request) > 0: + this_request.setdefault("download_format", "list") + these_requests[this_adaptor] = this_request + + results = [] + for adaptor, req in these_requests.items(): + try: + this_result = adaptor.retrieve(req) + except Exception: + logger.debug(Exception) + else: + results += this_result + + # TODO: Add parallelistation via multiprocessing + # # Allow a maximum of 2 parallel processes + # import multiprocessing as mp + + # pool = mp.Pool(min(len(these_requests), 2)) + + # def apply_adaptor(args): + # try: + # result = args[0](args[1]) + # except Exception as err: + # # Catch any possible exception and store error message in case all adaptors fail + # logger.debug(f"Adaptor Error ({args}): {err}") + # result = [] + # return result + + # results = pool.map( + # apply_adaptor, + # ((adaptor, request) for adaptor, request in these_requests.items()), + # ) + + if len(results) == 0: + raise RuntimeError( + "MultiAdaptor returned no results, the error logs of the sub-adaptors is as follows:\n" + f"{yaml.safe_dump(exception_logs)}" + ) + + # return self.merge_results(results, prefix=self.collection_id) + # close files + [res.close() for res in results] + # get the paths + paths = [res.name for res in results] + + download_kwargs = dict( + base_target=f"{self.collection_id}-{hash(tuple(results))}" + ) + + return download_tools.DOWNLOAD_FORMATS[download_format]( + paths, **download_kwargs + ) diff --git a/cads_adaptors/adaptors/url.py b/cads_adaptors/adaptors/url.py index 4e718c17..b0c47c9a 100644 --- a/cads_adaptors/adaptors/url.py +++ b/cads_adaptors/adaptors/url.py @@ -6,20 +6,29 @@ class UrlCdsAdaptor(cds.AbstractCdsAdaptor): def retrieve(self, request: Request) -> BinaryIO: - from cads_adaptors.tools import url_tools + from cads_adaptors.tools import download_tools, url_tools - data_format = request.pop("format", "zip") + download_format = request.pop("format", "zip") # TODO: Remove legacy syntax + # CADS syntax over-rules legacy syntax + download_format = request.pop("download_format", download_format) - if data_format not in {"zip", "tgz"}: - raise ValueError(f"{data_format=} is not supported") + # Do not need to check twice + # if download_format not in {"zip", "tgz"}: + # raise ValueError(f"{download_format} is not supported") mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore + # Convert request to list of URLs requests_urls = url_tools.requests_to_urls( mapped_request, patterns=self.config["patterns"] ) - path = url_tools.download_from_urls( - [ru["url"] for ru in requests_urls], data_format=data_format + # try to download URLs + urls = [ru["url"] for ru in requests_urls] + paths = url_tools.try_download(urls) + + download_kwargs = {"base_target": f"{self.collection_id}-{hash(tuple(urls))}"} + + return download_tools.DOWNLOAD_FORMATS[download_format]( + paths, **download_kwargs ) - return open(path, "rb") diff --git a/cads_adaptors/tools/download_tools.py b/cads_adaptors/tools/download_tools.py new file mode 100644 index 00000000..74c8ef1c --- /dev/null +++ b/cads_adaptors/tools/download_tools.py @@ -0,0 +1,59 @@ +import os +from typing import BinaryIO, Callable, Dict, List + +from cads_adaptors.tools.general import ensure_list + + +# TODO use targzstream +def zip_paths(paths: List[str], base_target: str = "output-data", **kwargs) -> BinaryIO: + import zipfile + + target = f"{base_target}.zip" + with zipfile.ZipFile(target, mode="w") as archive: + for p in paths: + archive.write(p) + + for p in paths: + os.remove(p) + + return open(target, "rb") + + +# TODO zipstream for archive creation +def targz_paths( + paths: List[str], + base_target: str = "output-data", + **kwargs, +) -> BinaryIO: + import tarfile + + target = f"{base_target}.tar.gz" + with tarfile.open(target, "w:gz") as archive: + for p in paths: + archive.add(p) + + for p in paths: + os.remove(p) + + return open(target, "rb") + + +def list_paths( + paths: List[str], + **kwargs, +) -> List: + return [open(path, "rb") for path in ensure_list(paths)] + + +def as_source(paths: List[str], **kwargs) -> BinaryIO: + # Only return as_source if a single path, otherwise list MUST be requested + assert len(paths) == 1 + return open(paths[0], "rb") + + +DOWNLOAD_FORMATS: Dict[str, Callable] = { + "zip": zip_paths, + "tgz": targz_paths, + "list": list_paths, + "as_source": as_source, +} diff --git a/cads_adaptors/tools/general.py b/cads_adaptors/tools/general.py new file mode 100644 index 00000000..3f6f00fd --- /dev/null +++ b/cads_adaptors/tools/general.py @@ -0,0 +1,5 @@ +def ensure_list(input_item): + """Ensure that item is a list, generally for iterability.""" + if not isinstance(input_item, list): + return [input_item] + return input_item diff --git a/ci/environment-integration.yml b/ci/environment-integration.yml index 6a8511d3..f45563ff 100644 --- a/ci/environment-integration.yml +++ b/ci/environment-integration.yml @@ -7,3 +7,4 @@ dependencies: - pytest - pytest-cov # DO NOT EDIT ABOVE THIS LINE, ADD DEPENDENCIES BELOW +- pyyaml diff --git a/tests/test_20_adaptor_multi.py b/tests/test_20_adaptor_multi.py new file mode 100644 index 00000000..61650e57 --- /dev/null +++ b/tests/test_20_adaptor_multi.py @@ -0,0 +1,47 @@ +from cads_adaptors.adaptors import multi + +FORM = { + "level": ["500", "850"], + "time": ["12:00", "00:00"], + "param": ["Z", "T"], + "stat": ["mean", "max"], +} +REQUEST = FORM.copy() + +ADAPTOR_CONFIG = { + "entry_point": "MultiAdaptor", + "adaptors": { + "mean": { + "entry_point": "cads_adaptors:UrlCdsAdaptor", + "values": { + "level": ["500", "850"], + "time": ["12:00", "00:00"], + "param": ["Z", "T"], + "stat": ["mean"], + }, + }, + "max": { + "entry_point": "cads_adaptors:DummyAdaptor", + "values": { + "level": ["500", "850"], + "time": ["12:00", "00:00"], + "param": ["Z", "T"], + "stat": ["max"], + }, + }, + }, +} + + +def test_multi_adaptor_split(): + multi_adaptor = multi.MultiAdaptor(FORM, **ADAPTOR_CONFIG) + + split_mean = multi_adaptor.split_request( + REQUEST, multi_adaptor.config["adaptors"]["mean"]["values"] + ) + assert split_mean == ADAPTOR_CONFIG["adaptors"]["mean"]["values"] + + split_max = multi_adaptor.split_request( + REQUEST, multi_adaptor.config["adaptors"]["max"]["values"] + ) + assert split_max == ADAPTOR_CONFIG["adaptors"]["max"]["values"]