Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

10 base multi adaptor clean #28

Merged
merged 15 commits into from
Aug 15, 2023
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ fabric.properties

# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser

.idea/
### PyCharm Patch ###
# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721

Expand Down Expand Up @@ -467,3 +467,4 @@ $RECYCLE.BIN/
*.lnk

# End of https://www.toptal.com/developers/gitignore/api/python,jupyternotebooks,vim,visualstudiocode,pycharm,emacs,linux,macos,windows
test.ipynb
2 changes: 2 additions & 0 deletions cads_adaptors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from cads_adaptors.adaptors.legacy import LegacyCdsAdaptor
from cads_adaptors.adaptors.mars import DirectMarsCdsAdaptor, MarsCdsAdaptor
from cads_adaptors.adaptors.multi import MultiAdaptor
from cads_adaptors.adaptors.url import UrlCdsAdaptor

from .tools.adaptor_tools import get_adaptor_class
Expand All @@ -46,4 +47,5 @@
"LegacyCdsAdaptor",
"MarsCdsAdaptor",
"UrlCdsAdaptor",
"MultiAdaptor",
]
1 change: 1 addition & 0 deletions cads_adaptors/adaptors/cds.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class AbstractCdsAdaptor(AbstractAdaptor):

def __init__(self, form: dict[str, Any], **config: Any):
self.form = form
self.collection_id = config.get("collection_id", "unknown-collection")
self.constraints = config.pop("constraints", [])
self.mapping = config.pop("mapping", {})
self.licences: list[tuple[str, int]] = config.pop("licences", [])
Expand Down
58 changes: 39 additions & 19 deletions cads_adaptors/adaptors/mars.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,56 @@
from cads_adaptors.adaptors import Request, cds


class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor):
resources = {"MARS_CLIENT": 1}
def execute_mars(request: Request, target="data.grib"):
import subprocess

def retrieve(self, request: Request) -> BinaryIO:
import subprocess
with open("r", "w") as fp:
print(f"retrieve, target={target}", file=fp)
for key, value in request.items():
if not isinstance(value, (list, tuple)):
value = [value]
print(f", {key}={'/'.join(str(v) for v in value)}", file=fp)

with open("r", "w") as fp:
print("retrieve, target=data.grib", file=fp)
for key, value in request.items():
if not isinstance(value, (list, tuple)):
value = [value]
print(f", {key}={'/'.join(str(v) for v in value)}", file=fp)
env = dict(**os.environ)
# FIXME: set with the namespace and user_id
namespace = "cads"
user_id = 0
env["MARS_USER"] = f"{namespace}-{user_id}"

env = dict(**os.environ)
# FIXME: set with the namespace and user_id
namespace = "cads"
user_id = 0
env["MARS_USER"] = f"{namespace}-{user_id}"
subprocess.run(["/usr/local/bin/mars", "r"], check=True, env=env)

subprocess.run(["/usr/local/bin/mars", "r"], check=True, env=env)
return target

return open("data.grib") # type: ignore

class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor):
resources = {"MARS_CLIENT": 1}

def retrieve(self, request: Request) -> BinaryIO:
result = execute_mars(request)

return open(result) # type: ignore


class MarsCdsAdaptor(DirectMarsCdsAdaptor):
def retrieve(self, request: Request) -> BinaryIO:
from cads_adaptors.tools import download_tools

# Format of data files, grib or netcdf
data_format = request.pop("format", "grib")

# Format of download archive, as_source, zip, tar, list etc.
download_format = request.pop("download_format", "as_source")

mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore
if data_format != "grib":
if data_format not in ["grib"]:
# FIXME: reformat if needed
pass
return super().retrieve(mapped_request)

result = execute_mars(mapped_request)

download_kwargs = {
"base_target": f"{self.collection_id}-{hash(tuple(request))}"
}
return download_tools.DOWNLOAD_FORMATS[download_format](
[result], **download_kwargs
)
110 changes: 110 additions & 0 deletions cads_adaptors/adaptors/multi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import typing as T

