Skip to content

Commit

Permalink
🎉 Source File - add support for custom encoding (#15293)
Browse files Browse the repository at this point in the history
* added support for custom encoding

* fixed unit test for utf16

* updated docs

* bumped connector version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
2 people authored and girarda committed Aug 11, 2022
1 parent f049223 commit 28d0c7a
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@
- name: File
sourceDefinitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerRepository: airbyte/source-file
dockerImageTag: 0.2.15
dockerImageTag: 0.2.16
documentationUrl: https://docs.airbyte.io/integrations/sources/file
icon: file.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2255,7 +2255,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-file:0.2.15"
- dockerImage: "airbyte/source-file:0.2.16"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/file"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_file ./source_file
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.15
LABEL io.airbyte.version=0.2.16
LABEL io.airbyte.name=airbyte/source-file
Binary file not shown.
70 changes: 33 additions & 37 deletions airbyte-integrations/connectors/source-file/source_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,14 @@ class URLFile:
```
"""

def __init__(self, url: str, provider: dict):
def __init__(self, url: str, provider: dict, binary=None, encoding=None):
self._url = url
self._provider = provider
self._file = None
self.args = {
"mode": "rb" if binary else "r",
"encoding": encoding,
}

def __enter__(self):
return self._file
Expand All @@ -74,29 +78,28 @@ def close(self):
self._file.close()
self._file = None

def open(self, binary=False):
def open(self):
self.close()
try:
self._file = self._open(binary=binary)
self._file = self._open()
except google.api_core.exceptions.NotFound as err:
raise FileNotFoundError(self.url) from err
return self

def _open(self, binary):
mode = "rb" if binary else "r"
def _open(self):
storage = self.storage_scheme
url = self.url

if storage == "gs://":
return self._open_gcs_url(binary=binary)
return self._open_gcs_url()
elif storage == "s3://":
return self._open_aws_url(binary=binary)
return self._open_aws_url()
elif storage == "azure://":
return self._open_azblob_url(binary=binary)
return self._open_azblob_url()
elif storage == "webhdfs://":
host = self._provider["host"]
port = self._provider["port"]
return smart_open.open(f"webhdfs://{host}:{port}/{url}", mode=mode)
return smart_open.open(f"webhdfs://{host}:{port}/{url}", **self.args)
elif storage in ("ssh://", "scp://", "sftp://"):
user = self._provider["user"]
host = self._provider["host"]
Expand All @@ -114,19 +117,15 @@ def _open(self, binary):
uri = f"{storage}{user}:{password}@{host}:{port}/{url}"
else:
uri = f"{storage}{user}@{host}:{port}/{url}"
return smart_open.open(uri, transport_params=transport_params, mode=mode)
return smart_open.open(uri, transport_params=transport_params, **self.args)
elif storage in ("https://", "http://"):
transport_params = None
if "user_agent" in self._provider and self._provider["user_agent"]:
airbyte_version = environ.get("AIRBYTE_VERSION", "0.0")
transport_params = {"headers": {"Accept-Encoding": "identity", "User-Agent": f"Airbyte/{airbyte_version}"}}
logger.info(f"TransportParams: {transport_params}")
return smart_open.open(
self.full_url,
mode=mode,
transport_params=transport_params,
)
return smart_open.open(self.full_url, mode=mode)
return smart_open.open(self.full_url, transport_params=transport_params, **self.args)
return smart_open.open(self.full_url, **self.args)

@property
def url(self) -> str:
Expand Down Expand Up @@ -168,8 +167,7 @@ def storage_scheme(self) -> str:
logger.error(f"Unknown Storage provider in: {self.full_url}")
return ""

def _open_gcs_url(self, binary) -> object:
mode = "rb" if binary else "r"
def _open_gcs_url(self) -> object:
service_account_json = self._provider.get("service_account_json")
credentials = None
if service_account_json:
Expand All @@ -185,28 +183,27 @@ def _open_gcs_url(self, binary) -> object:
client = GCSClient(credentials=credentials, project=credentials._project_id)
else:
client = GCSClient.create_anonymous_client()
file_to_close = smart_open.open(self.full_url, transport_params=dict(client=client), mode=mode)
file_to_close = smart_open.open(self.full_url, transport_params={"client": client}, **self.args)

return file_to_close

def _open_aws_url(self, binary):
mode = "rb" if binary else "r"
def _open_aws_url(self):
aws_access_key_id = self._provider.get("aws_access_key_id")
aws_secret_access_key = self._provider.get("aws_secret_access_key")
use_aws_account = aws_access_key_id and aws_secret_access_key

if use_aws_account:
aws_access_key_id = self._provider.get("aws_access_key_id", "")
aws_secret_access_key = self._provider.get("aws_secret_access_key", "")
result = smart_open.open(f"{self.storage_scheme}{aws_access_key_id}:{aws_secret_access_key}@{self.url}", mode=mode)
url = f"{self.storage_scheme}{aws_access_key_id}:{aws_secret_access_key}@{self.url}"
result = smart_open.open(url, **self.args)
else:
config = botocore.client.Config(signature_version=botocore.UNSIGNED)
params = {"client": boto3.client("s3", config=config)}
result = smart_open.open(self.full_url, transport_params=params, mode=mode)
result = smart_open.open(self.full_url, transport_params=params, **self.args)
return result

def _open_azblob_url(self, binary):
mode = "rb" if binary else "r"
def _open_azblob_url(self):
storage_account = self._provider.get("storage_account")
storage_acc_url = f"https://{storage_account}.blob.core.windows.net"
sas_token = self._provider.get("sas_token", None)
Expand All @@ -220,14 +217,15 @@ def _open_azblob_url(self, binary):
# assuming anonymous public read access given no credential
client = BlobServiceClient(account_url=storage_acc_url)

result = smart_open.open(f"{self.storage_scheme}{self.url}", transport_params=dict(client=client), mode=mode)
return result
url = f"{self.storage_scheme}{self.url}"
return smart_open.open(url, transport_params=dict(client=client), **self.args)


class Client:
"""Class that manages reading and parsing data from streams"""

reader_class = URLFile
binary_formats = {"excel", "feather", "parquet", "orc", "pickle"}

def __init__(self, dataset_name: str, url: str, provider: dict, format: str = None, reader_options: str = None):
self._dataset_name = dataset_name
Expand All @@ -243,6 +241,9 @@ def __init__(self, dataset_name: str, url: str, provider: dict, format: str = No
logger.error(error_msg)
raise ConfigurationError(error_msg) from err

self.binary_source = self._reader_format in self.binary_formats
self.encoding = self._reader_options.get("encoding")

@property
def stream_name(self) -> str:
if self._dataset_name:
Expand Down Expand Up @@ -336,17 +337,12 @@ def dtype_to_json_type(dtype) -> str:

@property
def reader(self) -> reader_class:
return self.reader_class(url=self._url, provider=self._provider)

@property
def binary_source(self):
binary_formats = {"excel", "feather", "parquet", "orc", "pickle"}
return self._reader_format in binary_formats
return self.reader_class(url=self._url, provider=self._provider, binary=self.binary_source, encoding=self.encoding)

def read(self, fields: Iterable = None) -> Iterable[dict]:
"""Read data from the stream"""
with self.reader.open(binary=self.binary_source) as fp:
if self._reader_format == "json" or self._reader_format == "jsonl":
with self.reader.open() as fp:
if self._reader_format in ["json", "jsonl"]:
yield from self.load_nested_json(fp)
elif self._reader_format == "yaml":
fields = set(fields) if fields else None
Expand Down Expand Up @@ -376,8 +372,8 @@ def _stream_properties(self, fp):
def streams(self) -> Iterable:
"""Discovers available streams"""
# TODO handle discovery of directories of multiple files instead
with self.reader.open(binary=self.binary_source) as fp:
if self._reader_format == "json" or self._reader_format == "jsonl":
with self.reader.open() as fp:
if self._reader_format in ["json", "jsonl"]:
json_schema = self.load_nested_json_schema(fp)
else:
json_schema = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def check(self, logger, config: Mapping) -> AirbyteConnectionStatus:
client = self._get_client(config)
logger.info(f"Checking access to {client.reader.full_url}...")
try:
with client.reader.open(binary=client.binary_source):
with client.reader.open():
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as err:
reason = f"Failed to load {client.reader.full_url}: {repr(err)}\n{traceback.format_exc()}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from pathlib import Path

from source_file.source import SourceFile

HERE = Path(__file__).parent.absolute()


def test_csv_with_utf16_encoding():

config_local_csv_utf16 = {
"dataset_name": "AAA",
"format": "csv",
"reader_options": '{"encoding":"utf_16"}',
"url": f"{HERE}/../integration_tests/sample_files/test_utf16.csv",
"provider": {"storage": "local"},
}
expected_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"header1": {"type": ["string", "null"]},
"header2": {"type": ["number", "null"]},
"header3": {"type": ["number", "null"]},
"header4": {"type": ["boolean", "null"]},
},
"type": "object",
}

catalog = SourceFile().discover(logger=logging.getLogger("airbyte"), config=config_local_csv_utf16)
stream = next(iter(catalog.streams))
assert stream.json_schema == expected_schema
3 changes: 2 additions & 1 deletion docs/integrations/sources/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ In order to read large files from a remote location, this connector uses the [sm
## Changelog

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------| ------------------------------------------------- |
|---------|------------|----------------------------------------------------------|---------------------------------------------------|
| 0.2.16 | 2022-08-10 | [15293](https://github.com/airbytehq/airbyte/pull/15293) | added support for encoding reader option |
| 0.2.15 | 2022-08-05 | [15269](https://github.com/airbytehq/airbyte/pull/15269) | Bump `smart-open` version to 6.0.0 |
| 0.2.12 | 2022-07-12 | [14535](https://github.com/airbytehq/airbyte/pull/14535) | Fix invalid schema generation for JSON files |
| 0.2.11 | 2022-07-12 | [9974](https://github.com/airbytehq/airbyte/pull/14588) | Add support to YAML format |
Expand Down

0 comments on commit 28d0c7a

Please sign in to comment.