Skip to content

Commit 34fc880

Browse files
committed
fix
1 parent 4dbb6fe commit 34fc880

9 files changed

+41
-18
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+14-2
Original file line numberDiff line numberDiff line change
@@ -1779,6 +1779,8 @@ definitions:
17791779
- stream_interval
17801780
- stream_partition
17811781
- stream_slice
1782+
- creation_response
1783+
- download_target
17821784
examples:
17831785
- "/products"
17841786
- "/quotes/{{ stream_partition['id'] }}/quote_line_groups"
@@ -3223,7 +3225,7 @@ definitions:
32233225
- polling_requester
32243226
- download_requester
32253227
- status_extractor
3226-
- urls_extractor
3228+
- download_target_extractor
32273229
properties:
32283230
type:
32293231
type: string
@@ -3240,7 +3242,7 @@ definitions:
32403242
anyOf:
32413243
- "$ref": "#/definitions/CustomRecordExtractor"
32423244
- "$ref": "#/definitions/DpathExtractor"
3243-
urls_extractor:
3245+
download_target_extractor:
32443246
description: Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.
32453247
anyOf:
32463248
- "$ref": "#/definitions/CustomRecordExtractor"
@@ -3667,6 +3669,16 @@ interpolation:
36673669
self: https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token=
36683670
next: https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token=0236d6d2
36693671
count: 82
3672+
- title: creation_response
3673+
description: The response received from the creation_requester in the AsyncRetriever component.
3674+
type: object
3675+
examples:
3676+
- id: "1234"
3677+
- title: download_target
3678+
description: The `URL` received from the polling_requester in the AsyncRetriever with jobStatus as `COMPLETED`.
3679+
type: string
3680+
examples:
3681+
- url: "https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token=0236d6d2&filename=xxx_yyy_zzz.csv"
36703682
- title: stream_interval
36713683
description: The current stream interval being processed. The keys are defined by the incremental sync component. Default keys are `start_time` and `end_time`.
36723684
type: object

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2263,7 +2263,7 @@ class AsyncRetriever(BaseModel):
22632263
status_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
22642264
..., description="Responsible for fetching the actual status of the async job."
22652265
)
2266-
urls_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
2266+
download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
22672267
...,
22682268
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
22692269
)

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -2757,8 +2757,8 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
27572757
status_extractor = self._create_component_from_model(
27582758
model=model.status_extractor, decoder=decoder, config=config, name=name
27592759
)
2760-
urls_extractor = self._create_component_from_model(
2761-
model=model.urls_extractor, decoder=decoder, config=config, name=name
2760+
download_target_extractor = self._create_component_from_model(
2761+
model=model.download_target_extractor, decoder=decoder, config=config, name=name
27622762
)
27632763
job_repository: AsyncJobRepository = AsyncHttpJobRepository(
27642764
creation_requester=creation_requester,
@@ -2769,7 +2769,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
27692769
delete_requester=delete_requester,
27702770
status_extractor=status_extractor,
27712771
status_mapping=self._create_async_job_status_mapping(model.status_mapping, config),
2772-
urls_extractor=urls_extractor,
2772+
download_target_extractor=download_target_extractor,
27732773
)
27742774

27752775
async_job_partition_router = AsyncJobPartitionRouter(

airbyte_cdk/sources/declarative/requesters/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# AsyncHttpJobRepository sequence diagram
22

33
- Components marked as optional are not required and can be ignored.
4-
- if `url_requester` is not provided, `urls_extractor` will get urls from the `polling_job_response`
4+
- if `url_requester` is not provided, `download_target_extractor` will get urls from the `polling_job_response`
55
- interpolation_context, e.g. `create_job_response` or `polling_job_response` can be obtained from stream_slice
66

77
```mermaid

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class AsyncHttpJobRepository(AsyncJobRepository):
4343
delete_requester: Optional[Requester]
4444
status_extractor: DpathExtractor
4545
status_mapping: Mapping[str, AsyncJobStatus]
46-
urls_extractor: DpathExtractor
46+
download_target_extractor: DpathExtractor
4747

4848
job_timeout: Optional[timedelta] = None
4949
record_extractor: RecordExtractor = field(
@@ -216,7 +216,10 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
216216
stream_slice = StreamSlice(
217217
partition=job_slice.partition,
218218
cursor_slice=job_slice.cursor_slice,
219-
extra_fields={**job_slice.extra_fields, "url": url},
219+
extra_fields={
220+
**job_slice.extra_fields,
221+
"download_target": url,
222+
},
220223
)
221224
for message in self.download_retriever.read_records({}, stream_slice):
222225
if isinstance(message, Record):
@@ -269,9 +272,11 @@ def _clean_up_job(self, job_id: str) -> None:
269272
del self._polling_job_response_by_id[job_id]
270273

271274
def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
275+
creation_response = self._create_job_response_by_id[job.api_job_id()].json()
272276
stream_slice = StreamSlice(
273-
partition={"create_job_response": self._create_job_response_by_id[job.api_job_id()]},
277+
partition={},
274278
cursor_slice={},
279+
extra_fields={"creation_response": creation_response},
275280
)
276281
return stream_slice
277282

@@ -292,4 +297,4 @@ def _get_download_url(self, job: AsyncJob) -> Iterable[str]:
292297
failure_type=FailureType.system_error,
293298
)
294299

295-
yield from self.urls_extractor.extract_records(url_response) # type: ignore # we expect urls_extractor to always return list of strings
300+
yield from self.download_target_extractor.extract_records(url_response) # type: ignore # we expect download_target_extractor to always return list of strings

airbyte_cdk/sources/declarative/requesters/http_requester.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
8585
self._parameters = parameters
8686

8787
if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"):
88-
backoff_strategies = self.error_handler.backoff_strategies
88+
backoff_strategies = self.error_handler.backoff_strategies # type: ignore
8989
else:
9090
backoff_strategies = None
9191

@@ -125,6 +125,12 @@ def get_path(
125125
kwargs = {
126126
"stream_slice": stream_slice,
127127
"next_page_token": next_page_token,
128+
# update the interpolation context with extra fields, if passed.
129+
**(
130+
stream_slice.extra_fields
131+
if stream_slice is not None and hasattr(stream_slice, "extra_fields")
132+
else {}
133+
),
128134
}
129135
path = str(self._path.eval(self.config, **kwargs))
130136
return path.lstrip("/")

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -3607,7 +3607,7 @@ def test_create_async_retriever():
36073607
"timeout": ["timeout"],
36083608
"completed": ["ready"],
36093609
},
3610-
"urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]},
3610+
"download_target_extractor": {"type": "DpathExtractor", "field_path": ["urls"]},
36113611
"record_selector": {
36123612
"type": "RecordSelector",
36133613
"extractor": {"type": "DpathExtractor", "field_path": ["data"]},
@@ -3681,7 +3681,7 @@ def test_create_async_retriever():
36813681
assert job_repository.abort_requester
36823682
assert job_repository.delete_requester
36833683
assert job_repository.status_extractor
3684-
assert job_repository.urls_extractor
3684+
assert job_repository.download_target_extractor
36853685

36863686
selector = component.record_selector
36873687
extractor = selector.extractor

unit_tests/sources/declarative/requesters/test_http_job_repository.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def setUp(self) -> None:
6969
self._polling_job_requester = HttpRequester(
7070
name="stream <name>: polling",
7171
url_base=_URL_BASE,
72-
path=_EXPORT_PATH + "/{{stream_slice['create_job_response'].json()['id']}}",
72+
path=_EXPORT_PATH + "/{{creation_response['id']}}",
7373
error_handler=error_handler,
7474
http_method=HttpMethod.GET,
7575
config=_ANY_CONFIG,
@@ -84,7 +84,7 @@ def setUp(self) -> None:
8484
requester=HttpRequester(
8585
name="stream <name>: fetch_result",
8686
url_base="",
87-
path="{{stream_slice.extra_fields['url']}}",
87+
path="{{download_target}}",
8888
error_handler=error_handler,
8989
http_method=HttpMethod.GET,
9090
config=_ANY_CONFIG,
@@ -143,7 +143,7 @@ def setUp(self) -> None:
143143
"failure": AsyncJobStatus.FAILED,
144144
"pending": AsyncJobStatus.RUNNING,
145145
},
146-
urls_extractor=DpathExtractor(
146+
download_target_extractor=DpathExtractor(
147147
decoder=JsonDecoder(parameters={}),
148148
field_path=["urls"],
149149
config={},

unit_tests/sources/declarative/test_concurrent_declarative_source.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@
299299
"timeout": ["timeout"],
300300
"completed": ["ready"],
301301
},
302-
"urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]},
302+
"download_target_extractor": {"type": "DpathExtractor", "field_path": ["urls"]},
303303
"record_selector": {
304304
"type": "RecordSelector",
305305
"extractor": {"type": "DpathExtractor", "field_path": []},

0 commit comments

Comments
 (0)