Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 MySQL destination: normalization #4163

Merged
merged 46 commits into from
Jul 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
55752a9
Add mysql dbt package
tuliren Jun 16, 2021
049cfa2
Add mysql normalization support in java
tuliren Jun 16, 2021
9165e80
Add mysql normalization support in python
tuliren Jun 16, 2021
94c10be
Fix unit tests
tuliren Jun 16, 2021
173ad67
Update readme
tuliren Jun 16, 2021
3d9cbf7
Setup mysql container in integration test
tuliren Jun 18, 2021
9e8e939
Add macros
tuliren Jun 18, 2021
47945bf
Depend on dbt-mysql from git repo
tuliren Jun 18, 2021
7f04076
Remove mysql limitation test
tuliren Jun 18, 2021
b9e3066
Test normalization
tuliren Jun 18, 2021
f13a95b
Revert protocol format change
tuliren Jun 18, 2021
dee3479
Fix mysel json macros
tuliren Jun 18, 2021
abe8eb7
Fix two more macros
tuliren Jun 22, 2021
a6acc91
Fix table name length
tuliren Jun 22, 2021
bbc3905
Fix array macro
tuliren Jun 23, 2021
51fcb2a
Fix equality test macro
tuliren Jun 23, 2021
5cbe550
Update replace-identifiers
tuliren Jun 23, 2021
2396590
Merge branch 'master' into liren/mysql-destination-normalization
tuliren Jun 25, 2021
b7fe9cc
Add more identifiers to replace
tuliren Jun 27, 2021
4a50b25
Fix unnest macro
tuliren Jun 27, 2021
5c6642a
Fix equality macro
tuliren Jun 28, 2021
b71bd6d
Check in mysql test output
tuliren Jun 28, 2021
5dc1cbb
Merge branch 'master' into liren/mysql-destination-normalization
tuliren Jun 28, 2021
b681fd0
Update column limit test for mysql
tuliren Jun 28, 2021
02a8670
Escape parentheses
tuliren Jun 28, 2021
ffa5789
Remove unnecessary mysql test
tuliren Jun 28, 2021
0cf0a17
Remove mysql output for easier code review
tuliren Jun 28, 2021
411425f
Remove unnecessary mysql test
tuliren Jun 28, 2021
c43d06e
Remove parentheses
tuliren Jun 28, 2021
091f137
Update dependencies
tuliren Jun 28, 2021
78e030c
Skip mysql instead of manually write out types
tuliren Jun 28, 2021
e89122d
Bump version
tuliren Jul 1, 2021
ebf5cc4
Merge branch 'master' into liren/mysql-destination-normalization
tuliren Jul 1, 2021
f94b1c8
Check in unit test for mysql name transformer
tuliren Jul 1, 2021
9310950
Fix type conversion
tuliren Jul 1, 2021
1c68996
Use json_value to extract scalar json fields
tuliren Jul 1, 2021
4e2a0dd
Move dbt-mysql to Dockerfile (#4459)
ChristopheDuong Jul 1, 2021
5c49cc3
Format code
tuliren Jul 1, 2021
46cab59
Check in mysql dbt output
tuliren Jul 1, 2021
aefc479
Remove unnecessary quote
tuliren Jul 1, 2021
84e6e1c
Update mysql equality test to match 0.19.0
tuliren Jul 1, 2021
0202369
Check in schema_test update
tuliren Jul 1, 2021
967e1d8
Update readme
tuliren Jul 4, 2021
73a1d25
Merge branch 'master' into liren/mysql-destination-normalization
tuliren Jul 4, 2021
f972e4b
Bump base normalization version
tuliren Jul 4, 2021
289ba24
Update document
tuliren Jul 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM fishtownanalytics/dbt:0.19.1
FROM fishtownanalytics/dbt:0.19.0
COPY --from=airbyte/base-airbyte-protocol-python:0.1.1 /airbyte /airbyte

WORKDIR /airbyte
Expand All @@ -14,6 +14,7 @@ RUN pip install .

WORKDIR /airbyte/normalization_code
RUN pip install .
RUN pip install git+https://github.com/dbeatty10/dbt-mysql@96655ea9f7fca7be90c9112ce8ffbb5aac1d3716#egg=dbt-mysql

WORKDIR /airbyte/normalization_code/dbt-template/
# Download external dbt dependencies
Expand All @@ -23,5 +24,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.34
LABEL io.airbyte.version=0.1.35
LABEL io.airbyte.name=airbyte/normalization
10 changes: 10 additions & 0 deletions airbyte-integrations/bases/base-normalization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ allowed characters, if quotes are needed or not, and the length limitations:
- [postgres](../../../docs/integrations/destinations/postgres.md)
- [redshift](../../../docs/integrations/destinations/redshift.md)
- [snowflake](../../../docs/integrations/destinations/snowflake.md)
- [mysql](../../../docs/integrations/destinations/mysql.md)

Rules about truncations, for example for both of these strings which are too long for the postgres 64 limit:
- `Aaaa_Bbbb_Cccc_Dddd_Eeee_Ffff_Gggg_Hhhh_Iiii`
Expand Down Expand Up @@ -216,6 +217,15 @@ A nice improvement would be to add csv/json seed files as expected output data f
The integration tests would verify that the content of such tables in the destination would match
these seed files or fail.

### Debug dbt operations with local database
This only works for testing databases launched in local containers (e.g. postgres and mysql).

- In `dbt_integration_test.py`, comment out the `tear_down_db` method so that the relevant database container is not deleted.
- Find the name of the database container in the logs (e.g. by searching `Executing`).
- Connect to the container by running `docker exec -it <container-name> bash` in the commandline.
- Connect to the database inside the container (e.g. `mysql -u root` for mysql).
- Test the generated dbt operations directly in the database.

## Standard Destination Tests

Generally, to invoke standard destination tests, you run with gradle using:
Expand Down
1 change: 1 addition & 0 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs
dependsOn ':airbyte-integrations:connectors:destination-postgres:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-redshift:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-mysql:airbyteDocker'
}

