From f143c8f02ab130ca695d8d7ea431971f5d680a56 Mon Sep 17 00:00:00 2001 From: midavadim Date: Wed, 10 Aug 2022 21:13:51 +0300 Subject: [PATCH] :tada: Source File - add support for custom encoding (#15293) * added support for custom encoding * fixed unit test for utf16 * updated docs * bumped connector version * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-file/Dockerfile | 2 +- .../sample_files/test_utf16.csv | Bin 0 -> 132 bytes .../source-file/source_file/client.py | 70 +++++++++--------- .../source-file/source_file/source.py | 2 +- .../source-file/unit_tests/test_source.py | 35 +++++++++ docs/integrations/sources/file.md | 3 +- 8 files changed, 74 insertions(+), 42 deletions(-) create mode 100644 airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_utf16.csv create mode 100644 airbyte-integrations/connectors/source-file/unit_tests/test_source.py diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 076e639c7bcd9..444de55646cd8 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -271,7 +271,7 @@ - name: File sourceDefinitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77 dockerRepository: airbyte/source-file - dockerImageTag: 0.2.15 + dockerImageTag: 0.2.16 documentationUrl: https://docs.airbyte.io/integrations/sources/file icon: file.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index b3ecd1a2fc7f3..3ba04f127e8e4 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2255,7 +2255,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-file:0.2.15" +- dockerImage: "airbyte/source-file:0.2.16" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/file" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-file/Dockerfile b/airbyte-integrations/connectors/source-file/Dockerfile index 1a0769dd0cc6d..9cbf49648e627 100644 --- a/airbyte-integrations/connectors/source-file/Dockerfile +++ b/airbyte-integrations/connectors/source-file/Dockerfile @@ -17,5 +17,5 @@ COPY source_file ./source_file ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.15 +LABEL io.airbyte.version=0.2.16 LABEL io.airbyte.name=airbyte/source-file diff --git a/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_utf16.csv b/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_utf16.csv new file mode 100644 index 0000000000000000000000000000000000000000..e786a4a6567ae8b408752239787439dd524f7e71 GIT binary patch literal 132 zcmezWFM}bKA(0^kNER^|GUzZM^NrB>#%O#K1}=sYpw0>)sRPz+z@Wzf(p3U9trRGO UEDzERk~aX-X+S%37>dCv0Iq%(?EnA( literal 0 HcmV?d00001 diff --git a/airbyte-integrations/connectors/source-file/source_file/client.py b/airbyte-integrations/connectors/source-file/source_file/client.py index cda6b8c40db0c..02e256cb5b35d 100644 --- a/airbyte-integrations/connectors/source-file/source_file/client.py +++ b/airbyte-integrations/connectors/source-file/source_file/client.py @@ -54,10 +54,14 @@ class URLFile: ``` """ - def __init__(self, url: str, provider: dict): + def __init__(self, url: str, provider: dict, binary=None, encoding=None): self._url = url self._provider = provider self._file = None + self.args = { + "mode": "rb" if binary else "r", + "encoding": encoding, + } def __enter__(self): return self._file @@ -74,29 +78,28 @@ def close(self): self._file.close() self._file = None - def open(self, binary=False): + def open(self): self.close() try: - self._file = self._open(binary=binary) + self._file = self._open() except google.api_core.exceptions.NotFound as err: raise FileNotFoundError(self.url) from err return self - def _open(self, binary): - mode = "rb" if binary else "r" + def _open(self): storage = self.storage_scheme url = self.url if storage == "gs://": - return self._open_gcs_url(binary=binary) + return self._open_gcs_url() elif storage == "s3://": - return self._open_aws_url(binary=binary) + return self._open_aws_url() elif storage == "azure://": - return self._open_azblob_url(binary=binary) + return self._open_azblob_url() elif storage == "webhdfs://": host = self._provider["host"] port = self._provider["port"] - return smart_open.open(f"webhdfs://{host}:{port}/{url}", mode=mode) + return smart_open.open(f"webhdfs://{host}:{port}/{url}", **self.args) elif storage in ("ssh://", "scp://", "sftp://"): user = self._provider["user"] host = self._provider["host"] @@ -114,19 +117,15 @@ def _open(self, binary): uri = f"{storage}{user}:{password}@{host}:{port}/{url}" else: uri = f"{storage}{user}@{host}:{port}/{url}" - return smart_open.open(uri, transport_params=transport_params, mode=mode) + return smart_open.open(uri, transport_params=transport_params, **self.args) elif storage in ("https://", "http://"): transport_params = None if "user_agent" in self._provider and self._provider["user_agent"]: airbyte_version = environ.get("AIRBYTE_VERSION", "0.0") transport_params = {"headers": {"Accept-Encoding": "identity", "User-Agent": f"Airbyte/{airbyte_version}"}} logger.info(f"TransportParams: {transport_params}") - return smart_open.open( - self.full_url, - mode=mode, - transport_params=transport_params, - ) - return smart_open.open(self.full_url, mode=mode) + return smart_open.open(self.full_url, transport_params=transport_params, **self.args) + return smart_open.open(self.full_url, **self.args) @property def url(self) -> str: @@ -168,8 +167,7 @@ def storage_scheme(self) -> str: logger.error(f"Unknown Storage provider in: {self.full_url}") return "" - def _open_gcs_url(self, binary) -> object: - mode = "rb" if binary else "r" + def _open_gcs_url(self) -> object: service_account_json = self._provider.get("service_account_json") credentials = None if service_account_json: @@ -185,12 +183,11 @@ def _open_gcs_url(self, binary) -> object: client = GCSClient(credentials=credentials, project=credentials._project_id) else: client = GCSClient.create_anonymous_client() - file_to_close = smart_open.open(self.full_url, transport_params=dict(client=client), mode=mode) + file_to_close = smart_open.open(self.full_url, transport_params={"client": client}, **self.args) return file_to_close - def _open_aws_url(self, binary): - mode = "rb" if binary else "r" + def _open_aws_url(self): aws_access_key_id = self._provider.get("aws_access_key_id") aws_secret_access_key = self._provider.get("aws_secret_access_key") use_aws_account = aws_access_key_id and aws_secret_access_key @@ -198,15 +195,15 @@ def _open_aws_url(self, binary): if use_aws_account: aws_access_key_id = self._provider.get("aws_access_key_id", "") aws_secret_access_key = self._provider.get("aws_secret_access_key", "") - result = smart_open.open(f"{self.storage_scheme}{aws_access_key_id}:{aws_secret_access_key}@{self.url}", mode=mode) + url = f"{self.storage_scheme}{aws_access_key_id}:{aws_secret_access_key}@{self.url}" + result = smart_open.open(url, **self.args) else: config = botocore.client.Config(signature_version=botocore.UNSIGNED) params = {"client": boto3.client("s3", config=config)} - result = smart_open.open(self.full_url, transport_params=params, mode=mode) + result = smart_open.open(self.full_url, transport_params=params, **self.args) return result - def _open_azblob_url(self, binary): - mode = "rb" if binary else "r" + def _open_azblob_url(self): storage_account = self._provider.get("storage_account") storage_acc_url = f"https://{storage_account}.blob.core.windows.net" sas_token = self._provider.get("sas_token", None) @@ -220,14 +217,15 @@ def _open_azblob_url(self, binary): # assuming anonymous public read access given no credential client = BlobServiceClient(account_url=storage_acc_url) - result = smart_open.open(f"{self.storage_scheme}{self.url}", transport_params=dict(client=client), mode=mode) - return result + url = f"{self.storage_scheme}{self.url}" + return smart_open.open(url, transport_params=dict(client=client), **self.args) class Client: """Class that manages reading and parsing data from streams""" reader_class = URLFile + binary_formats = {"excel", "feather", "parquet", "orc", "pickle"} def __init__(self, dataset_name: str, url: str, provider: dict, format: str = None, reader_options: str = None): self._dataset_name = dataset_name @@ -243,6 +241,9 @@ def __init__(self, dataset_name: str, url: str, provider: dict, format: str = No logger.error(error_msg) raise ConfigurationError(error_msg) from err + self.binary_source = self._reader_format in self.binary_formats + self.encoding = self._reader_options.get("encoding") + @property def stream_name(self) -> str: if self._dataset_name: @@ -336,17 +337,12 @@ def dtype_to_json_type(dtype) -> str: @property def reader(self) -> reader_class: - return self.reader_class(url=self._url, provider=self._provider) - - @property - def binary_source(self): - binary_formats = {"excel", "feather", "parquet", "orc", "pickle"} - return self._reader_format in binary_formats + return self.reader_class(url=self._url, provider=self._provider, binary=self.binary_source, encoding=self.encoding) def read(self, fields: Iterable = None) -> Iterable[dict]: """Read data from the stream""" - with self.reader.open(binary=self.binary_source) as fp: - if self._reader_format == "json" or self._reader_format == "jsonl": + with self.reader.open() as fp: + if self._reader_format in ["json", "jsonl"]: yield from self.load_nested_json(fp) elif self._reader_format == "yaml": fields = set(fields) if fields else None @@ -376,8 +372,8 @@ def _stream_properties(self, fp): def streams(self) -> Iterable: """Discovers available streams""" # TODO handle discovery of directories of multiple files instead - with self.reader.open(binary=self.binary_source) as fp: - if self._reader_format == "json" or self._reader_format == "jsonl": + with self.reader.open() as fp: + if self._reader_format in ["json", "jsonl"]: json_schema = self.load_nested_json_schema(fp) else: json_schema = { diff --git a/airbyte-integrations/connectors/source-file/source_file/source.py b/airbyte-integrations/connectors/source-file/source_file/source.py index bac45796dfc04..ba8ae07334c82 100644 --- a/airbyte-integrations/connectors/source-file/source_file/source.py +++ b/airbyte-integrations/connectors/source-file/source_file/source.py @@ -83,7 +83,7 @@ def check(self, logger, config: Mapping) -> AirbyteConnectionStatus: client = self._get_client(config) logger.info(f"Checking access to {client.reader.full_url}...") try: - with client.reader.open(binary=client.binary_source): + with client.reader.open(): return AirbyteConnectionStatus(status=Status.SUCCEEDED) except Exception as err: reason = f"Failed to load {client.reader.full_url}: {repr(err)}\n{traceback.format_exc()}" diff --git a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py new file mode 100644 index 0000000000000..8f6ed5277c620 --- /dev/null +++ b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py @@ -0,0 +1,35 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import logging +from pathlib import Path + +from source_file.source import SourceFile + +HERE = Path(__file__).parent.absolute() + + +def test_csv_with_utf16_encoding(): + + config_local_csv_utf16 = { + "dataset_name": "AAA", + "format": "csv", + "reader_options": '{"encoding":"utf_16"}', + "url": f"{HERE}/../integration_tests/sample_files/test_utf16.csv", + "provider": {"storage": "local"}, + } + expected_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "header1": {"type": ["string", "null"]}, + "header2": {"type": ["number", "null"]}, + "header3": {"type": ["number", "null"]}, + "header4": {"type": ["boolean", "null"]}, + }, + "type": "object", + } + + catalog = SourceFile().discover(logger=logging.getLogger("airbyte"), config=config_local_csv_utf16) + stream = next(iter(catalog.streams)) + assert stream.json_schema == expected_schema diff --git a/docs/integrations/sources/file.md b/docs/integrations/sources/file.md index 34511122f6300..77f7ef18ea9e9 100644 --- a/docs/integrations/sources/file.md +++ b/docs/integrations/sources/file.md @@ -126,7 +126,8 @@ In order to read large files from a remote location, this connector uses the [sm ## Changelog | Version | Date | Pull Request | Subject | -|---------|------------|----------------------------------------------------------| ------------------------------------------------- | +|---------|------------|----------------------------------------------------------|---------------------------------------------------| +| 0.2.16 | 2022-08-10 | [15293](https://github.com/airbytehq/airbyte/pull/15293) | added support for encoding reader option | | 0.2.15 | 2022-08-05 | [15269](https://github.com/airbytehq/airbyte/pull/15269) | Bump `smart-open` version to 6.0.0 | | 0.2.12 | 2022-07-12 | [14535](https://github.com/airbytehq/airbyte/pull/14535) | Fix invalid schema generation for JSON files | | 0.2.11 | 2022-07-12 | [9974](https://github.com/airbytehq/airbyte/pull/14588) | Add support to YAML format |