diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index d8f5aafa71002..b5d795d59ed99 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.15.0 +Reverts additions from versions 0.13.0 and 0.13.3. + ## 0.14.0 Low-code: Add token_expiry_date_format to OAuth Authenticator. Resolve ref schema diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 3f9efab8e48f2..d1ac63e76e823 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -106,10 +106,6 @@ def read( f"The requested stream {configured_stream.stream.name} was not found in the source." f" Available streams: {stream_instances.keys()}" ) - stream_is_available, error = stream_instance.check_availability(logger, self) - if not stream_is_available: - logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. Error: {error}") - continue try: timer.start_event(f"Syncing stream {configured_stream.stream.name}") yield from self._read_stream( @@ -191,7 +187,7 @@ def _read_stream( @staticmethod def _limit_reached(internal_config: InternalConfig, records_counter: int) -> bool: """ - Check if record count reached limit set by internal config. + Check if record count reached liimt set by internal config. :param internal_config - internal CDK configuration separated from user defined config :records_counter - number of records already red :return True if limit reached, False otherwise diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index 490cf104ec6d4..c982f354f3b40 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -6,9 +6,9 @@ from dataclasses import InitVar, dataclass from typing import Any, List, Mapping, Tuple +from airbyte_cdk.models.airbyte_protocol import SyncMode from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.source import Source -from airbyte_cdk.sources.utils.stream_helpers import StreamHelper from dataclasses_jsonschema import JsonSchemaMixin @@ -33,19 +33,29 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi if len(streams) == 0: return False, f"No streams to connect to from source {source}" for stream_name in self.stream_names: - if stream_name not in stream_name_to_stream.keys(): + if stream_name in stream_name_to_stream.keys(): + stream = stream_name_to_stream[stream_name] + try: + # Some streams need a stream slice to read records (eg if they have a SubstreamSlicer) + stream_slice = self._get_stream_slice(stream) + records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) + next(records) + except Exception as error: + return False, f"Unable to connect to stream {stream_name} - {error}" + else: raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}") - - stream = stream_name_to_stream[stream_name] - try: - if stream.availability_strategy is not None: - stream_is_available, reason = stream.check_availability(logger, source) - if not stream_is_available: - return False, reason - else: - stream_helper = StreamHelper() - stream_helper.get_first_record(stream) - except Exception as error: - return False, f"Unable to connect to stream {stream_name} - {error}" - return True, None + + def _get_stream_slice(self, stream): + # We wrap the return output of stream_slices() because some implementations return types that are iterable, + # but not iterators such as lists or tuples + slices = iter( + stream.stream_slices( + cursor_field=stream.cursor_field, + sync_mode=SyncMode.full_refresh, + ) + ) + try: + return next(slices) + except StopIteration: + return {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py index beb3cfaa1d263..6e79356ee93b7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py @@ -17,7 +17,7 @@ class DeclarativeSource(AbstractSource): @property @abstractmethod def connection_checker(self) -> ConnectionChecker: - """Returns the ConnectionChecker to use for the `check` operation""" + """Returns the ConnectioChecker to use for the `check` operation""" def check_connection(self, logger, config) -> Tuple[bool, any]: """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/availability_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/availability_strategy.py deleted file mode 100644 index bb86a1c1de0d3..0000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/availability_strategy.py +++ /dev/null @@ -1,33 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import logging -import typing -from abc import ABC, abstractmethod -from typing import Optional, Tuple - -from airbyte_cdk.sources.streams import Stream - -if typing.TYPE_CHECKING: - from airbyte_cdk.sources import Source - - -class AvailabilityStrategy(ABC): - """ - Abstract base class for checking stream availability. - """ - - @abstractmethod - def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]: - """ - Checks stream availability. - - :param stream: stream - :param logger: source logger - :param source: (optional) source - :return: A tuple of (boolean, str). If boolean is true, then the stream - is available, and no str is required. Otherwise, the stream is unavailable - for some reason and the str should describe what went wrong and how to - resolve the unavailability, if possible. - """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index 5ff57550e0032..d39c706eb9aaf 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -5,10 +5,9 @@ import inspect import logging -import typing from abc import ABC, abstractmethod from functools import lru_cache -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import airbyte_cdk.sources.utils.casing as casing from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode @@ -18,10 +17,6 @@ from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from deprecated.classic import deprecated -if typing.TYPE_CHECKING: - from airbyte_cdk.sources import Source - from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy - # A stream's read method can return one of the following types: # Mapping[str, Any]: The content of an AirbyteRecordMessage # AirbyteRecordMessage: An AirbyteRecordMessage @@ -175,28 +170,6 @@ def source_defined_cursor(self) -> bool: """ return True - def check_availability(self, logger: logging.Logger, source: Optional["Source"] = None) -> Tuple[bool, Optional[str]]: - """ - Checks whether this stream is available. - - :param logger: source logger - :param source: (optional) source - :return: A tuple of (boolean, str). If boolean is true, then this stream - is available, and no str is required. Otherwise, this stream is unavailable - for some reason and the str should describe what went wrong and how to - resolve the unavailability, if possible. - """ - if self.availability_strategy: - return self.availability_strategy.check_availability(self, logger, source) - return True, None - - @property - def availability_strategy(self) -> Optional["AvailabilityStrategy"]: - """ - :return: The AvailabilityStrategy used to check whether this stream is available. - """ - return None - @property @abstractmethod def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py deleted file mode 100644 index 180e8dd3ee95f..0000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py +++ /dev/null @@ -1,120 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import logging -import typing -from typing import Dict, Optional, Tuple - -import requests -from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy -from airbyte_cdk.sources.utils.stream_helpers import StreamHelper -from requests import HTTPError - -if typing.TYPE_CHECKING: - from airbyte_cdk.sources import Source - - -class HttpAvailabilityStrategy(AvailabilityStrategy): - def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]: - """ - Check stream availability by attempting to read the first record of the - stream. - - :param stream: stream - :param logger: source logger - :param source: (optional) source - :return: A tuple of (boolean, str). If boolean is true, then the stream - is available, and no str is required. Otherwise, the stream is unavailable - for some reason and the str should describe what went wrong and how to - resolve the unavailability, if possible. - """ - try: - stream_helper = StreamHelper() - stream_helper.get_first_record(stream) - except HTTPError as error: - return self.handle_http_error(stream, logger, source, error) - return True, None - - def handle_http_error( - self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError - ) -> Tuple[bool, Optional[str]]: - """ - Override this method to define error handling for various `HTTPError`s - that are raised while attempting to check a stream's availability. - - Checks whether an error's status_code is in a list of unavailable_error_codes, - and gets the associated reason for that error. - - :param stream: stream - :param logger: source logger - :param source: optional (source) - :param error: HTTPError raised while checking stream's availability. - :return: A tuple of (boolean, str). If boolean is true, then the stream - is available, and no str is required. Otherwise, the stream is unavailable - for some reason and the str should describe what went wrong and how to - resolve the unavailability, if possible. - """ - try: - status_code = error.response.status_code - reason = self.reasons_for_unavailable_status_codes(stream, logger, source, error)[status_code] - response_error_message = stream.parse_response_error_message(error.response) - if response_error_message: - reason += response_error_message - return False, reason - except KeyError: - # If the HTTPError is not in the dictionary of errors we know how to handle, don't except it - raise error - - def reasons_for_unavailable_status_codes( - self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError - ) -> Dict[int, str]: - """ - Returns a dictionary of HTTP status codes that indicate stream - unavailability and reasons explaining why a given status code may - have occurred and how the user can resolve that error, if applicable. - - :param stream: stream - :param logger: source logger - :param source: optional (source) - :return: A dictionary of (status code, reason) where the 'reason' explains - why 'status code' may have occurred and how the user can resolve that - error, if applicable. - """ - forbidden_error_message = f"The endpoint to access stream '{stream.name}' returned 403: Forbidden. " - forbidden_error_message += "This is most likely due to insufficient permissions on the credentials in use. " - forbidden_error_message += self._visit_docs_message(logger, source) - - reasons_for_codes: Dict[int, str] = {requests.codes.FORBIDDEN: forbidden_error_message} - return reasons_for_codes - - @staticmethod - def _visit_docs_message(logger: logging.Logger, source: Optional["Source"]) -> str: - """ - Creates a message indicicating where to look in the documentation for - more information on a given source by checking the spec of that source - (if provided) for a 'documentationUrl'. - - :param logger: source logger - :param source: optional (source) - :return: A message telling the user where to go to learn more about the source. - """ - if not source: - return "Please visit the connector's documentation to learn more. " - - try: - connector_spec = source.spec(logger) - docs_url = connector_spec.documentationUrl - if docs_url: - return f"Please visit {docs_url} to learn more. " - else: - return "Please visit the connector's documentation to learn more. " - - except FileNotFoundError: # If we are unit testing without implementing spec() method in source - if source: - docs_url = f"https://docs.airbyte.com/integrations/sources/{source.name}" - else: - docs_url = "https://docs.airbyte.com/integrations/sources/test" - - return f"Please visit {docs_url} to learn more." diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 35cef60807fe9..084eb052894ed 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -13,9 +13,7 @@ import requests import requests_cache from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.core import Stream, StreamData -from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from requests.auth import AuthBase from requests_cache.session import CachedSession @@ -115,10 +113,6 @@ def retry_factor(self) -> float: def authenticator(self) -> HttpAuthenticator: return self._authenticator - @property - def availability_strategy(self) -> Optional[AvailabilityStrategy]: - return HttpAvailabilityStrategy() - @abstractmethod def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py deleted file mode 100644 index bd922f3d80aa5..0000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py +++ /dev/null @@ -1,47 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from typing import Any, Mapping, Optional - -from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.core import StreamData - - -class StreamHelper: - def get_first_record(self, stream: Stream) -> StreamData: - """ - Gets the first record for a stream. - - :param stream: stream - :return: StreamData containing the first record in the stream - """ - try: - # Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer) - stream_slice = self.get_stream_slice(stream) - records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) - return next(records) - except StopIteration: - return {} - - @staticmethod - def get_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]: - """ - Gets the first stream_slice from a given stream's stream_slices. - - :param stream: stream - :return: First stream slice from 'stream_slices' generator - """ - # We wrap the return output of stream_slices() because some implementations return types that are iterable, - # but not iterators such as lists or tuples - slices = iter( - stream.stream_slices( - cursor_field=stream.cursor_field, - sync_mode=SyncMode.full_refresh, - ) - ) - try: - return next(slices) - except StopIteration: - return {} diff --git a/airbyte-cdk/python/docs/concepts/http-streams.md b/airbyte-cdk/python/docs/concepts/http-streams.md index 0be02ce35cdcc..5bedb787c747e 100644 --- a/airbyte-cdk/python/docs/concepts/http-streams.md +++ b/airbyte-cdk/python/docs/concepts/http-streams.md @@ -81,14 +81,3 @@ When we are dealing with streams that depend on the results of another stream, w If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc.. override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can be returned as a keyword argument. - -## Stream Availability - -The CDK defines an `AvailabilityStrategy` for a stream, which is used to perform the `check_availability` method. This method checks whether -the stream is available before performing `read_records`. - -For HTTP streams, a default `HttpAvailabilityStrategy` is defined, which attempts to read the first record of the stream, and excepts -a dictionary of known error codes and associated reasons, `reasons_for_unavailable_status_codes`. By default, this list contains only -`requests.status_codes.FORBIDDEN` (403), with an associated error message that tells the user that they are likely missing permissions associated with that stream. - -You can override these known errors to except more error codes and inform the user how to resolve errors. diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index b030f7b3654e8..76f15deede067 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.14.0", + version="0.15.0", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py index c7b84dc24b797..10e43ea7c4d10 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py @@ -2,15 +2,10 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -import logging -from typing import Any, Iterable, Mapping, Optional from unittest.mock import MagicMock import pytest -import requests from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream -from airbyte_cdk.sources.streams.http import HttpStream -from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy logger = None config = dict() @@ -32,7 +27,6 @@ def test_check_stream_with_slices_as_list(test_name, record, streams_to_check, stream_slice, expectation, slices_as_list): stream = MagicMock() stream.name = "s1" - stream.availability_strategy = None if slices_as_list: stream.stream_slices.return_value = [stream_slice] else: @@ -55,56 +49,3 @@ def test_check_stream_with_slices_as_list(test_name, record, streams_to_check, s def mock_read_records(responses, default_response=None, **kwargs): return lambda stream_slice, sync_mode: responses[frozenset(stream_slice)] if frozenset(stream_slice) in responses else default_response - - -@pytest.mark.parametrize( - "test_name, response_code, available_expectation, expected_messages", - [ - ("test_stream_unavailable_unhandled_error", 404, False, ["Unable to connect to stream mock_http_stream", "404 Client Error"]), - ("test_stream_unavailable_handled_error", 403, False, [ - "The endpoint to access stream 'mock_http_stream' returned 403: Forbidden.", - "This is most likely due to insufficient permissions on the credentials in use.", - ]), - ("test_stream_available", 200, True, []), - ], -) -def test_check_http_stream_via_availability_strategy(mocker, test_name, response_code, available_expectation, expected_messages): - class MockHttpStream(HttpStream): - url_base = "https://test_base_url.com" - primary_key = "" - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.resp_counter = 1 - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - return None - - def path(self, **kwargs) -> str: - return "" - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - stub_resp = {"data": self.resp_counter} - self.resp_counter += 1 - yield stub_resp - pass - - http_stream = MockHttpStream() - assert isinstance(http_stream, HttpStream) - assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) - - source = MagicMock() - source.streams.return_value = [http_stream] - - check_stream = CheckStream(stream_names=["mock_http_stream"], options={}) - - req = requests.Response() - req.status_code = response_code - mocker.patch.object(requests.Session, "send", return_value=req) - - logger = logging.getLogger(f"airbyte.{getattr(source, 'name', '')}") - stream_is_available, reason = check_stream.check_connection(source, logger, config) - - assert stream_is_available == available_expectation - for message in expected_messages: - assert message in reason diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py deleted file mode 100644 index 68c0b7e04dbf0..0000000000000 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py +++ /dev/null @@ -1,168 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import logging -from typing import Any, Iterable, List, Mapping, Optional, Tuple - -import pytest -import requests -from airbyte_cdk.sources import AbstractSource -from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy -from airbyte_cdk.sources.streams.http.http import HttpStream -from requests import HTTPError - -logger = logging.getLogger("airbyte") - - -class MockHttpStream(HttpStream): - url_base = "https://test_base_url.com" - primary_key = "" - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.resp_counter = 1 - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - return None - - def path(self, **kwargs) -> str: - return "" - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - stub_resp = {"data": self.resp_counter} - self.resp_counter += 1 - yield stub_resp - pass - - def retry_factor(self) -> float: - return 0.01 - - -def test_default_http_availability_strategy(mocker): - http_stream = MockHttpStream() - assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) - - class MockResponse(requests.Response, mocker.MagicMock): - def __init__(self, *args, **kvargs): - mocker.MagicMock.__init__(self) - requests.Response.__init__(self, **kvargs) - self.json = mocker.MagicMock() - - response = MockResponse() - response.status_code = 403 - response.json.return_value = {"error": "Oh no!"} - mocker.patch.object(requests.Session, "send", return_value=response) - - stream_is_available, reason = http_stream.check_availability(logger) - assert not stream_is_available - - expected_messages = [ - "This is most likely due to insufficient permissions on the credentials in use.", - "Please visit the connector's documentation to learn more.", - "Oh no!", - ] - for message in expected_messages: - assert message in reason - - req = requests.Response() - req.status_code = 200 - mocker.patch.object(requests.Session, "send", return_value=req) - - stream_is_available, _ = http_stream.check_availability(logger) - assert stream_is_available - - -def test_http_availability_connector_specific_docs(mocker): - class MockSource(AbstractSource): - def __init__(self, streams: List[Stream] = None): - self._streams = streams - - def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: - return True, "" - - def streams(self, config: Mapping[str, Any]) -> List[Stream]: - if not self._streams: - raise Exception("Stream is not set") - return self._streams - - http_stream = MockHttpStream() - source = MockSource(streams=[http_stream]) - assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) - - req = requests.Response() - req.status_code = 403 - mocker.patch.object(requests.Session, "send", return_value=req, json={"error": "Oh no!"}) - - stream_is_available, reason = http_stream.check_availability(logger, source) - assert not stream_is_available - - expected_messages = [ - f"The endpoint to access stream '{http_stream.name}' returned 403: Forbidden.", - "This is most likely due to insufficient permissions on the credentials in use.", - f"Please visit https://docs.airbyte.com/integrations/sources/{source.name} to learn more.", - # "Oh no!", - ] - for message in expected_messages: - assert message in reason - - -def test_http_availability_raises_unhandled_error(mocker): - http_stream = MockHttpStream() - assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) - - req = requests.Response() - req.status_code = 404 - mocker.patch.object(requests.Session, "send", return_value=req) - - with pytest.raises(HTTPError): - http_stream.check_availability(logger) - - -def test_send_handles_retries_when_checking_availability(mocker, caplog): - http_stream = MockHttpStream() - assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) - - req_1 = requests.Response() - req_1.status_code = 429 - req_2 = requests.Response() - req_2.status_code = 503 - req_3 = requests.Response() - req_3.status_code = 200 - mock_send = mocker.patch.object(requests.Session, "send", side_effect=[req_1, req_2, req_3]) - - with caplog.at_level(logging.INFO): - stream_is_available, _ = http_stream.check_availability(logger) - - assert stream_is_available - assert mock_send.call_count == 3 - for message in ["Caught retryable error", "Response Code: 429", "Response Code: 503"]: - assert message in caplog.text - - -def test_http_availability_strategy_on_empty_stream(mocker): - mocker.patch.multiple(HttpStream, __abstractmethods__=set()) - mocker.patch.multiple(Stream, __abstractmethods__=set()) - - class MockEmptyStream(mocker.MagicMock, HttpStream): - page_size = None - get_json_schema = mocker.MagicMock() - - def __init__(self, *args, **kvargs): - mocker.MagicMock.__init__(self) - self.read_records = mocker.MagicMock() - - empty_stream = MockEmptyStream() - assert isinstance(empty_stream, HttpStream) - - assert isinstance(empty_stream.availability_strategy, HttpAvailabilityStrategy) - - # Generator should have no values to generate - empty_stream.read_records.return_value = iter([]) - - logger = logging.getLogger("airbyte.test-source") - stream_is_available, _ = empty_stream.check_availability(logger) - - assert stream_is_available - assert empty_stream.read_records.called diff --git a/airbyte-cdk/python/unit_tests/sources/streams/test_availability_strategy.py b/airbyte-cdk/python/unit_tests/sources/streams/test_availability_strategy.py deleted file mode 100644 index 277924b011974..0000000000000 --- a/airbyte-cdk/python/unit_tests/sources/streams/test_availability_strategy.py +++ /dev/null @@ -1,70 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import logging -from typing import Any, Iterable, List, Mapping, Optional, Tuple, Union - -from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources import Source -from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy -from airbyte_cdk.sources.streams.core import StreamData - -logger = logging.getLogger("airbyte") - - -class MockStream(Stream): - def __init__(self, name: str) -> Stream: - self._name = name - - @property - def name(self) -> str: - return self._name - - @property - def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: - pass - - def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[StreamData]: - pass - - -def test_no_availability_strategy(): - stream_1 = MockStream("stream") - assert stream_1.availability_strategy is None - - stream_1_is_available, _ = stream_1.check_availability(logger) - assert stream_1_is_available - - -def test_availability_strategy(): - class MockAvailabilityStrategy(AvailabilityStrategy): - def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional[Source]) -> Tuple[bool, any]: - if stream.name == "available_stream": - return True, None - return False, f"Could not reach stream '{stream.name}'." - - class MockStreamWithAvailabilityStrategy(MockStream): - @property - def availability_strategy(self) -> Optional["AvailabilityStrategy"]: - return MockAvailabilityStrategy() - - stream_1 = MockStreamWithAvailabilityStrategy("available_stream") - stream_2 = MockStreamWithAvailabilityStrategy("unavailable_stream") - - for stream in [stream_1, stream_2]: - assert isinstance(stream.availability_strategy, MockAvailabilityStrategy) - - stream_1_is_available, _ = stream_1.check_availability(logger) - assert stream_1_is_available - - stream_2_is_available, reason = stream_2.check_availability(logger) - assert not stream_2_is_available - assert "Could not reach stream 'unavailable_stream'" in reason diff --git a/airbyte-cdk/python/unit_tests/sources/test_source.py b/airbyte-cdk/python/unit_tests/sources/test_source.py index 64a546108db14..5b67d57444ebc 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_source.py @@ -7,10 +7,10 @@ import tempfile from collections import defaultdict from contextlib import nullcontext as does_not_raise -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple +from typing import Any, List, Mapping, MutableMapping, Optional, Tuple +from unittest.mock import MagicMock import pytest -import requests from airbyte_cdk.models import ( AirbyteGlobalState, AirbyteStateBlob, @@ -24,7 +24,6 @@ ) from airbyte_cdk.sources import AbstractSource, Source from airbyte_cdk.sources.streams.core import Stream -from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from airbyte_cdk.sources.streams.http.http import HttpStream from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from pydantic import ValidationError @@ -44,15 +43,10 @@ def discover(self, logger: logging.Logger, config: Mapping[str, Any]): class MockAbstractSource(AbstractSource): - def __init__(self, streams: Optional[List[Stream]] = None): - self._streams = streams - def check_connection(self, *args, **kwargs) -> Tuple[bool, Optional[Any]]: return True, "" def streams(self, *args, **kwargs) -> List[Stream]: - if self._streams: - return self._streams return [] @@ -85,30 +79,26 @@ def abstract_source(mocker): mocker.patch.multiple(HttpStream, __abstractmethods__=set()) mocker.patch.multiple(Stream, __abstractmethods__=set()) - class MockHttpStream(mocker.MagicMock, HttpStream): + class MockHttpStream(MagicMock, HttpStream): url_base = "http://example.com" path = "/dummy/path" - get_json_schema = mocker.MagicMock() + get_json_schema = MagicMock() def supports_incremental(self): return True def __init__(self, *args, **kvargs): - mocker.MagicMock.__init__(self) + MagicMock.__init__(self) HttpStream.__init__(self, *args, kvargs) - self.read_records = mocker.MagicMock() - - @property - def availability_strategy(self): - return None + self.read_records = MagicMock() - class MockStream(mocker.MagicMock, Stream): + class MockStream(MagicMock, Stream): page_size = None - get_json_schema = mocker.MagicMock() + get_json_schema = MagicMock() - def __init__(self, **kwargs): - mocker.MagicMock.__init__(self) - self.read_records = mocker.MagicMock() + def __init__(self, *args, **kvargs): + MagicMock.__init__(self) + self.read_records = MagicMock() streams = [MockHttpStream(), MockStream()] @@ -395,8 +385,8 @@ def test_internal_config(abstract_source, catalog): assert not non_http_stream.page_size -def test_internal_config_limit(mocker, abstract_source, catalog): - logger_mock = mocker.MagicMock() +def test_internal_config_limit(abstract_source, catalog): + logger_mock = MagicMock() logger_mock.level = logging.DEBUG del catalog.streams[1] STREAM_LIMIT = 2 @@ -433,8 +423,8 @@ def test_internal_config_limit(mocker, abstract_source, catalog): SCHEMA = {"type": "object", "properties": {"value": {"type": "string"}}} -def test_source_config_no_transform(mocker, abstract_source, catalog): - logger_mock = mocker.MagicMock() +def test_source_config_no_transform(abstract_source, catalog): + logger_mock = MagicMock() logger_mock.level = logging.DEBUG streams = abstract_source.streams(None) http_stream, non_http_stream = streams @@ -447,8 +437,8 @@ def test_source_config_no_transform(mocker, abstract_source, catalog): assert non_http_stream.get_json_schema.call_count == 5 -def test_source_config_transform(mocker, abstract_source, catalog): - logger_mock = mocker.MagicMock() +def test_source_config_transform(abstract_source, catalog): + logger_mock = MagicMock() logger_mock.level = logging.DEBUG streams = abstract_source.streams(None) http_stream, non_http_stream = streams @@ -461,8 +451,8 @@ def test_source_config_transform(mocker, abstract_source, catalog): assert [r.record.data for r in records] == [{"value": "23"}] * 2 -def test_source_config_transform_and_no_transform(mocker, abstract_source, catalog): - logger_mock = mocker.MagicMock() +def test_source_config_transform_and_no_transform(abstract_source, catalog): + logger_mock = MagicMock() logger_mock.level = logging.DEBUG streams = abstract_source.streams(None) http_stream, non_http_stream = streams @@ -472,116 +462,3 @@ def test_source_config_transform_and_no_transform(mocker, abstract_source, catal records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] assert len(records) == 2 assert [r.record.data for r in records] == [{"value": "23"}, {"value": 23}] - - -def test_read_default_http_availability_strategy_stream_available(catalog, mocker): - mocker.patch.multiple(HttpStream, __abstractmethods__=set()) - mocker.patch.multiple(Stream, __abstractmethods__=set()) - - class MockHttpStream(mocker.MagicMock, HttpStream): - url_base = "http://example.com" - path = "/dummy/path" - get_json_schema = mocker.MagicMock() - - def supports_incremental(self): - return True - - def __init__(self, *args, **kvargs): - mocker.MagicMock.__init__(self) - HttpStream.__init__(self, *args, kvargs) - self.read_records = mocker.MagicMock() - - class MockStream(mocker.MagicMock, Stream): - page_size = None - get_json_schema = mocker.MagicMock() - - def __init__(self, *args, **kvargs): - mocker.MagicMock.__init__(self) - self.read_records = mocker.MagicMock() - - streams = [MockHttpStream(), MockStream()] - http_stream, non_http_stream = streams - assert isinstance(http_stream, HttpStream) - assert not isinstance(non_http_stream, HttpStream) - - assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) - assert non_http_stream.availability_strategy is None - - # Add an extra record for the default HttpAvailabilityStrategy to pull from - # during the try: next(records) check, since we are mocking the return value - # and not re-creating the generator like we would during actual reading - http_stream.read_records.return_value = iter([{"value": "test"}] + [{}] * 3) - non_http_stream.read_records.return_value = iter([{}] * 3) - - source = MockAbstractSource(streams=streams) - logger = logging.getLogger(f"airbyte.{getattr(abstract_source, 'name', '')}") - records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})] - # 3 for http stream and 3 for non http stream - assert len(records) == 3 + 3 - assert http_stream.read_records.called - assert non_http_stream.read_records.called - - -def test_read_default_http_availability_strategy_stream_unavailable(catalog, mocker, caplog): - mocker.patch.multiple(Stream, __abstractmethods__=set()) - - class MockHttpStream(HttpStream): - url_base = "https://test_base_url.com" - primary_key = "" - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.resp_counter = 1 - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - return None - - def path(self, **kwargs) -> str: - return "" - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - stub_response = {"data": self.resp_counter} - self.resp_counter += 1 - yield stub_response - - class MockStream(mocker.MagicMock, Stream): - page_size = None - get_json_schema = mocker.MagicMock() - - def __init__(self, *args, **kvargs): - mocker.MagicMock.__init__(self) - self.read_records = mocker.MagicMock() - - streams = [MockHttpStream(), MockStream()] - http_stream, non_http_stream = streams - assert isinstance(http_stream, HttpStream) - assert not isinstance(non_http_stream, HttpStream) - - assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) - assert non_http_stream.availability_strategy is None - - # Don't set anything for read_records return value for HttpStream, since - # it should be skipped due to the stream being unavailable - non_http_stream.read_records.return_value = iter([{}] * 3) - - # Patch HTTP request to stream endpoint to make it unavailable - req = requests.Response() - req.status_code = 403 - mocker.patch.object(requests.Session, "send", return_value=req) - - source = MockAbstractSource(streams=streams) - logger = logging.getLogger("test_read_default_http_availability_strategy_stream_unavailable") - with caplog.at_level(logging.WARNING): - records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})] - - # 0 for http stream and 3 for non http stream - assert len(records) == 0 + 3 - assert non_http_stream.read_records.called - expected_logs = [ - f"Skipped syncing stream '{http_stream.name}' because it was unavailable.", - f"The endpoint to access stream '{http_stream.name}' returned 403: Forbidden.", - "This is most likely due to insufficient permissions on the credentials in use.", - f"Please visit https://docs.airbyte.com/integrations/sources/{source.name} to learn more." - ] - for message in expected_logs: - assert message in caplog.text diff --git a/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py b/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py index 38aa713a5e78f..2274ffb02ae50 100644 --- a/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py +++ b/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py @@ -4,7 +4,6 @@ import json -import logging import os import shutil import sys @@ -13,13 +12,14 @@ from pathlib import Path import jsonref +from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification, FailureType from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader, check_config_against_spec_or_exit from airbyte_cdk.utils.traced_exception import AirbyteTracedException from pytest import fixture from pytest import raises as pytest_raises -logger = logging.getLogger("airbyte") +logger = AirbyteLogger() MODULE = sys.modules[__name__] diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index ae18ba857e462..8a55c88b2c7b1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -554,7 +554,7 @@ - name: GitHub sourceDefinitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e dockerRepository: airbyte/source-github - dockerImageTag: 0.3.9 + dockerImageTag: 0.3.10 documentationUrl: https://docs.airbyte.com/integrations/sources/github icon: github.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index e78f268da5ad5..6c9d1aafcb6b8 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -4543,7 +4543,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-github:0.3.9" +- dockerImage: "airbyte/source-github:0.3.10" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/github" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-github/Dockerfile b/airbyte-integrations/connectors/source-github/Dockerfile index a76dcdc9be90b..782915f127fcc 100644 --- a/airbyte-integrations/connectors/source-github/Dockerfile +++ b/airbyte-integrations/connectors/source-github/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.3.9 +LABEL io.airbyte.version=0.3.10 LABEL io.airbyte.name=airbyte/source-github diff --git a/airbyte-integrations/connectors/source-github/setup.py b/airbyte-integrations/connectors/source-github/setup.py index f51b3d4f4894a..476271bbbf77d 100644 --- a/airbyte-integrations/connectors/source-github/setup.py +++ b/airbyte-integrations/connectors/source-github/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.13", "pendulum~=2.1.2", "sgqlc"] +MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "pendulum~=2.1.2", "sgqlc"] TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "responses~=0.19.0"] diff --git a/airbyte-integrations/connectors/source-github/source_github/availability_strategies.py b/airbyte-integrations/connectors/source-github/source_github/availability_strategies.py deleted file mode 100644 index d6d959580e3e2..0000000000000 --- a/airbyte-integrations/connectors/source-github/source_github/availability_strategies.py +++ /dev/null @@ -1,90 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import logging -from typing import Dict, Optional - -import requests -from airbyte_cdk.sources import Source -from airbyte_cdk.sources.streams.core import Stream -from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy -from airbyte_cdk.sources.utils.stream_helpers import StreamHelper -from requests import HTTPError - - -class OrganizationBasedAvailabilityStrategy(HttpAvailabilityStrategy): - """ - Availability Strategy for organization-based streams. - """ - - def reasons_for_unavailable_status_codes( - self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError - ) -> Dict[int, str]: - stream_slice = StreamHelper().get_stream_slice(stream) - organization = stream_slice["organization"] - response_error_msg = str(error.response.json().get("message")) - - reasons_for_codes = { - requests.codes.NOT_FOUND: f"`{stream.__class__.__name__}` stream isn't available for organization `{organization}`.", - # When `403` for the stream, that has no access to the organization's teams, based on OAuth Apps Restrictions: - # https://docs.github.com/en/organizations/restricting-access-to-your-organizations-data/enabling-oauth-app-access-restrictions-for-your-organization - requests.codes.FORBIDDEN: f"`{stream.name}` stream isn't available for organization `{organization}`. Full error message: {response_error_msg}", - } - return reasons_for_codes - - -class RepositoryBasedAvailabilityStrategy(HttpAvailabilityStrategy): - """ - Availability Strategy for repository-based streams. - """ - - def reasons_for_unavailable_status_codes( - self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError - ) -> Dict[int, str]: - stream_slice = StreamHelper().get_stream_slice(stream) - repository = stream_slice["repository"] - error_msg = str(error.response.json().get("message")) - - reasons_for_codes = { - requests.codes.NOT_FOUND: f"`{stream.name}` stream isn't available for repository `{repository}`.", - requests.codes.FORBIDDEN: f"`{stream.name}` stream isn't available for repository `{repository}`. Full error message: {error_msg}", - requests.codes.CONFLICT: f"`{stream.name}` stream isn't available for repository `{repository}`, it seems like this repository is empty.", - } - return reasons_for_codes - - -class WorkflowRunsAvailabilityStrategy(RepositoryBasedAvailabilityStrategy): - """ - AvailabilityStrategy for the 'WorkflowRuns' stream. - """ - - def reasons_for_unavailable_status_codes( - self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError - ) -> Dict[int, str]: - stream_slice = StreamHelper().get_stream_slice(stream) - repository = stream_slice["repository"] - reasons_for_codes = super().reasons_for_unavailable_status_codes(stream, logger, source, error).copy() - server_error_msg = f"Syncing `{stream.name}` stream isn't available for repository `{repository}`." - reasons_for_codes[requests.codes.SERVER_ERROR] = server_error_msg - return reasons_for_codes - - -class ProjectsAvailabilityStrategy(RepositoryBasedAvailabilityStrategy): - """ - AvailabilityStrategy for the 'Projects' stream. - """ - - def reasons_for_unavailable_status_codes( - self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError - ) -> Dict[int, str]: - stream_slice = StreamHelper().get_stream_slice(stream) - repository = stream_slice["repository"] - reasons_for_codes = super().reasons_for_unavailable_status_codes(stream, logger, source, error).copy() - - # Some repos don't have projects enabled and we we get "410 Client Error: Gone for - # url: https://api.github.com/repos/xyz/projects?per_page=100" error. - gone_error_msg = f"`Projects` stream isn't available for repository `{repository}`." - reasons_for_codes[requests.codes.GONE] = gone_error_msg - - return reasons_for_codes diff --git a/airbyte-integrations/connectors/source-github/source_github/source.py b/airbyte-integrations/connectors/source-github/source_github/source.py index e47dd4b0e5139..db18092f14c7a 100644 --- a/airbyte-integrations/connectors/source-github/source_github/source.py +++ b/airbyte-integrations/connectors/source-github/source_github/source.py @@ -9,7 +9,7 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.http.requests_native_auth.token import MultipleTokenAuthenticator +from airbyte_cdk.sources.streams.http.auth import MultipleTokenAuthenticator from .streams import ( Assignees, diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index da13ed8832c32..4ad809cf8f42f 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -10,16 +10,10 @@ import pendulum import requests from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException +from requests.exceptions import HTTPError -from .availability_strategies import ( - OrganizationBasedAvailabilityStrategy, - ProjectsAvailabilityStrategy, - RepositoryBasedAvailabilityStrategy, - WorkflowRunsAvailabilityStrategy, -) from .graphql import CursorStorage, QueryReactions, get_query_pull_requests, get_query_reviews from .utils import getter @@ -130,6 +124,60 @@ def get_error_display_message(self, exception: BaseException) -> Optional[str]: return f'Please try to decrease the "Page size for large streams" below {self.page_size}. The stream "{self.name}" is a large stream, such streams can fail with 502 for high "page_size" values.' return super().get_error_display_message(exception) + def read_records(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]: + # get out the stream_slice parts for later use. + organisation = stream_slice.get("organization", "") + repository = stream_slice.get("repository", "") + # Reading records while handling the errors + try: + yield from super().read_records(stream_slice=stream_slice, **kwargs) + except HTTPError as e: + # This whole try/except situation in `read_records()` isn't good but right now in `self._send_request()` + # function we have `response.raise_for_status()` so we don't have much choice on how to handle errors. + # Bocked on https://github.com/airbytehq/airbyte/issues/3514. + if e.response.status_code == requests.codes.NOT_FOUND: + # A lot of streams are not available for repositories owned by a user instead of an organization. + if isinstance(self, Organizations): + error_msg = ( + f"Syncing `{self.__class__.__name__}` stream isn't available for organization `{stream_slice['organization']}`." + ) + else: + error_msg = f"Syncing `{self.__class__.__name__}` stream isn't available for repository `{stream_slice['repository']}`." + elif e.response.status_code == requests.codes.FORBIDDEN: + error_msg = str(e.response.json().get("message")) + # When using the `check_connection` method, we should raise an error if we do not have access to the repository. + if isinstance(self, Repositories): + raise e + # When `403` for the stream, that has no access to the organization's teams, based on OAuth Apps Restrictions: + # https://docs.github.com/en/organizations/restricting-access-to-your-organizations-data/enabling-oauth-app-access-restrictions-for-your-organization + # For all `Organisation` based streams + elif isinstance(self, Organizations) or isinstance(self, Teams) or isinstance(self, Users): + error_msg = ( + f"Syncing `{self.name}` stream isn't available for organization `{organisation}`. Full error message: {error_msg}" + ) + # For all other `Repository` base streams + else: + error_msg = ( + f"Syncing `{self.name}` stream isn't available for repository `{repository}`. Full error message: {error_msg}" + ) + elif e.response.status_code == requests.codes.GONE and isinstance(self, Projects): + # Some repos don't have projects enabled and we we get "410 Client Error: Gone for + # url: https://api.github.com/repos/xyz/projects?per_page=100" error. + error_msg = f"Syncing `Projects` stream isn't available for repository `{stream_slice['repository']}`." + elif e.response.status_code == requests.codes.CONFLICT: + error_msg = ( + f"Syncing `{self.name}` stream isn't available for repository " + f"`{stream_slice['repository']}`, it seems like this repository is empty." + ) + elif e.response.status_code == requests.codes.SERVER_ERROR and isinstance(self, WorkflowRuns): + error_msg = f"Syncing `{self.name}` stream isn't available for repository `{stream_slice['repository']}`." + else: + # most probably here we're facing a 500 server error and a risk to get a non-json response, so lets output response.text + self.logger.error(f"Undefined error while reading records: {e.response.text}") + raise e + + self.logger.warn(error_msg) + def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: @@ -163,10 +211,6 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, record["repository"] = stream_slice["repository"] return record - @property - def availability_strategy(self) -> Optional[AvailabilityStrategy]: - return RepositoryBasedAvailabilityStrategy() - class SemiIncrementalMixin: """ @@ -340,10 +384,6 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, record["organization"] = stream_slice["organization"] return record - @property - def availability_strategy(self) -> Optional[AvailabilityStrategy]: - return OrganizationBasedAvailabilityStrategy() - class Repositories(SemiIncrementalMixin, Organizations): """ @@ -552,10 +592,6 @@ def request_headers(self, **kwargs) -> Mapping[str, Any]: return {**base_headers, **headers} - @property - def availability_strategy(self) -> Optional[AvailabilityStrategy]: - return ProjectsAvailabilityStrategy() - class IssueEvents(SemiIncrementalMixin, GithubStream): """ @@ -1321,10 +1357,6 @@ def read_records( if created_at < break_point: break - @property - def availability_strategy(self) -> Optional[AvailabilityStrategy]: - return WorkflowRunsAvailabilityStrategy() - class WorkflowJobs(SemiIncrementalMixin, GithubStream): """ diff --git a/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py b/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py index 8d6b4caeace48..699ec27047d9d 100644 --- a/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py +++ b/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py @@ -3,7 +3,6 @@ # import json -import logging from http import HTTPStatus from pathlib import Path from unittest.mock import MagicMock, patch @@ -52,8 +51,6 @@ DEFAULT_BACKOFF_DELAYS = [5, 10, 20, 40, 80] -logger = logging.getLogger("source-github") - @responses.activate @patch("time.sleep") @@ -187,9 +184,7 @@ def test_stream_teams_404(): json={"message": "Not Found", "documentation_url": "https://docs.github.com/rest/reference/teams#list-teams"}, ) - stream_is_available, reason = stream.check_availability(logger) - assert not stream_is_available - assert "`Teams` stream isn't available for organization `org_name`." in reason + assert list(read_full_refresh(stream)) == [] assert len(responses.calls) == 1 assert responses.calls[0].request.url == "https://api.github.com/orgs/org_name/teams?per_page=100" @@ -242,9 +237,7 @@ def test_stream_repositories_404(): json={"message": "Not Found", "documentation_url": "https://docs.github.com/rest/reference/repos#list-organization-repositories"}, ) - stream_is_available, reason = stream.check_availability(logger) - assert not stream_is_available - assert "`Repositories` stream isn't available for organization `org_name`." in reason + assert list(read_full_refresh(stream)) == [] assert len(responses.calls) == 1 assert responses.calls[0].request.url == "https://api.github.com/orgs/org_name/repos?per_page=100&sort=updated&direction=desc" @@ -282,9 +275,7 @@ def test_stream_projects_disabled(): json={"message": "Projects are disabled for this repository", "documentation_url": "https://docs.github.com/v3/projects"}, ) - stream_is_available, reason = stream.check_availability(logger) - assert not stream_is_available - assert "`Projects` stream isn't available for repository `test_repo`." in reason + assert list(read_full_refresh(stream)) == [] assert len(responses.calls) == 1 assert responses.calls[0].request.url == "https://api.github.com/repos/test_repo/projects?per_page=100&state=all" diff --git a/airbyte-integrations/connectors/source-github/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-github/unit_tests/unit_test.py index ac6cfb0fb9999..c57abef1fa5c8 100644 --- a/airbyte-integrations/connectors/source-github/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-github/unit_tests/unit_test.py @@ -2,7 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from airbyte_cdk.sources.streams.http.requests_native_auth.token import MultipleTokenAuthenticator +from airbyte_cdk.sources.streams.http.auth import MultipleTokenAuthenticator from source_github import SourceGithub diff --git a/docs/integrations/sources/github.md b/docs/integrations/sources/github.md index 932f8b293ada5..017d83aaa192d 100644 --- a/docs/integrations/sources/github.md +++ b/docs/integrations/sources/github.md @@ -163,6 +163,7 @@ The GitHub connector should not run into GitHub API limitations under normal usa | Version | Date | Pull Request | Subject | | :------ | :--------- | :---------------------------------------------------------------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| 0.3.10 | 2022-12-15 | [20523](https://github.com/airbytehq/airbyte/pull/20523) | Revert changes from 0.3.9 | | 0.3.9 | 2022-12-14 | [19978](https://github.com/airbytehq/airbyte/pull/19978) | Update CDK dependency; move custom HTTPError handling into `AvailabilityStrategy` classes | | 0.3.8 | 2022-11-10 | [19299](https://github.com/airbytehq/airbyte/pull/19299) | Fix events and workflow_runs datetimes | | 0.3.7 | 2022-10-20 | [18213](https://github.com/airbytehq/airbyte/pull/18213) | Skip retry on HTTP 200 |