Skip to content

Commit

Permalink
arco adaptor updates added to hotfix
Browse files Browse the repository at this point in the history
  • Loading branch information
EddyCMWF committed Feb 19, 2025
1 parent 5377945 commit fcd566a
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 32 deletions.
88 changes: 65 additions & 23 deletions cads_adaptors/adaptors/arco.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import copy
import tempfile
from datetime import UTC, datetime, timedelta
from typing import Any

from dateutil.parser import parse as dtparse

from cads_adaptors.adaptors import Request, cds
from cads_adaptors.exceptions import ArcoDataLakeNoDataError, InvalidRequest
Expand Down Expand Up @@ -37,17 +41,38 @@ def _normalise_location(self, request: Request) -> None:
raise InvalidRequest(f"Invalid {location=}. {msg}")

def _normalise_date(self, request: Request) -> None:
date = ensure_list(request.get("date"))
if not date:
request["date"] = date
return

if len(date) != 1:
date_key = self.config.get("date_key", "date")
date = ensure_list(request.get(date_key))
date_range = sorted(str(date[0]).split("/") if len(date) == 1 else date)
if len(date_range) == 1:
date_range *= 2
if len(date_range) != 2:
raise InvalidRequest(
"Please specify a single date range using the format yyyy-mm-dd/yyyy-mm-dd."
'Please specify a single date range using the format "yyyy-mm-dd/yyyy-mm-dd" or '
'["yyyy-mm-dd", "yyyy-mm-dd"].'
)

# Embargo check
if "embargo" in self.config and self.config["embargo"]:
embargo = self.config["embargo"]
embargo_error_time_format: str = embargo.pop(
"error_time_format",
"%Y-%m-%d", # Default to daily embargo
)
split = sorted(str(date[0]).split("/"))
request["date"] = ["/".join([split[0], split[-1]])]
embargo_datetime = datetime.now(UTC) - timedelta(**embargo)
if dtparse(date_range[0]).date() > embargo_datetime.date():
raise InvalidRequest(
"You have requested data under embargo, the latest available data is: "
f" {embargo_datetime.strftime(embargo_error_time_format)}"
)
if dtparse(date_range[1]).date() > embargo_datetime.date():
date_range[1] = embargo_datetime.strftime(embargo_error_time_format)
self.context.add_user_visible_error(
"Part of the data you have requested is under embargo, "
"your request has been modified to the latest available data: "
f"{date_key}={date_range}"
)
request[date_key] = date_range

def _normalise_data_format(self, request: Request) -> None:
data_formats = ensure_list(request.get("data_format", DEFAULT_DATA_FORMAT))
Expand All @@ -64,6 +89,13 @@ def _normalise_data_format(self, request: Request) -> None:
f"Invalid {data_format=}. Available options: {available_options}"
)

def pre_mapping_modifications(self, request: Request) -> Request:
request = super().pre_mapping_modifications(request)

download_format = request.pop("download_format", "as_source")
self.set_download_format(download_format)
return request

def normalise_request(self, request: Request) -> Request:
if self.normalised:
return request
Expand Down Expand Up @@ -103,30 +135,40 @@ def retrieve_list_of_results(self, request: Request) -> list[str]:
self.context.add_user_visible_error(f"Invalid variable: {exc}.")
raise

if date := request["date"]:
try:
ds = ds.sel(time=slice(*date[0].split("/")))
except TypeError:
self.context.add_user_visible_error(f"Invalid {date=}")
raise
if not ds.sizes["time"]:
msg = f"No data found for {date=}"
self.context.add_user_visible_error(msg)
raise ArcoDataLakeNoDataError(msg)
# Normalised request is guarenteed to have a value for date_key, set to a list of two values
date_range = request[self.config.get("date_key", "date")]
source_date_key = self.config.get("source_date_key", "time")
selection: dict[str, Any] = {source_date_key: slice(*date_range)}
try:
ds = ds.sel(**selection)
except TypeError:
self.context.add_user_visible_error(f"Invalid {date_range=}")
raise
if not ds.sizes[source_date_key]:
msg = f"No data found for {date_range=}"
self.context.add_user_visible_error(msg)
raise ArcoDataLakeNoDataError(msg)

