Skip to content

Commit

Permalink
Source file: fix csv schema discovery (#15870)
Browse files Browse the repository at this point in the history
* #174 source file: fix csv schema discovery

* #174 source file: upd changelog

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
davydov-d and octavia-squidington-iii authored Aug 23, 2022
1 parent 6b34451 commit 81bfb5c
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@
- name: File
sourceDefinitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerRepository: airbyte/source-file
dockerImageTag: 0.2.19
dockerImageTag: 0.2.20
documentationUrl: https://docs.airbyte.io/integrations/sources/file
icon: file.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2291,7 +2291,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-file:0.2.19"
- dockerImage: "airbyte/source-file:0.2.20"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/file"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_file ./source_file
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.19
LABEL io.airbyte.version=0.2.20
LABEL io.airbyte.name=airbyte/source-file
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


from pathlib import Path
from unittest.mock import patch

import pytest
from airbyte_cdk import AirbyteLogger
Expand Down Expand Up @@ -56,3 +57,29 @@ def run_load_nested_json_schema(config, expected_columns=10, expected_rows=42):
df = data_list[0]
assert len(df) == expected_rows # DataFrame should have 42 items
return df


# https://github.com/airbytehq/alpha-beta-issues/issues/174
# this is to ensure we make all conditions under which the bug is reproduced, i.e.
# - chunk size < file size
# - column type in the last chunk is not `string`
@patch("source_file.client.Client.CSV_CHUNK_SIZE", 1)
def test_csv_schema():
source = SourceFile()
file_path = str(SAMPLE_DIRECTORY.parent.joinpath("discover.csv"))
config = {"dataset_name": "test", "format": "csv", "url": file_path, "provider": {"storage": "local"}}
catalog = source.discover(logger=AirbyteLogger(), config=config).dict()
assert len(catalog["streams"]) == 1
schema = catalog["streams"][0]["json_schema"]
assert schema == {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"Address": {"type": ["string", "null"]},
"City": {"type": ["string", "null"]},
"First Name": {"type": ["string", "null"]},
"Last Name": {"type": ["string", "null"]},
"State": {"type": ["string", "null"]},
"zip_code": {"type": ["string", "null"]},
},
"type": "object",
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
First Name,Last Name,Address,City,State,zip_code
John,Doe,120 jefferson st.,Riverside, NJ,8075
Jack,McGinnis,220 hobo Av.,Phila, PA,9119
"John ""Da Man""",Repici,120 Jefferson St.,Riverside, NJ,8075
Stephen,Tyler,"7452 Terrace ""At the Plaza"" road",SomeTown,SD,91234
,Blankman,,SomeTown, SD,unknown
"Joan ""the bone"", Anne",Jet,"9th, at Terrace plc",Desert City,CO,3333
18 changes: 13 additions & 5 deletions airbyte-integrations/connectors/source-file/source_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def _open_azblob_url(self):
class Client:
"""Class that manages reading and parsing data from streams"""

CSV_CHUNK_SIZE = 10_000
reader_class = URLFile
binary_formats = {"excel", "feather", "parquet", "orc", "pickle"}

Expand Down Expand Up @@ -313,7 +314,7 @@ def load_dataframes(self, fp, skip_data=False) -> Iterable:

reader_options = {**self._reader_options}
if self._reader_format == "csv":
reader_options["chunksize"] = 10000
reader_options["chunksize"] = self.CSV_CHUNK_SIZE
if skip_data:
reader_options["nrows"] = 0
reader_options["index_col"] = 0
Expand All @@ -323,17 +324,22 @@ def load_dataframes(self, fp, skip_data=False) -> Iterable:
yield reader(fp, **reader_options)

@staticmethod
def dtype_to_json_type(dtype) -> str:
def dtype_to_json_type(current_type: str, dtype) -> str:
"""Convert Pandas Dataframe types to Airbyte Types.
:param current_type: str - one of the following types based on previous dataframes
:param dtype: Pandas Dataframe type
:return: Corresponding Airbyte Type
"""
number_types = ("int64", "float64")
if current_type == "string":
# previous column values was of the string type, no sense to look further
return current_type
if dtype == object:
return "string"
elif dtype in ("int64", "float64"):
if dtype in number_types and (not current_type or current_type in number_types):
return "number"
elif dtype == "bool":
if dtype == "bool" and (not current_type or current_type == "boolean"):
return "boolean"
return "string"

Expand Down Expand Up @@ -379,7 +385,9 @@ def _stream_properties(self, fp):
fields = {}
for df in df_list:
for col in df.columns:
fields[col] = self.dtype_to_json_type(df[col].dtype)
# if data type of the same column differs in dataframes, we choose the broadest one
prev_frame_column_type = fields.get(col)
fields[col] = self.dtype_to_json_type(prev_frame_column_type, df[col].dtype)
return {field: {"type": [fields[field], "null"]} for field in fields}

@property
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ In order to read large files from a remote location, this connector uses the [sm

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|---------------------------------------------------|
| 0.2.20 | 2022-08-23 | [15870](https://github.com/airbytehq/airbyte/pull/15870) | Fix CSV schema discovery |
| 0.2.19 | 2022-08-19 | [15768](https://github.com/airbytehq/airbyte/pull/15768) | Convert 'nan' to 'null' |
| 0.2.18 | 2022-08-16 | [15698](https://github.com/airbytehq/airbyte/pull/15698) | Cache binary stream to file for discover |
| 0.2.17 | 2022-08-11 | [15501](https://github.com/airbytehq/airbyte/pull/15501) | Cache binary stream to file |
Expand Down

0 comments on commit 81bfb5c

Please sign in to comment.