Skip to content

Commit

Permalink
CDK 0.15.0 and source-github 0.3.10 -- revert AvailabillityStrategy c…
Browse files Browse the repository at this point in the history
…hanges (#20523)

* Revert "source-github: move known error handling to GithubAvailabilityStrategy (#19978)"

This reverts commit f97db17.

* Revert "🐛 Python CDK: fix `StopIteration` error for `check_availability` (#20429)"

This reverts commit 4e9b014.

* Revert "CDK: `AbstractSource.read()` skips syncing stream if its unavailable (add `AvailabilityStrategy` concept) (#19977)"

This reverts commit 55a3288.

* Restore changelog entries

* bump CDK version

* Bump Github version

* Re-add removed dependencies

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
erohmensing and octavia-squidington-iii authored Dec 15, 2022
1 parent 9cc3005 commit 8bb4128
Show file tree
Hide file tree
Showing 26 changed files with 118 additions and 839 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.15.0
Reverts additions from versions 0.13.0 and 0.13.3.

## 0.14.0
Low-code: Add token_expiry_date_format to OAuth Authenticator. Resolve ref schema

Expand Down
6 changes: 1 addition & 5 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ def read(
f"The requested stream {configured_stream.stream.name} was not found in the source."
f" Available streams: {stream_instances.keys()}"
)
stream_is_available, error = stream_instance.check_availability(logger, self)
if not stream_is_available:
logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. Error: {error}")
continue
try:
timer.start_event(f"Syncing stream {configured_stream.stream.name}")
yield from self._read_stream(
Expand Down Expand Up @@ -191,7 +187,7 @@ def _read_stream(
@staticmethod
def _limit_reached(internal_config: InternalConfig, records_counter: int) -> bool:
"""
Check if record count reached limit set by internal config.
Check if record count reached liimt set by internal config.
:param internal_config - internal CDK configuration separated from user defined config
:records_counter - number of records already red
:return True if limit reached, False otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Tuple

from airbyte_cdk.models.airbyte_protocol import SyncMode
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.utils.stream_helpers import StreamHelper
from dataclasses_jsonschema import JsonSchemaMixin


Expand All @@ -33,19 +33,29 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi
if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
for stream_name in self.stream_names:
if stream_name not in stream_name_to_stream.keys():
if stream_name in stream_name_to_stream.keys():
stream = stream_name_to_stream[stream_name]
try:
# Some streams need a stream slice to read records (eg if they have a SubstreamSlicer)
stream_slice = self._get_stream_slice(stream)
records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
next(records)
except Exception as error:
return False, f"Unable to connect to stream {stream_name} - {error}"
else:
raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}")

stream = stream_name_to_stream[stream_name]
try:
if stream.availability_strategy is not None:
stream_is_available, reason = stream.check_availability(logger, source)
if not stream_is_available:
return False, reason
else:
stream_helper = StreamHelper()
stream_helper.get_first_record(stream)
except Exception as error:
return False, f"Unable to connect to stream {stream_name} - {error}"

return True, None

def _get_stream_slice(self, stream):
# We wrap the return output of stream_slices() because some implementations return types that are iterable,
# but not iterators such as lists or tuples
slices = iter(
stream.stream_slices(
cursor_field=stream.cursor_field,
sync_mode=SyncMode.full_refresh,
)
)
try:
return next(slices)
except StopIteration:
return {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class DeclarativeSource(AbstractSource):
@property
@abstractmethod
def connection_checker(self) -> ConnectionChecker:
"""Returns the ConnectionChecker to use for the `check` operation"""
"""Returns the ConnectioChecker to use for the `check` operation"""

def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
Expand Down

This file was deleted.

29 changes: 1 addition & 28 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

import inspect
import logging
import typing
from abc import ABC, abstractmethod
from functools import lru_cache
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import airbyte_cdk.sources.utils.casing as casing
from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode
Expand All @@ -18,10 +17,6 @@
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from deprecated.classic import deprecated

if typing.TYPE_CHECKING:
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy

# A stream's read method can return one of the following types:
# Mapping[str, Any]: The content of an AirbyteRecordMessage
# AirbyteRecordMessage: An AirbyteRecordMessage
Expand Down Expand Up @@ -175,28 +170,6 @@ def source_defined_cursor(self) -> bool:
"""
return True

def check_availability(self, logger: logging.Logger, source: Optional["Source"] = None) -> Tuple[bool, Optional[str]]:
"""
Checks whether this stream is available.
:param logger: source logger
:param source: (optional) source
:return: A tuple of (boolean, str). If boolean is true, then this stream
is available, and no str is required. Otherwise, this stream is unavailable
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""
if self.availability_strategy:
return self.availability_strategy.check_availability(self, logger, source)
return True, None

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
"""
:return: The AvailabilityStrategy used to check whether this stream is available.
"""
return None

@property
@abstractmethod
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
Expand Down

This file was deleted.

6 changes: 0 additions & 6 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
import requests
import requests_cache
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import Stream, StreamData
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from requests.auth import AuthBase
from requests_cache.session import CachedSession

Expand Down Expand Up @@ -115,10 +113,6 @@ def retry_factor(self) -> float:
def authenticator(self) -> HttpAuthenticator:
return self._authenticator

@property
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return HttpAvailabilityStrategy()

@abstractmethod
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Expand Down
47 changes: 0 additions & 47 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py

This file was deleted.

11 changes: 0 additions & 11 deletions airbyte-cdk/python/docs/concepts/http-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,3 @@ When we are dealing with streams that depend on the results of another stream, w
If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc..
override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can
be returned as a keyword argument.

## Stream Availability

The CDK defines an `AvailabilityStrategy` for a stream, which is used to perform the `check_availability` method. This method checks whether
the stream is available before performing `read_records`.

For HTTP streams, a default `HttpAvailabilityStrategy` is defined, which attempts to read the first record of the stream, and excepts
a dictionary of known error codes and associated reasons, `reasons_for_unavailable_status_codes`. By default, this list contains only
`requests.status_codes.FORBIDDEN` (403), with an associated error message that tells the user that they are likely missing permissions associated with that stream.

You can override these known errors to except more error codes and inform the user how to resolve errors.
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.14.0",
version="0.15.0",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Loading

0 comments on commit 8bb4128

Please sign in to comment.