|
8 | 8 | from typing import Any, List, Mapping, MutableMapping, Optional, Tuple
|
9 | 9 |
|
10 | 10 | import pendulum
|
11 |
| -import stripe |
12 | 11 | from airbyte_cdk.entrypoint import logger as entrypoint_logger
|
13 | 12 | from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType
|
14 | 13 | from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
|
@@ -107,14 +106,29 @@ def validate_and_fill_with_defaults(config: MutableMapping[str, Any]) -> Mutable
|
107 | 106 | return config
|
108 | 107 |
|
109 | 108 | def check_connection(self, logger: logging.Logger, config: MutableMapping[str, Any]) -> Tuple[bool, Any]:
|
110 |
| - self.validate_and_fill_with_defaults(config) |
111 |
| - stripe.api_key = config["client_secret"] |
| 109 | + args = self._get_stream_base_args(config) |
| 110 | + account_stream = StripeStream(name="accounts", path="accounts", use_cache=USE_CACHE, **args) |
112 | 111 | try:
|
113 |
| - stripe.Account.retrieve(config["account_id"]) |
114 |
| - except (stripe.error.AuthenticationError, stripe.error.PermissionError) as e: |
115 |
| - return False, str(e) |
| 112 | + next(account_stream.read_records(sync_mode=SyncMode.full_refresh)) |
| 113 | + except AirbyteTracedException as error: |
| 114 | + if error.failure_type == FailureType.config_error: |
| 115 | + return False, error.message |
| 116 | + raise error |
116 | 117 | return True, None
|
117 | 118 |
|
| 119 | + def _get_stream_base_args(self, config: MutableMapping[str, Any]) -> MutableMapping[str, Any]: |
| 120 | + config = self.validate_and_fill_with_defaults(config) |
| 121 | + authenticator = TokenAuthenticator(config["client_secret"]) |
| 122 | + start_timestamp = self._start_date_to_timestamp(config) |
| 123 | + args = { |
| 124 | + "authenticator": authenticator, |
| 125 | + "account_id": config["account_id"], |
| 126 | + "start_date": start_timestamp, |
| 127 | + "slice_range": config["slice_range"], |
| 128 | + "api_budget": self.get_api_call_budget(config), |
| 129 | + } |
| 130 | + return args |
| 131 | + |
118 | 132 | @staticmethod
|
119 | 133 | def customers(**args):
|
120 | 134 | # The Customers stream is instantiated in a dedicated method to allow parametrization and avoid duplicated code.
|
@@ -174,17 +188,7 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget:
|
174 | 188 | return HttpAPIBudget(policies=policies)
|
175 | 189 |
|
176 | 190 | def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
|
177 |
| - config = self.validate_and_fill_with_defaults(config) |
178 |
| - authenticator = TokenAuthenticator(config["client_secret"]) |
179 |
| - |
180 |
| - start_timestamp = self._start_date_to_timestamp(config) |
181 |
| - args = { |
182 |
| - "authenticator": authenticator, |
183 |
| - "account_id": config["account_id"], |
184 |
| - "start_date": start_timestamp, |
185 |
| - "slice_range": config["slice_range"], |
186 |
| - "api_budget": self.get_api_call_budget(config), |
187 |
| - } |
| 191 | + args = self._get_stream_base_args(config) |
188 | 192 | incremental_args = {**args, "lookback_window_days": config["lookback_window_days"]}
|
189 | 193 | subscriptions = IncrementalStripeStream(
|
190 | 194 | name="subscriptions",
|
@@ -532,7 +536,7 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
|
532 | 536 | ),
|
533 | 537 | ]
|
534 | 538 |
|
535 |
| - state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in streams}, state=self._state) |
| 539 | + state_manager = ConnectorStateManager(state=self._state) |
536 | 540 | return [
|
537 | 541 | self._to_concurrent(
|
538 | 542 | stream,
|
|
0 commit comments