Skip to content

Commit

Permalink
CDK: provide better user-friendly error messages (#12944)
Browse files Browse the repository at this point in the history
* initial implementation

* add parse_response_error_message tests

* move error handler to existing try/catch in AbstractSource

* formatting

* var rename

* use isinstance for type checking

* add docstrings

* add more abstract_source and httpstream tests

* fix wrong httperror usage

* more test cases

* bump version, update changelog
  • Loading branch information
pedroslopez authored May 19, 2022
1 parent a5a1ba7 commit 41dc82f
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 6 deletions.
4 changes: 4 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.1.59
- Add `Stream.get_error_display_message()` to retrieve user-friendly messages from exceptions encountered while reading streams.
- Add default error error message retrieval logic for `HTTPStream`s following common API patterns.

## 0.1.58
`TypeTransformer.default_convert` catch `TypeError`

Expand Down
6 changes: 6 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
from airbyte_cdk.sources.utils.transform import TypeTransformer
from airbyte_cdk.utils.event_timing import create_timer
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


class AbstractSource(Source, ABC):
Expand Down Expand Up @@ -118,8 +119,13 @@ def read(
connector_state=connector_state,
internal_config=internal_config,
)
except AirbyteTracedException as e:
raise e
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
display_message = stream_instance.get_error_display_message(e)
if display_message:
raise AirbyteTracedException.from_exception(e, message=display_message) from e
raise e
finally:
timer.finish_event()
Expand Down
12 changes: 12 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ def name(self) -> str:
"""
return casing.camel_to_snake(self.__class__.__name__)

def get_error_display_message(self, exception: BaseException) -> Optional[str]:
"""
Retrieves the user-friendly display message that corresponds to an exception.
This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed.
:param exception: The exception that was raised
:return: A user-friendly message that indicates the cause of the error
"""
return None

@abstractmethod
def read_records(
self,
Expand Down
51 changes: 49 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from urllib.error import HTTPError
from urllib.parse import urljoin

import requests
Expand Down Expand Up @@ -297,7 +296,7 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str,
# Raise any HTTP exceptions that happened in case there were unexpected ones
try:
response.raise_for_status()
except HTTPError as exc:
except requests.HTTPError as exc:
self.logger.error(response.text)
raise exc
return response
Expand Down Expand Up @@ -336,6 +335,54 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi
backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor)
return backoff_handler(user_backoff_handler)(request, request_kwargs)

def parse_response_error_message(self, response: requests.Response) -> Optional[str]:
"""
Parses the raw response object from a failed request into a user-friendly error message.
By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.
:param response:
:return: A user-friendly message that indicates the cause of the error
"""

# default logic to grab error from common fields
def _try_get_error(value):
if isinstance(value, str):
return value
elif isinstance(value, list):
return ", ".join(_try_get_error(v) for v in value)
elif isinstance(value, dict):
new_value = (
value.get("message")
or value.get("messages")
or value.get("error")
or value.get("errors")
or value.get("failures")
or value.get("failure")
)
return _try_get_error(new_value)
return None

try:
body = response.json()
return _try_get_error(body)
except requests.exceptions.JSONDecodeError:
return None

def get_error_display_message(self, exception: BaseException) -> Optional[str]:
"""
Retrieves the user-friendly display message that corresponds to an exception.
This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message().
The method should be overriden as needed to handle any additional exception types.
:param exception: The exception that was raised
:return: A user-friendly message that indicates the cause of the error
"""
if isinstance(exception, requests.HTTPError):
return self.parse_response_error_message(exception.response)
return None

def read_records(
self,
sync_mode: SyncMode,
Expand Down
8 changes: 5 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/utils/airbyte_secrets_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
#

import logging
from typing import Any, List, Mapping
from typing import TYPE_CHECKING, Any, List, Mapping

from airbyte_cdk.sources import Source
from airbyte_cdk.utils.mapping_utils import all_key_pairs_dot_notation, get_value_by_dot_notation

if TYPE_CHECKING:
from airbyte_cdk.sources import Source

def get_secrets(source: Source, config: Mapping[str, Any], logger: logging.Logger) -> List[Any]:

def get_secrets(source: "Source", config: Mapping[str, Any], logger: logging.Logger) -> List[Any]:
"""
Get a list of secrets from the source config based on the source specification
"""
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.58",
version="0.1.59",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
50 changes: 50 additions & 0 deletions airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,3 +442,53 @@ def test_send_raise_on_http_errors_logs(mocker, status_code):
response = stream._send_request(req, {})
stream.logger.error.assert_called_with(response.text)
assert response.status_code == status_code


@pytest.mark.parametrize(
"api_response, expected_message",
[
({"error": "something broke"}, "something broke"),
({"error": {"message": "something broke"}}, "something broke"),
({"error": "err-001", "message": "something broke"}, "something broke"),
({"failure": {"message": "something broke"}}, "something broke"),
({"error": {"errors": [{"message": "one"}, {"message": "two"}, {"message": "three"}]}}, "one, two, three"),
({"errors": ["one", "two", "three"]}, "one, two, three"),
({"messages": ["one", "two", "three"]}, "one, two, three"),
({"errors": [{"message": "one"}, {"message": "two"}, {"message": "three"}]}, "one, two, three"),
({"error": [{"message": "one"}, {"message": "two"}, {"message": "three"}]}, "one, two, three"),
({"errors": [{"error": "one"}, {"error": "two"}, {"error": "three"}]}, "one, two, three"),
({"failures": [{"message": "one"}, {"message": "two"}, {"message": "three"}]}, "one, two, three"),
(["one", "two", "three"], "one, two, three"),
([{"error": "one"}, {"error": "two"}, {"error": "three"}], "one, two, three"),
({"error": True}, None),
({"something_else": "hi"}, None),
({}, None),
],
)
def test_default_parse_response_error_message(api_response: dict, expected_message: Optional[str]):
stream = StubBasicReadHttpStream()
response = MagicMock()
response.json.return_value = api_response

message = stream.parse_response_error_message(response)
assert message == expected_message


def test_default_parse_response_error_message_not_json():
stream = StubBasicReadHttpStream()
response = MagicMock()
response.json.side_effect = requests.exceptions.JSONDecodeError()

message = stream.parse_response_error_message(response)
assert message is None


def test_default_get_error_display_message_handles_http_error(mocker):
stream = StubBasicReadHttpStream()
mocker.patch.object(stream, "parse_response_error_message", return_value="my custom message")

non_http_err_msg = stream.get_error_display_message(RuntimeError("not me"))
assert non_http_err_msg is None

http_err_msg = stream.get_error_display_message(requests.HTTPError())
assert http_err_msg == "my custom message"
21 changes: 21 additions & 0 deletions airbyte-cdk/python/unit_tests/sources/test_abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

logger = logging.getLogger("airbyte")

Expand Down Expand Up @@ -142,6 +143,26 @@ def test_read_nonexistent_stream_raises_exception(mocker):
list(src.read(logger, {}, catalog))


def test_read_stream_with_error_gets_display_message(mocker):
stream = MockStream(name="my_stream")

mocker.patch.object(MockStream, "get_json_schema", return_value={})
mocker.patch.object(MockStream, "read_records", side_effect=RuntimeError("oh no!"))

source = MockSource(streams=[stream])
catalog = ConfiguredAirbyteCatalog(streams=[_configured_stream(stream, SyncMode.full_refresh)])

# without get_error_display_message
with pytest.raises(RuntimeError, match="oh no!"):
list(source.read(logger, {}, catalog))

mocker.patch.object(MockStream, "get_error_display_message", return_value="my message")

with pytest.raises(AirbyteTracedException, match="oh no!") as exc:
list(source.read(logger, {}, catalog))
assert exc.value.message == "my message"


GLOBAL_EMITTED_AT = 1


Expand Down

0 comments on commit 41dc82f

Please sign in to comment.