Skip to content

Commit

Permalink
Normalization fix Prefix Tables starting with number (#9301)
Browse files Browse the repository at this point in the history
* add normalization-clickhouse docker build step

* bump normalization version

* small changes gradle

* fix settings gradle

* fix eof file

* correct clickhouse normalization

* Refactor jinja template for scd (#9278)

* merge chris code and regenerate sql files

* correct scd post-hook generation for snowflake

* fix scd table for snowflake prefix table with number

* scd fix for all destinations

* use quote

* use normalize column for post-hook

* change logic to apply quote

* add logic to handle prefix for mssql and oracle

* run tests

* correct unit test

* bump normalization version

Co-authored-by: James Zhao <james.zhao@sinoreps.com>
Co-authored-by: Edward Gao <edward.gao@airbyte.io>
Co-authored-by: Christophe Duong <christophe.duong@gmail.com>
  • Loading branch information
4 people authored Jan 7, 2022
1 parent 4190dbc commit 511819b
Show file tree
Hide file tree
Showing 69 changed files with 354 additions and 708 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.62
LABEL io.airbyte.version=0.1.63
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
partition_by = {"field": "_airbyte_active_row", "data_type": "int64", "range": {"start": 0, "end": 1, "interval": 1}},
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.nested_stream_with_complex_columns_resulting_into_long_names_stg'],
post_hook = ["drop view _airbyte_test_normalization.nested_stream_with_complex_columns_resulting_into_long_names_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('nested_stream_with_complex_columns_resulting_into_long_names_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
partition_by = {"field": "_airbyte_active_row", "data_type": "int64", "range": {"start": 0, "end": 1, "interval": 1}},
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'],
post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: _airbyte_raw_1_prefix_startwith_number
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
partition_by = {"field": "_airbyte_active_row", "data_type": "int64", "range": {"start": 0, "end": 1, "interval": 1}},
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'],
post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.dedup_cdc_excluded_stg'],
post_hook = ["drop view _airbyte_test_normalization.dedup_cdc_excluded_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_cdc_excluded_stg')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'],
post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: _airbyte_raw_1_prefix_startwith_number
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.nested_stream_with_co__lting_into_long_names_stg'],
post_hook = ["drop view _airbyte_test_normalization.nested_stream_with_co__lting_into_long_names_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('nested_stream_with_co__lting_into_long_names_stg')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'],
post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: _airbyte_raw_1_prefix_startwith_number
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.nested_stream_with_co_1g_into_long_names_stg'],
post_hook = ["drop view _airbyte_test_normalization.nested_stream_with_co_1g_into_long_names_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('nested_stream_with_co_1g_into_long_names_stg')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'],
post_hook = ["drop view _airbyte_test_normalization.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: _airbyte_raw_1_prefix_startwith_number
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{ config(
unique_key = "{{ quote('_AIRBYTE_UNIQUE_KEY_SCD') }}",
schema = "test_normalization",
post_hook = ['drop view test_normalization.dedup_exchange_rate_stg'],
post_hook = ["drop view test_normalization.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: airbyte_raw_1_prefix_startwith_number
- name: airbyte_raw_dedup_cdc_excluded
- name: airbyte_raw_dedup_exchange_rate
- name: airbyte_raw_exchange_rate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}],
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['delete from _airbyte_test_normalization.nested_stream_with_c__lting_into_long_names_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.nested_stream_with_c__lting_into_long_names_stg)'],
post_hook = ["delete from _airbyte_test_normalization.nested_stream_with_c__lting_into_long_names_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.nested_stream_with_c__lting_into_long_names_stg)"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('nested_stream_with_c__lting_into_long_names_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}],
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ['delete from _airbyte_test_normalization.some_stream_that_was_empty_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.some_stream_that_was_empty_stg)'],
post_hook = ["delete from _airbyte_test_normalization.some_stream_that_was_empty_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.some_stream_that_was_empty_stg)"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('some_stream_that_was_empty_stg')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ profile: 'normalize'
# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that source models can be found
# in the "models/" directory. You probably won't need to change these!
source-paths: ["modified_models"]
source-paths: ["models"]
docs-paths: ["docs"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@



create table "postgres".test_normalization."1_prefix_startwith_number_scd"
as (

-- depends_on: ref('1_prefix_startwith_number_stg')
with

input_data as (
select *
from "postgres"._airbyte_test_normalization."1_prefix_startwith_number_stg"
-- 1_prefix_startwith_number from "postgres".test_normalization._airbyte_raw_1_prefix_startwith_number
),

scd_data as (
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
select
md5(cast(coalesce(cast("id" as
varchar
), '') as
varchar
)) as _airbyte_unique_key,
"id",
"date",
"text",
"date" as _airbyte_start_at,
lag("date") over (
partition by "id"
order by
"date" is null asc,
"date" desc,
_airbyte_emitted_at desc
) as _airbyte_end_at,
case when row_number() over (
partition by "id"
order by
"date" is null asc,
"date" desc,
_airbyte_emitted_at desc
) = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_1_prefix_startwith_number_hashid
from input_data
),
dedup_data as (
select
-- we need to ensure de-duplicated rows for merge/update queries
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by
_airbyte_unique_key,
_airbyte_start_at,
_airbyte_emitted_at
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
md5(cast(coalesce(cast(_airbyte_unique_key as
varchar
), '') || '-' || coalesce(cast(_airbyte_start_at as
varchar
), '') || '-' || coalesce(cast(_airbyte_emitted_at as
varchar
), '') as
varchar
)) as _airbyte_unique_key_scd,
scd_data.*
from scd_data
)
select
_airbyte_unique_key,
_airbyte_unique_key_scd,
"id",
"date",
"text",
_airbyte_start_at,
_airbyte_end_at,
_airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at,
_airbyte_1_prefix_startwith_number_hashid
from dedup_data where _airbyte_row_num = 1
);

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@



create table "postgres".test_normalization."1_prefix_startwith_number"
as (

-- Final base SQL model
-- depends_on: "postgres".test_normalization."1_prefix_startwith_number_scd"
select
_airbyte_unique_key,
"id",
"date",
"text",
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at,
_airbyte_1_prefix_startwith_number_hashid
from "postgres".test_normalization."1_prefix_startwith_number_scd"
-- 1_prefix_startwith_number from "postgres".test_normalization._airbyte_raw_1_prefix_startwith_number
where 1 = 1
and _airbyte_active_row = 1

);

Loading

0 comments on commit 511819b

Please sign in to comment.