Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle errors in obs adaptor #181

Merged
merged 7 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions cads_adaptors/adaptors/cadsobs/adaptor.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import logging
import tempfile
from pathlib import Path

from cads_adaptors.adaptors.cadsobs.api_client import CadsobsApiClient
from cads_adaptors.adaptors.cds import AbstractCdsAdaptor
from cads_adaptors.exceptions import CadsObsRuntimeError

logger = logging.getLogger(__name__)


class ObservationsAdaptor(AbstractCdsAdaptor):
def retrieve(self, request):
try:
output = self._retrieve(request)
except Exception as e:
self.context.add_user_visible_error(repr(e))
raise e
return output

def _retrieve(self, request):
# TODO: retrieve_data imports various optional dependencies at top level
from cads_adaptors.adaptors.cadsobs.retrieve import retrieve_data

Expand Down Expand Up @@ -55,7 +60,9 @@ def retrieve(self, request):
# Get the service definition file
service_definition = cadsobs_client.get_service_definition(dataset_name)
global_attributes = service_definition["global_attributes"]
logger.debug(f"The following objects are going to be filtered: {object_urls}")
self.context.debug(
f"The following objects are going to be filtered: {object_urls}"
)
output_dir = Path(tempfile.mkdtemp())
output_path = retrieve_data(
dataset_name,
Expand All @@ -82,7 +89,7 @@ def handle_auxiliary_variables(
auxvar = auxvar_dict["auxvar"]
if auxvar in auxiliary_variables:
metadata_field = auxvar_dict["metadata_name"]
logger.warning(
self.context.warning(
f"{auxvar} is an auxiliary variable, it will be included"
f"as an extra {metadata_field} column in the output file, not as a "
f"regular variable."
Expand Down Expand Up @@ -116,14 +123,11 @@ def handle_sources_list(self, dataset_source: list | str) -> str:
"""Raise error if many, extract if list."""
if isinstance(dataset_source, list):
if len(dataset_source) > 1:
self.context.add_user_visible_error(
"Asking for more than one observation_types in the same"
"request is currently unsupported."
)
raise CadsObsRuntimeError(
error_message = (
"Asking for more than one observation_types in the same"
"request is currently unsupported."
)
raise CadsObsRuntimeError(error_message)
else:
# Get the string if there is only one item in the list.
dataset_source_str = dataset_source[0]
Expand Down
29 changes: 25 additions & 4 deletions cads_adaptors/adaptors/cadsobs/api_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Literal

from cads_adaptors.exceptions import CadsObsConnectionError


class CadsobsApiClient:
"""API Client for the observations repository HTTP API."""
Expand All @@ -12,13 +14,32 @@ def _send_request(
):
import requests

with requests.Session() as session:
response = session.request(
method=method, url=f"{self.baseurl}/{endpoint}", json=payload
try:
with requests.Session() as session:
response = session.request(
method=method, url=f"{self.baseurl}/{endpoint}", json=payload
)
response.raise_for_status()
except requests.ConnectionError:
raise CadsObsConnectionError("Can't connect to the observations API.")
except requests.HTTPError:
message = self._get_error_message(response)
raise CadsObsConnectionError(
f"Request to observations API failed: {message}"
)
response.raise_for_status()
return response.json()

def _get_error_message(self, response) -> str:
import requests

try:
message = response.json()["detail"]
except requests.JSONDecodeError:
# When the exception is not handled well by the API server response.content
# will not be JSON parseable. Then we can get the traceback like this.
message = response.content.decode("UTF-8")
return message

def get_service_definition(self, dataset: str) -> dict:
return self._send_request("GET", f"{dataset}/service_definition")

Expand Down
4 changes: 4 additions & 0 deletions cads_adaptors/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class CadsObsRuntimeError(RuntimeError):
"""Raised when a CADS-observation repository request fails."""


class CadsObsConnectionError(RuntimeError):
"""Raised when a CADS-observation repository request fails."""


class RoocsRuntimeError(RuntimeError):
"""Raised when a ROOCS request fails."""

Expand Down
8 changes: 3 additions & 5 deletions cads_adaptors/tools/post_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ def monthly_reduce(
f"earthkit.transforms.aggregate.temporal.monthly_reduce({in_tag}, how={how}, **{kwargs})",
)


return out_xarray_dict


def update_history(dataset: Dataset, update_text: str, context: Context = Context()) -> Dataset:

def update_history(
dataset: Dataset, update_text: str, context: Context = Context()
) -> Dataset:
history = dataset.attrs.get("history", None)
if history is None:
history = update_text
Expand All @@ -120,5 +120,3 @@ def update_history(dataset: Dataset, update_text: str, context: Context = Contex
)
return dataset
return dataset.assign_attrs({"history": history})


4 changes: 2 additions & 2 deletions tests/test_40_post_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def test_monthly_reduce():


def test_update_history():
in_xarray = xr.Dataset(
in_xarray = xr.Dataset(
{
"temperature": xr.DataArray([1, 2, 3]),
}
Expand All @@ -85,4 +85,4 @@ def test_update_history():
out_xarray.attrs["history"] = 1
out_xarray = post_processors.update_history(out_xarray, "Test history update")
assert isinstance(out_xarray, xr.Dataset)
assert out_xarray.attrs["history"] == 1
assert out_xarray.attrs["history"] == 1
149 changes: 90 additions & 59 deletions tests/test_cadsobs_adaptor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import time
from pathlib import Path
from unittest.mock import Mock

import h5netcdf
import pytest

from cads_adaptors import ObservationsAdaptor

Expand Down Expand Up @@ -139,76 +141,105 @@ def get_aux_var_mapping(
}


class ErrorMockerCadsobsApiClient(MockerCadsobsApiClient):
def get_objects_to_retrieve(
self, dataset_name: str, mapped_request: dict, size_limit: int
):
raise RuntimeError("This is a test error")


TEST_REQUEST = {
"time_aggregation": "daily",
"format": "netCDF",
"variable": [
"maximum_air_temperature",
"maximum_air_temperature_negative_total_uncertainty",
"maximum_air_temperature_positive_total_uncertainty",
],
"year": ["2007"],
"month": ["11"],
"day": [
"01",
"02",
"03",
],
"_timestamp": str(time.time()),
}

TEST_ADAPTOR_CONFIG = {
"entry_point": "cads_adaptors:ObservationsAdaptor",
"collection_id": "insitu-observations-near-surface-temperature-us-climate-reference-network",
"obs_api_url": "http://localhost:8000",
"mapping": {
"remap": {
"time_aggregation": {
"daily": "USCRN_DAILY",
"hourly": "USCRN_HOURLY",
"monthly": "USCRN_MONTHLY",
"sub_hourly": "USCRN_SUBHOURLY",
},
"variable": {
"maximum_air_temperature": "daily_maximum_air_temperature",
"maximum_air_temperature_negative_total_uncertainty": "air_temperature_max_negative_total_uncertainty", # noqa E501
"maximum_air_temperature_positive_total_uncertainty": "air_temperature_max_positive_total_uncertainty", # noqa E501
"maximum_relative_humidity": "daily_maximum_relative_humidity",
"maximum_soil_temperature": "hourly_maximum_soil_temperature",
"maximum_soil_temperature_flag": "hourly_maximum_soil_temperature_flag", # noqa E501
"maximum_solar_irradiance": "hourly_maximum_downward_shortwave_irradiance_at_earth_surface", # noqa E501
"maximum_solar_irradiance_quality_flag": "hourly_maximum_downward_shortwave_irradiance_at_earth_surface_quality_flag", # noqa E501
"mean_air_temperature_negative_total_uncertainty": "air_temperature_mean_negative_total_uncertainty", # noqa E501
"mean_air_temperature_positive_total_uncertainty": "air_temperature_mean_positive_total_uncertainty", # noqa E501
"minimum_air_temperature": "daily_minimum_air_temperature",
"minimum_air_temperature_negative_total_uncertainty": "air_temperature_min_negative_total_uncertainty", # noqa E501
"minimum_air_temperature_positive_total_uncertainty": "air_temperature_min_positive_total_uncertainty", # noqa E501
"minimum_relative_humidity": "daily_minimum_relative_humidity",
"minimum_soil_temperature": "hourly_minimum_soil_temperature",
"minimum_soil_temperature_quality_flag": "hourly_minimum_soil_temperature_quality_flag", # noqa E501
"minimum_solar_irradiance": "hourly_minimum_downward_shortwave_irradiance_at_earth_surface", # noqa E501
"minimum_solar_irradiance_quality_flag": "hourly_minimum_downward_shortwave_irradiance_at_earth_surface_quality_flag", # noqa E501
"solar_irradiance": "downward_shortwave_irradiance_at_earth_surface",
"solar_irradiance_quality_flag": "downward_shortwave_irradiance_at_earth_surface_quality_flag", # noqa E501
},
},
"format": {"netcdf": "netCDF"},
"rename": {"time_aggregation": "dataset_source", "variable": "variables"},
"force": {},
},
}


def test_adaptor(tmp_path, monkeypatch):
monkeypatch.setattr(
"cads_adaptors.adaptors.cadsobs.adaptor.CadsobsApiClient",
MockerCadsobsApiClient,
)
test_request = {
"time_aggregation": "daily",
"format": "netCDF",
"variable": [
"maximum_air_temperature",
"maximum_air_temperature_negative_total_uncertainty",
"maximum_air_temperature_positive_total_uncertainty",
],
"year": ["2007"],
"month": ["11"],
"day": [
"01",
"02",
"03",
],
"_timestamp": str(time.time()),
}
test_form = {}
# + "/v1/AUTH_{public_user}" will be needed to work with S3 ceph public urls, but it
# is not needed for this test
test_adaptor_config = {
"entry_point": "cads_adaptors:ObservationsAdaptor",
"collection_id": "insitu-observations-near-surface-temperature-us-climate-reference-network",
"obs_api_url": "http://localhost:8000",
"mapping": {
"remap": {
"time_aggregation": {
"daily": "USCRN_DAILY",
"hourly": "USCRN_HOURLY",
"monthly": "USCRN_MONTHLY",
"sub_hourly": "USCRN_SUBHOURLY",
},
"variable": {
"maximum_air_temperature": "daily_maximum_air_temperature",
"maximum_air_temperature_negative_total_uncertainty": "air_temperature_max_negative_total_uncertainty", # noqa E501
"maximum_air_temperature_positive_total_uncertainty": "air_temperature_max_positive_total_uncertainty", # noqa E501
"maximum_relative_humidity": "daily_maximum_relative_humidity",
"maximum_soil_temperature": "hourly_maximum_soil_temperature",
"maximum_soil_temperature_flag": "hourly_maximum_soil_temperature_flag", # noqa E501
"maximum_solar_irradiance": "hourly_maximum_downward_shortwave_irradiance_at_earth_surface", # noqa E501
"maximum_solar_irradiance_quality_flag": "hourly_maximum_downward_shortwave_irradiance_at_earth_surface_quality_flag", # noqa E501
"mean_air_temperature_negative_total_uncertainty": "air_temperature_mean_negative_total_uncertainty", # noqa E501
"mean_air_temperature_positive_total_uncertainty": "air_temperature_mean_positive_total_uncertainty", # noqa E501
"minimum_air_temperature": "daily_minimum_air_temperature",
"minimum_air_temperature_negative_total_uncertainty": "air_temperature_min_negative_total_uncertainty", # noqa E501
"minimum_air_temperature_positive_total_uncertainty": "air_temperature_min_positive_total_uncertainty", # noqa E501
"minimum_relative_humidity": "daily_minimum_relative_humidity",
"minimum_soil_temperature": "hourly_minimum_soil_temperature",
"minimum_soil_temperature_quality_flag": "hourly_minimum_soil_temperature_quality_flag", # noqa E501
"minimum_solar_irradiance": "hourly_minimum_downward_shortwave_irradiance_at_earth_surface", # noqa E501
"minimum_solar_irradiance_quality_flag": "hourly_minimum_downward_shortwave_irradiance_at_earth_surface_quality_flag", # noqa E501
"solar_irradiance": "downward_shortwave_irradiance_at_earth_surface",
"solar_irradiance_quality_flag": "downward_shortwave_irradiance_at_earth_surface_quality_flag", # noqa E501
},
},
"format": {"netcdf": "netCDF"},
"rename": {"time_aggregation": "dataset_source", "variable": "variables"},
"force": {},
},
}
adaptor = ObservationsAdaptor(test_form, **test_adaptor_config)
result = adaptor.retrieve(test_request)

adaptor = ObservationsAdaptor(test_form, **TEST_ADAPTOR_CONFIG)
result = adaptor.retrieve(TEST_REQUEST)
tempfile = Path(tmp_path, "test_adaptor.nc")
with tempfile.open("wb") as tmpf:
tmpf.write(result.read())
assert tempfile.stat().st_size > 0
actual = h5netcdf.File(tempfile)
assert actual.dimensions["index"].size > 0


def test_adaptor_error(tmp_path, monkeypatch):
monkeypatch.setattr(
"cads_adaptors.adaptors.cadsobs.adaptor.CadsobsApiClient",
ErrorMockerCadsobsApiClient,
)
test_form = {}
# + "/v1/AUTH_{public_user}" will be needed to work with S3 ceph public urls, but it
# is not needed for this test

adaptor = ObservationsAdaptor(test_form, **TEST_ADAPTOR_CONFIG)
adaptor.context.add_user_visible_error = Mock()
with pytest.raises(RuntimeError) as e:
adaptor.retrieve(TEST_REQUEST)
expected_error = "RuntimeError('This is a test error')"
assert repr(e.value) == expected_error
adaptor.context.add_user_visible_error.assert_called_with(expected_error)