Skip to content

Commit 0ed20bc

Browse files
lazebnyiantixar
andauthored
🐛 Source Salesforce: Fix error during generating schema (#9478)
* Add filtering by queryable flag for available streams * Updated PR number and version * add logs for failed bulk jobs * Added logs for collect stream objects options if code 404 * Fixed unittests * Fixed integration tests * Bumped docker version * Update version in seed * Update streams.py * Added logger for print not queriable streams Co-authored-by: antixar <antixar@gmail.com>
1 parent b269b9f commit 0ed20bc

File tree

9 files changed

+59
-24
lines changed

9 files changed

+59
-24
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962",
33
"name": "Salesforce",
44
"dockerRepository": "airbyte/source-salesforce",
5-
"dockerImageTag": "0.1.13",
5+
"dockerImageTag": "0.1.18",
66
"documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce",
77
"icon": "salesforce.svg"
88
}

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -634,7 +634,7 @@
634634
- name: Salesforce
635635
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
636636
dockerRepository: airbyte/source-salesforce
637-
dockerImageTag: 0.1.17
637+
dockerImageTag: 0.1.18
638638
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
639639
icon: salesforce.svg
640640
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6700,7 +6700,7 @@
67006700
supportsNormalization: false
67016701
supportsDBT: false
67026702
supported_destination_sync_modes: []
6703-
- dockerImage: "airbyte/source-salesforce:0.1.17"
6703+
- dockerImage: "airbyte/source-salesforce:0.1.18"
67046704
spec:
67056705
documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce"
67066706
connectionSpecification:

airbyte-integrations/connectors/source-salesforce/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce
2525
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
2626
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
2727

28-
LABEL io.airbyte.version=0.1.17
28+
LABEL io.airbyte.version=0.1.18
2929
LABEL io.airbyte.name=airbyte/source-salesforce

airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py

+16-6
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,16 @@ def filter_streams(self, stream_name: str) -> bool:
208208

209209
def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None):
210210
salesforce_objects = self.describe()["sobjects"]
211-
stream_names = [stream_object["name"] for stream_object in salesforce_objects]
211+
stream_objects = []
212+
for stream_object in salesforce_objects:
213+
if stream_object["queryable"]:
214+
stream_objects.append(stream_object)
215+
else:
216+
self.logger.warn(f"Stream {stream_object['name']} is not queryable and will be ignored.")
217+
218+
stream_names = [stream_object["name"] for stream_object in stream_objects]
212219
if catalog:
213-
return [configured_stream.stream.name for configured_stream in catalog.streams]
220+
return [configured_stream.stream.name for configured_stream in catalog.streams], stream_objects
214221

215222
if config.get("streams_criteria"):
216223
filtered_stream_list = []
@@ -221,7 +228,8 @@ def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAi
221228
stream_names = list(set(filtered_stream_list))
222229

223230
validated_streams = [stream_name for stream_name in stream_names if self.filter_streams(stream_name)]
224-
return validated_streams
231+
validated_stream_objects = [stream_object for stream_object in stream_objects if stream_object["name"] in validated_streams]
232+
return validated_streams, validated_stream_objects
225233

