diff --git a/cads_adaptors/adaptors/arco.py b/cads_adaptors/adaptors/arco.py index 3e2e0f9f..823c7ab0 100644 --- a/cads_adaptors/adaptors/arco.py +++ b/cads_adaptors/adaptors/arco.py @@ -1,5 +1,9 @@ import copy import tempfile +from datetime import UTC, datetime, timedelta +from typing import Any + +from dateutil.parser import parse as dtparse from cads_adaptors.adaptors import Request, cds from cads_adaptors.exceptions import ArcoDataLakeNoDataError, InvalidRequest @@ -37,17 +41,38 @@ def _normalise_location(self, request: Request) -> None: raise InvalidRequest(f"Invalid {location=}. {msg}") def _normalise_date(self, request: Request) -> None: - date = ensure_list(request.get("date")) - if not date: - request["date"] = date - return - - if len(date) != 1: + date_key = self.config.get("date_key", "date") + date = ensure_list(request.get(date_key)) + date_range = sorted(str(date[0]).split("/") if len(date) == 1 else date) + if len(date_range) == 1: + date_range *= 2 + if len(date_range) != 2: raise InvalidRequest( - "Please specify a single date range using the format yyyy-mm-dd/yyyy-mm-dd." + 'Please specify a single date range using the format "yyyy-mm-dd/yyyy-mm-dd" or ' + '["yyyy-mm-dd", "yyyy-mm-dd"].' + ) + + # Embargo check + if "embargo" in self.config and self.config["embargo"]: + embargo = self.config["embargo"] + embargo_error_time_format: str = embargo.pop( + "error_time_format", + "%Y-%m-%d", # Default to daily embargo ) - split = sorted(str(date[0]).split("/")) - request["date"] = ["/".join([split[0], split[-1]])] + embargo_datetime = datetime.now(UTC) - timedelta(**embargo) + if dtparse(date_range[0]).date() > embargo_datetime.date(): + raise InvalidRequest( + "You have requested data under embargo, the latest available data is: " + f" {embargo_datetime.strftime(embargo_error_time_format)}" + ) + if dtparse(date_range[1]).date() > embargo_datetime.date(): + date_range[1] = embargo_datetime.strftime(embargo_error_time_format) + self.context.add_user_visible_error( + "Part of the data you have requested is under embargo, " + "your request has been modified to the latest available data: " + f"{date_key}={date_range}" + ) + request[date_key] = date_range def _normalise_data_format(self, request: Request) -> None: data_formats = ensure_list(request.get("data_format", DEFAULT_DATA_FORMAT)) @@ -64,6 +89,13 @@ def _normalise_data_format(self, request: Request) -> None: f"Invalid {data_format=}. Available options: {available_options}" ) + def pre_mapping_modifications(self, request: Request) -> Request: + request = super().pre_mapping_modifications(request) + + download_format = request.pop("download_format", "as_source") + self.set_download_format(download_format) + return request + def normalise_request(self, request: Request) -> Request: if self.normalised: return request @@ -103,30 +135,40 @@ def retrieve_list_of_results(self, request: Request) -> list[str]: self.context.add_user_visible_error(f"Invalid variable: {exc}.") raise - if date := request["date"]: - try: - ds = ds.sel(time=slice(*date[0].split("/"))) - except TypeError: - self.context.add_user_visible_error(f"Invalid {date=}") - raise - if not ds.sizes["time"]: - msg = f"No data found for {date=}" - self.context.add_user_visible_error(msg) - raise ArcoDataLakeNoDataError(msg) + # Normalised request is guarenteed to have a value for date_key, set to a list of two values + date_range = request[self.config.get("date_key", "date")] + source_date_key = self.config.get("source_date_key", "time") + selection: dict[str, Any] = {source_date_key: slice(*date_range)} + try: + ds = ds.sel(**selection) + except TypeError: + self.context.add_user_visible_error(f"Invalid {date_range=}") + raise + if not ds.sizes[source_date_key]: + msg = f"No data found for {date_range=}" + self.context.add_user_visible_error(msg) + raise ArcoDataLakeNoDataError(msg) ds = ds.sel(request["location"], method="nearest") ds = ds.rename(NAME_DICT) - with dask.config.set(scheduler="threads"): + with dask.config.set(scheduler="single-threaded"): match request["data_format"]: case "netcdf": - _, path = tempfile.mkstemp(suffix=".nc", dir=self.cache_tmp_path) + _, path = tempfile.mkstemp( + prefix=self.config.get("collection-id", "arco-data"), + suffix=".nc", + dir=self.cache_tmp_path, + ) ds.to_netcdf(path) case "csv": - _, path = tempfile.mkstemp(suffix=".csv", dir=self.cache_tmp_path) + _, path = tempfile.mkstemp( + prefix=self.config.get("collection-id", "arco-data"), + suffix=".csv", + dir=self.cache_tmp_path, + ) ds.to_pandas().to_csv(path) case data_format: raise NotImplementedError(f"Invalid {data_format=}.") - self.download_format = "as_source" # Prevent from writing a zip file return [str(path)] diff --git a/tests/test_50_arco_adaptor.py b/tests/test_50_arco_adaptor.py index 56a9de28..457f7fef 100644 --- a/tests/test_50_arco_adaptor.py +++ b/tests/test_50_arco_adaptor.py @@ -1,4 +1,6 @@ +import logging import pathlib +from datetime import datetime, timedelta from typing import Any, Type import numpy as np @@ -48,12 +50,23 @@ def mock_add_user_visible_error( "original,expected", [ ( - {"variable": "foo", "location": {"latitude": 0, "longitude": 0}}, + { + "data_format": ["nc"], + "location": { + "longitude": 1, + "latitude": "2", + }, + "date": 1990, + "variable": ("foo", "bar"), + }, { "data_format": "netcdf", - "location": {"latitude": 0.0, "longitude": 0.0}, - "date": [], - "variable": ["foo"], + "location": { + "latitude": 2.0, + "longitude": 1.0, + }, + "date": ["1990", "1990"], + "variable": ["bar", "foo"], }, ), ( @@ -63,7 +76,7 @@ def mock_add_user_visible_error( "longitude": 1, "latitude": "2", }, - "date": 1990, + "date": "1990/1991", "variable": ("foo", "bar"), }, { @@ -72,7 +85,27 @@ def mock_add_user_visible_error( "latitude": 2.0, "longitude": 1.0, }, - "date": ["1990/1990"], + "date": ["1990", "1991"], + "variable": ["bar", "foo"], + }, + ), + ( + { + "data_format": ["nc"], + "location": { + "longitude": 1, + "latitude": "2", + }, + "date": ["1990", "1991"], + "variable": ("foo", "bar"), + }, + { + "data_format": "netcdf", + "location": { + "latitude": 2.0, + "longitude": 1.0, + }, + "date": ["1990", "1991"], "variable": ["bar", "foo"], }, ), @@ -87,6 +120,119 @@ def test_arco_normalise_request( assert request == expected +@pytest.mark.parametrize( + "in_date,out_date", + ( + ( + (datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d"), + [(datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d")] * 2, + ), + ( + [ + (datetime.now() - timedelta(days=4)).strftime("%Y-%m-%d"), + (datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d"), + ], + [ + (datetime.now() - timedelta(days=4)).strftime("%Y-%m-%d"), + (datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d"), + ], + ), + ), +) +def test_arco_normalise_request_embargo_pass( + in_date: str | int | list[str | int], + out_date: list[str], + arco_adaptor: ArcoDataLakeCdsAdaptor, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setitem(arco_adaptor.config, "embargo", {"days": 2}) + request = { + "data_format": "netcdf", + "location": { + "latitude": 2.0, + "longitude": 1.0, + }, + "date": in_date, + "variable": ["bar", "foo"], + } + request = arco_adaptor.normalise_request(request) + assert request["date"] == out_date + + +@pytest.mark.parametrize( + "in_date", + ( + datetime.now().strftime("%Y-%m-%d"), + [ + (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"), + datetime.now().strftime("%Y-%m-%d"), + ], + [ + datetime.now().strftime("%Y-%m-%d"), + (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"), + ], + ), +) +def test_arco_normalise_request_embargo_raise( + in_date: str | int | list[str | int], + arco_adaptor: ArcoDataLakeCdsAdaptor, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setitem(arco_adaptor.config, "embargo", {"days": 2}) + request = { + "data_format": "netcdf", + "location": { + "latitude": 2.0, + "longitude": 1.0, + }, + "date": in_date, + "variable": ["bar", "foo"], + } + with pytest.raises(InvalidRequest, match="You have requested data under embargo"): + arco_adaptor.normalise_request(request) + + +@pytest.mark.parametrize( + "in_date,out_date", + ( + ( + [ + (datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d"), + datetime.now().strftime("%Y-%m-%d"), + ], + [ + (datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d"), + (datetime.now() - timedelta(days=2)).strftime("%Y-%m-%d"), + ], + ), + ), +) +def test_arco_normalise_request_embargo_warn( + in_date: str | int | list[str | int], + out_date: list[str], + arco_adaptor: ArcoDataLakeCdsAdaptor, + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + monkeypatch.setitem(arco_adaptor.config, "embargo", {"days": 2}) + request = { + "data_format": "netcdf", + "location": { + "latitude": 2.0, + "longitude": 1.0, + }, + "date": in_date, + "variable": ["bar", "foo"], + } + with caplog.at_level(logging.ERROR): + request = arco_adaptor.normalise_request(request) + assert any( + "Part of the data you have requested is under embargo" in message + for message in arco_adaptor.context.user_visible_errors # type: ignore[attr-defined] + ) + assert request["date"] == out_date + + @pytest.mark.parametrize( "invalid_request, match", [ @@ -123,6 +269,7 @@ def test_arco_normalise_request( { "variable": "FOO", "location": {"latitude": 0, "longitude": 0}, + "date": [1, 2], "data_format": ["foo", "bar"], }, "specify a single data_format", @@ -131,6 +278,7 @@ def test_arco_normalise_request( { "variable": "FOO", "location": {"latitude": 0, "longitude": 0}, + "date": [1, 2], "data_format": "foo", }, "Invalid data_format", @@ -163,6 +311,7 @@ def test_arco_select_variable( { "variable": variable, "location": {"latitude": 0, "longitude": 0}, + "date": "2000", } ) ds = xr.open_dataset(fp.name) @@ -172,7 +321,11 @@ def test_arco_select_variable( def test_arco_select_location(arco_adaptor: ArcoDataLakeCdsAdaptor): - request = {"variable": "FOO", "location": {"latitude": 31, "longitude": "41"}} + request = { + "variable": "FOO", + "location": {"latitude": 31, "longitude": "41"}, + "date": "2000", + } fp = arco_adaptor.retrieve(request) ds = xr.open_dataset(fp.name) assert ds["latitude"].item() == 30 @@ -220,6 +373,7 @@ def test_arco_data_format( request = { "variable": "FOO", "location": {"latitude": 0, "longitude": 0}, + "date": "2000", "data_format": data_format, } fp = arco_adaptor.retrieve(request) @@ -249,6 +403,7 @@ def test_arco_data_format( { "variable": "wrong", "location": {"latitude": 0, "longitude": 0}, + "date": "2000", }, KeyError, "Invalid variable: 'wrong'.", @@ -260,7 +415,7 @@ def test_arco_data_format( "date": "foo", }, TypeError, - "Invalid date=['foo/foo']", + "Invalid date_range=['foo', 'foo']", ), ( { @@ -269,7 +424,7 @@ def test_arco_data_format( "date": 1990, }, ArcoDataLakeNoDataError, - "No data found for date=['1990/1990']", + "No data found for date_range=['1990', '1990']", ), ], ) @@ -293,6 +448,7 @@ def test_connection_problems( { "variable": "FOO", "location": {"latitude": 0, "longitude": 0}, + "date": "2000", } ) assert (