integrationTest.dependsOn("customIntegrationTestPython")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
) as _airbyte_nested_data
{%- endmacro %}

{% macro mysql__cross_join_unnest(stream_name, array_col) -%}
left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid
{%- endmacro %}

{% macro redshift__cross_join_unnest(stream_name, array_col) -%}
left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid
{%- endmacro %}
Expand All @@ -36,7 +40,7 @@
cross join table(flatten({{ array_col }})) as {{ array_col }}
{%- endmacro %}

{# unnested_column_value ------------------------------------------------- #}
{# unnested_column_value -- this macro is related to unnest_cte #}

{% macro unnested_column_value(column_col) -%}
{{ adapter.dispatch('unnested_column_value')(column_col) }}
Expand All @@ -58,6 +62,10 @@
_airbyte_nested_data
{%- endmacro %}

{% macro mysql__unnested_column_value(column_col) -%}
_airbyte_nested_data
{%- endmacro %}

{# unnest_cte ------------------------------------------------- #}

{% macro unnest_cte(table_name, stream_name, column_col) -%}
Expand Down Expand Up @@ -97,3 +105,37 @@ joined as (
where numbers.generated_number <= json_array_length({{ column_col }}, true)
)
{%- endmacro %}

{% macro mysql__unnest_cte(table_name, stream_name, column_col) -%}
{%- if not execute -%}
{{ return('') }}
{% endif %}

{%- call statement('max_json_array_length', fetch_result=True) -%}
with max_value as (
select max(json_length({{ column_col }})) as max_number_of_items
from {{ ref(table_name) }}
)
select
case when max_number_of_items is not null and max_number_of_items > 1
then max_number_of_items
else 1 end as max_number_of_items
from max_value
{%- endcall -%}

{%- set max_length = load_result('max_json_array_length') -%}
with numbers as (
{{ dbt_utils.generate_series(max_length["data"][0][0]) }}
),
joined as (
select
_airbyte_{{ stream_name }}_hashid as _airbyte_hashid,
{# -- json_extract(column_col, '$[i][0]') as _airbyte_nested_data #}
json_extract({{ column_col }}, concat("$[", numbers.generated_number - 1, "][0]")) as _airbyte_nested_data
from {{ ref(table_name) }}
cross join numbers
-- only generate the number of records in the cross join that corresponds
-- to the number of items in {{ table_name }}.{{ column_col }}
where numbers.generated_number <= json_length({{ column_col }})
)
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,44 @@
{% macro snowflake__type_json() %}
variant
{% endmacro %}

{%- macro mysql__type_json() -%}
json
{%- endmacro -%}


{# string ------------------------------------------------- #}

{%- macro mysql__type_string() -%}
char
{%- endmacro -%}


{# float ------------------------------------------------- #}
{% macro mysql__type_float() %}
float
{% endmacro %}


{# int ------------------------------------------------- #}
{% macro default__type_int() %}
signed
{% endmacro %}


{# bigint ------------------------------------------------- #}
{% macro mysql__type_bigint() %}
signed
{% endmacro %}


{# numeric ------------------------------------------------- #}
{% macro mysql__type_numeric() %}
float
{% endmacro %}


{# timestamp ------------------------------------------------- #}
{% macro mysql__type_timestamp() %}
time
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro mysql__except() %}
{% do exceptions.warn("MySQL does not support EXCEPT operator") %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
Adapter Macros for the following functions:
- Bigquery: JSON_EXTRACT(json_string_expr, json_path_format) -> https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions
- Snowflake: JSON_EXTRACT_PATH_TEXT( <column_identifier> , '<path_name>' ) -> https://docs.snowflake.com/en/sql-reference/functions/json_extract_path_text.html
- Redshift: json_extract_path_text('json_string', 'path_elem' [,'path_elem'[, ] ] [, null_if_invalid ] ) -> https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html
- Redshift: json_extract_path_text('json_string', 'path_elem' [,'path_elem'[, ...] ] [, null_if_invalid ] ) -> https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html
- Postgres: json_extract_path_text(<from_json>, 'path' [, 'path' [, ...}}) -> https://www.postgresql.org/docs/12/functions-json.html
- MySQL: JSON_EXTRACT(json_doc, 'path' [, 'path'] ...) -> https://dev.mysql.com/doc/refman/8.0/en/json-search-functions.html
#}

{# format_json_path -------------------------------------------------- #}
Expand All @@ -23,6 +24,11 @@
{{ "'" ~ json_path_list|join("','") ~ "'" }}
{%- endmacro %}

{% macro mysql__format_json_path(json_path_list) -%}
{# -- '$."x"."y"."z"' #}
{{ "'$.\"" ~ json_path_list|join(".") ~ "\"'" }}
{%- endmacro %}

{% macro redshift__format_json_path(json_path_list) -%}
{{ "'" ~ json_path_list|join("','") ~ "'" }}
{%- endmacro %}
Expand All @@ -49,6 +55,10 @@
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract(json_column, json_path_list) -%}
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract(json_column, json_path_list) -%}
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{%- endmacro %}
Expand All @@ -75,6 +85,10 @@
jsonb_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract_scalar(json_column, json_path_list) -%}
json_value({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract_scalar(json_column, json_path_list) -%}
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
{%- endmacro %}
Expand All @@ -101,6 +115,10 @@
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro mysql__json_extract_array(json_column, json_path_list) -%}
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

{% macro redshift__json_extract_array(json_column, json_path_list) -%}
json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true)
{%- endmacro %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{#
-- Adapted from https://github.com/dbt-labs/dbt-utils/blob/0-19-0-updates/macros/schema_tests/equality.sql
-- dbt-utils version: 0.6.4
-- This macro needs to be updated accordingly when dbt-utils is upgraded.
-- This is needed because MySQL does not support the EXCEPT operator!
#}

{% macro mysql__test_equality(model, compare_model, compare_columns=None) %}

{%- if not execute -%}
{{ return('') }}
{% endif %}

{%- do dbt_utils._is_relation(model, 'test_equality') -%}

{%- if not compare_columns -%}
{%- do dbt_utils._is_ephemeral(model, 'test_equality') -%}
{%- set compare_columns = adapter.get_columns_in_relation(model) | map(attribute='quoted') -%}
{%- endif -%}

{% set compare_cols_csv = compare_columns | join(', ') %}

with a as (
select * from {{ model }}
),

b as (
select * from {{ compare_model }}
),

a_minus_b as (
select {{ compare_cols_csv }} from a
where ({{ compare_cols_csv }}) not in
(select {{ compare_cols_csv }} from b)
),

b_minus_a as (
select {{ compare_cols_csv }} from b
where ({{ compare_cols_csv }}) not in
(select {{ compare_cols_csv }} from a)
),

unioned as (
select * from a_minus_b
union all
select * from b_minus_a
),

final as (
select (select count(*) from unioned) +
(select abs(
(select count(*) from a_minus_b) -
(select count(*) from b_minus_a)
))
as count
)

select count from final

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,17 @@
class DbtIntegrationTest(object):
def __init__(self):
self.target_schema = "test_normalization"
self.container_name = "test_normalization_db_" + self.random_string(3)
self.container_prefix = f"test_normalization_db_{self.random_string(3)}"
self.db_names = ["postgres", "mysql"]

@staticmethod
def random_string(length: int) -> str:
return "".join(random.choice(string.ascii_lowercase) for i in range(length))

def setup_db(self):
self.setup_postgres_db()
self.setup_mysql_db()

def setup_postgres_db(self):
print("Starting localhost postgres container for tests")
port = self.find_free_port()
Expand All @@ -64,7 +69,7 @@ def setup_postgres_db(self):
"run",
"--rm",
"--name",
f"{self.container_name}",
f"{self.container_prefix}_postgres",
"-e",
f"POSTGRES_USER={config['username']}",
"-e",
Expand All @@ -81,6 +86,42 @@ def setup_postgres_db(self):
with open("../secrets/postgres.json", "w") as fh:
fh.write(json.dumps(config))

def setup_mysql_db(self):
print("Starting localhost mysql container for tests")
port = self.find_free_port()
config = {
"type": "mysql",
"host": "localhost",
"port": port,
"database": self.target_schema,
"username": "root",
"password": "",
}
commands = [
"docker",
"run",
"--rm",
"--name",
f"{self.container_prefix}_mysql",
"-e",
"MYSQL_ALLOW_EMPTY_PASSWORD=yes",
"-e",
"MYSQL_INITDB_SKIP_TZINFO=yes",
"-e",
f"MYSQL_DATABASE={config['database']}",
"-p",
f"{config['port']}:3306",
"-d",
"mysql",
]
print("Executing: ", " ".join(commands))
subprocess.call(commands)

if not os.path.exists("../secrets"):
os.makedirs("../secrets")
with open("../secrets/mysql.json", "w") as fh:
fh.write(json.dumps(config))

@staticmethod
def find_free_port():
"""
Expand All @@ -92,12 +133,13 @@ def find_free_port():
s.close()
return addr[1]

def tear_down_postgres_db(self):
print("Stopping localhost postgres container for tests")
try:
subprocess.call(["docker", "kill", f"{self.container_name}"])
except Exception as e:
print(f"WARN: Exception while shutting down postgres db: {e}")
def tear_down_db(self):
for db_name in self.db_names:
print(f"Stopping localhost {db_name} container for tests")
try:
subprocess.call(["docker", "kill", f"{self.container_prefix}_{db_name}"])
except Exception as e:
print(f"WARN: Exception while shutting down {db_name}: {e}")

@staticmethod
def change_current_test_dir(request):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ models:
- name: exchange_rate
tests:
- dbt_utils.equality:
description: check_streams_are_equal
In this integration test, we are sending the same records to both streams
exchange_rate and dedup_exchange_rate.
The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror
the final table with append or overwrite mode from exchange_rate.
# description: check_streams_are_equal
# In this integration test, we are sending the same records to both streams
# exchange_rate and dedup_exchange_rate.
# The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror
# the final table with append or overwrite mode from exchange_rate.
compare_model: ref('dedup_exchange_rate_scd')
compare_columns:
- id
Expand Down
Loading