import yaml

from cads_adaptors import AbstractCdsAdaptor
from cads_adaptors.adaptors import Request
from cads_adaptors.tools.general import ensure_list
from cads_adaptors.tools.logger import logger


class MultiAdaptor(AbstractCdsAdaptor):
@staticmethod
def split_request(
full_request: Request, # User request
this_values: T.Dict[str, T.Any], # key: [values] for the adaptor component
**config: T.Any,
) -> Request:
"""
Basic request splitter, splits based on whether the values are relevant to
the specific adaptor.
More complex constraints may need a more detailed splitter.
"""
this_request = {}
# loop over keys in this_values, i.e. the keys relevant to this_adaptor
for key in list(this_values):
# get request values for that key
req_vals = full_request.get(key, [])
# filter for values relevant to this_adaptor:
these_vals = [
v for v in ensure_list(req_vals) if v in this_values.get(key, [])
]
if len(these_vals) > 0:
# if values then add to request
this_request[key] = these_vals
elif key in config.get("required_keys", []):
# If a required key, then return an empty dictionary.
# optional keys must be set in the adaptor.json via gecko
return {}

return this_request

def retrieve(self, request: Request):
from cads_adaptors.tools import adaptor_tools, download_tools

download_format = request.pop("download_format", "zip")

these_requests = {}
exception_logs: T.Dict[str, str] = {}
logger.debug(f"MultiAdaptor, full_request: {request}")
for adaptor_tag, adaptor_desc in self.config["adaptors"].items():
this_adaptor = adaptor_tools.get_adaptor(adaptor_desc, self.form)
this_values = adaptor_desc.get("values", {})

this_request = self.split_request(request, this_values, **self.config)
logger.debug(f"MultiAdaptor, {adaptor_tag}, this_request: {this_request}")

# TODO: check this_request is valid for this_adaptor, or rely on try?
# i.e. split_request does NOT implement constraints.
if len(this_request) > 0:
this_request.setdefault("download_format", "list")
these_requests[this_adaptor] = this_request

results = []
for adaptor, req in these_requests.items():
try:
this_result = adaptor.retrieve(req)
except Exception:
logger.debug(Exception)
else:
results += this_result

# TODO: Add parallelistation via multiprocessing
# # Allow a maximum of 2 parallel processes
# import multiprocessing as mp

# pool = mp.Pool(min(len(these_requests), 2))

# def apply_adaptor(args):
# try:
# result = args[0](args[1])
# except Exception as err:
# # Catch any possible exception and store error message in case all adaptors fail
# logger.debug(f"Adaptor Error ({args}): {err}")
# result = []
# return result

# results = pool.map(
# apply_adaptor,
# ((adaptor, request) for adaptor, request in these_requests.items()),
# )

if len(results) == 0:
raise RuntimeError(
"MultiAdaptor returned no results, the error logs of the sub-adaptors is as follows:\n"
f"{yaml.safe_dump(exception_logs)}"
)

# return self.merge_results(results, prefix=self.collection_id)
# close files
[res.close() for res in results]
# get the paths
paths = [res.name for res in results]

download_kwargs = dict(
base_target=f"{self.collection_id}-{hash(tuple(results))}"
)

return download_tools.DOWNLOAD_FORMATS[download_format](
paths, **download_kwargs
)
23 changes: 16 additions & 7 deletions cads_adaptors/adaptors/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,29 @@

class UrlCdsAdaptor(cds.AbstractCdsAdaptor):
def retrieve(self, request: Request) -> BinaryIO:
from cads_adaptors.tools import url_tools
from cads_adaptors.tools import download_tools, url_tools

data_format = request.pop("format", "zip")
download_format = request.pop("format", "zip") # TODO: Remove legacy syntax
# CADS syntax over-rules legacy syntax
download_format = request.pop("download_format", download_format)

if data_format not in {"zip", "tgz"}:
raise ValueError(f"{data_format=} is not supported")
# Do not need to check twice
# if download_format not in {"zip", "tgz"}:
# raise ValueError(f"{download_format} is not supported")

