diff --git a/cads_adaptors/__init__.py b/cads_adaptors/__init__.py index f4239ab1..8ed07f52 100644 --- a/cads_adaptors/__init__.py +++ b/cads_adaptors/__init__.py @@ -24,6 +24,7 @@ from cads_adaptors.adaptors import AbstractAdaptor, Context, DummyAdaptor from cads_adaptors.adaptors.cadsobs.adaptor import ObservationsAdaptor +from cads_adaptors.adaptors.cams_solar_rad import CamsSolarRadiationTimeseriesAdaptor from cads_adaptors.adaptors.cds import AbstractCdsAdaptor, DummyCdsAdaptor from cads_adaptors.adaptors.insitu import ( InsituDatabaseCdsAdaptor, @@ -55,4 +56,5 @@ "MultiMarsCdsAdaptor", "RoocsCdsAdaptor", "ObservationsAdaptor", + "CamsSolarRadiationTimeseriesAdaptor", ] diff --git a/cads_adaptors/adaptors/cams_solar_rad/__init__.py b/cads_adaptors/adaptors/cams_solar_rad/__init__.py new file mode 100644 index 00000000..aa67a924 --- /dev/null +++ b/cads_adaptors/adaptors/cams_solar_rad/__init__.py @@ -0,0 +1,102 @@ +from typing import BinaryIO + +from cads_adaptors.adaptors.cams_solar_rad.functions import ( + BadRequest, + NoData, + determine_result_filename, + get_numeric_user_id, + solar_rad_retrieve, +) +from cads_adaptors.adaptors.cds import AbstractCdsAdaptor, Request +from cads_adaptors.exceptions import InvalidRequest + + +class CamsSolarRadiationTimeseriesAdaptor(AbstractCdsAdaptor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # Schema required to ensure adaptor will not fall over with an uncaught exception + self.schemas.append( + { + "_draft": "7", + "type": "object", # Request should be a single dict + "required": [ # ... with at least these keys + "sky_type", + "location", + "altitude", + "date", + "time_step", + "time_reference", + "format", + ], + "properties": { + "sky_type": {"type": "string"}, + "location": { + "type": "object", + "properties": { + "latitude": { + "maximum": 90.0, + "minimum": -90.0, + "type": "number", + }, + "longitude": { + "maximum": 180.0, + "minimum": -180.0, + "type": "number", + }, + }, + }, + "altitude": {"type": "string", "format": "numeric string"}, + "date": {"type": "string", "format": "date range"}, + "time_step": {"type": "string"}, + "time_reference": {"type": "string"}, + "format": {"type": "string"}, + }, + "_defaults": {"format": "csv"}, + } + ) + + def retrieve(self, request: Request) -> BinaryIO: + self.context.debug(f"Request is {request!r}") + + # Intersect constraints + if self.config.get("intersect_constraints", False): + requests = self.intersect_constraints(request) + if len(requests) != 1: + if len(requests) == 0: + msg = "Error: no intersection with the constraints." + else: + # TODO: check if indeed this can never happen + msg = "Error: unexpected intersection with more than 1 constraint." + self.context.add_user_visible_error(msg) + raise InvalidRequest(msg) + request_after_intersection = requests[0] + else: + request_after_intersection = request + + # Apply mapping + self._pre_retrieve( + request_after_intersection, default_download_format="as_source" + ) + mreq = self.mapped_request + self.context.debug(f"Mapped request is {mreq!r}") + + numeric_user_id = get_numeric_user_id(self.config["user_uid"]) + result_filename = determine_result_filename( + self.config, request_after_intersection + ) + + try: + solar_rad_retrieve( + mreq, + user_id=numeric_user_id, + outfile=result_filename, + logger=self.context, + ) + + except (BadRequest, NoData) as e: + msg = e.args[0] + self.context.add_user_visible_error(msg) + raise InvalidRequest(msg) + + return open(result_filename, "rb") diff --git a/cads_adaptors/adaptors/cams_solar_rad/functions.py b/cads_adaptors/adaptors/cams_solar_rad/functions.py new file mode 100644 index 00000000..1f1e230a --- /dev/null +++ b/cads_adaptors/adaptors/cams_solar_rad/functions.py @@ -0,0 +1,483 @@ +import hashlib +import logging +import re +import time +import traceback + + +class BadRequest(Exception): + pass + + +class NoData(Exception): + pass + + +def solar_rad_retrieve( + request, outfile=None, user_id="0", ntries=10, logger=logging.getLogger(__name__) +): + """Execute a CAMS solar radiation data retrieval.""" + user_id = anonymised_user_id(user_id) + req = {"username": user_id} + req.update(request) + + # Set expert_mode depending on format + req["expert_mode"] = {True: "true", False: "false"}.get( + req["format"] == "csv_expert" + ) + + # Set the MIME type from the format + if req["format"].startswith("csv"): + req["mimetype"] = "text/csv" + elif req["format"] == "netcdf": + req["mimetype"] = "application/x-netcdf" + else: + raise BadRequest(f'Unrecognised format: "{req["format"]}"') + + # We could use the URL API or the WPS API. Only WPS has the option for + # NetCDF and it has better error handling. + # retrieve_by_url(req, outfile, logger) + retrieve_by_wps(req, outfile, ntries, logger) + + +def determine_result_filename(config, request): + EXTENSIONS = {"csv": "csv", "csv_expert": "csv", "netcdf": "nc"} + request_uid = config.get("request_uid", "no-request-uid") + extension = EXTENSIONS.get(request["format"], "csv") + return f"result-{request_uid}.{extension}" + + +def get_numeric_user_id(ads_user_id): + return str(int(ads_user_id.replace("-", ""), 16) % 10**6) + + +def anonymised_user_id(ads_user_id): + # We have to pass a unique user ID to the provider but, for privacy reasons, + # we want it to be as anonymous as possible. Use a hash of the ADS user ID, + # which as an integer is already pretty anonymous but this takes it further + # for safety. Note that we include an additional string in the hash because + # if it's known to be a hash of an integer then it's trivial to reverse. + user_id = hashlib.md5( + (f"__%SecretFromDataProvider%__{ads_user_id}").encode() + ).hexdigest() + + # The data provider needs to be confident that this request actually is + # coming from the ADS though, so a valid user_id should not be easy to + # guess. To this end, we attach one of a number of fixed prefixes which are + # known to the provider. + # These are 100 ten-character random strings generated with + # [''.join([random.choice(string.ascii_lowercase+string.digits) + # for x in range(10)]) + # for y in range(100)] + prefixes = [ + "p1przq6umd", + "u3b0kpo03n", + "p7040vspzp", + "li3p20bdeu", + "bzrd6fxi1k", + "0wi1278hmc", + "wy97an8lvb", + "5uc8v70tjd", + "z363vyfmsx", + "7fwqcqnbkj", + "y0vhbbuf45", + "vcjb3ywu4v", + "dfnel8yw9e", + "14jtrrluo9", + "z6ttwnmqup", + "2vwm55v58m", + "e991ro2y08", + "te18dbyva5", + "03r00ip9by", + "db3yyauvke", + "0jugwocwea", + "z9lqg6ht69", + "opfosf4e14", + "jrkm2lnww4", + "j1n0vu1eew", + "d87j1lu4kc", + "9m90b12ood", + "kdqm2yikbd", + "rowzooxxgs", + "e4pp6g7oef", + "43u1r9r09b", + "v277x86ddz", + "efc8hfpda1", + "2djtds47ss", + "5sioewwsia", + "kgmvklxmhf", + "kksqxkadvw", + "1vnyn2a8u0", + "8uz3qvs4rt", + "w51gmulgne", + "g3ry4uo2mv", + "15w92afblk", + "wsa6ewkfrq", + "c72ppq2oae", + "9f1v8xnqva", + "dnhqvtifoi", + "ufjq1lx8v6", + "c7v5jfre33", + "p1x0fbq5mg", + "adv5727kly", + "j7ite32koa", + "da5dpm9ugj", + "jzz9ziydir", + "6k8qrjxswv", + "zjqlv1q0x2", + "ip0ovw6baa", + "7qkcb4ten8", + "ga6ou1rna9", + "rn0tbw5ibw", + "yskwayh2a7", + "2f6dauhbh3", + "00oi3eszof", + "59airwqq2f", + "fvoqdb9aos", + "x3eqha4ak5", + "w7213ekoai", + "v6pgpppvns", + "iw03lggz5k", + "ajlhquzk1x", + "ez0fxx2nk4", + "1gtusg605e", + "5fhbxnzcs3", + "1n6b0jmife", + "yd3dfx81yt", + "pfwadqtfbx", + "wbpbfksq8m", + "0txq9kslkd", + "71o3dzo4vg", + "i40of4zgbb", + "ta7vdzcre3", + "t3e4had0k2", + "6vju23ec1n", + "ezar2s1xto", + "mleasglelq", + "xlsdqwzsaj", + "k4ax97a69w", + "tsff0rbjih", + "ukvi7df5p0", + "tpjb14yfch", + "jel8nmb9o5", + "4g00awsv54", + "a3tt2oexus", + "ci2s0raubc", + "nsl4ryf90p", + "7ouih3sl43", + "g5f1llhozy", + "fewisaav0z", + "hjvce61cs7", + "pxnl2by0qn", + "l5w89ffcty", + ] + # The prefixes have been given to the user in the order above. That means if + # we choose the prefix to use in too simple a way from the integer user ID + # then the data provider could make a guess at that integer, so use the + # prefixes in a random order. These are the integers 0 to 99 in a random + # order not known to the data provider, generated with + # sorted(range(0,100),key=lambda X: random.random()) + order = [ + 74, + 69, + 26, + 28, + 71, + 20, + 50, + 98, + 10, + 55, + 41, + 81, + 94, + 2, + 85, + 84, + 22, + 60, + 93, + 48, + 27, + 12, + 7, + 3, + 6, + 45, + 56, + 25, + 21, + 53, + 14, + 73, + 19, + 65, + 18, + 83, + 15, + 86, + 36, + 62, + 58, + 16, + 9, + 13, + 96, + 35, + 0, + 66, + 44, + 24, + 1, + 89, + 46, + 78, + 49, + 57, + 39, + 11, + 54, + 4, + 82, + 80, + 29, + 42, + 8, + 90, + 43, + 64, + 61, + 5, + 40, + 97, + 70, + 63, + 30, + 32, + 88, + 68, + 33, + 75, + 37, + 31, + 47, + 17, + 51, + 99, + 59, + 76, + 95, + 34, + 91, + 87, + 52, + 77, + 67, + 23, + 79, + 92, + 38, + 72, + ] + prefix = prefixes[order[(int(ads_user_id) % len(prefixes))]] + + return prefix + user_id + + +def retrieve_by_wps(req, outfile, ntries, logger): + """Execute a CAMS solar radiation data retrieval through the WPS API.""" + import jinja2 + + # Construct the XML to pass + xml = jinja2.Template(template_xml()).render(req) + logger.debug("request=" + repr(req)) + logger.debug("xml=" + xml) + xml = xml.replace("\n", "") + + # Execute WPS requests in a retry-loop, cycling through available + # servers. Nowadays the only supported server is the load-balancing + # server: api.soda-solardata.com. + servers = ["api.soda-solardata.com"] + # servers = ['api.soda-solardata.com', 'www.soda-is.com', + # 'pro.soda-is.com'] + # servers = ['vhost5.soda-is.com'] + attempt = 0 + exc_txt = "" + while attempt < ntries: + attempt += 1 + if attempt > 1: + logger.info(f"Attempt #{attempt}...") + + # Cycle through available servers on each attempt + server = servers[(attempt - 1) % len(servers)] + url = f"https://{server}/service/wps" + + try: + wps_execute(url, xml, outfile, logger) + + except (BadRequest, NoData): + # Do not retry + raise + + except Exception as ex: + exc_txt = ": " + repr(ex) + tbstr = "".join(traceback.format_tb(ex.__traceback__)) + logger.error( + f"Execution attempt #{attempt} from {server} " + f"failed: {ex!r} \n" + " \n".join(tbstr.split("\n")) + ) + # Only start sleeping when we've tried all servers + if attempt >= len(servers): + time.sleep(3) + logger.debug("Retrying...") + + else: + break + + else: + logger.error("Request was " + repr(req)) + logger.error("XML was " + xml) + raise Exception(f"Failed to retrieve data after {attempt} attempts" + exc_txt) + if attempt > 1: + logger.info(f"Succeeded after {attempt} attempts") + + +def wps_execute(url, xml, outfile, logger): + from owslib.wps import WebProcessingService + + # Execute WPS. This can throw an immediate exception if the service is + # down + wps = WebProcessingService(url, skip_caps=True, timeout=3600) + execution = wps.execute(None, [], request=bytes(xml, encoding="utf-8")) + + # Wait for completion + while not execution.isComplete(): + execution.checkStatus(sleepSecs=1) + logger.debug("Execution status: %s" % execution.status) + + # Save the output if succeeded + if execution.isSucceded(): + if outfile is not None: + execution.getOutput(outfile) + + else: + # Certain types of error are due to bad requests. Distinguish these + # from unrecognised system errors. + known_user_errors = { + NoData: [ + "Error: incorrect dates", + "Error: no data available for the period", + ], + BadRequest: [ + "outside of the satellite field of view", + "Maximum number of daily requests reached", + "Unknown string format", + ], + } # Bad date string + user_error = None + for error in execution.errors: + logger.error("WPS error: " + repr([error.code, error.locator, error.text])) + + for extype, strings in known_user_errors.items(): + for string in strings: + if string.lower() in error.text.lower(): + user_error = ( + extype, + re.sub(r"^Process error: *(.+)", r"\1", error.text), + ) + + # If there was just one, familiar type of error then raise the + # associated exception type. Otherwise raise Exception. + if len(execution.errors) == 1 and user_error: + raise user_error[0](tidy_error(user_error[1])) + elif len(execution.errors) > 0: + raise Exception("\n".join([e.text for e in execution.errors])) + else: + logger.error("WPS failed but gave no errors?") + raise Exception("Unspecified WPS error") + + +def tidy_error(text): + lines = [line.strip() for line in text.split("\n")] + text = "; ".join([line for line in lines if line]) + return re.sub(r"^ *Failed to execute WPS process \[\w+\]: *", "", text) + + +def template_xml(): + """Return a Jinja2 template XML string that can be used to obtain data via WPS.""" + return """ + + + {{ sky_type }} + + + latitude + + {{ "{0:.5f}".format(location["latitude"]) }} + + + + longitude + + {{ "{0:.5f}".format(location["longitude"]) }} + + + + altitude + + {{ altitude }} + + + + date_begin + + {{ date[0:10] }} + + + + date_end + + + {% if date|length > 10 %}{{ date[11:] }}{% else %}{{ date[0:10] }}{% endif %} + + + + + time_ref + + {{ time_reference }} + + + + summarization + + {{ time_step }} + + + + verbose + + {{ expert_mode }} + + + + username + + {{ username }} + + + + + + + irradiation + + + + + """ diff --git a/cads_adaptors/adaptors/cds.py b/cads_adaptors/adaptors/cds.py index f20f0e16..3f69d2b9 100644 --- a/cads_adaptors/adaptors/cds.py +++ b/cads_adaptors/adaptors/cds.py @@ -1,5 +1,6 @@ import os from copy import deepcopy +from random import randint from typing import Any, Union from cads_adaptors import constraints, costing, mapping @@ -128,6 +129,10 @@ def normalise_request(self, request: Request) -> Request: raise TypeError( f"Normalised request is not a dictionary, instead it is of type {type(request)}" ) + # Avoid the cache by adding a random key-value pair to the request (if cache avoidance is on) + if self.config.get("avoid_cache", False): + random_key = str(randint(0, 2**128)) + request["_in_adaptor_no_cache"] = random_key return request def get_licences(self, request: Request) -> list[tuple[str, int]]: diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py index d15c5523..b9a2a84a 100644 --- a/cads_adaptors/adaptors/multi.py +++ b/cads_adaptors/adaptors/multi.py @@ -3,7 +3,7 @@ from cads_adaptors import AbstractCdsAdaptor, mapping from cads_adaptors.adaptors import Request -from cads_adaptors.exceptions import MultiAdaptorNoDataError +from cads_adaptors.exceptions import InvalidRequest, MultiAdaptorNoDataError from cads_adaptors.tools.general import ensure_list @@ -132,6 +132,29 @@ def convert_format(self, *args, **kwargs): return convert_format(*args, **kwargs) + def _pre_retrieve(self, request, default_download_format="zip"): + self.input_request = deepcopy(request) + self.receipt = request.pop("receipt", False) + + # Intersect constraints + if self.config.get("intersect_constraints", False): + requests_after_intersection = self.intersect_constraints(request) + if len(requests_after_intersection) == 0: + msg = "Error: no intersection with the constraints." + raise InvalidRequest(msg) + else: + requests_after_intersection = [request] + + self.mapped_requests_pieces = [] + for request_piece_after_intersection in requests_after_intersection: + self.mapped_requests_pieces.append( + mapping.apply_mapping(request_piece_after_intersection, self.mapping) + ) + + self.download_format = self.mapped_requests_pieces[0].pop( + "download_format", default_download_format + ) + def retrieve(self, request: Request): """For MultiMarsCdsAdaptor we just want to apply mapping from each adaptor.""" import dask @@ -153,24 +176,25 @@ def retrieve(self, request: Request): mapped_requests = [] self.context.add_stdout( - f"MultiMarsCdsAdaptor, full_request: {self.mapped_request}" + f"MultiMarsCdsAdaptor, full_request: {self.mapped_requests_pieces}" ) + for adaptor_tag, adaptor_desc in self.config["adaptors"].items(): this_adaptor = adaptor_tools.get_adaptor(adaptor_desc, self.form) this_values = adaptor_desc.get("values", {}) - - this_request = self.split_request( - self.mapped_request, this_values, **this_adaptor.config - ) - self.context.add_stdout( - f"MultiMarsCdsAdaptor, {adaptor_tag}, this_request: {this_request}" - ) - - if len(this_request) > 0: - mapped_requests.append( - mapping.apply_mapping(this_request, this_adaptor.mapping) + for mapped_request_piece in self.mapped_requests_pieces: + this_request = self.split_request( + mapped_request_piece, this_values, **this_adaptor.config + ) + self.context.add_stdout( + f"MultiMarsCdsAdaptor, {adaptor_tag}, this_request: {this_request}" ) + if len(this_request) > 0: + mapped_requests.append( + mapping.apply_mapping(this_request, this_adaptor.mapping) + ) + self.context.add_stdout( f"MultiMarsCdsAdaptor, mapped_requests: {mapped_requests}" ) diff --git a/cads_adaptors/constraints.py b/cads_adaptors/constraints.py index 3be2551e..b8b5043b 100644 --- a/cads_adaptors/constraints.py +++ b/cads_adaptors/constraints.py @@ -516,6 +516,21 @@ def get_keys(constraints: list[dict[str, Any]]) -> set[str]: return keys +def get_temporal_intersection( + selected_ranges: list[DateTimeRange], valid_ranges: list[DateTimeRange] +) -> list[str]: + intersections = [] + for selected_range in selected_ranges: + for valid_range in valid_ranges: + intersection = selected_range.intersection(valid_range) + if intersection.is_valid_timerange(): + intersection.start_time_format = "%Y-%m-%d" + intersection.end_time_format = "%Y-%m-%d" + intersection.separator = "/" + intersections.append(str(intersection)) + return intersections + + def temporal_intersection_between( selected: DateTimeRange, ranges: list[DateTimeRange] ) -> bool: @@ -532,6 +547,7 @@ def gen_time_range_from_string(string: str) -> DateTimeRange: time_range = DateTimeRange(dates[0], dates[1]) time_range.start_time_format = "%Y-%m-%d" time_range.end_time_format = "%Y-%m-%d" + time_range.separator = "/" if time_range.is_valid_timerange(): return time_range else: @@ -663,11 +679,27 @@ def legacy_intersect_constraints( for field in constraint: # Constrain the requested values for this field to the permitted # ones (by intersecting it with the constraint). - constrained_field_value = [ - v - for v in ensure_sequence(output_request.get(field, [])) - if str(v) in constraint[field] - ] + if field != "date": + constrained_field_value = [ + v + for v in ensure_sequence(output_request.get(field, [])) + if str(v) in constraint[field] + ] + else: + selected_ranges_as_strings = ensure_sequence( + output_request.get(field, []) + ) + selected_ranges = [ + gen_time_range_from_string(selected_range) + for selected_range in selected_ranges_as_strings + ] + valid_ranges = [ + gen_time_range_from_string(valid_range) + for valid_range in constraint[field] + ] + constrained_field_value = get_temporal_intersection( + selected_ranges, valid_ranges + ) # If the intersection is empty, the request as a whole does not # meet this constraint and this output_request must be