From 62c433eeb1f02cfd26e930b41d466180afc5952a Mon Sep 17 00:00:00 2001 From: augan-rymkhan <93112548+augan-rymkhan@users.noreply.github.com> Date: Tue, 18 Jan 2022 20:42:21 +0600 Subject: [PATCH] Source Salesforce: fix pagination in REST API streams (#9151) * fix next_page_token * fix BULK API * fix BUlk incremental stream * added unit test and comments * format code * bump version * updated spec and def yaml Co-authored-by: auganbay --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-salesforce/Dockerfile | 2 +- .../source_salesforce/streams.py | 83 ++++++++++++++++--- .../source-salesforce/unit_tests/unit_test.py | 42 ++++++++++ docs/integrations/sources/salesforce.md | 1 + 6 files changed, 116 insertions(+), 16 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 372fbe1101018..6ce13683cf803 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -613,7 +613,7 @@ - name: Salesforce sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 dockerRepository: airbyte/source-salesforce - dockerImageTag: 0.1.15 + dockerImageTag: 0.1.16 documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 1a6fc9b152605..ecf1dee8fdd65 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6425,7 +6425,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-salesforce:0.1.15" +- dockerImage: "airbyte/source-salesforce:0.1.16" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 6c59b3d8b1d38..812b159689030 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.15 +LABEL io.airbyte.version=0.1.16 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 4ea39cde10149..6e2867b27f1d4 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -44,13 +44,17 @@ def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: def url_base(self) -> str: return self.sf_api.instance_url - def path(self, **kwargs) -> str: + def path(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> str: + if next_page_token: + """ + If `next_page_token` is set, subsequent requests use `nextRecordsUrl`. + """ + return next_page_token return f"/services/data/{self.sf_api.version}/queryAll" def next_page_token(self, response: requests.Response) -> str: response_data = response.json() - if len(response_data["records"]) == self.page_size and self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: - return f"WHERE {self.primary_key} >= '{response_data['records'][-1][self.primary_key]}' " + return response_data.get("nextRecordsUrl") def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None @@ -58,6 +62,11 @@ def request_params( """ Salesforce SOQL Query: https://developer.salesforce.com/docs/atlas.en-us.232.0.api_rest.meta/api_rest/dome_queryall.htm """ + if next_page_token: + """ + If `next_page_token` is set, subsequent requests use `nextRecordsUrl`, and do not include any parameters. + """ + return {} selected_properties = self.get_json_schema().get("properties", {}) @@ -70,11 +79,9 @@ def request_params( } query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} " - if next_page_token: - query += next_page_token if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: - query += f"ORDER BY {self.primary_key} ASC LIMIT {self.page_size}" + query += f"ORDER BY {self.primary_key} ASC" return {"q": query} @@ -259,6 +266,32 @@ def next_page_token(self, last_record: dict) -> str: if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: return f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' " + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + """ + Salesforce SOQL Query: https://developer.salesforce.com/docs/atlas.en-us.232.0.api_rest.meta/api_rest/dome_queryall.htm + """ + + selected_properties = self.get_json_schema().get("properties", {}) + + # Salesforce BULK API currently does not support loading fields with data type base64 and compound data + if self.sf_api.api_type == "BULK": + selected_properties = { + key: value + for key, value in selected_properties.items() + if value.get("format") != "base64" and "object" not in value["type"] + } + + query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} " + if next_page_token: + query += next_page_token + + if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: + query += f"ORDER BY {self.primary_key} ASC LIMIT {self.page_size}" + + return {"q": query} + def read_records( self, sync_mode: SyncMode, @@ -305,14 +338,15 @@ def format_start_date(start_date: Optional[str]) -> Optional[str]: if start_date: return pendulum.parse(start_date).strftime("%Y-%m-%dT%H:%M:%SZ") - def next_page_token(self, response: requests.Response) -> str: - response_data = response.json() - if len(response_data["records"]) == self.page_size and self.name not in UNSUPPORTED_FILTERING_STREAMS: - return response_data["records"][-1][self.cursor_field] - def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: + if next_page_token: + """ + If `next_page_token` is set, subsequent requests use `nextRecordsUrl`, and do not include any parameters. + """ + return {} + selected_properties = self.get_json_schema().get("properties", {}) # Salesforce BULK API currently does not support loading fields with data type base64 and compound data @@ -324,13 +358,13 @@ def request_params( } stream_date = stream_state.get(self.cursor_field) - start_date = next_page_token or stream_date or self.start_date + start_date = stream_date or self.start_date query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} " if start_date: query += f"WHERE {self.cursor_field} >= {start_date} " if self.name not in UNSUPPORTED_FILTERING_STREAMS: - query += f"ORDER BY {self.cursor_field} ASC LIMIT {self.page_size}" + query += f"ORDER BY {self.cursor_field} ASC" return {"q": query} @property @@ -352,3 +386,26 @@ class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalSalesforc def next_page_token(self, last_record: dict) -> str: if self.name not in UNSUPPORTED_FILTERING_STREAMS: return last_record[self.cursor_field] + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + selected_properties = self.get_json_schema().get("properties", {}) + + # Salesforce BULK API currently does not support loading fields with data type base64 and compound data + if self.sf_api.api_type == "BULK": + selected_properties = { + key: value + for key, value in selected_properties.items() + if value.get("format") != "base64" and "object" not in value["type"] + } + + stream_date = stream_state.get(self.cursor_field) + start_date = next_page_token or stream_date or self.start_date + + query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} " + if start_date: + query += f"WHERE {self.cursor_field} >= {start_date} " + if self.name not in UNSUPPORTED_FILTERING_STREAMS: + query += f"ORDER BY {self.cursor_field} ASC LIMIT {self.page_size}" + return {"q": query} diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py index 40f417ebf516b..41f98f12772a2 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py @@ -349,3 +349,45 @@ def test_discover_with_streams_criteria_param(streams_criteria, predicted_filter ) filtered_streams = sf_object.get_validated_streams(config=updated_config) assert sorted(filtered_streams) == sorted(predicted_filtered_streams) + + +def test_pagination_rest(stream_rest_config, stream_rest_api): + stream: SalesforceStream = _generate_stream("Account", stream_rest_config, stream_rest_api) + stream._wait_timeout = 0.1 # maximum wait timeout will be 6 seconds + next_page_url = "/services/data/v52.0/query/012345" + with requests_mock.Mocker() as m: + resp_1 = { + "done": False, + "totalSize": 4, + "nextRecordsUrl": next_page_url, + "records": [ + { + "ID": 1, + "LastModifiedDate": "2021-11-15", + }, + { + "ID": 2, + "LastModifiedDate": "2021-11-16", + }, + ], + } + resp_2 = { + "done": True, + "totalSize": 4, + "records": [ + { + "ID": 3, + "LastModifiedDate": "2021-11-17", + }, + { + "ID": 4, + "LastModifiedDate": "2021-11-18", + }, + ], + } + + m.register_uri("GET", stream.path(), json=resp_1) + m.register_uri("GET", next_page_url, json=resp_2) + + records = [record for record in stream.read_records(sync_mode=SyncMode.full_refresh)] + assert len(records) == 4 diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 26c44a8ea0f6d..278eca3b9b5ae 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -737,6 +737,7 @@ List of available streams: | Version | Date | Pull Request | Subject | |:--------|:-----------| :--- |:--------------------------------------------------------------------------| +| 0.1.16 | 2022-01-18 | [9151](https://github.com/airbytehq/airbyte/pull/9151) | Fix pagination in REST API streams | | 0.1.15 | 2022-01-11 | [9409](https://github.com/airbytehq/airbyte/pull/9409) | Correcting the presence of an extra `else` handler in the error handling | | 0.1.14 | 2022-01-11 | [9386](https://github.com/airbytehq/airbyte/pull/9386) | Handling 400 error, while `sobject` doesn't support `query` or `queryAll` requests | | 0.1.13 | 2022-01-11 | [8797](https://github.com/airbytehq/airbyte/pull/8797) | Switched from authSpecification to advanced_auth in specefication |