diff --git a/airbyte-integrations/connectors/source-s3/source_s3/v4/__init__.py b/airbyte-integrations/connectors/source-s3/source_s3/v4/__init__.py index 56d8b388e93e0..c5207727808b4 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/v4/__init__.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/v4/__init__.py @@ -3,6 +3,8 @@ # from .config import Config +from .legacy_config_transformer import LegacyConfigTransformer +from .source import SourceS3 from .stream_reader import SourceS3StreamReader -__all__ = ["Config", "SourceS3StreamReader"] +__all__ = ["Config", "LegacyConfigTransformer", "SourceS3", "SourceS3StreamReader"] diff --git a/airbyte-integrations/connectors/source-s3/source_s3/v4/config.py b/airbyte-integrations/connectors/source-s3/source_s3/v4/config.py index 6cc00f8e800c3..de6ab28973221 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/v4/config.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/v4/config.py @@ -9,6 +9,11 @@ class Config(AbstractFileBasedSpec): + """ + NOTE: When this Spec is changed, legacy_config_transformer.py must also be modified to uptake the changes + because it is responsible for converting legacy S3 v3 configs into v4 configs using the File-Based CDK. + """ + @classmethod def documentation_url(cls) -> AnyUrl: return AnyUrl("https://docs.airbyte.com/integrations/sources/s3", scheme="https") diff --git a/airbyte-integrations/connectors/source-s3/source_s3/v4/legacy_config_transformer.py b/airbyte-integrations/connectors/source-s3/source_s3/v4/legacy_config_transformer.py new file mode 100644 index 0000000000000..8406b682f2499 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/source_s3/v4/legacy_config_transformer.py @@ -0,0 +1,60 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from datetime import datetime +from typing import Any, List, Mapping + +from source_s3.source import SourceS3Spec + +SECONDS_FORMAT = "%Y-%m-%dT%H:%M:%SZ" +MICROS_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" + + +class LegacyConfigTransformer: + """ + Class that takes in S3 source configs in the legacy format and transforms them into + configs that can be used by the new S3 source built with the file-based CDK. + """ + + @classmethod + def convert(cls, legacy_config: SourceS3Spec) -> Mapping[str, Any]: + transformed_config = { + "bucket": legacy_config.provider.bucket, + "streams": [ + { + "name": legacy_config.dataset, + "file_type": legacy_config.format.filetype, + "globs": cls._create_globs(legacy_config.path_pattern, legacy_config.provider.path_prefix), + "validation_policy": "Emit Record", + # todo: add formats on a per-type basis in follow up PRs + } + ], + } + + if legacy_config.provider.start_date: + transformed_config["start_date"] = cls._transform_seconds_to_micros(legacy_config.provider.start_date) + if legacy_config.provider.aws_access_key_id: + transformed_config["aws_access_key_id"] = legacy_config.provider.aws_access_key_id + if legacy_config.provider.aws_secret_access_key: + transformed_config["aws_secret_access_key"] = legacy_config.provider.aws_secret_access_key + if legacy_config.provider.endpoint: + transformed_config["endpoint"] = legacy_config.provider.endpoint + if legacy_config.user_schema and legacy_config.user_schema != "{}": + transformed_config["streams"][0]["input_schema"] = legacy_config.user_schema + + return transformed_config + + @classmethod + def _create_globs(cls, path_pattern: str, path_prefix: str) -> List[str]: + if path_prefix: + return [path_prefix + path_pattern] + return [path_pattern] + + @classmethod + def _transform_seconds_to_micros(cls, datetime_str: str) -> str: + try: + parsed_datetime = datetime.strptime(datetime_str, SECONDS_FORMAT) + return datetime.strftime(parsed_datetime, MICROS_FORMAT) + except ValueError as e: + raise e diff --git a/airbyte-integrations/connectors/source-s3/source_s3/v4/source.py b/airbyte-integrations/connectors/source-s3/source_s3/v4/source.py new file mode 100644 index 0000000000000..8302673b3ce7a --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/source_s3/v4/source.py @@ -0,0 +1,23 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Mapping + +from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource +from source_s3.source import SourceS3Spec +from source_s3.v4.legacy_config_transformer import LegacyConfigTransformer + + +class SourceS3(FileBasedSource): + def read_config(self, config_path: str) -> Mapping[str, Any]: + """ + Used to override the default read_config so that when the new file-based S3 connector processes a config + in the legacy format, it can be transformed into the new config. This happens in entrypoint before we + validate the config against the new spec. + """ + config = super().read_config(config_path) + if not config.get("streams"): + parsed_legacy_config = SourceS3Spec(**config) + return LegacyConfigTransformer.convert(parsed_legacy_config) + return config diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_legacy_config_transformer.py b/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_legacy_config_transformer.py new file mode 100644 index 0000000000000..e0b5dd777be10 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_legacy_config_transformer.py @@ -0,0 +1,86 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import pytest +from source_s3.source import SourceS3Spec +from source_s3.v4.legacy_config_transformer import LegacyConfigTransformer + + +@pytest.mark.parametrize( + "legacy_config, expected_config", + [ + pytest.param( + { + "dataset": "test_data", + "provider": { + "storage": "S3", + "bucket": "test_bucket", + "aws_access_key_id": "some_access_key", + "aws_secret_access_key": "some_secret", + "endpoint": "https://external-s3.com", + "path_prefix": "a_folder/", + "start_date": "2022-01-01T01:02:03Z" + + }, + "format": { + "filetype": "csv", + "delimiter": "^", + "quote_char": "|", + "escape_char": "!", + "double_quote": True, + "quoting_behavior": "Quote All" + }, + "path_pattern": "**/*.csv", + "schema": '{"col1": "string", "col2": "integer"}' + }, + { + "bucket": "test_bucket", + "aws_access_key_id": "some_access_key", + "aws_secret_access_key": "some_secret", + "endpoint": "https://external-s3.com", + "start_date": "2022-01-01T01:02:03.000000Z", + "streams": [ + { + "name": "test_data", + "file_type": "csv", + "globs": ["a_folder/**/*.csv"], + "validation_policy": "Emit Record", + "input_schema": '{"col1": "string", "col2": "integer"}', + } + ] + } + , id="test_convert_legacy_config" + ), + pytest.param( + { + "dataset": "test_data", + "provider": { + "storage": "S3", + "bucket": "test_bucket", + }, + "format": { + "filetype": "avro", + }, + "path_pattern": "**/*.csv", + }, + { + "bucket": "test_bucket", + "streams": [ + { + "name": "test_data", + "file_type": "avro", + "globs": ["**/*.csv"], + "validation_policy": "Emit Record", + } + ] + } + , id="test_convert_no_optional_fields" + ), + ] +) +def test_convert_legacy_config(legacy_config, expected_config): + parsed_legacy_config = SourceS3Spec(**legacy_config) + actual_config = LegacyConfigTransformer.convert(parsed_legacy_config) + + assert actual_config == expected_config diff --git a/airbyte-integrations/connectors/source-s3/v4_main.py b/airbyte-integrations/connectors/source-s3/v4_main.py index 368be291c6424..8344046c7119b 100644 --- a/airbyte-integrations/connectors/source-s3/v4_main.py +++ b/airbyte-integrations/connectors/source-s3/v4_main.py @@ -6,11 +6,10 @@ import sys from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch -from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource -from source_s3.v4 import Config, SourceS3StreamReader +from source_s3.v4 import Config, SourceS3, SourceS3StreamReader if __name__ == "__main__": args = sys.argv[1:] catalog_path = AirbyteEntrypoint.extract_catalog(args) - source = FileBasedSource(SourceS3StreamReader(), Config, catalog_path) + source = SourceS3(SourceS3StreamReader(), Config, catalog_path) launch(source, args)