Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source salesforce: failed async bulk jobs #8009

Merged
merged 13 commits into from
Nov 17, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962",
"name": "Salesforce",
"dockerRepository": "airbyte/source-salesforce",
"dockerImageTag": "0.1.4",
"dockerImageTag": "0.1.5",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce",
"icon": "salesforce.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 0.1.4
dockerImageTag: 0.1.5
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5028,7 +5028,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:0.1.4"
- dockerImage: "airbyte/source-salesforce:0.1.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce"
connectionSpecification:
Expand Down
27 changes: 20 additions & 7 deletions airbyte-integrations/connectors/source-salesforce/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
FROM python:3.7-slim
FROM python:3.7.11-alpine3.14 as base
FROM base as builder

# Bash is installed for more convenient debugging.
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*

RUN apk --no-cache upgrade \
&& pip install --upgrade pip \
&& apk --no-cache add tzdata build-base

WORKDIR /airbyte/integration_code
COPY source_salesforce ./source_salesforce
COPY main.py ./
COPY setup.py ./
RUN pip install .
RUN pip install --prefix=/install .


FROM base
COPY --from=builder /install /usr/local
# add default timezone settings
COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
RUN echo "Etc/UTC" > /etc/timezone

WORKDIR /airbyte/integration_code
COPY main.py ./
COPY source_salesforce ./source_salesforce


ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ tests:
spec:
- spec_path: "source_salesforce/spec.json"
connection:
- config_path: "secrets/config_bulk.json"
status: "succeed"
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env sh

# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2)
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also do this in templates? (can be separate pr)

Copy link
Contributor Author

@antixar antixar Nov 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning to aggregate more improvements for a separate PR)) for example I want to fix an issue with running of custom acceptance tests locally. These tests can be executed by GitHub CI only and developers can't run them by the script acceptance-test-docker.sh


# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"streams": [
{
"stream": {
"name": "Account",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@

MAIN_REQUIREMENTS = ["airbyte-cdk", "vcrpy==4.1.1"]

TEST_REQUIREMENTS = [
"pytest~=6.1",
"source-acceptance-test",
]
TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "requests_mock", "pytest-timeout"]

setup(
name="source_salesforce",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def __init__(
is_sandbox: bool = None,
start_date: str = None,
api_type: str = None,
**kwargs,
):
self.api_type = api_type.upper() if api_type else None
self.refresh_token = refresh_token
Expand Down Expand Up @@ -252,16 +253,16 @@ def login(self):
def describe(self, sobject: str = None) -> Mapping[str, Any]:
"""Describes all objects or a specific object"""
headers = self._get_standard_headers()

endpoint = "sobjects" if not sobject else f"sobjects/{sobject}/describe"

url = f"{self.instance_url}/services/data/{self.version}/{endpoint}"
resp = self._make_request("GET", url, headers=headers)

return resp.json()

def generate_schema(self, stream_name: str) -> Mapping[str, Any]:
schema = {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": True, "properties": {}}
def generate_schema(self, stream_name: str = None) -> Mapping[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did the default value change? is it meaningful to call this without a stream name supplied?

response = self.describe(stream_name)
schema = {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": True, "properties": {}}
for field in response["fields"]:
schema["properties"][field["name"]] = self.field_to_property_schema(field)
return schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def should_give_up(exc):
give_up = False

if give_up:
logger.info(f"Giving up for returned HTTP status: {exc.response.status_code}")
logger.info(f"Giving up for returned HTTP status: {exc.response.status_code}, body: {exc.response.json()}")
return give_up

return backoff.on_exception(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,34 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
_ = self._get_sf_object(config)
return True, None

def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> List[Stream]:
sf = self._get_sf_object(config)
authenticator = TokenAuthenticator(sf.access_token)
stream_names = sf.get_validated_streams(catalog=catalog)

@classmethod
def generate_streams(cls, config: Mapping[str, Any], stream_names: List[str], sf_object: Salesforce) -> List[Stream]:
""" "Generates a list of stream by their names. It can be used for different tests too"""
authenticator = TokenAuthenticator(sf_object.access_token)
streams_kwargs = {}
if config["api_type"] == "REST":
full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream
else:
full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream
streams_kwargs["wait_timeout"] = config.get("wait_timeout")

streams = []
for stream_name in stream_names:
json_schema = sf.generate_schema(stream_name)
pk, replication_key = sf.get_pk_and_replication_key(json_schema)
streams_kwargs = dict(sf_api=sf, pk=pk, stream_name=stream_name, schema=json_schema, authenticator=authenticator)
json_schema = sf_object.generate_schema(stream_name)
pk, replication_key = sf_object.get_pk_and_replication_key(json_schema)
streams_kwargs.update(dict(sf_api=sf_object, pk=pk, stream_name=stream_name, schema=json_schema, authenticator=authenticator))
if replication_key and stream_name not in UNSUPPORTED_FILTERING_STREAMS:
streams.append(incremental(**streams_kwargs, replication_key=replication_key, start_date=config["start_date"]))
else:
streams.append(full_refresh(**streams_kwargs))

return streams

def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> List[Stream]:
sf = self._get_sf_object(config)
stream_names = sf.get_validated_streams(catalog=catalog)
return self.generate_streams(config, stream_names, sf)

def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
) -> Iterator[AirbyteMessage]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@
"type": "string",
"enum": ["BULK", "REST"],
"default": "BULK"
},
"wait_timeout": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this option should not be exposed - it is an implementation detail. See the UX handbook for more context behind why it's not a good idea to expose it. Maybe instead we can use a dynamically increasing wait time for each job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus I forwarded this PR to Airbyte team)
Your idea with autoscaling of waiting timeout is pretty but we can reproduce this case with long responses locally. For more details I added more informative log messages for troubleshooting of possible similar cases.

"title": "Response Waiting Time",
"description": "Maximum wait time of Safesforce responses in minutes. This option is used for the BULK mode only",
"type": "integer",
"minimum": 5,
"maximum": 60,
"default": 10
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import csv
import json
import math
import time
from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
Expand All @@ -12,6 +13,7 @@
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
from pendulum import DateTime
from requests import codes, exceptions

from .api import UNSUPPORTED_FILTERING_STREAMS, Salesforce
Expand Down Expand Up @@ -80,7 +82,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp

def get_json_schema(self) -> Mapping[str, Any]:
if not self.schema:
self.schema = self.sf_api.generate_schema(self.name)
self.schema = self.sf_api.generate_schema([self.name])
return self.schema

def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
Expand All @@ -97,8 +99,13 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
class BulkSalesforceStream(SalesforceStream):

page_size = 30000
JOB_WAIT_TIMEOUT_MINS = 10
CHECK_INTERVAL_SECONDS = 2
DEFAULT_WAIT_TIMEOUT_MINS = 10
MAX_CHECK_INTERVAL_SECONDS = 2.0
MAX_RETRY_NUMBER = 3

def __init__(self, wait_timeout: Optional[int], **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every time we pass a time duration we should either use a data type which encodes the unit of time or we should embed the unit of time in the var name e.g: wait_timeout_in_minutes

super().__init__(**kwargs)
self._wait_timeout = wait_timeout or self.DEFAULT_WAIT_TIMEOUT_MINS

def path(self, **kwargs) -> str:
return f"/services/data/{self.sf_api.version}/jobs/query"
Expand All @@ -111,9 +118,12 @@ def _send_http_request(self, method: str, url: str, json: dict = None):
return response

def create_stream_job(self, query: str, url: str) -> Optional[str]:
"""
docs: https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/create_job.htm
"""
json = {"operation": "queryAll", "query": query, "contentType": "CSV", "columnDelimiter": "COMMA", "lineEnding": "LF"}
try:
response = self._send_http_request("POST", f"{self.url_base}/{url}", json=json)
response = self._send_http_request("POST", url, json=json)
job_id = response.json()["id"]
self.logger.info(f"Created Job: {job_id} to sync {self.name}")
return job_id
Expand All @@ -132,20 +142,52 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]:
raise error

def wait_for_job(self, url: str) -> str:
start_time = pendulum.now()
while True:
job_info = self._send_http_request("GET", url=url)
job_id = job_info.json()["id"]
job_status = job_info.json()["state"]

if job_info.json()["state"] in ["JobComplete", "Aborted", "Failed"]:
# using "seconds" argument because self._wait_timeout can be changed by tests
expiration_time: DateTime = pendulum.now().add(seconds=int(self._wait_timeout * 60.0))
job_status = "InProgress"
delay_timeout = 0
delay_cnt = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use a more expressive variable name?

job_info = None
# minimal starting delay is 0.5 seconds.
# this value was received empirically
time.sleep(0.5)
while pendulum.now() < expiration_time:
job_info = self._send_http_request("GET", url=url).json()
job_status = job_info["state"]
if job_status in ["JobComplete", "Aborted", "Failed"]:
return job_status

if pendulum.now() > start_time.add(minutes=self.JOB_WAIT_TIMEOUT_MINS):
return job_status
if delay_timeout < self.MAX_CHECK_INTERVAL_SECONDS:
delay_timeout = 0.5 + math.exp(delay_cnt) / 1000.0
delay_cnt += 1

time.sleep(delay_timeout)
job_id = job_info["id"]
self.logger.info(
f"Sleeping {delay_timeout} seconds while waiting for Job: {self.name}/{job_id}" f" to complete. Current state: {job_status}"
)

self.logger.warning(f"Not wait the {self.name} data for {self._wait_timeout} minutes, data: {job_info}!!")
return job_status

self.logger.info(f"Sleeping {self.CHECK_INTERVAL_SECONDS} seconds while waiting for Job: {job_id} to complete")
time.sleep(self.CHECK_INTERVAL_SECONDS)
def execute_job(self, query: Mapping[str, Any], url: str) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

signature should be -> Optional[str]

job_status = "Failed"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's probably better to encode these job statuses using an enum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it was the bug-fixing only with minimal refactoring. Sure we can do it while a next review.

for i in range(0, self.MAX_RETRY_NUMBER):
job_id = self.create_stream_job(query=query, url=url)
if not job_id:
return None
job_full_url = f"{url}/{job_id}"
job_status = self.wait_for_job(url=job_full_url)
if job_status not in ["UploadComplete", "InProgress"]:
break
self.logger.error(f"Waiting error. Try to run this job again {i+1}/{self.MAX_RETRY_NUMBER}...")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.logger.error(f"Waiting error. Try to run this job again {i+1}/{self.MAX_RETRY_NUMBER}...")
self.logger.error(f"Waiting error. Running retry {i+1} out of {self.MAX_RETRY_NUMBER}...")

self.abort_job(url=job_full_url)
job_status = "Aborted"

if job_status in ["Aborted", "Failed"]:
self.delete_job(url=job_full_url)
raise Exception(f"Job for {self.name} stream using BULK API was failed.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we be retrying this job instead of failing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not sure about possible reasons of failures. Unfortunately I didn't succeed to catch any real failed or aborted job. Maybe there are some internal job failure(timeout or resource issues)

return job_full_url

def download_data(self, url: str) -> Tuple[int, dict]:
job_data = self._send_http_request("GET", f"{url}/results")
Expand All @@ -160,6 +202,7 @@ def download_data(self, url: str) -> Tuple[int, dict]:
def abort_job(self, url: str):
data = {"state": "Aborted"}
self._send_http_request("PATCH", url=url, json=data)
self.logger.warning("Broken job was aborted")

def delete_job(self, url: str):
self._send_http_request("DELETE", url=url)
Expand Down Expand Up @@ -216,38 +259,27 @@ def read_records(
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
pagination_complete = False
next_page_token = None

while not pagination_complete:
while True:
params = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
path = self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
full_url = f"{self.url_base}/{path}"
job_id = self.create_stream_job(query=params["q"], url=path)
if not job_id:
job_full_url = self.execute_job(query=params["q"], url=f"{self.url_base}{path}")
if not job_full_url:
return
job_full_url = f"{full_url}/{job_id}"
job_status = self.wait_for_job(url=job_full_url)
if job_status == "JobComplete":
count = 0
for count, record in self.download_data(url=job_full_url):
yield self.transform(record)

if count == self.page_size:
next_page_token = self.next_page_token(record)
if not next_page_token:
pagination_complete = True
else:
pagination_complete = True

if job_status in ["UploadComplete", "InProgress"]:
self.abort_job(url=job_full_url)
job_status = "Aborted"

if job_status in ["JobComplete", "Aborted", "Failed"]:
self.delete_job(url=job_full_url)
if job_status in ["Aborted", "Failed"]:
raise Exception(f"Job for {self.name} stream using BULK API was failed")
count = 0
for count, record in self.download_data(url=job_full_url):
yield self.transform(record)
self.delete_job(url=job_full_url)

if count < self.page_size:
# this is a last page
break

next_page_token = self.next_page_token(record)
if not next_page_token:
# not found a next page data.
break


class IncrementalSalesforceStream(SalesforceStream, ABC):
Expand Down
Loading