diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 5bd110c4e..c664e237a 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1779,6 +1779,9 @@ definitions: - stream_interval - stream_partition - stream_slice + - creation_response + - polling_response + - download_target examples: - "/products" - "/quotes/{{ stream_partition['id'] }}/quote_line_groups" @@ -3223,7 +3226,7 @@ definitions: - polling_requester - download_requester - status_extractor - - urls_extractor + - download_target_extractor properties: type: type: string @@ -3240,7 +3243,7 @@ definitions: anyOf: - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" - urls_extractor: + download_target_extractor: description: Responsible for fetching the final result `urls` provided by the completed / finished / ready async job. anyOf: - "$ref": "#/definitions/CustomRecordExtractor" @@ -3261,7 +3264,7 @@ definitions: anyOf: - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" - url_requester: + download_target_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job. anyOf: - "$ref": "#/definitions/CustomRequester" @@ -3667,6 +3670,21 @@ interpolation: self: https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token= next: https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token=0236d6d2 count: 82 + - title: creation_response + description: The response received from the creation_requester in the AsyncRetriever component. + type: object + examples: + - id: "1234" + - title: polling_response + description: The response received from the polling_requester in the AsyncRetriever component. + type: object + examples: + - id: "1234" + - title: download_target + description: The `URL` received from the polling_requester in the AsyncRetriever with jobStatus as `COMPLETED`. + type: string + examples: + - "https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token=0236d6d2&filename=xxx_yyy_zzz.csv" - title: stream_interval description: The current stream interval being processed. The keys are defined by the incremental sync component. Default keys are `start_time` and `end_time`. type: object diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 7ff18fa1d..abe4d89cf 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2263,7 +2263,7 @@ class AsyncRetriever(BaseModel): status_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - urls_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field( + download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field( ..., description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) @@ -2278,7 +2278,7 @@ class AsyncRetriever(BaseModel): ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job.", ) - url_requester: Optional[Union[CustomRequester, HttpRequester]] = Field( + download_target_requester: Optional[Union[CustomRequester, HttpRequester]] = Field( None, description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.", ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 452c4e84a..f6e1cc0d6 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2744,32 +2744,32 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie if model.delete_requester else None ) - url_requester = ( + download_target_requester = ( self._create_component_from_model( - model=model.url_requester, + model=model.download_target_requester, decoder=decoder, config=config, name=f"job extract_url - {name}", ) - if model.url_requester + if model.download_target_requester else None ) status_extractor = self._create_component_from_model( model=model.status_extractor, decoder=decoder, config=config, name=name ) - urls_extractor = self._create_component_from_model( - model=model.urls_extractor, decoder=decoder, config=config, name=name + download_target_extractor = self._create_component_from_model( + model=model.download_target_extractor, decoder=decoder, config=config, name=name ) job_repository: AsyncJobRepository = AsyncHttpJobRepository( creation_requester=creation_requester, polling_requester=polling_requester, download_retriever=download_retriever, - url_requester=url_requester, + download_target_requester=download_target_requester, abort_requester=abort_requester, delete_requester=delete_requester, status_extractor=status_extractor, status_mapping=self._create_async_job_status_mapping(model.status_mapping, config), - urls_extractor=urls_extractor, + download_target_extractor=download_target_extractor, ) async_job_partition_router = AsyncJobPartitionRouter( diff --git a/airbyte_cdk/sources/declarative/requesters/README.md b/airbyte_cdk/sources/declarative/requesters/README.md index de8b3380c..cfeaf7e76 100644 --- a/airbyte_cdk/sources/declarative/requesters/README.md +++ b/airbyte_cdk/sources/declarative/requesters/README.md @@ -1,8 +1,8 @@ # AsyncHttpJobRepository sequence diagram - Components marked as optional are not required and can be ignored. -- if `url_requester` is not provided, `urls_extractor` will get urls from the `polling_job_response` -- interpolation_context, e.g. `create_job_response` or `polling_job_response` can be obtained from stream_slice +- if `download_target_requester` is not provided, `download_target_extractor` will get urls from the `polling_response` +- interpolation_context, e.g. `creation_response` or `polling_response` can be obtained from stream_slice ```mermaid --- @@ -12,7 +12,7 @@ sequenceDiagram participant AsyncHttpJobRepository as AsyncOrchestrator participant CreationRequester as creation_requester participant PollingRequester as polling_requester - participant UrlRequester as url_requester (Optional) + participant UrlRequester as download_target_requester (Optional) participant DownloadRetriever as download_retriever participant AbortRequester as abort_requester (Optional) participant DeleteRequester as delete_requester (Optional) @@ -25,14 +25,14 @@ sequenceDiagram loop Poll for job status AsyncHttpJobRepository ->> PollingRequester: Check job status - PollingRequester ->> Reporting Server: Status request (interpolation_context: `create_job_response`) + PollingRequester ->> Reporting Server: Status request (interpolation_context: `creation_response`) Reporting Server -->> PollingRequester: Status response PollingRequester -->> AsyncHttpJobRepository: Job status end alt Status: Ready AsyncHttpJobRepository ->> UrlRequester: Request download URLs (if applicable) - UrlRequester ->> Reporting Server: URL request (interpolation_context: `polling_job_response`) + UrlRequester ->> Reporting Server: URL request (interpolation_context: `polling_response`) Reporting Server -->> UrlRequester: Download URLs UrlRequester -->> AsyncHttpJobRepository: Download URLs diff --git a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py index da335b2b7..28e9528ea 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py +++ b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py @@ -43,13 +43,13 @@ class AsyncHttpJobRepository(AsyncJobRepository): delete_requester: Optional[Requester] status_extractor: DpathExtractor status_mapping: Mapping[str, AsyncJobStatus] - urls_extractor: DpathExtractor + download_target_extractor: DpathExtractor job_timeout: Optional[timedelta] = None record_extractor: RecordExtractor = field( init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({}) ) - url_requester: Optional[Requester] = ( + download_target_requester: Optional[Requester] = ( None # use it in case polling_requester provides some and extra request is needed to obtain list of urls to download from ) @@ -211,12 +211,15 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]: """ - for url in self._get_download_url(job): + for target_url in self._get_download_targets(job): job_slice = job.job_parameters() stream_slice = StreamSlice( partition=job_slice.partition, cursor_slice=job_slice.cursor_slice, - extra_fields={**job_slice.extra_fields, "url": url}, + extra_fields={ + **job_slice.extra_fields, + "download_target": target_url, + }, ) for message in self.download_retriever.read_records({}, stream_slice): if isinstance(message, Record): @@ -269,27 +272,29 @@ def _clean_up_job(self, job_id: str) -> None: del self._polling_job_response_by_id[job_id] def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice: + creation_response = self._create_job_response_by_id[job.api_job_id()].json() stream_slice = StreamSlice( - partition={"create_job_response": self._create_job_response_by_id[job.api_job_id()]}, + partition={}, cursor_slice={}, + extra_fields={"creation_response": creation_response}, ) return stream_slice - def _get_download_url(self, job: AsyncJob) -> Iterable[str]: - if not self.url_requester: + def _get_download_targets(self, job: AsyncJob) -> Iterable[str]: + if not self.download_target_requester: url_response = self._polling_job_response_by_id[job.api_job_id()] else: + polling_response = self._polling_job_response_by_id[job.api_job_id()].json() stream_slice: StreamSlice = StreamSlice( - partition={ - "polling_job_response": self._polling_job_response_by_id[job.api_job_id()] - }, + partition={}, cursor_slice={}, + extra_fields={"polling_response": polling_response}, ) - url_response = self.url_requester.send_request(stream_slice=stream_slice) # type: ignore # we expect url_requester to always be presented, otherwise raise an exception as we cannot proceed with the report + url_response = self.download_target_requester.send_request(stream_slice=stream_slice) # type: ignore # we expect download_target_requester to always be presented, otherwise raise an exception as we cannot proceed with the report if not url_response: raise AirbyteTracedException( - internal_message="Always expect a response or an exception from url_requester", + internal_message="Always expect a response or an exception from download_target_requester", failure_type=FailureType.system_error, ) - yield from self.urls_extractor.extract_records(url_response) # type: ignore # we expect urls_extractor to always return list of strings + yield from self.download_target_extractor.extract_records(url_response) # type: ignore # we expect download_target_extractor to always return list of strings diff --git a/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte_cdk/sources/declarative/requesters/http_requester.py index e8c446503..8a7b6aba0 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -85,7 +85,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"): - backoff_strategies = self.error_handler.backoff_strategies + backoff_strategies = self.error_handler.backoff_strategies # type: ignore else: backoff_strategies = None @@ -125,6 +125,12 @@ def get_path( kwargs = { "stream_slice": stream_slice, "next_page_token": next_page_token, + # update the interpolation context with extra fields, if passed. + **( + stream_slice.extra_fields + if stream_slice is not None and hasattr(stream_slice, "extra_fields") + else {} + ), } path = str(self._path.eval(self.config, **kwargs)) return path.lstrip("/") diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index faab999cb..be9177638 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -3607,7 +3607,7 @@ def test_create_async_retriever(): "timeout": ["timeout"], "completed": ["ready"], }, - "urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]}, + "download_target_extractor": {"type": "DpathExtractor", "field_path": ["urls"]}, "record_selector": { "type": "RecordSelector", "extractor": {"type": "DpathExtractor", "field_path": ["data"]}, @@ -3615,7 +3615,7 @@ def test_create_async_retriever(): "status_extractor": {"type": "DpathExtractor", "field_path": ["status"]}, "polling_requester": { "type": "HttpRequester", - "path": "/v3/marketing/contacts/exports/{{stream_slice['create_job_response'].json()['id'] }}", + "path": "/v3/marketing/contacts/exports/{{creation_response['id'] }}", "url_base": "https://api.sendgrid.com", "http_method": "GET", "authenticator": { @@ -3635,19 +3635,19 @@ def test_create_async_retriever(): }, "download_requester": { "type": "HttpRequester", - "path": "{{stream_slice['url']}}", + "path": "{{download_target}}", "url_base": "", "http_method": "GET", }, "abort_requester": { "type": "HttpRequester", - "path": "{{stream_slice['url']}}/abort", + "path": "{{download_target}}/abort", "url_base": "", "http_method": "POST", }, "delete_requester": { "type": "HttpRequester", - "path": "{{stream_slice['url']}}", + "path": "{{download_target}}", "url_base": "", "http_method": "POST", }, @@ -3681,7 +3681,7 @@ def test_create_async_retriever(): assert job_repository.abort_requester assert job_repository.delete_requester assert job_repository.status_extractor - assert job_repository.urls_extractor + assert job_repository.download_target_extractor selector = component.record_selector extractor = selector.extractor diff --git a/unit_tests/sources/declarative/requesters/test_http_job_repository.py b/unit_tests/sources/declarative/requesters/test_http_job_repository.py index cdc14e600..4be3ecb11 100644 --- a/unit_tests/sources/declarative/requesters/test_http_job_repository.py +++ b/unit_tests/sources/declarative/requesters/test_http_job_repository.py @@ -69,7 +69,7 @@ def setUp(self) -> None: self._polling_job_requester = HttpRequester( name="stream : polling", url_base=_URL_BASE, - path=_EXPORT_PATH + "/{{stream_slice['create_job_response'].json()['id']}}", + path=_EXPORT_PATH + "/{{creation_response['id']}}", error_handler=error_handler, http_method=HttpMethod.GET, config=_ANY_CONFIG, @@ -84,7 +84,7 @@ def setUp(self) -> None: requester=HttpRequester( name="stream : fetch_result", url_base="", - path="{{stream_slice.extra_fields['url']}}", + path="{{download_target}}", error_handler=error_handler, http_method=HttpMethod.GET, config=_ANY_CONFIG, @@ -143,7 +143,7 @@ def setUp(self) -> None: "failure": AsyncJobStatus.FAILED, "pending": AsyncJobStatus.RUNNING, }, - urls_extractor=DpathExtractor( + download_target_extractor=DpathExtractor( decoder=JsonDecoder(parameters={}), field_path=["urls"], config={}, diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 188256b10..8d9e6c675 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -299,7 +299,7 @@ "timeout": ["timeout"], "completed": ["ready"], }, - "urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]}, + "download_target_extractor": {"type": "DpathExtractor", "field_path": ["urls"]}, "record_selector": { "type": "RecordSelector", "extractor": {"type": "DpathExtractor", "field_path": []}, @@ -307,7 +307,7 @@ "status_extractor": {"type": "DpathExtractor", "field_path": ["status"]}, "polling_requester": { "type": "HttpRequester", - "path": "/async_job/{{stream_slice['create_job_response'].json()['id'] }}", + "path": "/async_job/{{creation_response['id'] }}", "http_method": "GET", "authenticator": { "type": "BearerAuthenticator",