mapped_request = mapping.apply_mapping(request, self.mapping) # type: ignore

# Convert request to list of URLs
requests_urls = url_tools.requests_to_urls(
mapped_request, patterns=self.config["patterns"]
)

path = url_tools.download_from_urls(
[ru["url"] for ru in requests_urls], data_format=data_format
# try to download URLs
urls = [ru["url"] for ru in requests_urls]
paths = url_tools.try_download(urls)

download_kwargs = {"base_target": f"{self.collection_id}-{hash(tuple(urls))}"}

return download_tools.DOWNLOAD_FORMATS[download_format](
paths, **download_kwargs
)
return open(path, "rb")
59 changes: 59 additions & 0 deletions cads_adaptors/tools/download_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import os
from typing import BinaryIO, Callable, Dict, List

from cads_adaptors.tools.general import ensure_list


# TODO use targzstream
def zip_paths(paths: List[str], base_target: str = "output-data", **kwargs) -> BinaryIO:
import zipfile

target = f"{base_target}.zip"
with zipfile.ZipFile(target, mode="w") as archive:
for p in paths:
archive.write(p)

for p in paths:
os.remove(p)

return open(target, "rb")


# TODO zipstream for archive creation
def targz_paths(
paths: List[str],
base_target: str = "output-data",
**kwargs,
) -> BinaryIO:
import tarfile

target = f"{base_target}.tar.gz"
with tarfile.open(target, "w:gz") as archive:
for p in paths:
archive.add(p)

for p in paths:
os.remove(p)

return open(target, "rb")


def list_paths(
paths: List[str],
**kwargs,
) -> List:
return [open(path, "rb") for path in ensure_list(paths)]


def as_source(paths: List[str], **kwargs) -> BinaryIO:
# Only return as_source if a single path, otherwise list MUST be requested
assert len(paths) == 1
return open(paths[0], "rb")


DOWNLOAD_FORMATS: Dict[str, Callable] = {
"zip": zip_paths,
"tgz": targz_paths,
"list": list_paths,
"as_source": as_source,
}
5 changes: 5 additions & 0 deletions cads_adaptors/tools/general.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def ensure_list(input_item):
"""Ensure that item is a list, generally for iterability."""
if not isinstance(input_item, list):
return [input_item]
return input_item
1 change: 1 addition & 0 deletions ci/environment-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ dependencies:
- pytest
- pytest-cov
# DO NOT EDIT ABOVE THIS LINE, ADD DEPENDENCIES BELOW
- pyyaml
47 changes: 47 additions & 0 deletions tests/test_20_adaptor_multi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from cads_adaptors.adaptors import multi

FORM = {
"level": ["500", "850"],
"time": ["12:00", "00:00"],
"param": ["Z", "T"],
"stat": ["mean", "max"],
}
REQUEST = FORM.copy()

ADAPTOR_CONFIG = {
"entry_point": "MultiAdaptor",
"adaptors": {
"mean": {
"entry_point": "cads_adaptors:UrlCdsAdaptor",
"values": {
"level": ["500", "850"],
"time": ["12:00", "00:00"],
"param": ["Z", "T"],
"stat": ["mean"],
},
},
"max": {
"entry_point": "cads_adaptors:DummyAdaptor",
"values": {
"level": ["500", "850"],
"time": ["12:00", "00:00"],
"param": ["Z", "T"],
"stat": ["max"],
},
},
},
}


def test_multi_adaptor_split():
multi_adaptor = multi.MultiAdaptor(FORM, **ADAPTOR_CONFIG)

split_mean = multi_adaptor.split_request(
REQUEST, multi_adaptor.config["adaptors"]["mean"]["values"]
)
assert split_mean == ADAPTOR_CONFIG["adaptors"]["mean"]["values"]

split_max = multi_adaptor.split_request(
REQUEST, multi_adaptor.config["adaptors"]["max"]["values"]
)
assert split_max == ADAPTOR_CONFIG["adaptors"]["max"]["values"]