Skip to content

Commit

Permalink
Publish PR 9029: clickhouse normalization (#9072)
Browse files Browse the repository at this point in the history
* add normalization-clickhouse docker build step

* bump normalization version

* small changes gradle

* fix settings gradle

* fix eof file

* correct clickhouse normalization

* Refactor jinja template for scd (#9278)

* merge chris code and regenerate sql files

Co-authored-by: James Zhao <james.zhao@sinoreps.com>
Co-authored-by: Edward Gao <edward.gao@airbyte.io>
Co-authored-by: Christophe Duong <christophe.duong@gmail.com>
  • Loading branch information
4 people authored Jan 5, 2022
1 parent bd71999 commit de56d47
Show file tree
Hide file tree
Showing 46 changed files with 254 additions and 601 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
!dbt-project-template-mssql
!dbt-project-template-mysql
!dbt-project-template-oracle
!dbt-project-template-clickhouse
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.61
LABEL io.airbyte.version=0.1.62
LABEL io.airbyte.name=airbyte/normalization
6 changes: 6 additions & 0 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,15 @@ task airbyteDockerOracle(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('oracle')
dependsOn assemble
}
task airbyteDockerClickhouse(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('clickhouse')
dependsOn assemble
}

airbyteDocker.dependsOn(airbyteDockerMSSql)
airbyteDocker.dependsOn(airbyteDockerMySql)
airbyteDocker.dependsOn(airbyteDockerOracle)
airbyteDocker.dependsOn(airbyteDockerClickhouse)

task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs) {
module = "pytest"
Expand All @@ -86,6 +91,7 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs
dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-oracle:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-mssql:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-clickhouse:airbyteDocker'
}

integrationTest.dependsOn("customIntegrationTestPython")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ services:
image: airbyte/normalization-mysql:${VERSION}
normalization-oracle:
image: airbyte/normalization-oracle:${VERSION}
normalization-clickhouse:
image: airbyte/normalization-clickhouse:${VERSION}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

as (

-- depends_on: ref('dedup_cdc_excluded_stg')
with

input_data as (
select *
from _airbyte_test_normalization.dedup_cdc_excluded_ab3
from _airbyte_test_normalization.dedup_cdc_excluded_stg
-- dedup_cdc_excluded from test_normalization._airbyte_raw_dedup_cdc_excluded
),

Expand Down Expand Up @@ -45,15 +46,15 @@ scd_data as (
_ab_cdc_updated_at,
_ab_cdc_deleted_at,
_airbyte_emitted_at as _airbyte_start_at,
case when _airbyte_active_row_num = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row,
anyOrNull(_airbyte_emitted_at) over (
partition by id
order by
_airbyte_emitted_at is null asc,
_airbyte_emitted_at desc,
_airbyte_emitted_at desc, _ab_cdc_updated_at desc
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
) as _airbyte_end_at,
case when _airbyte_active_row_num = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_dedup_cdc_excluded_hashid
Expand All @@ -65,7 +66,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at, accurateCastOrNull(_ab_cdc_deleted_at, 'String'), accurateCastOrNull(_ab_cdc_updated_at, 'String')
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
assumeNotNull(hex(MD5(

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

as (

-- depends_on: ref('dedup_exchange_rate_stg')
with

input_data as (
select *
from _airbyte_test_normalization.dedup_exchange_rate_ab3
from _airbyte_test_normalization.dedup_exchange_rate_stg
-- dedup_exchange_rate from test_normalization._airbyte_raw_dedup_exchange_rate
),

Expand Down Expand Up @@ -54,15 +55,15 @@ scd_data as (
NZD,
USD,
date as _airbyte_start_at,
case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row,
anyOrNull(date) over (
partition by id, currency, cast(NZD as String)
order by
date is null asc,
date desc,
_airbyte_emitted_at desc
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
) as _airbyte_end_at,
case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_dedup_exchange_rate_hashid
Expand All @@ -74,7 +75,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
assumeNotNull(hex(MD5(

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

as (

-- depends_on: ref('renamed_dedup_cdc_excluded_stg')
with

input_data as (
select *
from _airbyte_test_normalization.renamed_dedup_cdc_excluded_ab3
from _airbyte_test_normalization.renamed_dedup_cdc_excluded_stg
-- renamed_dedup_cdc_excluded from test_normalization._airbyte_raw_renamed_dedup_cdc_excluded
),

Expand All @@ -41,15 +42,15 @@ scd_data as (
))) as _airbyte_unique_key,
id,
_airbyte_emitted_at as _airbyte_start_at,
case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row,
anyOrNull(_airbyte_emitted_at) over (
partition by id
order by
_airbyte_emitted_at is null asc,
_airbyte_emitted_at desc,
_airbyte_emitted_at desc
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
) as _airbyte_end_at,
case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_renamed_dedup_cdc_excluded_hashid
Expand All @@ -61,7 +62,7 @@ dedup_data as (
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
order by _airbyte_ab_id
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
assumeNotNull(hex(MD5(

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
as (

-- Final base SQL model
-- depends_on: test_normalization.dedup_cdc_excluded_scd
select
_airbyte_unique_key,
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
as (

-- Final base SQL model
-- depends_on: test_normalization.dedup_exchange_rate_scd
select
_airbyte_unique_key,
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
as (

-- Final base SQL model
-- depends_on: test_normalization.renamed_dedup_cdc_excluded_scd
select
_airbyte_unique_key,
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
with __dbt__cte__exchange_rate_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization._airbyte_raw_exchange_rate
select
JSONExtractRaw(_airbyte_data, 'id') as id,
JSONExtractRaw(_airbyte_data, 'currency') as currency,
Expand All @@ -33,6 +34,7 @@ where 1 = 1
), __dbt__cte__exchange_rate_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__exchange_rate_ab1
select
accurateCastOrNull(id, '
BIGINT
Expand Down Expand Up @@ -60,6 +62,7 @@ where 1 = 1
), __dbt__cte__exchange_rate_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__cte__exchange_rate_ab2
select
assumeNotNull(hex(MD5(

Expand Down Expand Up @@ -95,6 +98,7 @@ from __dbt__cte__exchange_rate_ab2 tmp
-- exchange_rate
where 1 = 1
)-- Final base SQL model
-- depends_on: __dbt__cte__exchange_rate_ab3
select
id,
currency,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@


create view _airbyte_test_normalization.dedup_exchange_rate_ab3__dbt_tmp
create view _airbyte_test_normalization.dedup_exchange_rate_stg__dbt_tmp

as (

with __dbt__cte__dedup_exchange_rate_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization._airbyte_raw_dedup_exchange_rate
select
JSONExtractRaw(_airbyte_data, 'id') as id,
JSONExtractRaw(_airbyte_data, 'currency') as currency,
Expand All @@ -26,6 +27,7 @@ where 1 = 1
), __dbt__cte__dedup_exchange_rate_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__dedup_exchange_rate_ab1
select
accurateCastOrNull(id, '
BIGINT
Expand All @@ -51,6 +53,7 @@ from __dbt__cte__dedup_exchange_rate_ab1
where 1 = 1

)-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__cte__dedup_exchange_rate_ab2
select
assumeNotNull(hex(MD5(

Expand Down
Loading

0 comments on commit de56d47

Please sign in to comment.