Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Salesforce: Deprecate API Type parameter #9302

Merged
merged 16 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ tests:
spec:
- spec_path: "source_salesforce/spec.json"
connection:
- config_path: "secrets/config_bulk.json"
status: "succeed"
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
Expand All @@ -15,18 +13,11 @@ tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_rest.json"
- config_path: "secrets/config_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_rest.json"
future_state_path: "integration_tests/future_state.json"
- config_path: "secrets/config_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/future_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_rest.json"
- config_path: "secrets/config_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

@pytest.fixture(name="input_config")
def parse_input_config():
with open(HERE.parent / "secrets/config_bulk.json", "r") as file:
with open(HERE.parent / "secrets/config.json", "r") as file:
return json.loads(file.read())


Expand All @@ -28,7 +28,7 @@ def get_stream(input_config: Mapping[str, Any], stream_name: str) -> Stream:


def get_any_real_stream(input_config: Mapping[str, Any]) -> Stream:
return get_stream(input_config, "Account")
return get_stream(input_config, "ActiveFeatureLicenseMetric")


def test_not_queryable_stream(caplog, input_config):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "Asset",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "FormulaFunctionAllowedType",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@
"client_secret": "fake-client-secret",
"refresh_token": "fake-refresh-token",
"start_date": "2020-10-02T00:00:00Z",
"is_sandbox": false,
"api_type": "REST"
"is_sandbox": false
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,8 @@ def __init__(
client_secret: str = None,
is_sandbox: bool = None,
start_date: str = None,
api_type: str = None,
**kwargs,
):
self.api_type = api_type.upper() if api_type else None
self.refresh_token = refresh_token
self.token = token
self.client_id = client_id
Expand All @@ -200,11 +198,7 @@ def _get_standard_headers(self):
return {"Authorization": "Bearer {}".format(self.access_token)}

def get_streams_black_list(self) -> List[str]:
black_list = QUERY_RESTRICTED_SALESFORCE_OBJECTS + QUERY_INCOMPATIBLE_SALESFORCE_OBJECTS
if self.api_type == "REST":
return black_list
else:
return black_list + UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS
return QUERY_RESTRICTED_SALESFORCE_OBJECTS + QUERY_INCOMPATIBLE_SALESFORCE_OBJECTS

def filter_streams(self, stream_name: str) -> bool:
# REST and BULK API do not support all entities that end with `ChangeEvent`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.utils.schema_helpers import split_config

from .api import UNSUPPORTED_FILTERING_STREAMS, Salesforce
from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce
from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, SalesforceStream


Expand All @@ -28,18 +28,30 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
return True, None

@classmethod
def generate_streams(cls, config: Mapping[str, Any], stream_names: List[str], sf_object: Salesforce) -> List[Stream]:
def generate_streams(
cls, config: Mapping[str, Any], stream_names: List[str], sf_object: Salesforce, state: Mapping[str, Any] = None
) -> List[Stream]:
""" "Generates a list of stream by their names. It can be used for different tests too"""
authenticator = TokenAuthenticator(sf_object.access_token)
streams_kwargs = {}
if config["api_type"] == "REST":
full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream
else:
full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream
streams_kwargs["wait_timeout"] = config.get("wait_timeout")

streams = []
for stream_name in stream_names:
streams_kwargs = {}
stream_state = state.get(stream_name, {}) if state else {}

selected_properties = sf_object.generate_schema(stream_name).get("properties", {})
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
properties_not_supported_by_bulk = {
key: value for key, value in selected_properties.items() if value.get("format") == "base64" or "object" in value["type"]
}

if stream_state or stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk:
# Use REST API
full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream
else:
# Use BULK API
full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream
streams_kwargs["wait_timeout"] = config.get("wait_timeout")

json_schema = sf_object.generate_schema(stream_name)
pk, replication_key = sf_object.get_pk_and_replication_key(json_schema)
streams_kwargs.update(dict(sf_api=sf_object, pk=pk, stream_name=stream_name, schema=json_schema, authenticator=authenticator))
Expand All @@ -50,10 +62,10 @@ def generate_streams(cls, config: Mapping[str, Any], stream_names: List[str], sf

return streams

def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> List[Stream]:
def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None, state: Mapping[str, Any] = None) -> List[Stream]:
sf = self._get_sf_object(config)
stream_names = sf.get_validated_streams(config=config, catalog=catalog)
return self.generate_streams(config, stream_names, sf)
return self.generate_streams(config, stream_names, sf, state=state)

def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
Expand All @@ -66,7 +78,7 @@ def read(
config, internal_config = split_config(config)
# get the streams once in case the connector needs to make any queries to generate them
logger.info("Starting generating streams")
stream_instances = {s.name: s for s in self.streams(config, catalog=catalog)}
stream_instances = {s.name: s for s in self.streams(config, catalog=catalog, state=state)}
logger.info(f"Starting syncing {self.name}")
self._stream_to_instance_map = stream_instances
for configured_stream in catalog.streams:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Salesforce Source Spec",
"type": "object",
"required": ["client_id", "client_secret", "refresh_token", "api_type"],
"additionalProperties": false,
"required": ["client_id", "client_secret", "refresh_token"],
"additionalProperties": true,
Copy link
Contributor Author

@augan-rymkhan augan-rymkhan Jan 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set it to true because api_type parameter is removed from the spec but it is provided on config files. It's temporary solution not to fail checks.

"properties": {
"auth_type": {
"type": "string",
Expand Down Expand Up @@ -41,13 +41,6 @@
"type": "boolean",
"default": false
},
"api_type": {
"title": "API Type",
"description": "Unless you know that you are transferring a very small amount of data, prefer using the BULK API. This will help avoid using up all of your API call quota with Salesforce. Valid values are BULK or REST.",
"type": "string",
"enum": ["BULK", "REST"],
"default": "BULK"
},
"streams_criteria": {
"type": "array",
"items": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

class SalesforceStream(HttpStream, ABC):
page_size = 2000

transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)

def __init__(self, sf_api: Salesforce, pk: str, stream_name: str, schema: dict = None, **kwargs):
Expand Down Expand Up @@ -60,15 +59,6 @@ def request_params(
"""

selected_properties = self.get_json_schema().get("properties", {})

# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
if self.sf_api.api_type == "BULK":
selected_properties = {
key: value
for key, value in selected_properties.items()
if value.get("format") != "base64" and "object" not in value["type"]
}

query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
if next_page_token:
query += next_page_token
Expand Down Expand Up @@ -315,14 +305,6 @@ def request_params(
) -> MutableMapping[str, Any]:
selected_properties = self.get_json_schema().get("properties", {})

# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
if self.sf_api.api_type == "BULK":
selected_properties = {
key: value
for key, value in selected_properties.items()
if value.get("format") != "base64" and "object" not in value["type"]
}

stream_date = stream_state.get(self.cursor_field)
start_date = next_page_token or stream_date or self.start_date

Expand Down
Loading