-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathhttp_requester.py
428 lines (383 loc) · 16.8 KB
/
http_requester.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
import os
from dataclasses import InitVar, dataclass, field
from typing import Any, Callable, Mapping, MutableMapping, Optional, Union
from urllib.parse import urljoin
import requests
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import (
DeclarativeAuthenticator,
NoAuth,
)
from airbyte_cdk.sources.declarative.decoders import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
InterpolatedString,
)
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
)
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester
from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
from airbyte_cdk.sources.streams.call_rate import APIBudget
from airbyte_cdk.sources.streams.http import HttpClient
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
from airbyte_cdk.sources.types import Config, EmptyString, StreamSlice, StreamState
from airbyte_cdk.utils.mapping_helpers import (
combine_mappings,
get_interpolation_context,
)
@dataclass
class HttpRequester(Requester):
"""
Default implementation of a Requester
Attributes:
name (str): Name of the stream. Only used for request/response caching
url_base (Union[InterpolatedString, str]): Base url to send requests to
path (Union[InterpolatedString, str]): Path to send requests to
http_method (Union[str, HttpMethod]): HTTP method to use when sending requests
request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests
authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source
error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors
backoff_strategies (Optional[List[BackoffStrategy]]): List of backoff strategies to use when retrying requests
config (Config): The user-provided configuration as specified by the source's spec
use_cache (bool): Indicates that data should be cached for this stream
"""
name: str
url_base: Union[InterpolatedString, str]
config: Config
parameters: InitVar[Mapping[str, Any]]
path: Optional[Union[InterpolatedString, str]] = None
authenticator: Optional[DeclarativeAuthenticator] = None
http_method: Union[str, HttpMethod] = HttpMethod.GET
request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None
error_handler: Optional[ErrorHandler] = None
api_budget: Optional[APIBudget] = None
disable_retries: bool = False
message_repository: MessageRepository = NoopMessageRepository()
use_cache: bool = False
_exit_on_rate_limit: bool = False
stream_response: bool = False
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._url_base = InterpolatedString.create(self.url_base, parameters=parameters)
self._path = InterpolatedString.create(
self.path if self.path else EmptyString, parameters=parameters
)
if self.request_options_provider is None:
self._request_options_provider = InterpolatedRequestOptionsProvider(
config=self.config, parameters=parameters
)
elif isinstance(self.request_options_provider, dict):
self._request_options_provider = InterpolatedRequestOptionsProvider(
config=self.config, **self.request_options_provider
)
else:
self._request_options_provider = self.request_options_provider
self._authenticator = self.authenticator or NoAuth(parameters=parameters)
self._http_method = (
HttpMethod[self.http_method] if isinstance(self.http_method, str) else self.http_method
)
self.error_handler = self.error_handler
self._parameters = parameters
if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"):
backoff_strategies = self.error_handler.backoff_strategies # type: ignore
else:
backoff_strategies = None
self._http_client = HttpClient(
name=self.name,
logger=self.logger,
error_handler=self.error_handler,
api_budget=self.api_budget,
authenticator=self._authenticator,
use_cache=self.use_cache,
backoff_strategy=backoff_strategies,
disable_retries=self.disable_retries,
message_repository=self.message_repository,
)
@property
def exit_on_rate_limit(self) -> bool:
return self._exit_on_rate_limit
@exit_on_rate_limit.setter
def exit_on_rate_limit(self, value: bool) -> None:
self._exit_on_rate_limit = value
def get_authenticator(self) -> DeclarativeAuthenticator:
return self._authenticator
def get_url_base(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> str:
interpolation_context = get_interpolation_context(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
return os.path.join(self._url_base.eval(self.config, **interpolation_context), EmptyString)
def get_path(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> str:
interpolation_context = get_interpolation_context(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
path = str(self._path.eval(self.config, **interpolation_context))
return path.lstrip("/")
def get_method(self) -> HttpMethod:
return self._http_method
def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
return self._request_options_provider.get_request_params(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
def get_request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._request_options_provider.get_request_headers(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
# fixing request options provider types has a lot of dependencies
def get_request_body_data( # type: ignore
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Union[Mapping[str, Any], str]:
return (
self._request_options_provider.get_request_body_data(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
or {}
)
# fixing request options provider types has a lot of dependencies
def get_request_body_json( # type: ignore
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping[str, Any]]:
return self._request_options_provider.get_request_body_json(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
@property
def logger(self) -> logging.Logger:
return logging.getLogger(f"airbyte.HttpRequester.{self.name}")
def _get_request_options(
self,
stream_state: Optional[StreamState],
stream_slice: Optional[StreamSlice],
next_page_token: Optional[Mapping[str, Any]],
requester_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
auth_options_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
extra_options: Optional[Union[Mapping[str, Any], str]] = None,
) -> Union[Mapping[str, Any], str]:
"""
Get the request_option from the requester, the authenticator and extra_options passed in.
Raise a ValueError if there's a key collision
Returned merged mapping otherwise
"""
is_body_json = requester_method.__name__ == "get_request_body_json"
return combine_mappings(
[
requester_method(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
auth_options_method(),
extra_options,
],
allow_same_value_merge=is_body_json,
)
def _request_headers(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
extra_headers: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""
Specifies request headers.
Authentication headers will overwrite any overlapping headers returned from this method.
"""
headers = self._get_request_options(
stream_state,
stream_slice,
next_page_token,
self.get_request_headers,
self.get_authenticator().get_auth_header,
extra_headers,
)
if isinstance(headers, str):
raise ValueError("Request headers cannot be a string")
return {str(k): str(v) for k, v in headers.items()}
def _request_params(
self,
stream_state: Optional[StreamState],
stream_slice: Optional[StreamSlice],
next_page_token: Optional[Mapping[str, Any]],
extra_params: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
"""
options = self._get_request_options(
stream_state,
stream_slice,
next_page_token,
self.get_request_params,
self.get_authenticator().get_request_params,
extra_params,
)
if isinstance(options, str):
raise ValueError("Request params cannot be a string")
for k, v in options.items():
if isinstance(v, (dict,)):
raise ValueError(
f"Invalid value for `{k}` parameter. The values of request params cannot be an object."
)
return options
def _request_body_data(
self,
stream_state: Optional[StreamState],
stream_slice: Optional[StreamSlice],
next_page_token: Optional[Mapping[str, Any]],
extra_body_data: Optional[Union[Mapping[str, Any], str]] = None,
) -> Optional[Union[Mapping[str, Any], str]]:
"""
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is.
If returns a dict that it will be converted to a urlencoded form.
E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
"""
# Warning: use self.state instead of the stream_state passed as argument!
return self._get_request_options(
stream_state,
stream_slice,
next_page_token,
self.get_request_body_data,
self.get_authenticator().get_request_body_data,
extra_body_data,
)
def _request_body_json(
self,
stream_state: Optional[StreamState],
stream_slice: Optional[StreamSlice],
next_page_token: Optional[Mapping[str, Any]],
extra_body_json: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping[str, Any]]:
"""
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
"""
# Warning: use self.state instead of the stream_state passed as argument!
options = self._get_request_options(
stream_state,
stream_slice,
next_page_token,
self.get_request_body_json,
self.get_authenticator().get_request_body_json,
extra_body_json,
)
if isinstance(options, str):
raise ValueError("Request body json cannot be a string")
return options
@classmethod
def _join_url(cls, url_base: str, path: str) -> str:
"""
Joins a base URL with a given path and returns the resulting URL with any trailing slash removed.
This method ensures that there are no duplicate slashes when concatenating the base URL and the path,
which is useful when the full URL is provided from an interpolation context.
Args:
url_base (str): The base URL to which the path will be appended.
path (str): The path to join with the base URL.
Returns:
str: The resulting joined URL.
Note:
Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869
- If the path is an empty string or None, the method returns the base URL with any trailing slash removed.
Example:
1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint'
2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint'
3) _join_url("https://example.com/api/", "") >> 'https://example.com/api'
4) _join_url("https://example.com/api", None) >> 'https://example.com/api'
"""
# return a full-url if provided directly from interpolation context
if path == EmptyString or path is None:
return url_base.rstrip("/")
return urljoin(url_base, path)
def send_request(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
path: Optional[str] = None,
request_headers: Optional[Mapping[str, Any]] = None,
request_params: Optional[Mapping[str, Any]] = None,
request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
request_body_json: Optional[Mapping[str, Any]] = None,
log_formatter: Optional[Callable[[requests.Response], Any]] = None,
) -> Optional[requests.Response]:
request, response = self._http_client.send_request(
http_method=self.get_method().value,
url=self._join_url(
self.get_url_base(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
path
or self.get_path(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
),
request_kwargs={"stream": self.stream_response},
headers=self._request_headers(
stream_state, stream_slice, next_page_token, request_headers
),
params=self._request_params(
stream_state, stream_slice, next_page_token, request_params
),
json=self._request_body_json(
stream_state, stream_slice, next_page_token, request_body_json
),
data=self._request_body_data(
stream_state, stream_slice, next_page_token, request_body_data
),
dedupe_query_params=True,
log_formatter=log_formatter,
exit_on_rate_limit=self._exit_on_rate_limit,
)
return response