226234
@default_backoff_handler(max_tries=5, factor=15)
227235
def _make_request(
@@ -253,18 +261,20 @@ def login(self):
253261
self.access_token = auth["access_token"]
254262
self.instance_url = auth["instance_url"]
255263

256-
def describe(self, sobject: str = None) -> Mapping[str, Any]:
264+
def describe(self, sobject: str = None, stream_objects: List = None) -> Mapping[str, Any]:
257265
"""Describes all objects or a specific object"""
258266
headers = self._get_standard_headers()
259267

260268
endpoint = "sobjects" if not sobject else f"sobjects/{sobject}/describe"
261269

262270
url = f"{self.instance_url}/services/data/{self.version}/{endpoint}"
263271
resp = self._make_request("GET", url, headers=headers)
272+
if resp.status_code == 404:
273+
self.logger.error(f"Filtered stream objects: {stream_objects}")
264274
return resp.json()
265275

266-
def generate_schema(self, stream_name: str = None) -> Mapping[str, Any]:
267-
response = self.describe(stream_name)
276+
def generate_schema(self, stream_name: str = None, stream_objects: List = None) -> Mapping[str, Any]:
277+
response = self.describe(stream_name, stream_objects)
268278
schema = {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": True, "properties": {}}
269279
for field in response["fields"]:
270280
schema["properties"][field["name"]] = self.field_to_property_schema(field)

airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
2929

3030
@classmethod
3131
def generate_streams(
32-
cls, config: Mapping[str, Any], stream_names: List[str], sf_object: Salesforce, state: Mapping[str, Any] = None
32+
cls, config: Mapping[str, Any], stream_names: List[str], sf_object: Salesforce, state: Mapping[str, Any] = None, stream_objects: List = None
3333
) -> List[Stream]:
3434
""" "Generates a list of stream by their names. It can be used for different tests too"""
3535
authenticator = TokenAuthenticator(sf_object.access_token)
@@ -38,7 +38,7 @@ def generate_streams(
3838
streams_kwargs = {}
3939
stream_state = state.get(stream_name, {}) if state else {}
4040

41-
selected_properties = sf_object.generate_schema(stream_name).get("properties", {})
41+
selected_properties = sf_object.generate_schema(stream_name, stream_objects).get("properties", {})
4242
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
4343
properties_not_supported_by_bulk = {
4444
key: value for key, value in selected_properties.items() if value.get("format") == "base64" or "object" in value["type"]
@@ -52,7 +52,7 @@ def generate_streams(
5252
full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream
5353
streams_kwargs["wait_timeout"] = config.get("wait_timeout")
5454

55-
json_schema = sf_object.generate_schema(stream_name)
55+
json_schema = sf_object.generate_schema(stream_name, stream_objects)
5656
pk, replication_key = sf_object.get_pk_and_replication_key(json_schema)
5757
streams_kwargs.update(dict(sf_api=sf_object, pk=pk, stream_name=stream_name, schema=json_schema, authenticator=authenticator))
5858
if replication_key and stream_name not in UNSUPPORTED_FILTERING_STREAMS:
@@ -64,8 +64,8 @@ def generate_streams(
6464

6565
def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None, state: Mapping[str, Any] = None) -> List[Stream]:
6666
sf = self._get_sf_object(config)
67-
stream_names = sf.get_validated_streams(config=config, catalog=catalog)
68-
return self.generate_streams(config, stream_names, sf, state=state)
67+
stream_names, stream_objects = sf.get_validated_streams(config=config, catalog=catalog)
68+
return self.generate_streams(config, stream_names, sf, state=state, stream_objects=stream_objects)
6969

7070
def read(
7171
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None

airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py

+7
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ def transform_empty_string_to_none(instance, schema):
133133
def _send_http_request(self, method: str, url: str, json: dict = None):
134134
headers = self.authenticator.get_auth_header()
135135
response = self._session.request(method, url=url, headers=headers, json=json)
136+
if response.status_code not in [200, 204]:
137+
self.logger.error(f"error body: {response.text}")
136138
response.raise_for_status()
137139
return response
138140

@@ -141,6 +143,7 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]:
141143
docs: https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/create_job.htm
142144
"""
143145
json = {"operation": "queryAll", "query": query, "contentType": "CSV", "columnDelimiter": "COMMA", "lineEnding": "LF"}
146+
144147
try:
145148
response = self._send_http_request("POST", url, json=json)
146149
job_id = response.json()["id"]
@@ -191,6 +194,10 @@ def wait_for_job(self, url: str) -> str:
191194
job_info = self._send_http_request("GET", url=url).json()
192195
job_status = job_info["state"]
193196
if job_status in ["JobComplete", "Aborted", "Failed"]:
197+
if job_status != "JobComplete":
198+
# this is only job metadata without payload
199+
self.logger.error(f"JobStatus: {job_status}, full job response: {job_info}")
200+
194201
return job_status
195202

196203
if delay_timeout < self.MAX_CHECK_INTERVAL_SECONDS:

airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py

+26-9
Original file line numberDiff line numberDiff line change
@@ -302,21 +302,38 @@ def test_discover_with_streams_criteria_param(streams_criteria, predicted_filter
302302
sf_object.describe = Mock(
303303
return_value={
304304
"sobjects": [
305-
{"name": "Account"},
306-
{"name": "AIApplications"},
307-
{"name": "Leads"},
308-
{"name": "LeadHistory"},
309-
{"name": "Orders"},
310-
{"name": "OrderHistory"},
311-
{"name": "CustomStream"},
312-
{"name": "CustomStreamHistory"},
305+
{"name": "Account", "queryable": True},
306+
{"name": "AIApplications", "queryable": True},
307+
{"name": "Leads", "queryable": True},
308+
{"name": "LeadHistory", "queryable": True},
309+
{"name": "Orders", "queryable": True},
310+
{"name": "OrderHistory", "queryable": True},
311+
{"name": "CustomStream", "queryable": True},
312+
{"name": "CustomStreamHistory", "queryable": True},
313313
]
314314
}
315315
)
316-
filtered_streams = sf_object.get_validated_streams(config=updated_config)
316+
filtered_streams, _ = sf_object.get_validated_streams(config=updated_config)
317317
assert sorted(filtered_streams) == sorted(predicted_filtered_streams)
318318

319319

320+
def test_discover_only_queryable(stream_config):
321+
sf_object = Salesforce(**stream_config)
322+
sf_object.login = Mock()
323+
sf_object.access_token = Mock()
324+
sf_object.instance_url = "https://fase-account.salesforce.com"
325+
sf_object.describe = Mock(
326+
return_value={
327+
"sobjects": [
328+
{"name": "Account", "queryable": True},
329+
{"name": "Leads", "queryable": False},
330+
]
331+
}
332+
)
333+
filtered_streams, _ = sf_object.get_validated_streams(config=stream_config)
334+
assert filtered_streams == ["Account"]
335+
336+
320337
def test_pagination_rest(stream_config, stream_api):
321338
stream_name = "ActiveFeatureLicenseMetric"
322339
state = {stream_name: {"SystemModstamp": "2122-08-22T05:08:29.000Z"}}

docs/integrations/sources/salesforce.md

+1
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ List of available streams:
737737

738738
| Version | Date | Pull Request | Subject |
739739
|:--------|:-----------| :--- |:--------------------------------------------------------------------------|
740+
| 0.1.18 | 2022-01-20 | [9478](https://github.com/airbytehq/airbyte/pull/9478) | Add available stream filtering by `queryable` flag |
740741
| 0.1.17 | 2022-01-19 | [9302](https://github.com/airbytehq/airbyte/pull/9302) | Deprecate API Type parameter |
741742
| 0.1.16 | 2022-01-18 | [9151](https://github.com/airbytehq/airbyte/pull/9151) | Fix pagination in REST API streams |
742743
| 0.1.15 | 2022-01-11 | [9409](https://github.com/airbytehq/airbyte/pull/9409) | Correcting the presence of an extra `else` handler in the error handling |

0 commit comments

Comments
 (0)