Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish PR 9029: clickhouse normalization #9072

Merged
merged 11 commits into from
Jan 5, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,3 @@ models:
dispatch:
- macro_namespace: dbt_utils
search_order: ['airbyte_utils', 'dbt_utils']

vars:
destination: "clickhouse"
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,3 @@ models:

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
destination: "mssql"
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,3 @@ models:

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
destination: "myssql"
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,3 @@ models:

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
destination: "oracle"
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,3 @@ models:
dispatch:
- macro_namespace: dbt_utils
search_order: ['airbyte_utils', 'dbt_utils']

vars:
destination: "default"
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,3 @@ models:
dispatch:
- macro_namespace: dbt_utils
search_order: ['airbyte_utils', 'dbt_utils']

vars:
destination: "clickhouse"
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

as (

-- depends_on: ref('dedup_cdc_excluded_stg')
with

input_data as (
select *
from _airbyte_test_normalization.dedup_cdc_excluded_ab3
from _airbyte_test_normalization.dedup_cdc_excluded_stg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my education: what do ab3 and stg mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you use incremental + dedup sync mode dbt will create the stg tables for you, but other sync methods the extraction of data, data type conversion and hashing are made using sub-queries/tables with suffix ab1, ab2 and ab3.

-- dedup_cdc_excluded from test_normalization._airbyte_raw_dedup_cdc_excluded
),

Expand Down Expand Up @@ -45,15 +46,15 @@ scd_data as (
_ab_cdc_updated_at,
_ab_cdc_deleted_at,
_airbyte_emitted_at as _airbyte_start_at,
case when _airbyte_active_row_num = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row,
anyOrNull(_airbyte_emitted_at) over (
partition by id
order by
_airbyte_emitted_at is null asc,
_airbyte_emitted_at desc,
_airbyte_emitted_at desc, _ab_cdc_updated_at desc
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
) as _airbyte_end_at,
case when _airbyte_active_row_num = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_dedup_cdc_excluded_hashid
Expand All @@ -65,7 +66,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at, accurateCastOrNull(_ab_cdc_deleted_at, 'String'), accurateCastOrNull(_ab_cdc_updated_at, 'String')
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it expected that we're now sorting on active_row in additional to ab_id?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this prevents cases having multiple _airbyte_ab_id where there is some rows with 0 (not active) and making sure to select the active row.

) as _airbyte_row_num,
assumeNotNull(hex(MD5(

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

as (

-- depends_on: ref('dedup_exchange_rate_stg')
with

input_data as (
select *
from _airbyte_test_normalization.dedup_exchange_rate_ab3
from _airbyte_test_normalization.dedup_exchange_rate_stg
-- dedup_exchange_rate from test_normalization._airbyte_raw_dedup_exchange_rate
),

Expand Down Expand Up @@ -54,15 +55,15 @@ scd_data as (
NZD,
USD,
date as _airbyte_start_at,
case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row,
anyOrNull(date) over (
partition by id, currency, cast(NZD as String)
order by
date is null asc,
date desc,
_airbyte_emitted_at desc
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
) as _airbyte_end_at,
case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_dedup_exchange_rate_hashid
Expand All @@ -74,7 +75,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
assumeNotNull(hex(MD5(

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

as (

-- depends_on: ref('renamed_dedup_cdc_excluded_stg')
with

input_data as (
select *
from _airbyte_test_normalization.renamed_dedup_cdc_excluded_ab3
from _airbyte_test_normalization.renamed_dedup_cdc_excluded_stg
-- renamed_dedup_cdc_excluded from test_normalization._airbyte_raw_renamed_dedup_cdc_excluded
),

Expand All @@ -41,15 +42,15 @@ scd_data as (
))) as _airbyte_unique_key,
id,
_airbyte_emitted_at as _airbyte_start_at,
case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row,
anyOrNull(_airbyte_emitted_at) over (
partition by id
order by
_airbyte_emitted_at is null asc,
_airbyte_emitted_at desc,
_airbyte_emitted_at desc
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
) as _airbyte_end_at,
case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_renamed_dedup_cdc_excluded_hashid
Expand All @@ -61,7 +62,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
assumeNotNull(hex(MD5(

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
as (

-- Final base SQL model
-- depends_on: test_normalization.dedup_cdc_excluded_scd
select
_airbyte_unique_key,
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
as (

-- Final base SQL model
-- depends_on: test_normalization.dedup_exchange_rate_scd
select
_airbyte_unique_key,
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
as (

-- Final base SQL model
-- depends_on: test_normalization.renamed_dedup_cdc_excluded_scd
select
_airbyte_unique_key,
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
with __dbt__cte__exchange_rate_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization._airbyte_raw_exchange_rate
select
JSONExtractRaw(_airbyte_data, 'id') as id,
JSONExtractRaw(_airbyte_data, 'currency') as currency,
Expand All @@ -33,6 +34,7 @@ where 1 = 1
), __dbt__cte__exchange_rate_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__exchange_rate_ab1
select
accurateCastOrNull(id, '
BIGINT
Expand Down Expand Up @@ -60,6 +62,7 @@ where 1 = 1
), __dbt__cte__exchange_rate_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__cte__exchange_rate_ab2
select
assumeNotNull(hex(MD5(

Expand Down Expand Up @@ -95,6 +98,7 @@ from __dbt__cte__exchange_rate_ab2 tmp
-- exchange_rate
where 1 = 1
)-- Final base SQL model
-- depends_on: __dbt__cte__exchange_rate_ab3
select
id,
currency,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@


create view _airbyte_test_normalization.dedup_exchange_rate_ab3__dbt_tmp
create view _airbyte_test_normalization.dedup_exchange_rate_stg__dbt_tmp

as (

with __dbt__cte__dedup_exchange_rate_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization._airbyte_raw_dedup_exchange_rate
select
JSONExtractRaw(_airbyte_data, 'id') as id,
JSONExtractRaw(_airbyte_data, 'currency') as currency,
Expand All @@ -26,6 +27,7 @@ where 1 = 1
), __dbt__cte__dedup_exchange_rate_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__dedup_exchange_rate_ab1
select
accurateCastOrNull(id, '
BIGINT
Expand All @@ -51,6 +53,7 @@ from __dbt__cte__dedup_exchange_rate_ab1
where 1 = 1

)-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__cte__dedup_exchange_rate_ab2
select
assumeNotNull(hex(MD5(

Expand Down
Loading