diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 04de70d965d7c..41ce86ac6e194 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.53 +Remove a false positive error logging during the send process. + ## 0.1.52 Fix BaseBackoffException constructor diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 2b6ca06b3a11d..fc6a1569e55bb 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -7,6 +7,7 @@ import os from abc import ABC, abstractmethod from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union +from urllib.error import HTTPError from urllib.parse import urljoin import requests @@ -294,9 +295,11 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, raise DefaultBackoffException(request=request, response=response) elif self.raise_on_http_errors: # Raise any HTTP exceptions that happened in case there were unexpected ones - self.logger.error(f"Request raised an error with response: {response.text}") - response.raise_for_status() - + try: + response.raise_for_status() + except HTTPError as exc: + self.logger.error(response.text) + raise exc return response def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 2d15cea75c1bb..72fb15e40705c 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.52", + version="0.1.53", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py index 7e15096126e7f..1bb558e574728 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py @@ -424,3 +424,21 @@ def test_using_cache(mocker): pass assert parent_stream.cassete.play_count != 0 + + +class AutoFailTrueHttpStream(StubBasicReadHttpStream): + raise_on_http_errors = True + + +@pytest.mark.parametrize("status_code", range(400, 600)) +def test_send_raise_on_http_errors_logs(mocker, status_code): + mocker.patch.object(AutoFailTrueHttpStream, "logger") + mocker.patch.object(AutoFailTrueHttpStream, "should_retry", mocker.Mock(return_value=False)) + stream = AutoFailTrueHttpStream() + req = requests.Response() + req.status_code = status_code + mocker.patch.object(requests.Session, "send", return_value=req) + with pytest.raises(requests.exceptions.HTTPError): + response = stream._send_request(req, {}) + stream.logger.error.assert_called_with(response.text) + assert response.status_code == status_code diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 47adfae6a3e94..899f81358839e 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -19,7 +19,7 @@ - name: BigQuery destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 dockerRepository: airbyte/destination-bigquery - dockerImageTag: 1.0.1 + dockerImageTag: 1.0.2 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg resourceRequirements: @@ -31,7 +31,7 @@ - name: BigQuery (denormalized typed struct) destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 0.2.11 + dockerImageTag: 0.2.14 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg resourceRequirements: @@ -203,7 +203,7 @@ - name: S3 destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 dockerRepository: airbyte/destination-s3 - dockerImageTag: 0.2.13 + dockerImageTag: 0.3.0 documentationUrl: https://docs.airbyte.io/integrations/destinations/s3 icon: s3.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index e5074a39e6351..1304dbb58f8df 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -188,7 +188,7 @@ supportsDBT: false supported_destination_sync_modes: - "append" -- dockerImage: "airbyte/destination-bigquery:1.0.1" +- dockerImage: "airbyte/destination-bigquery:1.0.2" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: @@ -398,7 +398,7 @@ - "overwrite" - "append" - "append_dedup" -- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.11" +- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.14" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: @@ -3469,7 +3469,7 @@ supported_destination_sync_modes: - "append" - "overwrite" -- dockerImage: "airbyte/destination-s3:0.2.13" +- dockerImage: "airbyte/destination-s3:0.3.0" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3" connectionSpecification: @@ -3510,7 +3510,7 @@ \ bucket directory" type: "string" examples: - - "${NAMESPACE}/${STREAM_NAME}/" + - "${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_" order: 3 s3_bucket_region: title: "S3 Bucket Region" diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 9126da7ccb282..7e704567cedd1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -273,7 +273,7 @@ - name: Google Analytics sourceDefinitionId: eff3616a-f9c3-11eb-9a03-0242ac130003 dockerRepository: airbyte/source-google-analytics-v4 - dockerImageTag: 0.1.16 + dockerImageTag: 0.1.17 documentationUrl: https://docs.airbyte.io/integrations/sources/google-analytics-v4 icon: google-analytics.svg sourceType: api @@ -411,7 +411,7 @@ - name: LinkedIn Ads sourceDefinitionId: 137ece28-5434-455c-8f34-69dc3782f451 dockerRepository: airbyte/source-linkedin-ads - dockerImageTag: 0.1.5 + dockerImageTag: 0.1.6 documentationUrl: https://docs.airbyte.io/integrations/sources/linkedin-ads icon: linkedin.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 1f2c38fc05d65..287a82cd0295e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2754,7 +2754,7 @@ oauthFlowOutputParameters: - - "access_token" - - "refresh_token" -- dockerImage: "airbyte/source-google-analytics-v4:0.1.16" +- dockerImage: "airbyte/source-google-analytics-v4:0.1.17" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/google-analytics-v4" connectionSpecification: @@ -4289,7 +4289,7 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-linkedin-ads:0.1.5" +- dockerImage: "airbyte/source-linkedin-ads:0.1.6" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/linkedin-ads" connectionSpecification: @@ -4309,16 +4309,17 @@ examples: - "2021-05-17" account_ids: - title: "Account IDs" + title: "Account IDs (Optional)" type: "array" description: "Specify the Account IDs separated by space, to pull the data\ \ from. Leave empty, if you want to pull the data from all associated\ - \ accounts." + \ accounts. See the official LinkedIn Ads docs for more info." items: type: "integer" default: [] credentials: - title: "Authorization Method" + title: "Authentication *" type: "object" oneOf: - type: "object" @@ -4357,7 +4358,9 @@ access_token: type: "string" title: "Access Token" - description: "The token value generated using Authentication Code." + description: "The token value generated using the authentication code.\ + \ See the docs to obtain yours." airbyte_secret: true supportsNormalization: false supportsDBT: false diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index 57afd61f02f90..e38c692bb017b 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.11 +LABEL io.airbyte.version=0.2.14 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/resources/spec.json index ee93cfcf42907..8a2d0e4b9652a 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/resources/spec.json @@ -12,8 +12,8 @@ "additionalProperties": true, "properties": { "big_query_client_buffer_size_mb": { - "title": "Google BigQuery client chunk size", - "description": "Google BigQuery client's chunk (buffer) size (MIN = 1, MAX = 15) for each table. The size that will be written by a single RPC. Written data will be buffered and only flushed upon reaching this size or closing the channel. It defaults to 15MiB. Smaller chunk size means less memory consumption, and is recommended for big data sets. For more details refer to the documentation here", + "title": "Google BigQuery Client Chunk Size (Optional)", + "description": "Google BigQuery client's chunk (buffer) size (MIN=1, MAX = 15) for each table. The size that will be written by a single RPC. Written data will be buffered and only flushed upon reaching this size or closing the channel. The default 15MB value is used if not set explicitly. Read more here.", "type": "integer", "minimum": 1, "maximum": 15, @@ -22,18 +22,18 @@ }, "project_id": { "type": "string", - "description": "The GCP project ID for the project containing the target BigQuery dataset.", + "description": "The GCP project ID for the project containing the target BigQuery dataset. Read more here.", "title": "Project ID" }, "dataset_id": { "type": "string", - "description": "Default BigQuery Dataset ID tables are replicated to if the source does not specify a namespace.", + "description": "The default BigQuery Dataset ID that tables are replicated to if the source does not specify a namespace. Read more here.", "title": "Default Dataset ID" }, "dataset_location": { "type": "string", - "description": "The location of the dataset. Warning: Changes made after creation will not be applied.", - "title": "Dataset Location", + "description": "The location of the dataset. Warning: Changes made after creation will not be applied. The default \"US\" value is used if not set explicitly. Read more here.", + "title": "Dataset Location (Optional)", "default": "US", "enum": [ "US", @@ -71,19 +71,18 @@ }, "credentials_json": { "type": "string", - "description": "The contents of the JSON service account key. Check out the docs if you need help generating this key. Default credentials will be used if this field is left empty.", - "title": "Credentials JSON", + "description": "The contents of the JSON service account key. Check out the docs if you need help generating this key. Default credentials will be used if this field is left empty.", + "title": "Service Account Key JSON (Optional)", "airbyte_secret": true }, "loading_method": { "type": "object", - "title": "Loading Method", - "description": "Select the way that data will be uploaded to BigQuery.", + "title": "Loading Method *", + "description": "Loading method used to send select the way data will be uploaded to BigQuery.
Standard Inserts - Direct uploading using SQL INSERT statements. This method is extremely inefficient and provided only for quick testing. In almost all cases, you should use staging.
GCS Staging - Writes large batches of records to a file, uploads the file to GCS, then uses COPY INTO table to upload the file. Recommended for most workloads for better speed and scalability. Read more about GCS Staging here.", "oneOf": [ { "title": "Standard Inserts", "additionalProperties": false, - "description": "Direct uploading using streams.", "required": ["method"], "properties": { "method": { @@ -95,7 +94,6 @@ { "title": "GCS Staging", "additionalProperties": false, - "description": "Writes large batches of records to a file, uploads the file to GCS, then uses
COPY INTO table
to upload the file. Recommended for large production workloads for better speed and scalability.", "required": [ "method", "gcs_bucket_name", @@ -110,16 +108,17 @@ "gcs_bucket_name": { "title": "GCS Bucket Name", "type": "string", - "description": "The name of the GCS bucket.", + "description": "The name of the GCS bucket. Read more here.", "examples": ["airbyte_sync"] }, "gcs_bucket_path": { + "title": "GCS Bucket Path", "description": "Directory under the GCS bucket where data will be written.", "type": "string", "examples": ["data_sync/test"] }, "part_size_mb": { - "title": "Block Size (MB) for GCS multipart upload", + "title": "Block Size (MB) for GCS Multipart Upload (Optional)", "description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes more memory. Allowed values: min=5MB, max=525MB Default: 5MB.", "type": "integer", "default": 5, @@ -129,8 +128,8 @@ }, "keep_files_in_gcs-bucket": { "type": "string", - "description": "This upload method is supposed to temporary store records in GCS bucket. What do you want to do with data in GCS bucket when migration has finished?", - "title": "GCS tmp files afterward processing", + "description": "This upload method is supposed to temporary store records in GCS bucket. What do you want to do with data in GCS bucket when migration has finished? The default \"Delete all tmp files from GCS\" value is used if not set explicitly.", + "title": "GCS Tmp Files Afterward Processing (Optional)", "default": "Delete all tmp files from GCS", "enum": [ "Delete all tmp files from GCS", @@ -139,6 +138,7 @@ }, "credential": { "title": "Credential", + "description": "An HMAC key is a type of credential and can be associated with a service account or a user account in Cloud Storage. Read more here.", "type": "object", "oneOf": [ { diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index ce6786accddb6..20434f895ccaf 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.1 +LABEL io.airbyte.version=1.0.2 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json index 440ceb16aafbd..83d3ff31e6b71 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json @@ -22,7 +22,7 @@ }, "project_id": { "type": "string", - "description": "The GCP project ID for the project containing the target BigQuery dataset. Read more here.", + "description": "The GCP project ID for the project containing the target BigQuery dataset. Read more here.", "title": "Project ID" }, "dataset_id": { @@ -72,7 +72,7 @@ "credentials_json": { "type": "string", "description": "The contents of the JSON service account key. Check out the docs if you need help generating this key. Default credentials will be used if this field is left empty.", - "title": "Credentials JSON (Optional)", + "title": "Service Account Key JSON (Optional)", "airbyte_secret": true }, "transformation_priority": { diff --git a/airbyte-integrations/connectors/destination-s3/Dockerfile b/airbyte-integrations/connectors/destination-s3/Dockerfile index 852934d1f0680..560538d143e5e 100644 --- a/airbyte-integrations/connectors/destination-s3/Dockerfile +++ b/airbyte-integrations/connectors/destination-s3/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-s3 COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.13 +LABEL io.airbyte.version=0.3.0 LABEL io.airbyte.name=airbyte/destination-s3 diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java index c709e2ed547c2..4cc0f693866e5 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java @@ -80,14 +80,14 @@ private static Function toWriteConfig( final AirbyteStream abStream = stream.getStream(); final String namespace = abStream.getNamespace(); final String streamName = abStream.getName(); - final String bucketPath = config.get(BUCKET_PATH_FIELD).asText(); + final String outputBucketPath = config.get(BUCKET_PATH_FIELD).asText(); final String customOutputFormat = String.join("/", - bucketPath, + outputBucketPath, config.has(PATH_FORMAT_FIELD) && !config.get(PATH_FORMAT_FIELD).asText().isBlank() ? config.get(PATH_FORMAT_FIELD).asText() : S3DestinationConstants.DEFAULT_PATH_FORMAT); - final String outputBucketPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat); + final String fullOutputPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat); final DestinationSyncMode syncMode = stream.getDestinationSyncMode(); - final WriteConfig writeConfig = new WriteConfig(namespace, streamName, outputBucketPath, syncMode); + final WriteConfig writeConfig = new WriteConfig(namespace, streamName, outputBucketPath, fullOutputPath, syncMode); LOGGER.info("Write config: {}", writeConfig); return writeConfig; }; @@ -139,7 +139,7 @@ private CheckedBiConsumer storedFiles; public WriteConfig(final String namespace, final String streamName, final String outputBucketPath, + final String fullOutputPath, final DestinationSyncMode syncMode) { this.namespace = namespace; this.streamName = streamName; this.outputBucketPath = outputBucketPath; + this.fullOutputPath = fullOutputPath; this.syncMode = syncMode; this.storedFiles = new ArrayList<>(); } @@ -42,6 +45,10 @@ public String getOutputBucketPath() { return outputBucketPath; } + public String getFullOutputPath() { + return fullOutputPath; + } + public DestinationSyncMode getSyncMode() { return syncMode; } @@ -64,6 +71,7 @@ public String toString() { "streamName=" + streamName + ", namespace=" + namespace + ", outputBucketPath=" + outputBucketPath + + ", fullOutputPath=" + fullOutputPath + ", syncMode=" + syncMode + '}'; } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json index e1bd828c2ab5e..5ad2611b98fb1 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json @@ -40,7 +40,9 @@ "s3_path_format": { "description": "Format string on how data will be organized inside the S3 bucket directory", "type": "string", - "examples": ["${NAMESPACE}/${STREAM_NAME}/"], + "examples": [ + "${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_" + ], "order": 3 }, "s3_bucket_region": { diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java index 200f1ed9dd39d..e0f52d48934a5 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java @@ -100,8 +100,9 @@ protected List getAllSyncedObjects(final String streamName, fin streamNameStr, DateTime.now(DateTimeZone.UTC), S3DestinationConstants.DEFAULT_PATH_FORMAT); + final String parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1); final List objectSummaries = s3Client - .listObjects(config.getBucketName(), outputPrefix) + .listObjects(config.getBucketName(), parentFolder) .getObjectSummaries() .stream() .filter(o -> o.getKey().contains(streamNameStr + "/")) diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java index 745aecd94d3d3..16694a4110ee8 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java @@ -55,7 +55,7 @@ public class AvroSerializedBufferTest { public void testSnappyAvroWriter() throws Exception { final S3AvroFormatConfig config = new S3AvroFormatConfig(Jsons.jsonNode(Map.of("compression_codec", Map.of( "codec", "snappy")))); - runTest(new InMemoryBuffer(AVRO_FILE_EXTENSION), 970L, 980L, config, getExpectedString()); + runTest(new InMemoryBuffer(AVRO_FILE_EXTENSION), 965L, 980L, config, getExpectedString()); } @Test diff --git a/airbyte-integrations/connectors/source-google-analytics-v4/Dockerfile b/airbyte-integrations/connectors/source-google-analytics-v4/Dockerfile index fb470d3c967e7..609fb81096acd 100644 --- a/airbyte-integrations/connectors/source-google-analytics-v4/Dockerfile +++ b/airbyte-integrations/connectors/source-google-analytics-v4/Dockerfile @@ -4,13 +4,13 @@ FROM python:3.7-slim RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/* WORKDIR /airbyte/integration_code -COPY source_google_analytics_v4 ./source_google_analytics_v4 -COPY main.py ./ COPY setup.py ./ RUN pip install . +COPY source_google_analytics_v4 ./source_google_analytics_v4 +COPY main.py ./ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.16 +LABEL io.airbyte.version=0.1.17 LABEL io.airbyte.name=airbyte/source-google-analytics-v4 diff --git a/airbyte-integrations/connectors/source-google-analytics-v4/acceptance-test-config.yml b/airbyte-integrations/connectors/source-google-analytics-v4/acceptance-test-config.yml index a496053327274..a697c0fc0955f 100644 --- a/airbyte-integrations/connectors/source-google-analytics-v4/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-google-analytics-v4/acceptance-test-config.yml @@ -1,10 +1,6 @@ # See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) # for more information about how to configure these tests -# The 'future_state_path' field is commented out to skip the test `test_state_with_abnormally_large_values` -# as a temporary solution not to block publishing of new versions. The reason is -# When specifying future date in the state the current implementation of the connector produces records for [current_date, current_date] slice, -# and it makes SAT fail, because it should produce no records with the state with abnormally large values connector_image: airbyte/source-google-analytics-v4:dev tests: spec: @@ -27,7 +23,7 @@ tests: incremental: - config_path: "secrets/service_config.json" configured_catalog_path: "integration_tests/configured_catalog.json" - # future_state_path: "integration_tests/abnormal_state.json" + future_state_path: "integration_tests/abnormal_state.json" full_refresh: - config_path: "secrets/service_config.json" configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-google-analytics-v4/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-google-analytics-v4/integration_tests/abnormal_state.json index 70565e04f926e..8bbad318c4939 100644 --- a/airbyte-integrations/connectors/source-google-analytics-v4/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-google-analytics-v4/integration_tests/abnormal_state.json @@ -10,5 +10,5 @@ "daily_active_users": { "ga_date": "2050-05-01" }, "devices": { "ga_date": "2050-05-01" }, "users_per_day": { "ga_date": "2050-05-01" }, - "sessions_per_country_day": { "ga_date": "2050-05-01" } + "new_users_per_day": {"ga_date": "2050-05-01"} } diff --git a/airbyte-integrations/connectors/source-google-analytics-v4/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-google-analytics-v4/integration_tests/sample_state.json index 87fd48bf19ddf..d636881bc2a22 100644 --- a/airbyte-integrations/connectors/source-google-analytics-v4/integration_tests/sample_state.json +++ b/airbyte-integrations/connectors/source-google-analytics-v4/integration_tests/sample_state.json @@ -10,5 +10,5 @@ "daily_active_users": { "ga_date": "2021-02-11" }, "devices": { "ga_date": "2021-02-11" }, "users_per_day": { "ga_date": "2021-02-11" }, - "sessions_per_country_day": { "ga_date": "2021-02-11" } + "new_users_per_day": { "ga_date": "2021-02-11" } } diff --git a/airbyte-integrations/connectors/source-google-analytics-v4/source_google_analytics_v4/source.py b/airbyte-integrations/connectors/source-google-analytics-v4/source_google_analytics_v4/source.py index 15ada1ca0c518..46764df750599 100644 --- a/airbyte-integrations/connectors/source-google-analytics-v4/source_google_analytics_v4/source.py +++ b/airbyte-integrations/connectors/source-google-analytics-v4/source_google_analytics_v4/source.py @@ -14,6 +14,7 @@ import jwt import pendulum import requests +from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream @@ -92,7 +93,6 @@ class GoogleAnalyticsV4Stream(HttpStream, ABC): url_base = "https://analyticsreporting.googleapis.com/v4/" report_field = "reports" - data_fields = ["data", "rows"] map_type = dict(INTEGER="integer", FLOAT="number", PERCENT="number", TIME="number") @@ -226,33 +226,29 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs: Any) - ...] """ + today = pendulum.now().date() start_date = pendulum.parse(self.start_date).date() - end_date = pendulum.now().date() - - # Determine stream_state, if no stream_state we use start_date if stream_state: - start_date = pendulum.parse(stream_state.get(self.cursor_field)).date() + prev_end_date = pendulum.parse(stream_state.get(self.cursor_field)).date() + start_date = prev_end_date.add(days=1) + end_date = today + if start_date > end_date: + return [None] - # use the lowest date between start_date and self.end_date, otherwise API fails if start_date is in future - start_date = min(start_date, end_date) date_slices = [] - while start_date <= end_date: - end_date_slice = start_date.add(days=self.window_in_days) + slice_start_date = start_date + while slice_start_date <= end_date: + slice_end_date = slice_start_date.add(days=self.window_in_days) # limit the slice range with end_date - end_date_slice = min(end_date_slice, end_date) - date_slices.append({"startDate": self.to_datetime_str(start_date), "endDate": self.to_datetime_str(end_date_slice)}) - # add 1 day for start next slice from next day and not duplicate data from previous slice end date. - start_date = end_date_slice.add(days=1) - return date_slices - - # TODO: the method has to be updated for more logical and obvious - def get_data(self, data): # type: ignore[no-untyped-def] - for data_field in self.data_fields: - if data and isinstance(data, dict): - data = data.get(data_field, []) - else: - return [] - return data + slice_end_date = min(slice_end_date, end_date) + date_slices.append({"startDate": self.to_datetime_str(slice_start_date), "endDate": self.to_datetime_str(slice_end_date)}) + # start next slice 1 day after previous slice ended to prevent duplicate reads + slice_start_date = slice_end_date.add(days=1) + return date_slices or [None] + + @staticmethod + def report_rows(report_body: MutableMapping[Any, Any]) -> List[MutableMapping[Any, Any]]: + return report_body.get("data", {}).get("rows", []) def lookup_data_type(self, field_type: str, attribute: str) -> str: """ @@ -268,7 +264,7 @@ def lookup_data_type(self, field_type: str, attribute: str) -> str: attr_type = self.dimensions_ref[attribute] elif field_type == "metric": # Custom Google Analytics Metrics {ga:goalXXStarts, ga:metricXX, ... } - # We always treat them as as strings as we can not be sure of their data type + # We always treat them as strings as we can not be sure of their data type if attribute.startswith("ga:goal") and attribute.endswith( ("Starts", "Completions", "Value", "ConversionRate", "Abandons", "AbandonRate") ): @@ -282,10 +278,10 @@ def lookup_data_type(self, field_type: str, attribute: str) -> str: attr_type = self.metrics_ref[attribute] else: attr_type = None - self.logger.error(f"Unsuported GA type: {field_type}") + self.logger.error(f"Unsupported GA type: {field_type}") except KeyError: attr_type = None - self.logger.error(f"Unsuported GA {field_type}: {attribute}") + self.logger.error(f"Unsupported GA {field_type}: {attribute}") return self.map_type.get(attr_type, "string") @@ -387,7 +383,7 @@ def parse_response(self, response: requests.Response, **kwargs: Any) -> Iterable self.check_for_sampled_result(report.get("data", {})) - for row in self.get_data(report): + for row in self.report_rows(report): record = {} dimensions = row.get("dimensions", []) metrics = row.get("metrics", []) @@ -421,11 +417,19 @@ class GoogleAnalyticsV4IncrementalObjectsBase(GoogleAnalyticsV4Stream): cursor_field = "ga_date" def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - """ - Update the state value, default CDK method. - """ return {self.cursor_field: max(latest_record.get(self.cursor_field, ""), current_stream_state.get(self.cursor_field, ""))} + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + if not stream_slice: + return [] + return super().read_records(sync_mode, cursor_field, stream_slice, stream_state) + class GoogleAnalyticsServiceOauth2Authenticator(Oauth2Authenticator): """Request example for API token extraction: diff --git a/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/empty_response.json b/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/empty_response.json index 0d8067faf9bc7..36f7e59004028 100644 --- a/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/empty_response.json +++ b/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/empty_response.json @@ -2,11 +2,11 @@ "reports": [ { "columnHeader": { - "dimensions": ["ga: date"], + "dimensions": ["ga:date"], "metricHeader": { "metricHeaderEntries": [ { - "name": "ga: 14dayUsers", + "name": "ga:14dayUsers", "type": "INTEGER" } ] diff --git a/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/response_is_data_golden_false.json b/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/response_is_data_golden_false.json index 4e2e641ac3f28..ff7e3d23ad238 100644 --- a/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/response_is_data_golden_false.json +++ b/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/response_is_data_golden_false.json @@ -2,7 +2,7 @@ "reports": [ { "columnHeader": { - "dimensions": ["ga: date"], + "dimensions": ["ga:date"], "metricHeader": { "metricHeaderEntries": [ { diff --git a/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/response_with_records.json b/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/response_with_records.json index 80a46d877be80..be89bd5858762 100644 --- a/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/response_with_records.json +++ b/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/response_with_records.json @@ -2,11 +2,11 @@ "reports": [ { "columnHeader": { - "dimensions": ["ga: date"], + "dimensions": ["ga:date"], "metricHeader": { "metricHeaderEntries": [ { - "name": "ga: 14dayUsers", + "name": "ga:14dayUsers", "type": "INTEGER" } ] diff --git a/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/response_with_sampling.json b/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/response_with_sampling.json index b116b5f012621..0c6a54151a31e 100644 --- a/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/response_with_sampling.json +++ b/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/response_with_sampling.json @@ -2,11 +2,11 @@ "reports": [ { "columnHeader": { - "dimensions": ["ga: date"], + "dimensions": ["ga:date"], "metricHeader": { "metricHeaderEntries": [ { - "name": "ga: 14dayUsers", + "name": "ga:14dayUsers", "type": "INTEGER" } ] diff --git a/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/unit_test.py index 6af81b43f58a6..cc92af277a14b 100644 --- a/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-google-analytics-v4/unit_tests/unit_test.py @@ -2,6 +2,7 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # +import copy import json import logging from pathlib import Path @@ -10,7 +11,7 @@ import pendulum import pytest -from airbyte_cdk.models import ConfiguredAirbyteCatalog +from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode from airbyte_cdk.sources.streams.http.auth import NoAuth from freezegun import freeze_time from source_google_analytics_v4.source import ( @@ -104,16 +105,45 @@ def mock_api_returns_is_data_golden_false(requests_mock): ) +@pytest.fixture +def configured_catalog(): + return ConfiguredAirbyteCatalog.parse_obj(json.loads(read_file("./configured_catalog.json"))) + + @pytest.fixture() def test_config(): - test_config = json.loads(read_file("../integration_tests/sample_config.json")) - test_config["authenticator"] = NoAuth() - test_config["metrics"] = [] - test_config["dimensions"] = [] + test_conf = { + "view_id": "1234567", + "window_in_days": 1, + "authenticator": NoAuth(), + "metrics": [], + "start_date": pendulum.now().subtract(days=2).date().strftime("%Y-%m-%d"), + "dimensions": [], + "credentials": { + "type": "Service", + }, + } + return copy.deepcopy(test_conf) + + +@pytest.fixture() +def test_config_auth_service(test_config): + test_config["credentials"] = { + "auth_type": "Service", + "credentials_json": '{"client_email": "", "private_key": "", "private_key_id": ""}', + } + return copy.deepcopy(test_config) + + +@pytest.fixture() +def test_config_auth_client(test_config): test_config["credentials"] = { - "type": "Service", + "auth_type": "Client", + "client_id": "client_id_val", + "client_secret": "client_secret_val", + "refresh_token": "refresh_token_val", } - return test_config + return copy.deepcopy(test_config) def test_metrics_dimensions_type_list(mock_metrics_dimensions_type_list_link): @@ -144,42 +174,39 @@ def test_lookup_metrics_dimensions_data_type(test_config, metrics_dimensions_map def test_data_is_not_golden_is_logged_as_warning( mock_api_returns_is_data_golden_false, test_config, + configured_catalog, mock_metrics_dimensions_type_list_link, mock_auth_call, caplog, ): source = SourceGoogleAnalyticsV4() - del test_config["custom_reports"] - catalog = ConfiguredAirbyteCatalog.parse_obj(json.loads(read_file("./configured_catalog.json"))) - list(source.read(logging.getLogger(), test_config, catalog)) + list(source.read(logging.getLogger(), test_config, configured_catalog)) assert DATA_IS_NOT_GOLDEN_MSG in caplog.text def test_sampled_result_is_logged_as_warning( mock_api_returns_sampled_results, test_config, + configured_catalog, mock_metrics_dimensions_type_list_link, mock_auth_call, caplog, ): source = SourceGoogleAnalyticsV4() - del test_config["custom_reports"] - catalog = ConfiguredAirbyteCatalog.parse_obj(json.loads(read_file("./configured_catalog.json"))) - list(source.read(logging.getLogger(), test_config, catalog)) + list(source.read(logging.getLogger(), test_config, configured_catalog)) assert RESULT_IS_SAMPLED_MSG in caplog.text def test_no_regressions_for_result_is_sampled_and_data_is_golden_warnings( mock_api_returns_valid_records, test_config, + configured_catalog, mock_metrics_dimensions_type_list_link, mock_auth_call, caplog, ): source = SourceGoogleAnalyticsV4() - del test_config["custom_reports"] - catalog = ConfiguredAirbyteCatalog.parse_obj(json.loads(read_file("./configured_catalog.json"))) - list(source.read(logging.getLogger(), test_config, catalog)) + list(source.read(logging.getLogger(), test_config, configured_catalog)) assert RESULT_IS_SAMPLED_MSG not in caplog.text assert DATA_IS_NOT_GOLDEN_MSG not in caplog.text @@ -187,6 +214,7 @@ def test_no_regressions_for_result_is_sampled_and_data_is_golden_warnings( @patch("source_google_analytics_v4.source.jwt") def test_check_connection_fails_jwt( jwt_encode_mock, + test_config_auth_service, mocker, mock_metrics_dimensions_type_list_link, mock_auth_call, @@ -196,17 +224,12 @@ def test_check_connection_fails_jwt( check_connection fails because of the API returns no records, then we assume than user doesn't have permission to read requested `view` """ - test_config = json.loads(read_file("../integration_tests/sample_config.json")) - del test_config["custom_reports"] - test_config["credentials"] = { - "auth_type": "Service", - "credentials_json": '{"client_email": "", "private_key": "", "private_key_id": ""}', - } source = SourceGoogleAnalyticsV4() - is_success, msg = source.check_connection(MagicMock(), test_config) + is_success, msg = source.check_connection(MagicMock(), test_config_auth_service) assert is_success is False assert ( - msg == f"Please check the permissions for the requested view_id: {test_config['view_id']}. Cannot retrieve data from that view ID." + msg + == f"Please check the permissions for the requested view_id: {test_config_auth_service['view_id']}. Cannot retrieve data from that view ID." ) jwt_encode_mock.encode.assert_called() assert mock_auth_call.called @@ -216,6 +239,7 @@ def test_check_connection_fails_jwt( @patch("source_google_analytics_v4.source.jwt") def test_check_connection_success_jwt( jwt_encode_mock, + test_config_auth_service, mocker, mock_metrics_dimensions_type_list_link, mock_auth_call, @@ -225,14 +249,8 @@ def test_check_connection_success_jwt( check_connection succeeds because of the API returns valid records for the latest date based slice, then we assume than user has permission to read requested `view` """ - test_config = json.loads(read_file("../integration_tests/sample_config.json")) - del test_config["custom_reports"] - test_config["credentials"] = { - "auth_type": "Service", - "credentials_json": '{"client_email": "", "private_key": "", "private_key_id": ""}', - } source = SourceGoogleAnalyticsV4() - is_success, msg = source.check_connection(MagicMock(), test_config) + is_success, msg = source.check_connection(MagicMock(), test_config_auth_service) assert is_success is True assert msg is None jwt_encode_mock.encode.assert_called() @@ -243,6 +261,7 @@ def test_check_connection_success_jwt( @patch("source_google_analytics_v4.source.jwt") def test_check_connection_fails_oauth( jwt_encode_mock, + test_config_auth_client, mocker, mock_metrics_dimensions_type_list_link, mock_auth_call, @@ -252,19 +271,12 @@ def test_check_connection_fails_oauth( check_connection fails because of the API returns no records, then we assume than user doesn't have permission to read requested `view` """ - test_config = json.loads(read_file("../integration_tests/sample_config.json")) - del test_config["custom_reports"] - test_config["credentials"] = { - "auth_type": "Client", - "client_id": "client_id_val", - "client_secret": "client_secret_val", - "refresh_token": "refresh_token_val", - } source = SourceGoogleAnalyticsV4() - is_success, msg = source.check_connection(MagicMock(), test_config) + is_success, msg = source.check_connection(MagicMock(), test_config_auth_client) assert is_success is False assert ( - msg == f"Please check the permissions for the requested view_id: {test_config['view_id']}. Cannot retrieve data from that view ID." + msg + == f"Please check the permissions for the requested view_id: {test_config_auth_client['view_id']}. Cannot retrieve data from that view ID." ) jwt_encode_mock.encode.assert_not_called() assert "https://www.googleapis.com/auth/analytics.readonly" in unquote(mock_auth_call.last_request.body) @@ -278,6 +290,7 @@ def test_check_connection_fails_oauth( @patch("source_google_analytics_v4.source.jwt") def test_check_connection_success_oauth( jwt_encode_mock, + test_config_auth_client, mocker, mock_metrics_dimensions_type_list_link, mock_auth_call, @@ -287,16 +300,8 @@ def test_check_connection_success_oauth( check_connection succeeds because of the API returns valid records for the latest date based slice, then we assume than user has permission to read requested `view` """ - test_config = json.loads(read_file("../integration_tests/sample_config.json")) - del test_config["custom_reports"] - test_config["credentials"] = { - "auth_type": "Client", - "client_id": "client_id_val", - "client_secret": "client_secret_val", - "refresh_token": "refresh_token_val", - } source = SourceGoogleAnalyticsV4() - is_success, msg = source.check_connection(MagicMock(), test_config) + is_success, msg = source.check_connection(MagicMock(), test_config_auth_client) assert is_success is True assert msg is None jwt_encode_mock.encode.assert_not_called() @@ -314,24 +319,63 @@ def test_unknown_metrics_or_dimensions_error_validation(mock_metrics_dimensions_ @freeze_time("2021-11-30") -def test_stream_slices_limited_by_current_date(test_config): +def test_stream_slices_limited_by_current_date(test_config, mock_metrics_dimensions_type_list_link): + test_config["window_in_days"] = 14 g = GoogleAnalyticsV4IncrementalObjectsBase(config=test_config) - stream_state = {"ga_date": "2050-05-01"} + stream_state = {"ga_date": "2021-11-25"} slices = g.stream_slices(stream_state=stream_state) current_date = pendulum.now().date().strftime("%Y-%m-%d") - - assert len(slices) == 1 - assert slices[0]["startDate"] == slices[0]["endDate"] - assert slices[0]["endDate"] == current_date + assert slices == [{"startDate": "2021-11-26", "endDate": current_date}] @freeze_time("2021-11-30") -def test_stream_slices_start_from_current_date_if_abnornal_state_is_passed(test_config): +def test_empty_stream_slice_if_abnormal_state_is_passed(test_config, mock_metrics_dimensions_type_list_link): g = GoogleAnalyticsV4IncrementalObjectsBase(config=test_config) stream_state = {"ga_date": "2050-05-01"} slices = g.stream_slices(stream_state=stream_state) - current_date = pendulum.now().date().strftime("%Y-%m-%d") + assert slices == [None] - assert len(slices) == 1 - assert slices[0]["startDate"] == slices[0]["endDate"] - assert slices[0]["startDate"] == current_date + +def test_empty_slice_produces_no_records(test_config, mock_metrics_dimensions_type_list_link): + g = GoogleAnalyticsV4IncrementalObjectsBase(config=test_config) + records = g.read_records(sync_mode=SyncMode.incremental, stream_slice=None, stream_state={g.cursor_field: g.start_date}) + assert next(iter(records), None) is None + + +def test_state_saved_after_each_record(test_config, mock_metrics_dimensions_type_list_link): + today_dt = pendulum.now().date() + before_yesterday = today_dt.subtract(days=2).strftime("%Y-%m-%d") + today = today_dt.strftime("%Y-%m-%d") + record = {"ga_date": today} + g = GoogleAnalyticsV4IncrementalObjectsBase(config=test_config) + state = {g.cursor_field: before_yesterday} + assert g.get_updated_state(state, record) == {g.cursor_field: today} + + +def test_connection_fail_invalid_reports_json(test_config): + source = SourceGoogleAnalyticsV4() + test_config["custom_reports"] = "[{'data': {'ga:foo': 'ga:bar'}}]" + ok, error = source.check_connection(logging.getLogger(), test_config) + assert not ok + assert "Invalid custom reports json structure." in error + + +@pytest.mark.parametrize( + ("status", "json_resp"), + ( + (403, {"error": "Your role is not not granted the permission for accessing this resource"}), + (500, {"error": "Internal server error, please contact support"}), + ), +) +def test_connection_fail_due_to_http_status( + mocker, test_config, requests_mock, mock_auth_call, mock_metrics_dimensions_type_list_link, status, json_resp +): + mocker.patch("time.sleep") + requests_mock.post("https://analyticsreporting.googleapis.com/v4/reports:batchGet", status_code=status, json=json_resp) + source = SourceGoogleAnalyticsV4() + ok, error = source.check_connection(logging.getLogger(), test_config) + assert not ok + if status == 403: + assert "Please check the permissions for the requested view_id" in error + assert test_config["view_id"] in error + assert json_resp["error"] in error diff --git a/airbyte-integrations/connectors/source-linkedin-ads/Dockerfile b/airbyte-integrations/connectors/source-linkedin-ads/Dockerfile index 7b18dc7322bfc..71e2e090c3f61 100644 --- a/airbyte-integrations/connectors/source-linkedin-ads/Dockerfile +++ b/airbyte-integrations/connectors/source-linkedin-ads/Dockerfile @@ -33,5 +33,5 @@ COPY source_linkedin_ads ./source_linkedin_ads ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.5 +LABEL io.airbyte.version=0.1.6 LABEL io.airbyte.name=airbyte/source-linkedin-ads diff --git a/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/spec.json b/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/spec.json index 1e55729964821..5ee0df5f8f930 100644 --- a/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/spec.json +++ b/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/spec.json @@ -15,16 +15,16 @@ "examples": ["2021-05-17"] }, "account_ids": { - "title": "Account IDs", + "title": "Account IDs (Optional)", "type": "array", - "description": "Specify the Account IDs separated by space, to pull the data from. Leave empty, if you want to pull the data from all associated accounts.", + "description": "Specify the Account IDs separated by space, to pull the data from. Leave empty, if you want to pull the data from all associated accounts. See the official LinkedIn Ads docs for more info.", "items": { "type": "integer" }, "default": [] }, "credentials": { - "title": "Authorization Method", + "title": "Authentication *", "type": "object", "oneOf": [ { @@ -68,7 +68,7 @@ "access_token": { "type": "string", "title": "Access Token", - "description": "The token value generated using Authentication Code.", + "description": "The token value generated using the authentication code. See the docs to obtain yours.", "airbyte_secret": true } } diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 4abf9471c1ea3..1d52311e0797a 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -80,8 +80,12 @@ import io.airbyte.db.Databases; import io.airbyte.test.airbyte_test_container.AirbyteTestContainer; import io.airbyte.test.utils.PostgreSQLContainerHelper; +import io.airbyte.workers.temporal.TemporalUtils; +import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.File; import java.io.IOException; import java.net.Inet4Address; @@ -783,7 +787,7 @@ public void testCheckpointing() throws Exception { // now cancel it so that we freeze state! try { apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead1.getJob().getId())); - } catch (Exception e) {} + } catch (final Exception e) {} final ConnectionState connectionState = waitForConnectionState(apiClient, connectionId); @@ -1146,6 +1150,78 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception assertEquals(JobStatus.CANCELLED, resp.get().getJob().getStatus()); } + @Test + @Order(22) + public void testDeleteConnection() throws Exception { + final String connectionName = "test-connection"; + final UUID sourceId = createPostgresSource().getSourceId(); + final UUID destinationId = createDestination().getDestinationId(); + final UUID operationId = createOperation().getOperationId(); + final AirbyteCatalog catalog = discoverSourceSchema(sourceId); + final SyncMode syncMode = SyncMode.INCREMENTAL; + final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP; + catalog.getStreams().forEach(s -> s.getConfig() + .syncMode(syncMode) + .cursorField(List.of(COLUMN_ID)) + .destinationSyncMode(destinationSyncMode) + .primaryKey(List.of(List.of(COLUMN_NAME)))); + + UUID connectionId = + createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + + final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING)); + + // test normal deletion of connection + apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + + // remove connection to avoid exception during tear down + connectionIds.remove(connectionId); + + LOGGER.info("Waiting for connection to be deleted..."); + Thread.sleep(500); + + ConnectionStatus connectionStatus = + apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus(); + assertEquals(ConnectionStatus.DEPRECATED, connectionStatus); + + // test deletion of connection when temporal workflow is in a bad state, only when using new + // scheduler + final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); + if (featureFlags.usesNewScheduler()) { + LOGGER.info("Testing connection deletion when temporal is in a terminal state"); + connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + + terminateTemporalWorkflow(connectionId); + + // we should still be able to delete the connection when the temporal workflow is in this state + apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + + LOGGER.info("Waiting for connection to be deleted..."); + Thread.sleep(500); + + connectionStatus = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus(); + assertEquals(ConnectionStatus.DEPRECATED, connectionStatus); + } + } + + private void terminateTemporalWorkflow(final UUID connectionId) { + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); + final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); + + // check if temporal workflow is reachable + final ConnectionManagerWorkflow connectionManagerWorkflow = + workflowCLient.newWorkflowStub(ConnectionManagerWorkflow.class, "connection_manager_" + connectionId); + connectionManagerWorkflow.getState(); + + // Terminate workflow + LOGGER.info("Terminating temporal workflow..."); + workflowCLient.newUntypedWorkflowStub("connection_manager_" + connectionId).terminate(""); + + // remove connection to avoid exception during tear down + connectionIds.remove(connectionId); + } + private AirbyteCatalog discoverSourceSchema(final UUID sourceId) throws ApiException { return apiClient.getSourceApi().discoverSchemaForSource(new SourceDiscoverSchemaRequestBody().sourceId(sourceId)).getCatalog(); } diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 421879010fedd..7828f188e4dc1 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -207,6 +207,7 @@ This uploads data directly from your source to BigQuery. While this is faster to | Version | Date | Pull Request | Subject | |:--------| :--- | :--- | :--- | +| 1.0.2 | 2022-03-30 | [11620](https://github.com/airbytehq/airbyte/pull/11620) | Updated spec | | 1.0.1 | 2022-03-24 | [11350](https://github.com/airbytehq/airbyte/pull/11350) | Improve check performance | | 1.0.0 | 2022-03-18 | [11238](https://github.com/airbytehq/airbyte/pull/11238) | Updated spec and documentation | | 0.6.12 | 2022-03-18 | [10793](https://github.com/airbytehq/airbyte/pull/10793) | Fix namespace with invalid characters | @@ -235,6 +236,7 @@ This uploads data directly from your source to BigQuery. While this is faster to | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------| :--- | +| 0.2.14 | 2022-04-02 | [11620](https://github.com/airbytehq/airbyte/pull/11620) | Updated spec | | 0.2.13 | 2022-04-01 | [11636](https://github.com/airbytehq/airbyte/pull/11636) | Added new unit tests | | 0.2.12 | 2022-03-28 | [11454](https://github.com/airbytehq/airbyte/pull/11454) | Integration test enhancement for picking test-data and schemas | | 0.2.11 | 2022-03-18 | [10793](https://github.com/airbytehq/airbyte/pull/10793) | Fix namespace with invalid characters | diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index b96b2ab208cf6..8c4dc9f46c499 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -22,7 +22,7 @@ Check out common troubleshooting issues for the S3 destination connector on our | S3 Endpoint | string | URL to S3, If using AWS S3 just leave blank. | | S3 Bucket Name | string | Name of the bucket to sync data into. | | S3 Bucket Path | string | Subdirectory under the above bucket to sync the data into. | -| S3 Bucket Format | string | Additional subdirectories format under S3 Bucket Path. Default value is `${NAMESPACE}/${STREAM_NAME}/` and this can be further customized with variables such as `${YEAR}, ${MONTH}, ${DAY}, ${HOUR} etc` referring to the writing datetime. | +| S3 Bucket Format | string | Additional string format on how to store data under S3 Bucket Path. Default value is `${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_`. | | S3 Region | string | See [here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-available-regions) for all region codes. | | Access Key ID | string | AWS/Minio credential. | | Secret Access Key | string | AWS/Minio credential. | @@ -30,20 +30,20 @@ Check out common troubleshooting issues for the S3 destination connector on our ⚠️ Please note that under "Full Refresh Sync" mode, data in the configured bucket and path will be wiped out before each sync. We recommend you to provision a dedicated S3 resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️ -The full path of the output data with S3 path format `${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}` is: +The full path of the output data with the default S3 path format `${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_` is: ```text -////. +///__. ``` For example: ```text -testing_bucket/data_output_path/public/users/2021_01_01/123e4567-e89b-12d3-a456-426614174000.csv.gz -↑ ↑ ↑ ↑ ↑ ↑ ↑ -| | | | | | format extension -| | | | | | -| | | | | uuid +testing_bucket/data_output_path/public/users/2021_01_01_1234567890_0.csv.gz +↑ ↑ ↑ ↑ ↑ ↑ ↑ ↑ +| | | | | | | format extension +| | | | | | unique incremental part id +| | | | | milliseconds since epoch | | | | upload date in YYYY_MM_DD | | | stream name | | source namespace (if it exists) @@ -51,6 +51,29 @@ testing_bucket/data_output_path/public/users/2021_01_01/123e4567-e89b-12d3-a456- bucket name ``` +The rationales behind this naming pattern are: + +1. Each stream has its own directory. +2. The data output files can be sorted by upload time. +3. The upload time composes of a date part and millis part so that it is both readable and unique. + +But it is possible to further customize by using the available variables to format the bucket path: +- `${NAMESPACE}`: Namespace where the stream comes from or configured by the connectionn namespace fields. +- `${STREAM_NAME}`: Name of the stream +- `${YEAR}`: Year in which the sync was writing the output data in. +- `${MONTH}`: Month in which the sync was writing the output data in. +- `${DAY}`: Day in which the sync was writing the output data in. +- `${HOUR}`: Hour in which the sync was writing the output data in. +- `${MINUTE}` : Minute in which the sync was writing the output data in. +- `${SECOND}`: Second in which the sync was writing the output data in. +- `${MILLISECOND}`: Millisecond in which the sync was writing the output data in. +- `${EPOCH}`: Milliseconds since Epoch in which the sync was writing the output data in. +- `${UUID}`: random uuid string + +Note: +- Multiple `/` characters in the S3 path are collapsed into a single `/` character. +- If the output bucket contains too many files, the part id variable is using a `UUID` instead. It uses sequential ID otherwise. + Please note that the stream name may contain a prefix, if it is configured on the connection. A data sync may create multiple files as the output files can be partitioned by size (targeting a size of 200MB compressed or lower) . @@ -228,6 +251,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A | Version | Date | Pull Request | Subject | |:--------| :--- | :--- |:---------------------------------------------------------------------------------------------------------------------------| +| 0.3.0 | 2022-04-04 | [\#11666](https://github.com/airbytehq/airbyte/pull/11666) | 0.2.12 actually has breaking changes since files are compressed by default, this PR also fixes the naming to be more compatible with older versions. | | 0.2.13 | 2022-03-29 | [\#11496](https://github.com/airbytehq/airbyte/pull/11496) | Fix S3 bucket path to be included with S3 bucket format | | 0.2.12 | 2022-03-28 | [\#11294](https://github.com/airbytehq/airbyte/pull/11294) | Change to serialized buffering strategy to reduce memory consumption | | 0.2.11 | 2022-03-23 | [\#11173](https://github.com/airbytehq/airbyte/pull/11173) | Added support for AWS Glue crawler | diff --git a/docs/integrations/sources/google-analytics-v4.md b/docs/integrations/sources/google-analytics-v4.md index 54b527a856e8b..16e2ee7c8be51 100644 --- a/docs/integrations/sources/google-analytics-v4.md +++ b/docs/integrations/sources/google-analytics-v4.md @@ -2,13 +2,13 @@ ## Features -| Feature | Supported? | -| :--- | :--- | -| Full Refresh Sync | Yes | -| Incremental Sync | Yes | -| Replicate Incremental Deletes | No | -| SSL connection | Yes | -| Custom Reports | Yes | +| Feature | Supported? | +|:------------------------------|:-----------| +| Full Refresh Sync | Yes | +| Incremental Sync | Yes | +| Replicate Incremental Deletes | No | +| SSL connection | Yes | +| Custom Reports | Yes | ### Supported Tables @@ -140,21 +140,22 @@ Google Analytics API may return provisional or incomplete data. When this occurs ## Changelog -| Version | Date | Pull Request | Subject | -| :--- | :--- | :--- | :--- | -| 0.1.16 | 2022-01-26 | [9480](https://github.com/airbytehq/airbyte/pull/9480) | Reintroduce `window_in_days` and log warning when sampling occurs | -| 0.1.15 | 2021-12-28 | [9165](https://github.com/airbytehq/airbyte/pull/9165) | Update titles and descriptions | -| 0.1.14 | 2021-12-09 | [8656](https://github.com/airbytehq/airbyte/pull/8656) | Fix date-format in schemas | -| 0.1.13 | 2021-12-09 | [8676](https://github.com/airbytehq/airbyte/pull/8676) | Fix `window_in_days` validation issue | -| 0.1.12 | 2021-12-03 | [8175](https://github.com/airbytehq/airbyte/pull/8175) | Fix validation of unknown metric(s) or dimension(s) error | -| 0.1.11 | 2021-11-30 | [8264](https://github.com/airbytehq/airbyte/pull/8264) | Corrected date range | -| 0.1.10 | 2021-11-19 | [8087](https://github.com/airbytehq/airbyte/pull/8087) | Support `start_date` before the account has any data | -| 0.1.9 | 2021-10-27 | [7410](https://github.com/airbytehq/airbyte/pull/7410) | Add check for correct permission for requested `view_id` | -| 0.1.8 | 2021-10-13 | [7020](https://github.com/airbytehq/airbyte/pull/7020) | Add intermediary auth config support | -| 0.1.7 | 2021-10-07 | [6414](https://github.com/airbytehq/airbyte/pull/6414) | Declare oauth parameters in google sources | -| 0.1.6 | 2021-09-27 | [6459](https://github.com/airbytehq/airbyte/pull/6459) | Update OAuth Spec File | -| 0.1.3 | 2021-09-21 | [6357](https://github.com/airbytehq/airbyte/pull/6357) | Fix oauth workflow parameters | -| 0.1.2 | 2021-09-20 | [6306](https://github.com/airbytehq/airbyte/pull/6306) | Support of airbyte OAuth initialization flow | -| 0.1.1 | 2021-08-25 | [5655](https://github.com/airbytehq/airbyte/pull/5655) | Corrected validation of empty custom report | -| 0.1.0 | 2021-08-10 | [5290](https://github.com/airbytehq/airbyte/pull/5290) | Initial Release | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------| +| 0.1.17 | 2022-03-31 | [11512](https://github.com/airbytehq/airbyte/pull/11512) | Improved Unit and Acceptance tests coverage, fix `read` with abnormally large state values | +| 0.1.16 | 2022-01-26 | [9480](https://github.com/airbytehq/airbyte/pull/9480) | Reintroduce `window_in_days` and log warning when sampling occurs | +| 0.1.15 | 2021-12-28 | [9165](https://github.com/airbytehq/airbyte/pull/9165) | Update titles and descriptions | +| 0.1.14 | 2021-12-09 | [8656](https://github.com/airbytehq/airbyte/pull/8656) | Fix date-format in schemas | +| 0.1.13 | 2021-12-09 | [8676](https://github.com/airbytehq/airbyte/pull/8676) | Fix `window_in_days` validation issue | +| 0.1.12 | 2021-12-03 | [8175](https://github.com/airbytehq/airbyte/pull/8175) | Fix validation of unknown metric(s) or dimension(s) error | +| 0.1.11 | 2021-11-30 | [8264](https://github.com/airbytehq/airbyte/pull/8264) | Corrected date range | +| 0.1.10 | 2021-11-19 | [8087](https://github.com/airbytehq/airbyte/pull/8087) | Support `start_date` before the account has any data | +| 0.1.9 | 2021-10-27 | [7410](https://github.com/airbytehq/airbyte/pull/7410) | Add check for correct permission for requested `view_id` | +| 0.1.8 | 2021-10-13 | [7020](https://github.com/airbytehq/airbyte/pull/7020) | Add intermediary auth config support | +| 0.1.7 | 2021-10-07 | [6414](https://github.com/airbytehq/airbyte/pull/6414) | Declare oauth parameters in google sources | +| 0.1.6 | 2021-09-27 | [6459](https://github.com/airbytehq/airbyte/pull/6459) | Update OAuth Spec File | +| 0.1.3 | 2021-09-21 | [6357](https://github.com/airbytehq/airbyte/pull/6357) | Fix oauth workflow parameters | +| 0.1.2 | 2021-09-20 | [6306](https://github.com/airbytehq/airbyte/pull/6306) | Support of airbyte OAuth initialization flow | +| 0.1.1 | 2021-08-25 | [5655](https://github.com/airbytehq/airbyte/pull/5655) | Corrected validation of empty custom report | +| 0.1.0 | 2021-08-10 | [5290](https://github.com/airbytehq/airbyte/pull/5290) | Initial Release | diff --git a/docs/integrations/sources/linkedin-ads.md b/docs/integrations/sources/linkedin-ads.md index fa17b967f170f..301e9b0a16373 100644 --- a/docs/integrations/sources/linkedin-ads.md +++ b/docs/integrations/sources/linkedin-ads.md @@ -37,13 +37,13 @@ This Source is capable of syncing the following data as streams: ### Features -| Feature | Supported?\(Yes/No\) | Notes | -| :---------------------------------------- | :------------------- | :---- | -| Full Refresh Overwrite Sync | Yes | | -| Full Refresh Append Sync | Yes | | -| Incremental - Append Sync | Yes | | -| Incremental - Append + Deduplication Sync | Yes | | -| Namespaces | No | | +| Feature | Supported?\(Yes/No\) | +| :---------------------------------------- | :------------------- | +| Full Refresh Overwrite Sync | Yes | +| Full Refresh Append Sync | Yes | +| Incremental - Append Sync | Yes | +| Incremental - Append + Deduplication Sync | Yes | +| Namespaces | No | ### Performance considerations @@ -79,7 +79,7 @@ The API user account should be assigned one of the following roles: ### Authentication There are 2 authentication methods: -##### Generate the Access\_Token +##### Generate the `Access Token` The source LinkedIn uses `access_token` provided in the UI connector's settings to make API requests. Access tokens expire after `2 months from generating date (60 days)` and require a user to manually authenticate again. If you receive a `401 invalid token response`, the error logs will state that your access token has expired and to re-authenticate your connection to generate a new token. This is described more [here](https://docs.microsoft.com/en-us/linkedin/shared/authentication/authorization-code-flow?context=linkedin/context). 1. **Login to LinkedIn as the API user.** 2. **Create an App** [here](https://www.linkedin.com/developers/apps): @@ -130,12 +130,13 @@ The source LinkedIn supports the oAuth2 protocol. Everyone can use it directly v ## Changelog -| Version | Date | Pull Request | Subject | -| :------ | :--------- | :----------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------- | -| 0.1.5 | 2021-12-21 | [8984](https://github.com/airbytehq/airbyte/pull/8984) | Update connector fields title/description | -| 0.1.4 | 2021-12-02 | [8382](https://github.com/airbytehq/airbyte/pull/8382) | Modify log message in rate-limit cases | -| 0.1.3 | 2021-11-11 | [7839](https://github.com/airbytehq/airbyte/pull/7839) | Added oauth support | -| 0.1.2 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies | -| 0.1.1 | 2021-10-02 | [6610](https://github.com/airbytehq/airbyte/pull/6610) | Fix for `Campaigns/targetingCriteria` transformation, coerced `Creatives/variables/values` to string by default | -| 0.1.0 | 2021-09-05 | [5285](https://github.com/airbytehq/airbyte/pull/5285) | Initial release of Native LinkedIn Ads connector for Airbyte | +| Version | Date | Pull Request | Subject | +| :------ | :--------- | :----------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------- | +| 0.1.6 | 2022-04-04 | [11690](https://github.com/airbytehq/airbyte/pull/11690) | Small documenation corrections | +| 0.1.5 | 2021-12-21 | [8984](https://github.com/airbytehq/airbyte/pull/8984) | Update connector fields title/description | +| 0.1.4 | 2021-12-02 | [8382](https://github.com/airbytehq/airbyte/pull/8382) | Modify log message in rate-limit cases | +| 0.1.3 | 2021-11-11 | [7839](https://github.com/airbytehq/airbyte/pull/7839) | Added oauth support | +| 0.1.2 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies | +| 0.1.1 | 2021-10-02 | [6610](https://github.com/airbytehq/airbyte/pull/6610) | Fix for `Campaigns/targetingCriteria` transformation, coerced `Creatives/variables/values` to string by default | +| 0.1.0 | 2021-09-05 | [5285](https://github.com/airbytehq/airbyte/pull/5285) | Initial release of Native LinkedIn Ads connector for Airbyte | diff --git a/tools/bin/ci_integration_workflow_launcher.sh b/tools/bin/ci_integration_workflow_launcher.sh index f1dfbb8978b33..8abbb3f10bd8b 100755 --- a/tools/bin/ci_integration_workflow_launcher.sh +++ b/tools/bin/ci_integration_workflow_launcher.sh @@ -29,12 +29,14 @@ fi CONNECTORS=$(./gradlew integrationTest --dry-run | grep 'integrationTest SKIPPED' | cut -d: -f 4 | sort | uniq) echo "$CONNECTORS" | while read -r connector; do - echo "Issuing request for connector $connector..." - curl \ - -i \ - -X POST \ - -H "Accept: application/vnd.github.v3+json" \ - -H "Authorization: Bearer $GITHUB_TOKEN" \ - "$REPO_API/actions/workflows/$WORKFLOW_ID/dispatches" \ - -d "{\"ref\":\"master\", \"inputs\": { \"connector\": \"$connector\"} }" + if [ ! -z "$connector" ]; then + echo "Issuing request for connector $connector..." + curl \ + -i \ + -X POST \ + -H "Accept: application/vnd.github.v3+json" \ + -H "Authorization: Bearer $GITHUB_TOKEN" \ + "$REPO_API/actions/workflows/$WORKFLOW_ID/dispatches" \ + -d "{\"ref\":\"master\", \"inputs\": { \"connector\": \"$connector\"} }" + fi done