diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json index 3e3561699a5f0..ebd1a31d5f93f 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962", "name": "Salesforce", "dockerRepository": "airbyte/source-salesforce", - "dockerImageTag": "0.1.4", + "dockerImageTag": "0.1.6", "documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce", "icon": "salesforce.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 53254af425e9f..1f06117bb4f11 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -492,7 +492,7 @@ - name: Salesforce sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 dockerRepository: airbyte/source-salesforce - dockerImageTag: 0.1.5 + dockerImageTag: 0.1.6 documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 073dff61ccf5e..31450e5838ed5 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5064,7 +5064,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-salesforce:0.1.5" +- dockerImage: "airbyte/source-salesforce:0.1.6" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce" connectionSpecification: @@ -5119,6 +5119,14 @@ - "BULK" - "REST" default: "BULK" + wait_timeout: + title: "Response Waiting Time" + description: "Maximum wait time of Safesforce responses in minutes. This\ + \ option is used for the BULK mode only" + type: "integer" + minimum: 5 + maximum: 60 + default: 10 supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index f4d0c7c3c9d83..213098b0c05f8 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -1,16 +1,29 @@ -FROM python:3.7-slim +FROM python:3.7.11-alpine3.14 as base +FROM base as builder -# Bash is installed for more convenient debugging. -RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/* + +RUN apk --no-cache upgrade \ + && pip install --upgrade pip \ + && apk --no-cache add tzdata build-base WORKDIR /airbyte/integration_code -COPY source_salesforce ./source_salesforce -COPY main.py ./ COPY setup.py ./ -RUN pip install . +RUN pip install --prefix=/install . + + +FROM base +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +WORKDIR /airbyte/integration_code +COPY main.py ./ +COPY source_salesforce ./source_salesforce + ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.5 +LABEL io.airbyte.version=0.1.6 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml index 6338a7e1bd4f5..9bd68f9775c9f 100644 --- a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml @@ -5,6 +5,8 @@ tests: spec: - spec_path: "source_salesforce/spec.json" connection: + - config_path: "secrets/config_bulk.json" + status: "succeed" - config_path: "secrets/config.json" status: "succeed" - config_path: "integration_tests/invalid_config.json" diff --git a/airbyte-integrations/connectors/source-salesforce/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-salesforce/acceptance-test-docker.sh index e4d8b1cef8961..c51577d10690c 100644 --- a/airbyte-integrations/connectors/source-salesforce/acceptance-test-docker.sh +++ b/airbyte-integrations/connectors/source-salesforce/acceptance-test-docker.sh @@ -1,7 +1,7 @@ #!/usr/bin/env sh # Build latest connector image -docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2) +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) # Pull latest acctest image docker pull airbyte/source-acceptance-test:latest diff --git a/airbyte-integrations/connectors/source-salesforce/setup.py b/airbyte-integrations/connectors/source-salesforce/setup.py index a33b797aa0d5c..bcaefe352d5aa 100644 --- a/airbyte-integrations/connectors/source-salesforce/setup.py +++ b/airbyte-integrations/connectors/source-salesforce/setup.py @@ -7,10 +7,7 @@ MAIN_REQUIREMENTS = ["airbyte-cdk", "vcrpy==4.1.1"] -TEST_REQUIREMENTS = [ - "pytest~=6.1", - "source-acceptance-test", -] +TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "requests_mock", "pytest-timeout"] setup( name="source_salesforce", diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py index 81293589be4e3..fa1b743b0fd51 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py @@ -179,6 +179,7 @@ def __init__( is_sandbox: bool = None, start_date: str = None, api_type: str = None, + **kwargs, ): self.api_type = api_type.upper() if api_type else None self.refresh_token = refresh_token @@ -252,16 +253,16 @@ def login(self): def describe(self, sobject: str = None) -> Mapping[str, Any]: """Describes all objects or a specific object""" headers = self._get_standard_headers() + endpoint = "sobjects" if not sobject else f"sobjects/{sobject}/describe" url = f"{self.instance_url}/services/data/{self.version}/{endpoint}" resp = self._make_request("GET", url, headers=headers) - return resp.json() - def generate_schema(self, stream_name: str) -> Mapping[str, Any]: - schema = {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": True, "properties": {}} + def generate_schema(self, stream_name: str = None) -> Mapping[str, Any]: response = self.describe(stream_name) + schema = {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": True, "properties": {}} for field in response["fields"]: schema["properties"][field["name"]] = self.field_to_property_schema(field) return schema diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py index c09327cb875e9..4c58e795444e9 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py @@ -37,7 +37,7 @@ def should_give_up(exc): give_up = False if give_up: - logger.info(f"Giving up for returned HTTP status: {exc.response.status_code}") + logger.info(f"Giving up for returned HTTP status: {exc.response.status_code}, body: {exc.response.json()}") return give_up return backoff.on_exception( diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index 0e5977605bfc5..73d86f9d18cf0 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -27,21 +27,22 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> _ = self._get_sf_object(config) return True, None - def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> List[Stream]: - sf = self._get_sf_object(config) - authenticator = TokenAuthenticator(sf.access_token) - stream_names = sf.get_validated_streams(catalog=catalog) - + @classmethod + def generate_streams(cls, config: Mapping[str, Any], stream_names: List[str], sf_object: Salesforce) -> List[Stream]: + """ "Generates a list of stream by their names. It can be used for different tests too""" + authenticator = TokenAuthenticator(sf_object.access_token) + streams_kwargs = {} if config["api_type"] == "REST": full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream else: full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream + streams_kwargs["wait_timeout"] = config.get("wait_timeout") streams = [] for stream_name in stream_names: - json_schema = sf.generate_schema(stream_name) - pk, replication_key = sf.get_pk_and_replication_key(json_schema) - streams_kwargs = dict(sf_api=sf, pk=pk, stream_name=stream_name, schema=json_schema, authenticator=authenticator) + json_schema = sf_object.generate_schema(stream_name) + pk, replication_key = sf_object.get_pk_and_replication_key(json_schema) + streams_kwargs.update(dict(sf_api=sf_object, pk=pk, stream_name=stream_name, schema=json_schema, authenticator=authenticator)) if replication_key and stream_name not in UNSUPPORTED_FILTERING_STREAMS: streams.append(incremental(**streams_kwargs, replication_key=replication_key, start_date=config["start_date"])) else: @@ -49,6 +50,11 @@ def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = return streams + def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> List[Stream]: + sf = self._get_sf_object(config) + stream_names = sf.get_validated_streams(catalog=catalog) + return self.generate_streams(config, stream_names, sf) + def read( self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None ) -> Iterator[AirbyteMessage]: diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json index a167a53fc2c4d..46c6f1844317b 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json @@ -43,6 +43,14 @@ "type": "string", "enum": ["BULK", "REST"], "default": "BULK" + }, + "wait_timeout": { + "title": "Response Waiting Time", + "description": "Maximum wait time of Safesforce responses in minutes. This option is used for the BULK mode only", + "type": "integer", + "minimum": 5, + "maximum": 60, + "default": 10 } } }, diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 0dbc677909c67..26a12c5a051d2 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -3,6 +3,7 @@ # import csv +import math import time from abc import ABC from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union @@ -12,6 +13,7 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer +from pendulum import DateTime from requests import codes, exceptions from .api import UNSUPPORTED_FILTERING_STREAMS, Salesforce @@ -82,7 +84,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp def get_json_schema(self) -> Mapping[str, Any]: if not self.schema: - self.schema = self.sf_api.generate_schema(self.name) + self.schema = self.sf_api.generate_schema([self.name]) return self.schema def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: @@ -99,8 +101,13 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: class BulkSalesforceStream(SalesforceStream): page_size = 30000 - JOB_WAIT_TIMEOUT_MINS = 10 - CHECK_INTERVAL_SECONDS = 2 + DEFAULT_WAIT_TIMEOUT_MINS = 10 + MAX_CHECK_INTERVAL_SECONDS = 2.0 + MAX_RETRY_NUMBER = 3 + + def __init__(self, wait_timeout: Optional[int], **kwargs): + super().__init__(**kwargs) + self._wait_timeout = wait_timeout or self.DEFAULT_WAIT_TIMEOUT_MINS def path(self, **kwargs) -> str: return f"/services/data/{self.sf_api.version}/jobs/query" @@ -125,9 +132,12 @@ def _send_http_request(self, method: str, url: str, json: dict = None): return response def create_stream_job(self, query: str, url: str) -> Optional[str]: + """ + docs: https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/create_job.htm + """ json = {"operation": "queryAll", "query": query, "contentType": "CSV", "columnDelimiter": "COMMA", "lineEnding": "LF"} try: - response = self._send_http_request("POST", f"{self.url_base}/{url}", json=json) + response = self._send_http_request("POST", url, json=json) job_id = response.json()["id"] self.logger.info(f"Created Job: {job_id} to sync {self.name}") return job_id @@ -146,20 +156,52 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: raise error def wait_for_job(self, url: str) -> str: - start_time = pendulum.now() - while True: - job_info = self._send_http_request("GET", url=url) - job_id = job_info.json()["id"] - job_status = job_info.json()["state"] - - if job_info.json()["state"] in ["JobComplete", "Aborted", "Failed"]: + # using "seconds" argument because self._wait_timeout can be changed by tests + expiration_time: DateTime = pendulum.now().add(seconds=int(self._wait_timeout * 60.0)) + job_status = "InProgress" + delay_timeout = 0 + delay_cnt = 0 + job_info = None + # minimal starting delay is 0.5 seconds. + # this value was received empirically + time.sleep(0.5) + while pendulum.now() < expiration_time: + job_info = self._send_http_request("GET", url=url).json() + job_status = job_info["state"] + if job_status in ["JobComplete", "Aborted", "Failed"]: return job_status - if pendulum.now() > start_time.add(minutes=self.JOB_WAIT_TIMEOUT_MINS): - return job_status + if delay_timeout < self.MAX_CHECK_INTERVAL_SECONDS: + delay_timeout = 0.5 + math.exp(delay_cnt) / 1000.0 + delay_cnt += 1 + + time.sleep(delay_timeout) + job_id = job_info["id"] + self.logger.info( + f"Sleeping {delay_timeout} seconds while waiting for Job: {self.name}/{job_id}" f" to complete. Current state: {job_status}" + ) + + self.logger.warning(f"Not wait the {self.name} data for {self._wait_timeout} minutes, data: {job_info}!!") + return job_status - self.logger.info(f"Sleeping {self.CHECK_INTERVAL_SECONDS} seconds while waiting for Job: {job_id} to complete") - time.sleep(self.CHECK_INTERVAL_SECONDS) + def execute_job(self, query: Mapping[str, Any], url: str) -> str: + job_status = "Failed" + for i in range(0, self.MAX_RETRY_NUMBER): + job_id = self.create_stream_job(query=query, url=url) + if not job_id: + return None + job_full_url = f"{url}/{job_id}" + job_status = self.wait_for_job(url=job_full_url) + if job_status not in ["UploadComplete", "InProgress"]: + break + self.logger.error(f"Waiting error. Try to run this job again {i+1}/{self.MAX_RETRY_NUMBER}...") + self.abort_job(url=job_full_url) + job_status = "Aborted" + + if job_status in ["Aborted", "Failed"]: + self.delete_job(url=job_full_url) + raise Exception(f"Job for {self.name} stream using BULK API was failed.") + return job_full_url def download_data(self, url: str) -> Tuple[int, dict]: job_data = self._send_http_request("GET", f"{url}/results") @@ -174,6 +216,7 @@ def download_data(self, url: str) -> Tuple[int, dict]: def abort_job(self, url: str): data = {"state": "Aborted"} self._send_http_request("PATCH", url=url, json=data) + self.logger.warning("Broken job was aborted") def delete_job(self, url: str): self._send_http_request("DELETE", url=url) @@ -190,38 +233,29 @@ def read_records( stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: stream_state = stream_state or {} - pagination_complete = False next_page_token = None - while not pagination_complete: + while True: params = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) path = self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - full_url = f"{self.url_base}/{path}" - job_id = self.create_stream_job(query=params["q"], url=path) - if not job_id: + job_full_url = self.execute_job(query=params["q"], url=f"{self.url_base}{path}") + if not job_full_url: return - job_full_url = f"{full_url}/{job_id}" - job_status = self.wait_for_job(url=job_full_url) - if job_status == "JobComplete": - count = 0 - for count, record in self.download_data(url=job_full_url): - yield record - - if count == self.page_size: - next_page_token = self.next_page_token(record) - if not next_page_token: - pagination_complete = True - else: - pagination_complete = True - if job_status in ["UploadComplete", "InProgress"]: - self.abort_job(url=job_full_url) - job_status = "Aborted" + count = 0 + for count, record in self.download_data(url=job_full_url): + yield record + self.delete_job(url=job_full_url) + + if count < self.page_size: + # this is a last page + break + + next_page_token = self.next_page_token(record) + if not next_page_token: + # not found a next page data. + break - if job_status in ["JobComplete", "Aborted", "Failed"]: - self.delete_job(url=job_full_url) - if job_status in ["Aborted", "Failed"]: - raise Exception(f"Job for {self.name} stream using BULK API was failed") class IncrementalSalesforceStream(SalesforceStream, ABC): diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py index e1814314fc3b0..d9b68d7c35963 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py @@ -2,6 +2,178 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # +from unittest.mock import Mock -def test_example_method(): - assert True +import pytest +import requests_mock +from airbyte_cdk.models import SyncMode +from requests.exceptions import HTTPError +from source_salesforce.api import Salesforce +from source_salesforce.source import SourceSalesforce +from source_salesforce.streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, SalesforceStream + + +@pytest.fixture(scope="module") +def stream_bulk_config(): + """Generates streams settings for BULK logic""" + return { + "client_id": "fake_client_id", + "client_secret": "fake_client_secret", + "refresh_token": "fake_refresh_token", + "start_date": "2010-01-18T21:18:20Z", + "is_sandbox": False, + "wait_timeout": 15, + "api_type": "BULK", + } + + +@pytest.fixture(scope="module") +def stream_rest_config(): + """Generates streams settings for BULK logic""" + return { + "client_id": "fake_client_id", + "client_secret": "fake_client_secret", + "refresh_token": "fake_refresh_token", + "start_date": "2010-01-18T21:18:20Z", + "is_sandbox": False, + "wait_timeout": 15, + "api_type": "REST", + } + + +def _stream_api(stream_config): + sf_object = Salesforce(**stream_config) + sf_object.login = Mock() + sf_object.access_token = Mock() + sf_object.instance_url = "https://fase-account.salesforce.com" + sf_object.describe = Mock(return_value={"fields": [{"name": "LastModifiedDate", "type": "string"}]}) + return sf_object + + +@pytest.fixture(scope="module") +def stream_rest_api(stream_rest_config): + return _stream_api(stream_rest_config) + + +@pytest.fixture(scope="module") +def stream_bulk_api(stream_bulk_config): + return _stream_api(stream_bulk_config) + + +def _generate_stream(stream_name, stream_config, stream_api): + return SourceSalesforce.generate_streams(stream_config, [stream_name], stream_api)[0] + + +@pytest.mark.parametrize( + "api_type,stream_name,expected_cls", + [ + ("BULK", "Account", BulkIncrementalSalesforceStream), + ("BULK", "FormulaFunctionAllowedType", BulkSalesforceStream), + ("REST", "ActiveFeatureLicenseMetric", IncrementalSalesforceStream), + ("REST", "AppDefinition", SalesforceStream), + ], +) +def test_stream_generator(api_type, stream_name, expected_cls, stream_bulk_config, stream_bulk_api, stream_rest_config, stream_rest_api): + stream_config, stream_api = (stream_rest_config, stream_rest_api) if api_type == "REST" else (stream_bulk_config, stream_bulk_api) + stream = _generate_stream(stream_name, stream_config, stream_api) + assert stream.name == stream_name + assert isinstance(stream, expected_cls) + + +def test_bulk_sync_creation_failed(stream_bulk_config, stream_bulk_api): + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) + with requests_mock.Mocker() as m: + m.register_uri("POST", stream.path(), status_code=400, json=[{"message": "test_error"}]) + with pytest.raises(HTTPError) as err: + next(stream.read_records(sync_mode=SyncMode.full_refresh)) + assert err.value.response.json()[0]["message"] == "test_error" + + +@pytest.mark.parametrize("item_number", [0, 15, 2000, 2324, 193434]) +def test_bulk_sync_pagination(item_number, stream_bulk_config, stream_bulk_api): + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) + test_ids = [i for i in range(1, item_number)] + pages = [test_ids[i : i + stream.page_size] for i in range(0, len(test_ids), stream.page_size)] + if not pages: + pages = [[]] + with requests_mock.Mocker() as m: + creation_responses = [] + + for page in range(len(pages)): + job_id = f"fake_job_{page}" + creation_responses.append({"json": {"id": job_id}}) + m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"}) + resp = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-16,{i}" for i in pages[page]] + m.register_uri("GET", stream.path() + f"/{job_id}/results", text="\n".join(resp)) + m.register_uri("DELETE", stream.path() + f"/{job_id}") + m.register_uri("POST", stream.path(), creation_responses) + + loaded_ids = [int(record["ID"]) for record in stream.read_records(sync_mode=SyncMode.full_refresh)] + assert not set(test_ids).symmetric_difference(set(loaded_ids)) + post_request_count = len([r for r in m.request_history if r.method == "POST"]) + assert post_request_count == len(pages) + + +def _prepare_mock(m, stream): + job_id = "fake_job_1" + m.register_uri("POST", stream.path(), json={"id": job_id}) + m.register_uri("DELETE", stream.path() + f"/{job_id}") + m.register_uri("GET", stream.path() + f"/{job_id}/results", text="Field1,LastModifiedDate,ID\ntest,2021-11-16,1") + m.register_uri("PATCH", stream.path() + f"/{job_id}", text="") + return job_id + + +def _get_result_id(stream): + return int(list(stream.read_records(sync_mode=SyncMode.full_refresh))[0]["ID"]) + + +def test_bulk_sync_successful(stream_bulk_config, stream_bulk_api): + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) + with requests_mock.Mocker() as m: + job_id = _prepare_mock(m, stream) + m.register_uri("GET", stream.path() + f"/{job_id}", [{"json": {"state": "JobComplete"}}]) + assert _get_result_id(stream) == 1 + + +def test_bulk_sync_successful_long_response(stream_bulk_config, stream_bulk_api): + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) + with requests_mock.Mocker() as m: + job_id = _prepare_mock(m, stream) + m.register_uri( + "GET", + stream.path() + f"/{job_id}", + [ + {"json": {"state": "UploadComplete", "id": job_id}}, + {"json": {"state": "InProgress", "id": job_id}}, + {"json": {"state": "JobComplete", "id": job_id}}, + ], + ) + assert _get_result_id(stream) == 1 + + +# maximum timeout is wait_timeout * max_retry_attempt +# this test tries to check a job state 17 times with +-1second for very one +@pytest.mark.timeout(17) +def test_bulk_sync_successful_retry(stream_bulk_config, stream_bulk_api): + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) + stream._wait_timeout = 0.1 # maximum wait timeout will be 6 seconds + with requests_mock.Mocker() as m: + job_id = _prepare_mock(m, stream) + # 2 failed attempts, 3rd one should be successful + states = [{"json": {"state": "InProgress", "id": job_id}}] * 17 + states.append({"json": {"state": "JobComplete", "id": job_id}}) + # raise Exception(states) + m.register_uri("GET", stream.path() + f"/{job_id}", states) + assert _get_result_id(stream) == 1 + + +@pytest.mark.timeout(30) +def test_bulk_sync_failed_retry(stream_bulk_config, stream_bulk_api): + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) + stream._wait_timeout = 0.1 # maximum wait timeout will be 6 seconds + with requests_mock.Mocker() as m: + job_id = _prepare_mock(m, stream) + m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "InProgress", "id": job_id}) + with pytest.raises(Exception) as err: + next(stream.read_records(sync_mode=SyncMode.full_refresh)) + assert "stream using BULK API was failed" in str(err.value) diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 9a9a86711ad16..ea0b908a1bbbf 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -734,6 +734,8 @@ List of available streams: | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | + +| 0.1.6 | 2021-11-16 | [8009](https://github.com/airbytehq/airbyte/pull/8009) | Fix retring of BULK jobs | | 0.1.5 | 2021-11-15 | [7885](https://github.com/airbytehq/airbyte/pull/7885) | Add `Transform` for output records | | 0.1.4 | 2021-11-09 | [7778](https://github.com/airbytehq/airbyte/pull/7778) | Fix types for `anyType` fields | | 0.1.3 | 2021-11-06 | [7592](https://github.com/airbytehq/airbyte/pull/7592) | Fix getting `anyType` fields using BULK API | diff --git a/tools/bin/ci_credentials.sh b/tools/bin/ci_credentials.sh index f3592e9770857..cd586e0070fe5 100755 --- a/tools/bin/ci_credentials.sh +++ b/tools/bin/ci_credentials.sh @@ -95,7 +95,7 @@ function export_gsm_secrets(){ # docs: https://cloud.google.com/secret-manager/docs/filtering#api local filter="name:SECRET_" - [[ ${CONNECTOR_NAME} != "all" ]] && filter="${filter} AND labels.connector:${CONNECTOR_NAME}" + [[ ${CONNECTOR_NAME} != "all" ]] && filter="${filter} AND labels.connector=${CONNECTOR_NAME}" local uri="https://secretmanager.googleapis.com/v1/projects/${project_id}/secrets" local next_token='' while true; do @@ -264,8 +264,6 @@ read_secrets source-redshift "$AWS_REDSHIFT_INTEGRATION_TEST_CREDS" read_secrets source-retently "$SOURCE_RETENTLY_TEST_CREDS" read_secrets source-s3 "$SOURCE_S3_TEST_CREDS" read_secrets source-s3 "$SOURCE_S3_PARQUET_CREDS" "parquet_config.json" -read_secrets source-salesforce "$SALESFORCE_BULK_INTEGRATION_TESTS_CREDS" "config_bulk.json" -read_secrets source-salesforce "$SALESFORCE_INTEGRATION_TESTS_CREDS" read_secrets source-salesloft "$SOURCE_SALESLOFT_TEST_CREDS" read_secrets source-sendgrid "$SENDGRID_INTEGRATION_TEST_CREDS" read_secrets source-shopify "$SHOPIFY_INTEGRATION_TEST_CREDS"