From 62e5528e5787ed80ba696536285c494c9bc089bb Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Tue, 13 Feb 2024 08:31:54 -0800 Subject: [PATCH 1/2] Don't emit final state if there is an underlying stream failure (#34869) Co-authored-by: Xiaohan Song --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../core/src/main/resources/version.properties | 2 +- .../relationaldb/state/SourceStateIterator.java | 16 ++++++++++------ .../state/SourceStateIteratorTest.java | 9 +++++++++ .../connectors/source-mysql/build.gradle | 2 +- .../connectors/source-mysql/metadata.yaml | 2 +- docs/integrations/sources/mysql.md | 9 +++++---- 7 files changed, 28 insertions(+), 13 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 8fe186d4e5186..327bf2795170b 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.20.5 | 2024-02-13 | [\#34869](https://github.com/airbytehq/airbyte/pull/34869) | Don't emit final state in SourceStateIterator there is an underlying stream failure. | | 0.20.4 | 2024-02-12 | [\#35042](https://github.com/airbytehq/airbyte/pull/35042) | Use delegate's isDestinationV2 invocation in SshWrappedDestination. | | 0.20.3 | 2024-02-09 | [\#34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in mysql/mssql database name. | | 0.20.2 | 2024-02-12 | [\#35111](https://github.com/airbytehq/airbyte/pull/35144) | Make state emission from async framework synchronized. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 5ca3b8c87d355..09d1c8ab8e97d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.20.4 \ No newline at end of file +version=0.20.5 \ No newline at end of file diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.java index 203244800b421..5166ae2898ae0 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.java @@ -34,15 +34,19 @@ public SourceStateIterator(final Iterator messageIterator, @CheckForNull @Override protected AirbyteMessage computeNext() { + boolean iteratorHasNextValue = false; try { iteratorHasNextValue = messageIterator.hasNext(); - } catch (Exception ex) { - LOGGER.info("Caught exception while trying to get the next from message iterator. Treating hasNext to false. ", ex); + } catch (final Exception ex) { + // If the initial snapshot is incomplete for this stream, throw an exception failing the sync. This + // will ensure the platform retry logic + // kicks in and keeps retrying the sync until the initial snapshot is complete. + throw new RuntimeException(ex); } if (iteratorHasNextValue) { if (sourceStateIteratorManager.shouldEmitStateMessage(recordCount, lastCheckpoint)) { - AirbyteStateMessage stateMessage = sourceStateIteratorManager.generateStateMessageAtCheckpoint(); + final AirbyteStateMessage stateMessage = sourceStateIteratorManager.generateStateMessageAtCheckpoint(); stateMessage.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount)); recordCount = 0L; @@ -62,12 +66,12 @@ protected AirbyteMessage computeNext() { } } else if (!hasEmittedFinalState) { hasEmittedFinalState = true; - final AirbyteStateMessage finalStateMessage = sourceStateIteratorManager.createFinalStateMessage(); - finalStateMessage.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount)); + final AirbyteStateMessage finalStateMessageForStream = sourceStateIteratorManager.createFinalStateMessage(); + finalStateMessageForStream.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount)); recordCount = 0L; return new AirbyteMessage() .withType(Type.STATE) - .withState(finalStateMessage); + .withState(finalStateMessageForStream); } else { return endOfData(); } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.java index 7a32b03607b3d..34560be119d98 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.java @@ -5,11 +5,13 @@ package io.airbyte.cdk.integrations.source.relationaldb.state; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -89,4 +91,11 @@ void testShouldSendEndOfData() { assertEquals(null, sourceStateIterator.computeNext()); } + @Test + void testShouldRethrowExceptions() { + processRecordMessage(); + doThrow(new ArrayIndexOutOfBoundsException("unexpected error")).when(messageIterator).hasNext(); + assertThrows(RuntimeException.class, () -> sourceStateIterator.computeNext()); + } + } diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 24c52f470102f..11b9d5b4e156f 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -6,7 +6,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.20.3' + cdkVersionRequired = '0.20.5' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 16b607af14ba5..a50e4a1846552 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.3.5 + dockerImageTag: 3.3.6 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 64f8de10e4041..7ad708fe1736c 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -223,10 +223,11 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.3.5 | 2024-02-12 | [34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in db name | -| 3.3.4 | 2024-02-08 | [34750](https://github.com/airbytehq/airbyte/pull/34750) | Adopt CDK 0.19.0 | -| 3.3.3 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0. | -| 3.3.2 | 2024-01-08 | [33005](https://github.com/airbytehq/airbyte/pull/33005) | Adding count stats for incremental sync in AirbyteStateMessage +| 3.3.6 | 2024-02-13 | [34869](https://github.com/airbytehq/airbyte/pull/34573) | Don't emit state in SourceStateIterator when there is an underlying stream failure. | +| 3.3.5 | 2024-02-12 | [34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in db name | +| 3.3.4 | 2024-02-08 | [34750](https://github.com/airbytehq/airbyte/pull/34750) | Adopt CDK 0.19.0 | +| 3.3.3 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0. | +| 3.3.2 | 2024-01-08 | [33005](https://github.com/airbytehq/airbyte/pull/33005) | Adding count stats for incremental sync in AirbyteStateMessage | | 3.3.1 | 2024-01-03 | [33312](https://github.com/airbytehq/airbyte/pull/33312) | Adding count stats in AirbyteStateMessage | | 3.3.0 | 2023-12-19 | [33436](https://github.com/airbytehq/airbyte/pull/33436) | Remove LEGACY state flag | | 3.2.4 | 2023-12-12 | [33356](https://github.com/airbytehq/airbyte/pull/33210) | Support for better debugging tools.. | From 5d665ec23781334ecae46208e97b1fa53ce4bb19 Mon Sep 17 00:00:00 2001 From: Tyler B <104733644+tybernstein@users.noreply.github.com> Date: Tue, 13 Feb 2024 12:47:25 -0500 Subject: [PATCH 2/2] Remove IAM Role Setup instructions from s3.md (#35190) --- docs/integrations/sources/s3.md | 75 +++------------------------------ 1 file changed, 7 insertions(+), 68 deletions(-) diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 387ad9befaf31..ee1ea400a1967 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -15,7 +15,7 @@ Please note that using cloud storage may incur egress costs. Egress refers to da ### Step 1: Set up Amazon S3 -**If you are syncing from a private bucket**, you need to authenticate the connection. This can be done either by using an `IAM User` (with `AWS Access Key ID` and `Secret Access Key`) or an `IAM Role` (with `Role ARN`). Begin by creating a policy with the necessary permissions: +**If you are syncing from a private bucket**, you will need to provide both an `AWS Access Key ID` and `AWS Secret Access Key` to authenticate the connection. The IAM user associated with the credentials must be granted `read` and `list` permissions for the bucket and its objects. If you are unfamiliar with configuring AWS permissions, you can follow these steps to obtain the necessary permissions and credentials: #### Create a Policy @@ -47,70 +47,11 @@ At this time, object-level permissions alone are not sufficient to successfully ::: 4. Give your policy a descriptive name, then click **Create policy**. - -#### Option 1: Using an IAM Role (Most secure) - - -:::note -This authentication method is currently in the testing phase. To enable it for your workspace, please contact our Support Team. -::: - - -1. In the IAM dashboard, click **Roles**, then **Create role**. -2. Choose the appropriate trust entity and attach the policy you created. -3. Set up a trust relationship for the role. For example for **AWS account** trusted entity use default AWS account on your instance (it will be used to assume role). To use **External ID** set it to environment variables as `export AWS_ASSUME_ROLE_EXTERNAL_ID="{your-external-id}"`. Edit the trust relationship policy to reflect this: -``` -{ - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Principal": { - "AWS": "arn:aws:iam::{your-aws-account-id}:user/{your-username}" - }, - "Action": "sts:AssumeRole", - "Condition": { - "StringEquals": { - "sts:ExternalId": "{your-external-id}" - } - } - } - ] -} -``` - - -2. Choose the **AWS account** trusted entity type. -3. Set up a trust relationship for the role. This allows the Airbyte instance's AWS account to assume this role. You will also need to specify an external ID, which is a secret key that the trusting service (Airbyte) and the trusted role (the role you're creating) both know. This ID is used to prevent the "confused deputy" problem. The External ID should be your Airbyte workspace ID, which can be found in the URL of your workspace page. Edit the trust relationship policy to include the external ID: -``` -{ - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Principal": { - "AWS": "arn:aws:iam::094410056844:user/delegated_access_user" - }, - "Action": "sts:AssumeRole", - "Condition": { - "StringEquals": { - "sts:ExternalId": "{your-airbyte-workspace-id}" - } - } - } - ] -} -``` - -4. Complete the role creation and note the Role ARN. - -#### Option 2: Using an IAM User - -1. In the IAM dashboard, click **Users**. Select an existing IAM user or create a new one by clicking **Add users**. -2. If you are using an _existing_ IAM user, click the **Add permissions** dropdown menu and select **Add permissions**. If you are creating a _new_ user, you will be taken to the Permissions screen after selecting a name. -3. Select **Attach policies directly**, then find and check the box for your new policy. Click **Next**, then **Add permissions**. -4. After successfully creating your user, select the **Security credentials** tab and click **Create access key**. You will be prompted to select a use case and add optional tags to your access key. Click **Create access key** to generate the keys. - +5. In the IAM dashboard, click **Users**. Select an existing IAM user or create a new one by clicking **Add users**. +6. If you are using an _existing_ IAM user, click the **Add permissions** dropdown menu and select **Add permissions**. If you are creating a _new_ user, you will be taken to the Permissions screen after selecting a name. +7. Select **Attach policies directly**, then find and check the box for your new policy. Click **Next**, then **Add permissions**. +8. After successfully creating your user, select the **Security credentials** tab and click **Create access key**. You will be prompted to select a use case and add optional tags to your access key. Click **Create access key** to generate the keys. + :::caution Your `Secret Access Key` will only be visible once upon creation. Be sure to copy and store it securely for future use. ::: @@ -130,9 +71,7 @@ For more information on managing your access keys, please refer to the 3. Give a **Name** to the stream 4. (Optional) - If you want to enforce a specific schema, you can enter a **Input schema**. By default, this value is set to `{}` and will automatically infer the schema from the file\(s\) you are replicating. For details on providing a custom schema, refer to the [User Schema section](#user-schema). 5. Optionally, enter the **Globs** which dictates which files to be synced. This is a regular expression that allows Airbyte to pattern match the specific files to replicate. If you are replicating all the files within your bucket, use `**` as the pattern. For more precise pattern matching options, refer to the [Path Patterns section](#path-patterns) below. -6. **To authenticate your private bucket**: - - If using an IAM role, enter the **AWS Role ARN**. - - If using IAM user credentials, fill the **AWS Access Key ID** and **AWS Secret Access Key** fields with the appropriate credentials. +6. **If you are syncing from a private bucket**, you must fill the **AWS Access Key ID** and **AWS Secret Access Key** fields with the appropriate credentials to authenticate the connection. All other fields are optional and can be left empty. Refer to the [S3 Provider Settings section](#s3-provider-settings) below for more information on each field. All other fields are optional and can be left empty. Refer to the [S3 Provider Settings section](#s3-provider-settings) below for more information on each field.