ds = ds.sel(request["location"], method="nearest")
ds = ds.rename(NAME_DICT)

with dask.config.set(scheduler="threads"):
with dask.config.set(scheduler="single-threaded"):
match request["data_format"]:
case "netcdf":
_, path = tempfile.mkstemp(suffix=".nc", dir=self.cache_tmp_path)
_, path = tempfile.mkstemp(
prefix=self.config.get("collection-id", "arco-data"),
suffix=".nc",
dir=self.cache_tmp_path,
)
ds.to_netcdf(path)
case "csv":
_, path = tempfile.mkstemp(suffix=".csv", dir=self.cache_tmp_path)
_, path = tempfile.mkstemp(
prefix=self.config.get("collection-id", "arco-data"),
suffix=".csv",
dir=self.cache_tmp_path,
)
ds.to_pandas().to_csv(path)
case data_format:
raise NotImplementedError(f"Invalid {data_format=}.")

self.download_format = "as_source" # Prevent from writing a zip file
return [str(path)]
174 changes: 165 additions & 9 deletions tests/test_50_arco_adaptor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
import pathlib
from datetime import datetime, timedelta
from typing import Any, Type

import numpy as np
Expand Down Expand Up @@ -48,12 +50,23 @@ def mock_add_user_visible_error(
"original,expected",
[
(
{"variable": "foo", "location": {"latitude": 0, "longitude": 0}},
{
"data_format": ["nc"],
"location": {
"longitude": 1,
"latitude": "2",
},
"date": 1990,
"variable": ("foo", "bar"),
},
{
"data_format": "netcdf",
"location": {"latitude": 0.0, "longitude": 0.0},
"date": [],
"variable": ["foo"],
"location": {
"latitude": 2.0,
"longitude": 1.0,
},
"date": ["1990", "1990"],
"variable": ["bar", "foo"],
},
),
(
Expand All @@ -63,7 +76,7 @@ def mock_add_user_visible_error(
"longitude": 1,
"latitude": "2",
},
"date": 1990,
"date": "1990/1991",
"variable": ("foo", "bar"),
},
{
Expand All @@ -72,7 +85,27 @@ def mock_add_user_visible_error(
"latitude": 2.0,
"longitude": 1.0,
},
"date": ["1990/1990"],
"date": ["1990", "1991"],
"variable": ["bar", "foo"],
},
),
(
{
"data_format": ["nc"],
"location": {
"longitude": 1,
"latitude": "2",
},
"date": ["1990", "1991"],
"variable": ("foo", "bar"),
},
{
"data_format": "netcdf",
"location": {
"latitude": 2.0,
"longitude": 1.0,
},
"date": ["1990", "1991"],
"variable": ["bar", "foo"],
},
),
Expand All @@ -87,6 +120,119 @@ def test_arco_normalise_request(
assert request == expected


@pytest.mark.parametrize(
"in_date,out_date",
(
(
(datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d"),
[(datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d")] * 2,
),
(
[
(datetime.now() - timedelta(days=4)).strftime("%Y-%m-%d"),
(datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d"),
],
[
(datetime.now() - timedelta(days=4)).strftime("%Y-%m-%d"),
(datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d"),
],
),
),
)
def test_arco_normalise_request_embargo_pass(
in_date: str | int | list[str | int],
out_date: list[str],
arco_adaptor: ArcoDataLakeCdsAdaptor,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setitem(arco_adaptor.config, "embargo", {"days": 2})
request = {
"data_format": "netcdf",
"location": {
"latitude": 2.0,
"longitude": 1.0,
},
"date": in_date,
"variable": ["bar", "foo"],
}
request = arco_adaptor.normalise_request(request)
assert request["date"] == out_date


@pytest.mark.parametrize(
"in_date",
(
datetime.now().strftime("%Y-%m-%d"),
[
(datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"),
datetime.now().strftime("%Y-%m-%d"),
],
[
datetime.now().strftime("%Y-%m-%d"),
(datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"),
],
),
)
def test_arco_normalise_request_embargo_raise(
in_date: str | int | list[str | int],
arco_adaptor: ArcoDataLakeCdsAdaptor,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setitem(arco_adaptor.config, "embargo", {"days": 2})
request = {
"data_format": "netcdf",
"location": {
"latitude": 2.0,
"longitude": 1.0,
},
"date": in_date,
"variable": ["bar", "foo"],
}
with pytest.raises(InvalidRequest, match="You have requested data under embargo"):
arco_adaptor.normalise_request(request)


@pytest.mark.parametrize(
"in_date,out_date",
(
(
[
(datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d"),
datetime.now().strftime("%Y-%m-%d"),
],
[
(datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d"),
(datetime.now() - timedelta(days=2)).strftime("%Y-%m-%d"),
],
),
),
)
def test_arco_normalise_request_embargo_warn(
in_date: str | int | list[str | int],
out_date: list[str],
arco_adaptor: ArcoDataLakeCdsAdaptor,
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
monkeypatch.setitem(arco_adaptor.config, "embargo", {"days": 2})
request = {
"data_format": "netcdf",
"location": {
"latitude": 2.0,
"longitude": 1.0,
},
"date": in_date,
"variable": ["bar", "foo"],
}
with caplog.at_level(logging.ERROR):
request = arco_adaptor.normalise_request(request)
assert any(
"Part of the data you have requested is under embargo" in message
for message in arco_adaptor.context.user_visible_errors # type: ignore[attr-defined]
)
assert request["date"] == out_date


@pytest.mark.parametrize(
"invalid_request, match",
[
Expand Down Expand Up @@ -123,6 +269,7 @@ def test_arco_normalise_request(
{
"variable": "FOO",
"location": {"latitude": 0, "longitude": 0},
"date": [1, 2],
"data_format": ["foo", "bar"],
},
"specify a single data_format",
Expand All @@ -131,6 +278,7 @@ def test_arco_normalise_request(
{
"variable": "FOO",
"location": {"latitude": 0, "longitude": 0},
"date": [1, 2],
"data_format": "foo",
},
"Invalid data_format",
Expand Down Expand Up @@ -163,6 +311,7 @@ def test_arco_select_variable(
{
"variable": variable,
"location": {"latitude": 0, "longitude": 0},
"date": "2000",
}
)
ds = xr.open_dataset(fp.name)
Expand All @@ -172,7 +321,11 @@ def test_arco_select_variable(


def test_arco_select_location(arco_adaptor: ArcoDataLakeCdsAdaptor):
request = {"variable": "FOO", "location": {"latitude": 31, "longitude": "41"}}
request = {
"variable": "FOO",
"location": {"latitude": 31, "longitude": "41"},
"date": "2000",
}
fp = arco_adaptor.retrieve(request)
ds = xr.open_dataset(fp.name)
assert ds["latitude"].item() == 30
Expand Down Expand Up @@ -220,6 +373,7 @@ def test_arco_data_format(
request = {
"variable": "FOO",
"location": {"latitude": 0, "longitude": 0},
"date": "2000",
"data_format": data_format,
}
fp = arco_adaptor.retrieve(request)
Expand Down Expand Up @@ -249,6 +403,7 @@ def test_arco_data_format(
{
"variable": "wrong",
"location": {"latitude": 0, "longitude": 0},
"date": "2000",
},
KeyError,
"Invalid variable: 'wrong'.",
Expand All @@ -260,7 +415,7 @@ def test_arco_data_format(
"date": "foo",
},
TypeError,
"Invalid date=['foo/foo']",
"Invalid date_range=['foo', 'foo']",
),
(
{
Expand All @@ -269,7 +424,7 @@ def test_arco_data_format(
"date": 1990,
},
ArcoDataLakeNoDataError,
"No data found for date=['1990/1990']",
"No data found for date_range=['1990', '1990']",
),
],
)
Expand All @@ -293,6 +448,7 @@ def test_connection_problems(
{
"variable": "FOO",
"location": {"latitude": 0, "longitude": 0},
"date": "2000",
}
)
assert (
Expand Down

0 comments on commit fcd566a

Please sign in to comment.