Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: (CDK) (AsyncRetriever) - Improve UX on variable naming and interpolation #368

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1779,6 +1779,9 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- creation_response
- polling_response
- download_target
examples:
- "/products"
- "/quotes/{{ stream_partition['id'] }}/quote_line_groups"
Expand Down Expand Up @@ -3223,7 +3226,7 @@ definitions:
- polling_requester
- download_requester
- status_extractor
- urls_extractor
- download_target_extractor
properties:
type:
type: string
Expand All @@ -3240,7 +3243,7 @@ definitions:
anyOf:
- "$ref": "#/definitions/CustomRecordExtractor"
- "$ref": "#/definitions/DpathExtractor"
urls_extractor:
download_target_extractor:
description: Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.
anyOf:
- "$ref": "#/definitions/CustomRecordExtractor"
Expand All @@ -3261,7 +3264,7 @@ definitions:
anyOf:
- "$ref": "#/definitions/CustomRequester"
- "$ref": "#/definitions/HttpRequester"
url_requester:
download_target_requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.
anyOf:
- "$ref": "#/definitions/CustomRequester"
Expand Down Expand Up @@ -3667,6 +3670,21 @@ interpolation:
self: https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token=
next: https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token=0236d6d2
count: 82
- title: creation_response
description: The response received from the creation_requester in the AsyncRetriever component.
type: object
examples:
- id: "1234"
- title: polling_response
description: The response received from the polling_requester in the AsyncRetriever component.
type: object
examples:
- id: "1234"
- title: download_target
description: The `URL` received from the polling_requester in the AsyncRetriever with jobStatus as `COMPLETED`.
type: string
examples:
- "https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token=0236d6d2&filename=xxx_yyy_zzz.csv"
- title: stream_interval
description: The current stream interval being processed. The keys are defined by the incremental sync component. Default keys are `start_time` and `end_time`.
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2263,7 +2263,7 @@ class AsyncRetriever(BaseModel):
status_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
..., description="Responsible for fetching the actual status of the async job."
)
urls_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
...,
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
)
Expand All @@ -2278,7 +2278,7 @@ class AsyncRetriever(BaseModel):
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job.",
)
url_requester: Optional[Union[CustomRequester, HttpRequester]] = Field(
download_target_requester: Optional[Union[CustomRequester, HttpRequester]] = Field(
None,
description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2744,32 +2744,32 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
if model.delete_requester
else None
)
url_requester = (
download_target_requester = (
self._create_component_from_model(
model=model.url_requester,
model=model.download_target_requester,
decoder=decoder,
config=config,
name=f"job extract_url - {name}",
)
if model.url_requester
if model.download_target_requester
else None
)
status_extractor = self._create_component_from_model(
model=model.status_extractor, decoder=decoder, config=config, name=name
)
urls_extractor = self._create_component_from_model(
model=model.urls_extractor, decoder=decoder, config=config, name=name
download_target_extractor = self._create_component_from_model(
model=model.download_target_extractor, decoder=decoder, config=config, name=name
)
job_repository: AsyncJobRepository = AsyncHttpJobRepository(
creation_requester=creation_requester,
polling_requester=polling_requester,
download_retriever=download_retriever,
url_requester=url_requester,
download_target_requester=download_target_requester,
abort_requester=abort_requester,
delete_requester=delete_requester,
status_extractor=status_extractor,
status_mapping=self._create_async_job_status_mapping(model.status_mapping, config),
urls_extractor=urls_extractor,
download_target_extractor=download_target_extractor,
)

async_job_partition_router = AsyncJobPartitionRouter(
Expand Down
10 changes: 5 additions & 5 deletions airbyte_cdk/sources/declarative/requesters/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# AsyncHttpJobRepository sequence diagram

- Components marked as optional are not required and can be ignored.
- if `url_requester` is not provided, `urls_extractor` will get urls from the `polling_job_response`
- interpolation_context, e.g. `create_job_response` or `polling_job_response` can be obtained from stream_slice
- if `download_target_requester` is not provided, `download_target_extractor` will get urls from the `polling_response`
- interpolation_context, e.g. `creation_response` or `polling_response` can be obtained from stream_slice

```mermaid
---
Expand All @@ -12,7 +12,7 @@ sequenceDiagram
participant AsyncHttpJobRepository as AsyncOrchestrator
participant CreationRequester as creation_requester
participant PollingRequester as polling_requester
participant UrlRequester as url_requester (Optional)
participant UrlRequester as download_target_requester (Optional)
participant DownloadRetriever as download_retriever
participant AbortRequester as abort_requester (Optional)
participant DeleteRequester as delete_requester (Optional)
Expand All @@ -25,14 +25,14 @@ sequenceDiagram
loop Poll for job status
AsyncHttpJobRepository ->> PollingRequester: Check job status
PollingRequester ->> Reporting Server: Status request (interpolation_context: `create_job_response`)
PollingRequester ->> Reporting Server: Status request (interpolation_context: `creation_response`)
Reporting Server -->> PollingRequester: Status response
PollingRequester -->> AsyncHttpJobRepository: Job status
end
alt Status: Ready
AsyncHttpJobRepository ->> UrlRequester: Request download URLs (if applicable)
UrlRequester ->> Reporting Server: URL request (interpolation_context: `polling_job_response`)
UrlRequester ->> Reporting Server: URL request (interpolation_context: `polling_response`)
Reporting Server -->> UrlRequester: Download URLs
UrlRequester -->> AsyncHttpJobRepository: Download URLs
Expand Down
31 changes: 18 additions & 13 deletions airbyte_cdk/sources/declarative/requesters/http_job_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ class AsyncHttpJobRepository(AsyncJobRepository):
delete_requester: Optional[Requester]
status_extractor: DpathExtractor
status_mapping: Mapping[str, AsyncJobStatus]
urls_extractor: DpathExtractor
download_target_extractor: DpathExtractor

job_timeout: Optional[timedelta] = None
record_extractor: RecordExtractor = field(
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({})
)
url_requester: Optional[Requester] = (
download_target_requester: Optional[Requester] = (
None # use it in case polling_requester provides some <id> and extra request is needed to obtain list of urls to download from
)

Expand Down Expand Up @@ -211,12 +211,15 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
"""

for url in self._get_download_url(job):
for target_url in self._get_download_targets(job):
job_slice = job.job_parameters()
stream_slice = StreamSlice(
partition=job_slice.partition,
cursor_slice=job_slice.cursor_slice,
extra_fields={**job_slice.extra_fields, "url": url},
extra_fields={
**job_slice.extra_fields,
"download_target": target_url,
},
)
for message in self.download_retriever.read_records({}, stream_slice):
if isinstance(message, Record):
Expand Down Expand Up @@ -269,27 +272,29 @@ def _clean_up_job(self, job_id: str) -> None:
del self._polling_job_response_by_id[job_id]

def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
creation_response = self._create_job_response_by_id[job.api_job_id()].json()
stream_slice = StreamSlice(
partition={"create_job_response": self._create_job_response_by_id[job.api_job_id()]},
partition={},
cursor_slice={},
extra_fields={"creation_response": creation_response},
)
return stream_slice

def _get_download_url(self, job: AsyncJob) -> Iterable[str]:
if not self.url_requester:
def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
if not self.download_target_requester:
url_response = self._polling_job_response_by_id[job.api_job_id()]
else:
polling_response = self._polling_job_response_by_id[job.api_job_id()].json()
stream_slice: StreamSlice = StreamSlice(
partition={
"polling_job_response": self._polling_job_response_by_id[job.api_job_id()]
},
partition={},
cursor_slice={},
extra_fields={"polling_response": polling_response},
)
url_response = self.url_requester.send_request(stream_slice=stream_slice) # type: ignore # we expect url_requester to always be presented, otherwise raise an exception as we cannot proceed with the report
url_response = self.download_target_requester.send_request(stream_slice=stream_slice) # type: ignore # we expect download_target_requester to always be presented, otherwise raise an exception as we cannot proceed with the report
if not url_response:
raise AirbyteTracedException(
internal_message="Always expect a response or an exception from url_requester",
internal_message="Always expect a response or an exception from download_target_requester",
failure_type=FailureType.system_error,
)

yield from self.urls_extractor.extract_records(url_response) # type: ignore # we expect urls_extractor to always return list of strings
yield from self.download_target_extractor.extract_records(url_response) # type: ignore # we expect download_target_extractor to always return list of strings
8 changes: 7 additions & 1 deletion airbyte_cdk/sources/declarative/requesters/http_requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters

if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"):
backoff_strategies = self.error_handler.backoff_strategies
backoff_strategies = self.error_handler.backoff_strategies # type: ignore
else:
backoff_strategies = None

Expand Down Expand Up @@ -125,6 +125,12 @@ def get_path(
kwargs = {
"stream_slice": stream_slice,
"next_page_token": next_page_token,
# update the interpolation context with extra fields, if passed.
**(
stream_slice.extra_fields
if stream_slice is not None and hasattr(stream_slice, "extra_fields")
else {}
),
}
path = str(self._path.eval(self.config, **kwargs))
return path.lstrip("/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3607,15 +3607,15 @@ def test_create_async_retriever():
"timeout": ["timeout"],
"completed": ["ready"],
},
"urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]},
"download_target_extractor": {"type": "DpathExtractor", "field_path": ["urls"]},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": ["data"]},
},
"status_extractor": {"type": "DpathExtractor", "field_path": ["status"]},
"polling_requester": {
"type": "HttpRequester",
"path": "/v3/marketing/contacts/exports/{{stream_slice['create_job_response'].json()['id'] }}",
"path": "/v3/marketing/contacts/exports/{{creation_response['id'] }}",
"url_base": "https://api.sendgrid.com",
"http_method": "GET",
"authenticator": {
Expand All @@ -3635,19 +3635,19 @@ def test_create_async_retriever():
},
"download_requester": {
"type": "HttpRequester",
"path": "{{stream_slice['url']}}",
"path": "{{download_target}}",
"url_base": "",
"http_method": "GET",
},
"abort_requester": {
"type": "HttpRequester",
"path": "{{stream_slice['url']}}/abort",
"path": "{{download_target}}/abort",
"url_base": "",
"http_method": "POST",
},
"delete_requester": {
"type": "HttpRequester",
"path": "{{stream_slice['url']}}",
"path": "{{download_target}}",
"url_base": "",
"http_method": "POST",
},
Expand Down Expand Up @@ -3681,7 +3681,7 @@ def test_create_async_retriever():
assert job_repository.abort_requester
assert job_repository.delete_requester
assert job_repository.status_extractor
assert job_repository.urls_extractor
assert job_repository.download_target_extractor

selector = component.record_selector
extractor = selector.extractor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def setUp(self) -> None:
self._polling_job_requester = HttpRequester(
name="stream <name>: polling",
url_base=_URL_BASE,
path=_EXPORT_PATH + "/{{stream_slice['create_job_response'].json()['id']}}",
path=_EXPORT_PATH + "/{{creation_response['id']}}",
error_handler=error_handler,
http_method=HttpMethod.GET,
config=_ANY_CONFIG,
Expand All @@ -84,7 +84,7 @@ def setUp(self) -> None:
requester=HttpRequester(
name="stream <name>: fetch_result",
url_base="",
path="{{stream_slice.extra_fields['url']}}",
path="{{download_target}}",
error_handler=error_handler,
http_method=HttpMethod.GET,
config=_ANY_CONFIG,
Expand Down Expand Up @@ -143,7 +143,7 @@ def setUp(self) -> None:
"failure": AsyncJobStatus.FAILED,
"pending": AsyncJobStatus.RUNNING,
},
urls_extractor=DpathExtractor(
download_target_extractor=DpathExtractor(
decoder=JsonDecoder(parameters={}),
field_path=["urls"],
config={},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,15 @@
"timeout": ["timeout"],
"completed": ["ready"],
},
"urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]},
"download_target_extractor": {"type": "DpathExtractor", "field_path": ["urls"]},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"status_extractor": {"type": "DpathExtractor", "field_path": ["status"]},
"polling_requester": {
"type": "HttpRequester",
"path": "/async_job/{{stream_slice['create_job_response'].json()['id'] }}",
"path": "/async_job/{{creation_response['id'] }}",
"http_method": "GET",
"authenticator": {
"type": "BearerAuthenticator",
Expand Down
Loading