diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 2d86e3f7b999a..118ba80d73046 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -620,7 +620,7 @@ - name: Salesforce sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 dockerRepository: airbyte/source-salesforce - dockerImageTag: 0.1.16 + dockerImageTag: 0.1.17 documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 005c5af601381..e4d51db2ea2f3 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6452,7 +6452,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-salesforce:0.1.16" +- dockerImage: "airbyte/source-salesforce:0.1.17" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce" connectionSpecification: @@ -6463,8 +6463,7 @@ - "client_id" - "client_secret" - "refresh_token" - - "api_type" - additionalProperties: false + additionalProperties: true properties: auth_type: type: "string" @@ -6506,16 +6505,6 @@ >docs." type: "boolean" default: false - api_type: - title: "API Type" - description: "Unless you know that you are transferring a very small amount\ - \ of data, prefer using the BULK API. This will help avoid using up all\ - \ of your API call quota with Salesforce. Valid values are BULK or REST." - type: "string" - enum: - - "BULK" - - "REST" - default: "BULK" streams_criteria: type: "array" items: diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 812b159689030..37713bed463b8 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.16 +LABEL io.airbyte.version=0.1.17 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml index ec590cfd42275..6d379470c03ec 100644 --- a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml @@ -5,8 +5,6 @@ tests: spec: - spec_path: "source_salesforce/spec.json" connection: - - config_path: "secrets/config_bulk.json" - status: "succeed" - config_path: "secrets/config.json" status: "succeed" - config_path: "integration_tests/invalid_config.json" @@ -15,18 +13,11 @@ tests: - config_path: "secrets/config.json" basic_read: - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_rest.json" - - config_path: "secrets/config_bulk.json" - configured_catalog_path: "integration_tests/configured_catalog_bulk.json" + configured_catalog_path: "integration_tests/configured_catalog.json" incremental: - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_rest.json" - future_state_path: "integration_tests/future_state.json" - - config_path: "secrets/config_bulk.json" - configured_catalog_path: "integration_tests/configured_catalog_bulk.json" + configured_catalog_path: "integration_tests/configured_catalog.json" future_state_path: "integration_tests/future_state.json" full_refresh: - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_rest.json" - - config_path: "secrets/config_bulk.json" - configured_catalog_path: "integration_tests/configured_catalog_bulk.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py b/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py index 8525384bb697e..fb6f63f2ec277 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py @@ -16,7 +16,7 @@ @pytest.fixture(name="input_config") def parse_input_config(): - with open(HERE.parent / "secrets/config_bulk.json", "r") as file: + with open(HERE.parent / "secrets/config.json", "r") as file: return json.loads(file.read()) @@ -28,7 +28,7 @@ def get_stream(input_config: Mapping[str, Any], stream_name: str) -> Stream: def get_any_real_stream(input_config: Mapping[str, Any]) -> Stream: - return get_stream(input_config, "Account") + return get_stream(input_config, "ActiveFeatureLicenseMetric") def test_not_queryable_stream(caplog, input_config): diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_bulk.json b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog.json similarity index 89% rename from airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_bulk.json rename to airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog.json index cfac02393b5f3..c5f317729aa59 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_bulk.json +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog.json @@ -58,6 +58,18 @@ "sync_mode": "full_refresh", "destination_sync_mode": "overwrite" }, + { + "stream": { + "name": "Asset", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["SystemModstamp"], + "source_defined_primary_key": [["Id"]] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, { "stream": { "name": "FormulaFunctionAllowedType", diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_rest.json b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_rest.json deleted file mode 100644 index d81043ee5e6e7..0000000000000 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_rest.json +++ /dev/null @@ -1,98 +0,0 @@ -{ - "streams": [ - { - "stream": { - "name": "Account", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "ActiveFeatureLicenseMetric", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "ActivePermSetLicenseMetric", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "ActiveProfileMetric", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "AppDefinition", - "json_schema": {}, - "supported_sync_modes": ["full_refresh"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "Asset", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "PermissionSetTabSetting", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "LeadHistory", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["CreatedDate"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - } - ] -} diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-salesforce/integration_tests/invalid_config.json index 9b8f07e9c38fa..7bf437ac9fe3d 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/invalid_config.json +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/invalid_config.json @@ -3,6 +3,5 @@ "client_secret": "fake-client-secret", "refresh_token": "fake-refresh-token", "start_date": "2020-10-02T00:00:00Z", - "is_sandbox": false, - "api_type": "REST" + "is_sandbox": false } diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py index 51f9cf9fc27ed..6487b4dcf507e 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py @@ -182,10 +182,8 @@ def __init__( client_secret: str = None, is_sandbox: bool = None, start_date: str = None, - api_type: str = None, **kwargs, ): - self.api_type = api_type.upper() if api_type else None self.refresh_token = refresh_token self.token = token self.client_id = client_id @@ -200,11 +198,7 @@ def _get_standard_headers(self): return {"Authorization": "Bearer {}".format(self.access_token)} def get_streams_black_list(self) -> List[str]: - black_list = QUERY_RESTRICTED_SALESFORCE_OBJECTS + QUERY_INCOMPATIBLE_SALESFORCE_OBJECTS - if self.api_type == "REST": - return black_list - else: - return black_list + UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS + return QUERY_RESTRICTED_SALESFORCE_OBJECTS + QUERY_INCOMPATIBLE_SALESFORCE_OBJECTS def filter_streams(self, stream_name: str) -> bool: # REST and BULK API do not support all entities that end with `ChangeEvent`. diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index d1cf51cd2f351..c28e74045f811 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -12,7 +12,7 @@ from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator from airbyte_cdk.sources.utils.schema_helpers import split_config -from .api import UNSUPPORTED_FILTERING_STREAMS, Salesforce +from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, SalesforceStream @@ -28,18 +28,30 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> return True, None @classmethod - def generate_streams(cls, config: Mapping[str, Any], stream_names: List[str], sf_object: Salesforce) -> List[Stream]: + def generate_streams( + cls, config: Mapping[str, Any], stream_names: List[str], sf_object: Salesforce, state: Mapping[str, Any] = None + ) -> List[Stream]: """ "Generates a list of stream by their names. It can be used for different tests too""" authenticator = TokenAuthenticator(sf_object.access_token) - streams_kwargs = {} - if config["api_type"] == "REST": - full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream - else: - full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream - streams_kwargs["wait_timeout"] = config.get("wait_timeout") - streams = [] for stream_name in stream_names: + streams_kwargs = {} + stream_state = state.get(stream_name, {}) if state else {} + + selected_properties = sf_object.generate_schema(stream_name).get("properties", {}) + # Salesforce BULK API currently does not support loading fields with data type base64 and compound data + properties_not_supported_by_bulk = { + key: value for key, value in selected_properties.items() if value.get("format") == "base64" or "object" in value["type"] + } + + if stream_state or stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk: + # Use REST API + full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream + else: + # Use BULK API + full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream + streams_kwargs["wait_timeout"] = config.get("wait_timeout") + json_schema = sf_object.generate_schema(stream_name) pk, replication_key = sf_object.get_pk_and_replication_key(json_schema) streams_kwargs.update(dict(sf_api=sf_object, pk=pk, stream_name=stream_name, schema=json_schema, authenticator=authenticator)) @@ -50,10 +62,10 @@ def generate_streams(cls, config: Mapping[str, Any], stream_names: List[str], sf return streams - def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> List[Stream]: + def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None, state: Mapping[str, Any] = None) -> List[Stream]: sf = self._get_sf_object(config) stream_names = sf.get_validated_streams(config=config, catalog=catalog) - return self.generate_streams(config, stream_names, sf) + return self.generate_streams(config, stream_names, sf, state=state) def read( self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None @@ -66,7 +78,7 @@ def read( config, internal_config = split_config(config) # get the streams once in case the connector needs to make any queries to generate them logger.info("Starting generating streams") - stream_instances = {s.name: s for s in self.streams(config, catalog=catalog)} + stream_instances = {s.name: s for s in self.streams(config, catalog=catalog, state=state)} logger.info(f"Starting syncing {self.name}") self._stream_to_instance_map = stream_instances for configured_stream in catalog.streams: diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json index 1435f17cf04e9..1c72c854ecd22 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json @@ -4,8 +4,8 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Salesforce Source Spec", "type": "object", - "required": ["client_id", "client_secret", "refresh_token", "api_type"], - "additionalProperties": false, + "required": ["client_id", "client_secret", "refresh_token"], + "additionalProperties": true, "properties": { "auth_type": { "type": "string", @@ -41,13 +41,6 @@ "type": "boolean", "default": false }, - "api_type": { - "title": "API Type", - "description": "Unless you know that you are transferring a very small amount of data, prefer using the BULK API. This will help avoid using up all of your API call quota with Salesforce. Valid values are BULK or REST.", - "type": "string", - "enum": ["BULK", "REST"], - "default": "BULK" - }, "streams_criteria": { "type": "array", "items": { diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 6e2867b27f1d4..aa6d3ca9fd6d1 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -22,7 +22,6 @@ class SalesforceStream(HttpStream, ABC): page_size = 2000 - transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) def __init__(self, sf_api: Salesforce, pk: str, stream_name: str, schema: dict = None, **kwargs): @@ -69,15 +68,6 @@ def request_params( return {} selected_properties = self.get_json_schema().get("properties", {}) - - # Salesforce BULK API currently does not support loading fields with data type base64 and compound data - if self.sf_api.api_type == "BULK": - selected_properties = { - key: value - for key, value in selected_properties.items() - if value.get("format") != "base64" and "object" not in value["type"] - } - query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} " if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: @@ -274,15 +264,6 @@ def request_params( """ selected_properties = self.get_json_schema().get("properties", {}) - - # Salesforce BULK API currently does not support loading fields with data type base64 and compound data - if self.sf_api.api_type == "BULK": - selected_properties = { - key: value - for key, value in selected_properties.items() - if value.get("format") != "base64" and "object" not in value["type"] - } - query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} " if next_page_token: query += next_page_token @@ -349,14 +330,6 @@ def request_params( selected_properties = self.get_json_schema().get("properties", {}) - # Salesforce BULK API currently does not support loading fields with data type base64 and compound data - if self.sf_api.api_type == "BULK": - selected_properties = { - key: value - for key, value in selected_properties.items() - if value.get("format") != "base64" and "object" not in value["type"] - } - stream_date = stream_state.get(self.cursor_field) start_date = stream_date or self.start_date @@ -392,14 +365,6 @@ def request_params( ) -> MutableMapping[str, Any]: selected_properties = self.get_json_schema().get("properties", {}) - # Salesforce BULK API currently does not support loading fields with data type base64 and compound data - if self.sf_api.api_type == "BULK": - selected_properties = { - key: value - for key, value in selected_properties.items() - if value.get("format") != "base64" and "object" not in value["type"] - } - stream_date = stream_state.get(self.cursor_field) start_date = next_page_token or stream_date or self.start_date diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py index 41f98f12772a2..19db4c2ddcb2e 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py @@ -14,7 +14,7 @@ @pytest.fixture(scope="module") -def stream_bulk_config(): +def stream_config(): """Generates streams settings for BULK logic""" return { "client_id": "fake_client_id", @@ -23,39 +23,11 @@ def stream_bulk_config(): "start_date": "2010-01-18T21:18:20Z", "is_sandbox": False, "wait_timeout": 15, - "api_type": "BULK", } @pytest.fixture(scope="module") -def stream_bulk_config_without_start_date(): - """Generates streams settings for BULK logic without start_date""" - return { - "client_id": "fake_client_id", - "client_secret": "fake_client_secret", - "refresh_token": "fake_refresh_token", - "is_sandbox": False, - "wait_timeout": 15, - "api_type": "BULK", - } - - -@pytest.fixture(scope="module") -def stream_rest_config(): - """Generates streams settings for BULK logic""" - return { - "client_id": "fake_client_id", - "client_secret": "fake_client_secret", - "refresh_token": "fake_refresh_token", - "start_date": "2010-01-18T21:18:20Z", - "is_sandbox": False, - "wait_timeout": 15, - "api_type": "REST", - } - - -@pytest.fixture(scope="module") -def stream_rest_config_date_format(): +def stream_config_date_format(): """Generates streams settings with `start_date` in format YYYY-MM-DD""" return { "client_id": "fake_client_id", @@ -64,12 +36,11 @@ def stream_rest_config_date_format(): "start_date": "2010-01-18", "is_sandbox": False, "wait_timeout": 15, - "api_type": "REST", } @pytest.fixture(scope="module") -def stream_rest_config_without_start_date(): +def stream_config_without_start_date(): """Generates streams settings for REST logic without start_date""" return { "client_id": "fake_client_id", @@ -77,51 +48,39 @@ def stream_rest_config_without_start_date(): "refresh_token": "fake_refresh_token", "is_sandbox": False, "wait_timeout": 15, - "api_type": "REST", } -def _stream_api(stream_config): +def _stream_api(stream_config, describe_response_data=None): sf_object = Salesforce(**stream_config) sf_object.login = Mock() sf_object.access_token = Mock() sf_object.instance_url = "https://fase-account.salesforce.com" - sf_object.describe = Mock(return_value={"fields": [{"name": "LastModifiedDate", "type": "string"}]}) + + response_data = {"fields": [{"name": "LastModifiedDate", "type": "string"}]} + if describe_response_data: + response_data = describe_response_data + sf_object.describe = Mock(return_value=response_data) return sf_object @pytest.fixture(scope="module") -def stream_rest_api(stream_rest_config): - return _stream_api(stream_rest_config) +def stream_api(stream_config): + return _stream_api(stream_config) @pytest.fixture(scope="module") -def stream_bulk_api(stream_bulk_config): - return _stream_api(stream_bulk_config) +def stream_api_v2(stream_config): + describe_response_data = {"fields": [{"name": "LastModifiedDate", "type": "string"}, {"name": "BillingAddress", "type": "address"}]} + return _stream_api(stream_config, describe_response_data=describe_response_data) -def _generate_stream(stream_name, stream_config, stream_api): - return SourceSalesforce.generate_streams(stream_config, [stream_name], stream_api)[0] +def _generate_stream(stream_name, stream_config, stream_api, state=None): + return SourceSalesforce.generate_streams(stream_config, [stream_name], stream_api, state=state)[0] -@pytest.mark.parametrize( - "api_type,stream_name,expected_cls", - [ - ("BULK", "Account", BulkIncrementalSalesforceStream), - ("BULK", "FormulaFunctionAllowedType", BulkSalesforceStream), - ("REST", "ActiveFeatureLicenseMetric", IncrementalSalesforceStream), - ("REST", "AppDefinition", SalesforceStream), - ], -) -def test_stream_generator(api_type, stream_name, expected_cls, stream_bulk_config, stream_bulk_api, stream_rest_config, stream_rest_api): - stream_config, stream_api = (stream_rest_config, stream_rest_api) if api_type == "REST" else (stream_bulk_config, stream_bulk_api) - stream = _generate_stream(stream_name, stream_config, stream_api) - assert stream.name == stream_name - assert isinstance(stream, expected_cls) - - -def test_bulk_sync_creation_failed(stream_bulk_config, stream_bulk_api): - stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) +def test_bulk_sync_creation_failed(stream_config, stream_api): + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: m.register_uri("POST", stream.path(), status_code=400, json=[{"message": "test_error"}]) with pytest.raises(HTTPError) as err: @@ -129,31 +88,48 @@ def test_bulk_sync_creation_failed(stream_bulk_config, stream_bulk_api): assert err.value.response.json()[0]["message"] == "test_error" -def test_bulk_sync_unsupported_stream(stream_bulk_config, stream_bulk_api, caplog): +def test_stream_unsupported_by_bulk(stream_config, stream_api, caplog): + """ + Stream `AcceptedEventRelation` is not supported by BULK API, so that REST API stream will be used for it. + """ stream_name = "AcceptedEventRelation" - stream: BulkIncrementalSalesforceStream = _generate_stream(stream_name, stream_bulk_config, stream_bulk_api) - with requests_mock.Mocker() as m: - m.register_uri( - "POST", - stream.path(), - status_code=400, - json=[{"errorCode": "INVALIDENTITY", "message": f"Entity '{stream_name}' is not supported by the Bulk API."}], - ) - list(stream.read_records(sync_mode=SyncMode.full_refresh)) + stream = _generate_stream(stream_name, stream_config, stream_api) + assert not isinstance(stream, BulkSalesforceStream) - logs = caplog.records - assert logs - assert logs[1].levelname == "ERROR" - assert ( - logs[1].msg - == f"Cannot receive data for stream '{stream_name}' using BULK API, error message: 'Entity '{stream_name}' is not supported by the Bulk API.'" - ) +def test_stream_contains_unsupported_properties_by_bulk(stream_config, stream_api_v2): + """ + Stream `Account` contains compound field such as BillingAddress, which is not supported by BULK API (csv), + in that case REST API stream will be used for it. + """ + stream_name = "Account" + stream = _generate_stream(stream_name, stream_config, stream_api_v2) + assert not isinstance(stream, BulkSalesforceStream) + + +def test_stream_has_state_rest_api_should_be_used(stream_config, stream_api): + """ + Stream `ActiveFeatureLicenseMetric` has state, in that case REST API stream will be used for it. + """ + stream_name = "ActiveFeatureLicenseMetric" + state = {stream_name: {"SystemModstamp": "2122-08-22T05:08:29.000Z"}} + stream = _generate_stream(stream_name, stream_config, stream_api, state=state) + assert not isinstance(stream, BulkSalesforceStream) + + +def test_stream_has_no_state_bulk_api_should_be_used(stream_config, stream_api): + """ + Stream `ActiveFeatureLicenseMetric` has no state, in that case BULK API stream will be used for it. + """ + stream_name = "ActiveFeatureLicenseMetric" + state = {"other_stream": {"SystemModstamp": "2122-08-22T05:08:29.000Z"}} + stream = _generate_stream(stream_name, stream_config, stream_api, state=state) + assert isinstance(stream, BulkSalesforceStream) @pytest.mark.parametrize("item_number", [0, 15, 2000, 2324, 193434]) -def test_bulk_sync_pagination(item_number, stream_bulk_config, stream_bulk_api): - stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) +def test_bulk_sync_pagination(item_number, stream_config, stream_api): + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) test_ids = [i for i in range(1, item_number)] pages = [test_ids[i : i + stream.page_size] for i in range(0, len(test_ids), stream.page_size)] if not pages: @@ -189,16 +165,16 @@ def _get_result_id(stream): return int(list(stream.read_records(sync_mode=SyncMode.full_refresh))[0]["ID"]) -def test_bulk_sync_successful(stream_bulk_config, stream_bulk_api): - stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) +def test_bulk_sync_successful(stream_config, stream_api): + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: job_id = _prepare_mock(m, stream) m.register_uri("GET", stream.path() + f"/{job_id}", [{"json": {"state": "JobComplete"}}]) assert _get_result_id(stream) == 1 -def test_bulk_sync_successful_long_response(stream_bulk_config, stream_bulk_api): - stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) +def test_bulk_sync_successful_long_response(stream_config, stream_api): + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: job_id = _prepare_mock(m, stream) m.register_uri( @@ -216,8 +192,8 @@ def test_bulk_sync_successful_long_response(stream_bulk_config, stream_bulk_api) # maximum timeout is wait_timeout * max_retry_attempt # this test tries to check a job state 17 times with +-1second for very one @pytest.mark.timeout(17) -def test_bulk_sync_successful_retry(stream_bulk_config, stream_bulk_api): - stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) +def test_bulk_sync_successful_retry(stream_config, stream_api): + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) stream._wait_timeout = 0.1 # maximum wait timeout will be 6 seconds with requests_mock.Mocker() as m: job_id = _prepare_mock(m, stream) @@ -230,8 +206,8 @@ def test_bulk_sync_successful_retry(stream_bulk_config, stream_bulk_api): @pytest.mark.timeout(30) -def test_bulk_sync_failed_retry(stream_bulk_config, stream_bulk_api): - stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) +def test_bulk_sync_failed_retry(stream_config, stream_api): + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) stream._wait_timeout = 0.1 # maximum wait timeout will be 6 seconds with requests_mock.Mocker() as m: job_id = _prepare_mock(m, stream) @@ -242,53 +218,43 @@ def test_bulk_sync_failed_retry(stream_bulk_config, stream_bulk_api): @pytest.mark.parametrize( - "api_type,start_date_provided,stream_name,expected_start_date", + "start_date_provided,stream_name,expected_start_date", [ - ("BULK", True, "Account", "2010-01-18T21:18:20Z"), - ("BULK", False, "Account", None), - ("REST", True, "ActiveFeatureLicenseMetric", "2010-01-18T21:18:20Z"), - ("REST", False, "ActiveFeatureLicenseMetric", None), + (True, "Account", "2010-01-18T21:18:20Z"), + (False, "Account", None), + (True, "ActiveFeatureLicenseMetric", "2010-01-18T21:18:20Z"), + (False, "ActiveFeatureLicenseMetric", None), ], ) def test_stream_start_date( - api_type, start_date_provided, stream_name, expected_start_date, - stream_bulk_config, - stream_bulk_api, - stream_rest_config, - stream_rest_api, - stream_rest_config_without_start_date, - stream_bulk_config_without_start_date, + stream_config, + stream_api, + stream_config_without_start_date, ): if start_date_provided: - stream_config, stream_api = (stream_rest_config, stream_rest_api) if api_type == "REST" else (stream_bulk_config, stream_bulk_api) stream = _generate_stream(stream_name, stream_config, stream_api) else: - stream_config, stream_api = ( - (stream_rest_config_without_start_date, stream_rest_api) - if api_type == "REST" - else (stream_bulk_config_without_start_date, stream_bulk_api) - ) - stream = _generate_stream(stream_name, stream_config, stream_api) + stream = _generate_stream(stream_name, stream_config_without_start_date, stream_api) assert stream.start_date == expected_start_date -def test_stream_start_date_should_be_converted_to_datetime_format(stream_rest_config_date_format, stream_rest_api): - stream: IncrementalSalesforceStream = _generate_stream("ActiveFeatureLicenseMetric", stream_rest_config_date_format, stream_rest_api) +def test_stream_start_date_should_be_converted_to_datetime_format(stream_config_date_format, stream_api): + stream: IncrementalSalesforceStream = _generate_stream("ActiveFeatureLicenseMetric", stream_config_date_format, stream_api) assert stream.start_date == "2010-01-18T00:00:00Z" -def test_stream_start_datetime_format_should_not_changed(stream_rest_config, stream_rest_api): - stream: IncrementalSalesforceStream = _generate_stream("ActiveFeatureLicenseMetric", stream_rest_config, stream_rest_api) +def test_stream_start_datetime_format_should_not_changed(stream_config, stream_api): + stream: IncrementalSalesforceStream = _generate_stream("ActiveFeatureLicenseMetric", stream_config, stream_api) assert stream.start_date == "2010-01-18T21:18:20Z" -def test_download_data_filter_null_bytes(stream_bulk_config, stream_bulk_api): +def test_download_data_filter_null_bytes(stream_config, stream_api): job_full_url: str = "https://fase-account.salesforce.com/services/data/v52.0/jobs/query/7504W00000bkgnpQAA" - stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: m.register_uri("GET", f"{job_full_url}/results", content=b"\x00") @@ -327,9 +293,9 @@ def test_download_data_filter_null_bytes(stream_bulk_config, stream_bulk_api): ), ], ) -def test_discover_with_streams_criteria_param(streams_criteria, predicted_filtered_streams, stream_rest_config): - updated_config = {**stream_rest_config, **{"streams_criteria": streams_criteria}} - sf_object = Salesforce(**stream_rest_config) +def test_discover_with_streams_criteria_param(streams_criteria, predicted_filtered_streams, stream_config): + updated_config = {**stream_config, **{"streams_criteria": streams_criteria}} + sf_object = Salesforce(**stream_config) sf_object.login = Mock() sf_object.access_token = Mock() sf_object.instance_url = "https://fase-account.salesforce.com" @@ -351,8 +317,11 @@ def test_discover_with_streams_criteria_param(streams_criteria, predicted_filter assert sorted(filtered_streams) == sorted(predicted_filtered_streams) -def test_pagination_rest(stream_rest_config, stream_rest_api): - stream: SalesforceStream = _generate_stream("Account", stream_rest_config, stream_rest_api) +def test_pagination_rest(stream_config, stream_api): + stream_name = "ActiveFeatureLicenseMetric" + state = {stream_name: {"SystemModstamp": "2122-08-22T05:08:29.000Z"}} + + stream: SalesforceStream = _generate_stream(stream_name, stream_config, stream_api, state=state) stream._wait_timeout = 0.1 # maximum wait timeout will be 6 seconds next_page_url = "/services/data/v52.0/query/012345" with requests_mock.Mocker() as m: diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 278eca3b9b5ae..f52bed6b32de2 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -737,6 +737,7 @@ List of available streams: | Version | Date | Pull Request | Subject | |:--------|:-----------| :--- |:--------------------------------------------------------------------------| +| 0.1.17 | 2022-01-19 | [9302](https://github.com/airbytehq/airbyte/pull/9302) | Deprecate API Type parameter | | 0.1.16 | 2022-01-18 | [9151](https://github.com/airbytehq/airbyte/pull/9151) | Fix pagination in REST API streams | | 0.1.15 | 2022-01-11 | [9409](https://github.com/airbytehq/airbyte/pull/9409) | Correcting the presence of an extra `else` handler in the error handling | | 0.1.14 | 2022-01-11 | [9386](https://github.com/airbytehq/airbyte/pull/9386) | Handling 400 error, while `sobject` doesn't support `query` or `queryAll` requests |