Skip to content

Commit

Permalink
Source Salesforce: Deprecate API Type parameter (#9302)
Browse files Browse the repository at this point in the history
* use BULK for the first sync, REST for incremental sync

* if stream contains compound data or/and base64 use always REST

* fix get stream state from connector state

* fix integration test

* refactor catalog name

* format code

* refactor unit tests

* refactor unit tests 2

* format code 2

* Set additionalProperties to true not to break test temporarily

* fix unit test and remove unnecessary filtering fields

* bump version

* updated spec and def yaml

Co-authored-by: auganbay <auganenu@gmail.com>
  • Loading branch information
augan-rymkhan and Augan93 authored Jan 19, 2022
1 parent f3798ed commit 0a3713a
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 308 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 0.1.16
dockerImageTag: 0.1.17
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
15 changes: 2 additions & 13 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6452,7 +6452,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:0.1.16"
- dockerImage: "airbyte/source-salesforce:0.1.17"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce"
connectionSpecification:
Expand All @@ -6463,8 +6463,7 @@
- "client_id"
- "client_secret"
- "refresh_token"
- "api_type"
additionalProperties: false
additionalProperties: true
properties:
auth_type:
type: "string"
Expand Down Expand Up @@ -6506,16 +6505,6 @@
>docs</a>."
type: "boolean"
default: false
api_type:
title: "API Type"
description: "Unless you know that you are transferring a very small amount\
\ of data, prefer using the BULK API. This will help avoid using up all\
\ of your API call quota with Salesforce. Valid values are BULK or REST."
type: "string"
enum:
- "BULK"
- "REST"
default: "BULK"
streams_criteria:
type: "array"
items:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.16
LABEL io.airbyte.version=0.1.17
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ tests:
spec:
- spec_path: "source_salesforce/spec.json"
connection:
- config_path: "secrets/config_bulk.json"
status: "succeed"
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
Expand All @@ -15,18 +13,11 @@ tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_rest.json"
- config_path: "secrets/config_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_rest.json"
future_state_path: "integration_tests/future_state.json"
- config_path: "secrets/config_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/future_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_rest.json"
- config_path: "secrets/config_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

@pytest.fixture(name="input_config")
def parse_input_config():
with open(HERE.parent / "secrets/config_bulk.json", "r") as file:
with open(HERE.parent / "secrets/config.json", "r") as file:
return json.loads(file.read())


Expand All @@ -28,7 +28,7 @@ def get_stream(input_config: Mapping[str, Any], stream_name: str) -> Stream:


def get_any_real_stream(input_config: Mapping[str, Any]) -> Stream:
return get_stream(input_config, "Account")
return get_stream(input_config, "ActiveFeatureLicenseMetric")


def test_not_queryable_stream(caplog, input_config):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "Asset",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "FormulaFunctionAllowedType",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@
"client_secret": "fake-client-secret",
"refresh_token": "fake-refresh-token",
"start_date": "2020-10-02T00:00:00Z",
"is_sandbox": false,
"api_type": "REST"
"is_sandbox": false
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,8 @@ def __init__(
client_secret: str = None,
is_sandbox: bool = None,
start_date: str = None,
api_type: str = None,
**kwargs,
):
self.api_type = api_type.upper() if api_type else None
self.refresh_token = refresh_token
self.token = token
self.client_id = client_id
Expand All @@ -200,11 +198,7 @@ def _get_standard_headers(self):
return {"Authorization": "Bearer {}".format(self.access_token)}

def get_streams_black_list(self) -> List[str]:
black_list = QUERY_RESTRICTED_SALESFORCE_OBJECTS + QUERY_INCOMPATIBLE_SALESFORCE_OBJECTS
if self.api_type == "REST":
return black_list
else:
return black_list + UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS
return QUERY_RESTRICTED_SALESFORCE_OBJECTS + QUERY_INCOMPATIBLE_SALESFORCE_OBJECTS

def filter_streams(self, stream_name: str) -> bool:
# REST and BULK API do not support all entities that end with `ChangeEvent`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.utils.schema_helpers import split_config

from .api import UNSUPPORTED_FILTERING_STREAMS, Salesforce
from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce
from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, SalesforceStream


Expand All @@ -28,18 +28,30 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
return True, None

@classmethod
def generate_streams(cls, config: Mapping[str, Any], stream_names: List[str], sf_object: Salesforce) -> List[Stream]:
def generate_streams(
cls, config: Mapping[str, Any], stream_names: List[str], sf_object: Salesforce, state: Mapping[str, Any] = None
) -> List[Stream]:
""" "Generates a list of stream by their names. It can be used for different tests too"""
authenticator = TokenAuthenticator(sf_object.access_token)
streams_kwargs = {}
if config["api_type"] == "REST":
full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream
else:
full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream
streams_kwargs["wait_timeout"] = config.get("wait_timeout")

streams = []
for stream_name in stream_names:
streams_kwargs = {}
stream_state = state.get(stream_name, {}) if state else {}

selected_properties = sf_object.generate_schema(stream_name).get("properties", {})
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
properties_not_supported_by_bulk = {
key: value for key, value in selected_properties.items() if value.get("format") == "base64" or "object" in value["type"]
}

if stream_state or stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk:
# Use REST API
full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream
else:
# Use BULK API
full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream
streams_kwargs["wait_timeout"] = config.get("wait_timeout")

json_schema = sf_object.generate_schema(stream_name)
pk, replication_key = sf_object.get_pk_and_replication_key(json_schema)
streams_kwargs.update(dict(sf_api=sf_object, pk=pk, stream_name=stream_name, schema=json_schema, authenticator=authenticator))
Expand All @@ -50,10 +62,10 @@ def generate_streams(cls, config: Mapping[str, Any], stream_names: List[str], sf

return streams

def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> List[Stream]:
def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None, state: Mapping[str, Any] = None) -> List[Stream]:
sf = self._get_sf_object(config)
stream_names = sf.get_validated_streams(config=config, catalog=catalog)
return self.generate_streams(config, stream_names, sf)
return self.generate_streams(config, stream_names, sf, state=state)

def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
Expand All @@ -66,7 +78,7 @@ def read(
config, internal_config = split_config(config)
# get the streams once in case the connector needs to make any queries to generate them
logger.info("Starting generating streams")
stream_instances = {s.name: s for s in self.streams(config, catalog=catalog)}
stream_instances = {s.name: s for s in self.streams(config, catalog=catalog, state=state)}
logger.info(f"Starting syncing {self.name}")
self._stream_to_instance_map = stream_instances
for configured_stream in catalog.streams:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Salesforce Source Spec",
"type": "object",
"required": ["client_id", "client_secret", "refresh_token", "api_type"],
"additionalProperties": false,
"required": ["client_id", "client_secret", "refresh_token"],
"additionalProperties": true,
"properties": {
"auth_type": {
"type": "string",
Expand Down Expand Up @@ -41,13 +41,6 @@
"type": "boolean",
"default": false
},
"api_type": {
"title": "API Type",
"description": "Unless you know that you are transferring a very small amount of data, prefer using the BULK API. This will help avoid using up all of your API call quota with Salesforce. Valid values are BULK or REST.",
"type": "string",
"enum": ["BULK", "REST"],
"default": "BULK"
},
"streams_criteria": {
"type": "array",
"items": {
Expand Down
Loading

3 comments on commit 0a3713a

@jkaelin
Copy link

@jkaelin jkaelin commented on 0a3713a Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@augan-rymkhan

As I was testing a pull request I was submitting, I ran across some unexpected behavior related to this commit.

This condition seems to evaluates to TRUE for stream_state in any incremental stream (other than the very first run).
As a result, BULK API supported streams will switch to using the REST/non-BULK API after the first run.

source.generate_streams

            if stream_state or stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk:
                # Use REST API
                full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream

Is this behavior intentional?

By replacing this line with the following, the BulkIncremental stream is leveraged as expected for subsequent runs:

            if stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk:

@augan-rymkhan
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkaelin Yes, that was intentional behaviour. For the initial sync (without state), always use BULK API, where it's possible (Some streams don't support BULK API, for them always use REST). For the subsequent incremental syncs (when state is provided), use REST API.
Did you face any issue related to this logic?
If yes, you can create an issue.

@jkaelin
Copy link

@jkaelin jkaelin commented on 0a3713a Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@augan-rymkhan Thanks.
I guess I'm asking: is there any particular reason that you wouldn't want to use the BULK API for subsequent syncs on streams which supported this method?

I can see a few issues switching to REST but I do not understand if there's a compelling reason to do so.
Happy to create an Issue but wanted to better understand the rationale in case I was missing something.

General issues

  • API usage limits: there are different quotas for REST & BULK API and generally the BULK API is recommended for large data syncs
    • Although incremental syncs should be smaller than the initial sync, they can be quite large for some objects which have rapidly changing data
  • On previous versions of this connector, a user could choose to use the BULK API and this setting would govern both Full & Incremental syncs
    • This version no longer allows BULK API usage in the incremental workflow after the initial sync, even when supported

Specific Issue

  • The BULK API uses the POST HTTP method while the REST API uses the GET HTTP method
    • These methods have different limits on their payload size
    • When querying a large object on the REST API, Salesforce returns 414 URI Too Long which is unhandled in the code

The resolution for my specific issue would depend on whether or not there was a compelling reason to force non-BULK API usage in the first place.

  • If BULK API usage is allowed, dropping the stream_state condition would suffice.
  • If incremental syncs must use the REST API then certain objects will always fail with the 414 error. Therefore, the user should be notified when configuring the connection that incremental syncs are not allowed for this stream.

Please sign in to comment.