Skip to content

Commit 4e7f94a

Browse files
lazebnyioctavia-squidington-iiiaaronsteers
authored
feat(low-code): add use check availability flag to dynamic check (#293)
Co-authored-by: octavia-squidington-iii <contact@airbyte.com> Co-authored-by: Aaron ("AJ") Steers <aj@airbyte.io>
1 parent 3af96dc commit 4e7f94a

File tree

7 files changed

+71
-32
lines changed

7 files changed

+71
-32
lines changed

airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py

+19-9
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ class CheckDynamicStream(ConnectionChecker):
2121
stream_count (int): numbers of streams to check
2222
"""
2323

24+
# TODO: Add field stream_names to check_connection for static streams
25+
# https://github.com/airbytehq/airbyte-python-cdk/pull/293#discussion_r1934933483
26+
2427
stream_count: int
2528
parameters: InitVar[Mapping[str, Any]]
29+
use_check_availability: bool = True
2630

2731
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
2832
self._parameters = parameters
@@ -31,21 +35,27 @@ def check_connection(
3135
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
3236
) -> Tuple[bool, Any]:
3337
streams = source.streams(config=config)
38+
3439
if len(streams) == 0:
3540
return False, f"No streams to connect to from source {source}"
41+
if not self.use_check_availability:
42+
return True, None
43+
44+
availability_strategy = HttpAvailabilityStrategy()
3645

37-
for stream_index in range(min(self.stream_count, len(streams))):
38-
stream = streams[stream_index]
39-
availability_strategy = HttpAvailabilityStrategy()
40-
try:
46+
try:
47+
for stream in streams[: min(self.stream_count, len(streams))]:
4148
stream_is_available, reason = availability_strategy.check_availability(
4249
stream, logger
4350
)
4451
if not stream_is_available:
52+
logger.warning(f"Stream {stream.name} is not available: {reason}")
4553
return False, reason
46-
except Exception as error:
47-
logger.error(
48-
f"Encountered an error trying to connect to stream {stream.name}. Error: \n {traceback.format_exc()}"
49-
)
50-
return False, f"Unable to connect to stream {stream.name} - {error}"
54+
except Exception as error:
55+
error_message = (
56+
f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
57+
)
58+
logger.error(error_message, exc_info=True)
59+
return False, error_message
60+
5161
return True, None

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,11 @@ definitions:
320320
title: Stream Count
321321
description: Numbers of the streams to try reading from when running a check operation.
322322
type: integer
323+
use_check_availability:
324+
title: Use Check Availability
325+
description: Enables stream check availability. This field is automatically set by the CDK.
326+
type: boolean
327+
default: true
323328
CompositeErrorHandler:
324329
title: Composite Error Handler
325330
description: Error handler that sequentially iterates over a list of error handlers.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+5
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ class CheckDynamicStream(BaseModel):
5959
description="Numbers of the streams to try reading from when running a check operation.",
6060
title="Stream Count",
6161
)
62+
use_check_availability: Optional[bool] = Field(
63+
True,
64+
description="Enables stream check availability. This field is automatically set by the CDK.",
65+
title="Use Check Availability",
66+
)
6267

6368

6469
class ConcurrencyLevel(BaseModel):

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -903,7 +903,15 @@ def create_check_stream(model: CheckStreamModel, config: Config, **kwargs: Any)
903903
def create_check_dynamic_stream(
904904
model: CheckDynamicStreamModel, config: Config, **kwargs: Any
905905
) -> CheckDynamicStream:
906-
return CheckDynamicStream(stream_count=model.stream_count, parameters={})
906+
assert model.use_check_availability is not None # for mypy
907+
908+
use_check_availability = model.use_check_availability
909+
910+
return CheckDynamicStream(
911+
stream_count=model.stream_count,
912+
use_check_availability=use_check_availability,
913+
parameters={},
914+
)
907915

908916
def create_composite_error_handler(
909917
self, model: CompositeErrorHandlerModel, config: Config, **kwargs: Any

airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,16 @@ def _create_error_message(self, response: requests.Response) -> Optional[str]:
151151
:param response: The HTTP response which can be used during interpolation
152152
:return: The evaluated error message string to be emitted
153153
"""
154-
return self.error_message.eval( # type: ignore [no-any-return, union-attr]
154+
return self.error_message.eval( # type: ignore[no-any-return, union-attr]
155155
self.config, response=self._safe_response_json(response), headers=response.headers
156156
)
157157

158158
def _response_matches_predicate(self, response: requests.Response) -> bool:
159159
return (
160160
bool(
161-
self.predicate.condition # type: ignore [union-attr]
162-
and self.predicate.eval( # type: ignore [union-attr]
163-
None, # type: ignore [arg-type]
161+
self.predicate.condition # type:ignore[union-attr]
162+
and self.predicate.eval( # type:ignore[union-attr]
163+
None, # type: ignore[arg-type]
164164
response=self._safe_response_json(response),
165165
headers=response.headers,
166166
)

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py

+1
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str,
225225
return self._get_airbyte_type(complex_type.field_type)
226226

227227
field_type = self._get_airbyte_type(complex_type.field_type)
228+
228229
field_type["items"] = (
229230
self._get_airbyte_type(complex_type.items)
230231
if isinstance(complex_type.items, str)

unit_tests/sources/declarative/checks/test_check_dynamic_stream.py

+28-18
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import json
66
import logging
7+
from copy import deepcopy
78

89
import pytest
910

@@ -104,56 +105,65 @@
104105

105106

106107
@pytest.mark.parametrize(
107-
"response_code, available_expectation, expected_messages",
108+
"response_code, available_expectation, use_check_availability, expected_messages",
108109
[
109110
pytest.param(
110111
404,
111112
False,
113+
True,
112114
["Not found. The requested resource was not found on the server."],
113115
id="test_stream_unavailable_unhandled_error",
114116
),
115117
pytest.param(
116118
403,
117119
False,
120+
True,
118121
["Forbidden. You don't have permission to access this resource."],
119122
id="test_stream_unavailable_handled_error",
120123
),
121-
pytest.param(200, True, [], id="test_stream_available"),
124+
pytest.param(200, True, True, [], id="test_stream_available"),
125+
pytest.param(200, True, False, [], id="test_stream_available"),
122126
pytest.param(
123127
401,
124128
False,
129+
True,
125130
["Unauthorized. Please ensure you are authenticated correctly."],
126131
id="test_stream_unauthorized_error",
127132
),
128133
],
129134
)
130-
def test_check_dynamic_stream(response_code, available_expectation, expected_messages):
135+
def test_check_dynamic_stream(
136+
response_code, available_expectation, use_check_availability, expected_messages
137+
):
138+
manifest = deepcopy(_MANIFEST)
139+
131140
with HttpMocker() as http_mocker:
132-
http_mocker.get(
133-
HttpRequest(url="https://api.test.com/items"),
134-
HttpResponse(
135-
body=json.dumps(
136-
[
137-
{"id": 1, "name": "item_1"},
138-
{"id": 2, "name": "item_2"},
139-
]
140-
)
141-
),
142-
)
143-
http_mocker.get(
144-
HttpRequest(url="https://api.test.com/items/1"),
145-
HttpResponse(body=json.dumps(expected_messages), status_code=response_code),
141+
items_request = HttpRequest(url="https://api.test.com/items")
142+
items_response = HttpResponse(
143+
body=json.dumps([{"id": 1, "name": "item_1"}, {"id": 2, "name": "item_2"}])
146144
)
145+
http_mocker.get(items_request, items_response)
146+
147+
item_request = HttpRequest(url="https://api.test.com/items/1")
148+
item_response = HttpResponse(body=json.dumps(expected_messages), status_code=response_code)
149+
item_request_count = 1
150+
http_mocker.get(item_request, item_response)
151+
152+
if not use_check_availability:
153+
manifest["check"]["use_check_availability"] = False
154+
item_request_count = 0 # stream only created and data request not called
147155

148156
source = ConcurrentDeclarativeSource(
149-
source_config=_MANIFEST,
157+
source_config=manifest,
150158
config=_CONFIG,
151159
catalog=None,
152160
state=None,
153161
)
154162

155163
stream_is_available, reason = source.check_connection(logger, _CONFIG)
156164

165+
http_mocker.assert_number_of_calls(item_request, item_request_count)
166+
157167
assert stream_is_available == available_expectation
158168
for message in expected_messages:
159169
assert message in reason

0 commit comments

Comments
 (0)