From de56d4713cf15af42a4cd24e5744e790d9a98dd2 Mon Sep 17 00:00:00 2001 From: Marcos Marx Date: Tue, 4 Jan 2022 23:28:14 -0300 Subject: [PATCH] Publish PR 9029: clickhouse normalization (#9072) * add normalization-clickhouse docker build step * bump normalization version * small changes gradle * fix settings gradle * fix eof file * correct clickhouse normalization * Refactor jinja template for scd (#9278) * merge chris code and regenerate sql files Co-authored-by: James Zhao Co-authored-by: Edward Gao Co-authored-by: Christophe Duong --- .../bases/base-normalization/.dockerignore | 1 + .../bases/base-normalization/Dockerfile | 2 +- .../bases/base-normalization/build.gradle | 6 + .../base-normalization/docker-compose.yaml | 2 + .../dedup_cdc_excluded_scd.sql | 9 +- .../dedup_exchange_rate_scd.sql | 9 +- .../renamed_dedup_cdc_excluded_scd.sql | 9 +- .../test_normalization/dedup_cdc_excluded.sql | 1 + .../dedup_exchange_rate.sql | 1 + .../renamed_dedup_cdc_excluded.sql | 1 + .../test_normalization/exchange_rate.sql | 4 + .../dedup_cdc_excluded_ab3.sql | 71 ----- .../dedup_exchange_rate_stg.sql} | 5 +- .../test_normalization/pos_dedup_cdcx_ab3.sql | 78 ------ .../renamed_dedup_cdc_excluded_ab3.sql | 45 ---- .../dedup_cdc_excluded_ab1.sql | 1 + .../dedup_cdc_excluded_ab2.sql | 1 + .../dedup_exchange_rate_ab1.sql | 1 + .../dedup_exchange_rate_ab2.sql | 1 + .../test_normalization/exchange_rate_ab1.sql | 1 + .../test_normalization/exchange_rate_ab2.sql | 1 + .../test_normalization/exchange_rate_ab3.sql | 1 + .../test_normalization/pos_dedup_cdcx_ab1.sql | 1 + .../test_normalization/pos_dedup_cdcx_ab2.sql | 1 + .../renamed_dedup_cdc_excluded_ab1.sql | 1 + .../renamed_dedup_cdc_excluded_ab2.sql | 1 + .../dedup_cdc_excluded_scd.sql | 26 +- .../dedup_exchange_rate_scd.sql | 26 +- .../renamed_dedup_cdc_excluded_scd.sql | 26 +- .../test_normalization/dedup_cdc_excluded.sql | 1 + .../dedup_exchange_rate.sql | 1 + .../renamed_dedup_cdc_excluded.sql | 1 + .../test_normalization/exchange_rate.sql | 1 + .../dedup_cdc_excluded_ab3.sql | 20 -- ...te_ab3.sql => dedup_exchange_rate_stg.sql} | 1 + .../test_normalization/pos_dedup_cdcx_ab3.sql | 21 -- .../renamed_dedup_cdc_excluded_ab3.sql | 16 -- .../test_normalization/exchange_rate.sql | 4 + .../dedup_cdc_excluded_ab3.sql | 71 ----- .../dedup_exchange_rate_stg.sql} | 5 +- .../test_normalization/pos_dedup_cdcx_ab3.sql | 78 ------ .../renamed_dedup_cdc_excluded_ab3.sql | 45 ---- .../transform_catalog/stream_processor.py | 253 ++++++++++-------- .../NormalizationRunnerFactory.java | 2 +- build.gradle | 1 + settings.gradle | 1 + 46 files changed, 254 insertions(+), 601 deletions(-) delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/dedup_cdc_excluded_ab3.sql rename airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/{second_output/airbyte_views/test_normalization/dedup_exchange_rate_ab3.sql => first_output/airbyte_views/test_normalization/dedup_exchange_rate_stg.sql} (91%) delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/pos_dedup_cdcx_ab3.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/renamed_dedup_cdc_excluded_ab3.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/dedup_cdc_excluded_ab3.sql rename airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/{dedup_exchange_rate_ab3.sql => dedup_exchange_rate_stg.sql} (92%) delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/pos_dedup_cdcx_ab3.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/renamed_dedup_cdc_excluded_ab3.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/dedup_cdc_excluded_ab3.sql rename airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/{first_output/airbyte_views/test_normalization/dedup_exchange_rate_ab3.sql => second_output/airbyte_views/test_normalization/dedup_exchange_rate_stg.sql} (91%) delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/pos_dedup_cdcx_ab3.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/renamed_dedup_cdc_excluded_ab3.sql diff --git a/airbyte-integrations/bases/base-normalization/.dockerignore b/airbyte-integrations/bases/base-normalization/.dockerignore index 2e284d6c45646..e6fbfb3101a91 100644 --- a/airbyte-integrations/bases/base-normalization/.dockerignore +++ b/airbyte-integrations/bases/base-normalization/.dockerignore @@ -8,3 +8,4 @@ !dbt-project-template-mssql !dbt-project-template-mysql !dbt-project-template-oracle +!dbt-project-template-clickhouse diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index a9efa7a9379f6..be915f01ab0d0 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -28,5 +28,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.1.61 +LABEL io.airbyte.version=0.1.62 LABEL io.airbyte.name=airbyte/normalization diff --git a/airbyte-integrations/bases/base-normalization/build.gradle b/airbyte-integrations/bases/base-normalization/build.gradle index ccdda2ce39dd4..eff597c11a3e1 100644 --- a/airbyte-integrations/bases/base-normalization/build.gradle +++ b/airbyte-integrations/bases/base-normalization/build.gradle @@ -69,10 +69,15 @@ task airbyteDockerOracle(type: Exec, dependsOn: checkSshScriptCopy) { configure buildAirbyteDocker('oracle') dependsOn assemble } +task airbyteDockerClickhouse(type: Exec, dependsOn: checkSshScriptCopy) { + configure buildAirbyteDocker('clickhouse') + dependsOn assemble +} airbyteDocker.dependsOn(airbyteDockerMSSql) airbyteDocker.dependsOn(airbyteDockerMySql) airbyteDocker.dependsOn(airbyteDockerOracle) +airbyteDocker.dependsOn(airbyteDockerClickhouse) task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs) { module = "pytest" @@ -86,6 +91,7 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker' dependsOn ':airbyte-integrations:connectors:destination-oracle:airbyteDocker' dependsOn ':airbyte-integrations:connectors:destination-mssql:airbyteDocker' + dependsOn ':airbyte-integrations:connectors:destination-clickhouse:airbyteDocker' } integrationTest.dependsOn("customIntegrationTestPython") diff --git a/airbyte-integrations/bases/base-normalization/docker-compose.yaml b/airbyte-integrations/bases/base-normalization/docker-compose.yaml index ced1d036d206f..8dd94275765be 100644 --- a/airbyte-integrations/bases/base-normalization/docker-compose.yaml +++ b/airbyte-integrations/bases/base-normalization/docker-compose.yaml @@ -10,3 +10,5 @@ services: image: airbyte/normalization-mysql:${VERSION} normalization-oracle: image: airbyte/normalization-oracle:${VERSION} + normalization-clickhouse: + image: airbyte/normalization-clickhouse:${VERSION} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql index 749e5b38562b4..99e574c63fda6 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql @@ -12,11 +12,12 @@ as ( +-- depends_on: ref('dedup_cdc_excluded_stg') with input_data as ( select * - from _airbyte_test_normalization.dedup_cdc_excluded_ab3 + from _airbyte_test_normalization.dedup_cdc_excluded_stg -- dedup_cdc_excluded from test_normalization._airbyte_raw_dedup_cdc_excluded ), @@ -45,15 +46,15 @@ scd_data as ( _ab_cdc_updated_at, _ab_cdc_deleted_at, _airbyte_emitted_at as _airbyte_start_at, + case when _airbyte_active_row_num = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row, anyOrNull(_airbyte_emitted_at) over ( partition by id order by _airbyte_emitted_at is null asc, _airbyte_emitted_at desc, _airbyte_emitted_at desc, _ab_cdc_updated_at desc - ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING + ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING ) as _airbyte_end_at, - case when _airbyte_active_row_num = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row, _airbyte_ab_id, _airbyte_emitted_at, _airbyte_dedup_cdc_excluded_hashid @@ -65,7 +66,7 @@ dedup_data as ( -- additionally, we generate a unique key for the scd table row_number() over ( partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at, accurateCastOrNull(_ab_cdc_deleted_at, 'String'), accurateCastOrNull(_ab_cdc_updated_at, 'String') - order by _airbyte_ab_id + order by _airbyte_active_row desc, _airbyte_ab_id ) as _airbyte_row_num, assumeNotNull(hex(MD5( diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql index 49c1843204fee..2486691308c65 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql @@ -12,11 +12,12 @@ as ( +-- depends_on: ref('dedup_exchange_rate_stg') with input_data as ( select * - from _airbyte_test_normalization.dedup_exchange_rate_ab3 + from _airbyte_test_normalization.dedup_exchange_rate_stg -- dedup_exchange_rate from test_normalization._airbyte_raw_dedup_exchange_rate ), @@ -54,15 +55,15 @@ scd_data as ( NZD, USD, date as _airbyte_start_at, + case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row, anyOrNull(date) over ( partition by id, currency, cast(NZD as String) order by date is null asc, date desc, _airbyte_emitted_at desc - ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING + ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING ) as _airbyte_end_at, - case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row, _airbyte_ab_id, _airbyte_emitted_at, _airbyte_dedup_exchange_rate_hashid @@ -74,7 +75,7 @@ dedup_data as ( -- additionally, we generate a unique key for the scd table row_number() over ( partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at - order by _airbyte_ab_id + order by _airbyte_active_row desc, _airbyte_ab_id ) as _airbyte_row_num, assumeNotNull(hex(MD5( diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql index 61f46aa4665c4..4fa7b03259e21 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql @@ -12,11 +12,12 @@ as ( +-- depends_on: ref('renamed_dedup_cdc_excluded_stg') with input_data as ( select * - from _airbyte_test_normalization.renamed_dedup_cdc_excluded_ab3 + from _airbyte_test_normalization.renamed_dedup_cdc_excluded_stg -- renamed_dedup_cdc_excluded from test_normalization._airbyte_raw_renamed_dedup_cdc_excluded ), @@ -41,15 +42,15 @@ scd_data as ( ))) as _airbyte_unique_key, id, _airbyte_emitted_at as _airbyte_start_at, + case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row, anyOrNull(_airbyte_emitted_at) over ( partition by id order by _airbyte_emitted_at is null asc, _airbyte_emitted_at desc, _airbyte_emitted_at desc - ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING + ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING ) as _airbyte_end_at, - case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row, _airbyte_ab_id, _airbyte_emitted_at, _airbyte_renamed_dedup_cdc_excluded_hashid @@ -61,7 +62,7 @@ dedup_data as ( -- additionally, we generate a unique key for the scd table row_number() over ( partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at - order by _airbyte_ab_id + order by _airbyte_active_row desc, _airbyte_ab_id ) as _airbyte_row_num, assumeNotNull(hex(MD5( diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql index 4b2055de8600d..8aea31930d35c 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql @@ -13,6 +13,7 @@ as ( -- Final base SQL model +-- depends_on: test_normalization.dedup_cdc_excluded_scd select _airbyte_unique_key, id, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/dedup_exchange_rate.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/dedup_exchange_rate.sql index d1dc1abdc7142..28204615e97cf 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/dedup_exchange_rate.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/dedup_exchange_rate.sql @@ -13,6 +13,7 @@ as ( -- Final base SQL model +-- depends_on: test_normalization.dedup_exchange_rate_scd select _airbyte_unique_key, id, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql index 5295b9baa8dae..b16b5361120f0 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql @@ -13,6 +13,7 @@ as ( -- Final base SQL model +-- depends_on: test_normalization.renamed_dedup_cdc_excluded_scd select _airbyte_unique_key, id, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_tables/test_normalization/exchange_rate.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_tables/test_normalization/exchange_rate.sql index 0eb15bc43e455..2ee3d293b8403 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_tables/test_normalization/exchange_rate.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_tables/test_normalization/exchange_rate.sql @@ -14,6 +14,7 @@ with __dbt__cte__exchange_rate_ab1 as ( -- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: test_normalization._airbyte_raw_exchange_rate select JSONExtractRaw(_airbyte_data, 'id') as id, JSONExtractRaw(_airbyte_data, 'currency') as currency, @@ -33,6 +34,7 @@ where 1 = 1 ), __dbt__cte__exchange_rate_ab2 as ( -- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__exchange_rate_ab1 select accurateCastOrNull(id, ' BIGINT @@ -60,6 +62,7 @@ where 1 = 1 ), __dbt__cte__exchange_rate_ab3 as ( -- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__exchange_rate_ab2 select assumeNotNull(hex(MD5( @@ -95,6 +98,7 @@ from __dbt__cte__exchange_rate_ab2 tmp -- exchange_rate where 1 = 1 )-- Final base SQL model +-- depends_on: __dbt__cte__exchange_rate_ab3 select id, currency, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/dedup_cdc_excluded_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/dedup_cdc_excluded_ab3.sql deleted file mode 100644 index fe2bf632dbf20..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/dedup_cdc_excluded_ab3.sql +++ /dev/null @@ -1,71 +0,0 @@ - - - create view _airbyte_test_normalization.dedup_cdc_excluded_ab3__dbt_tmp - - as ( - -with __dbt__cte__dedup_cdc_excluded_ab1 as ( - --- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema -select - JSONExtractRaw(_airbyte_data, 'id') as id, - JSONExtractRaw(_airbyte_data, 'name') as name, - JSONExtractRaw(_airbyte_data, '_ab_cdc_lsn') as _ab_cdc_lsn, - JSONExtractRaw(_airbyte_data, '_ab_cdc_updated_at') as _ab_cdc_updated_at, - JSONExtractRaw(_airbyte_data, '_ab_cdc_deleted_at') as _ab_cdc_deleted_at, - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from test_normalization._airbyte_raw_dedup_cdc_excluded as table_alias --- dedup_cdc_excluded -where 1 = 1 - -), __dbt__cte__dedup_cdc_excluded_ab2 as ( - --- SQL model to cast each column to its adequate SQL type converted from the JSON schema type -select - accurateCastOrNull(id, ' - BIGINT -') as id, - nullif(accurateCastOrNull(trim(BOTH '"' from name), 'String'), 'null') as name, - accurateCastOrNull(_ab_cdc_lsn, ' - Float64 -') as _ab_cdc_lsn, - accurateCastOrNull(_ab_cdc_updated_at, ' - Float64 -') as _ab_cdc_updated_at, - accurateCastOrNull(_ab_cdc_deleted_at, ' - Float64 -') as _ab_cdc_deleted_at, - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from __dbt__cte__dedup_cdc_excluded_ab1 --- dedup_cdc_excluded -where 1 = 1 - -)-- SQL model to build a hash column based on the values of this record -select - assumeNotNull(hex(MD5( - - toString(id) || '~' || - - - toString(name) || '~' || - - - toString(_ab_cdc_lsn) || '~' || - - - toString(_ab_cdc_updated_at) || '~' || - - - toString(_ab_cdc_deleted_at) - - ))) as _airbyte_dedup_cdc_excluded_hashid, - tmp.* -from __dbt__cte__dedup_cdc_excluded_ab2 tmp --- dedup_cdc_excluded -where 1 = 1 - - ) \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/dedup_exchange_rate_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/dedup_exchange_rate_stg.sql similarity index 91% rename from airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/dedup_exchange_rate_ab3.sql rename to airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/dedup_exchange_rate_stg.sql index 28abd1a79a7f2..799af4ec78aba 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/dedup_exchange_rate_ab3.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/dedup_exchange_rate_stg.sql @@ -1,12 +1,13 @@ - create view _airbyte_test_normalization.dedup_exchange_rate_ab3__dbt_tmp + create view _airbyte_test_normalization.dedup_exchange_rate_stg__dbt_tmp as ( with __dbt__cte__dedup_exchange_rate_ab1 as ( -- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: test_normalization._airbyte_raw_dedup_exchange_rate select JSONExtractRaw(_airbyte_data, 'id') as id, JSONExtractRaw(_airbyte_data, 'currency') as currency, @@ -26,6 +27,7 @@ where 1 = 1 ), __dbt__cte__dedup_exchange_rate_ab2 as ( -- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__dedup_exchange_rate_ab1 select accurateCastOrNull(id, ' BIGINT @@ -51,6 +53,7 @@ from __dbt__cte__dedup_exchange_rate_ab1 where 1 = 1 )-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__dedup_exchange_rate_ab2 select assumeNotNull(hex(MD5( diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/pos_dedup_cdcx_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/pos_dedup_cdcx_ab3.sql deleted file mode 100644 index 9f515f09a4a44..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/pos_dedup_cdcx_ab3.sql +++ /dev/null @@ -1,78 +0,0 @@ - - - create view _airbyte_test_normalization.pos_dedup_cdcx_ab3__dbt_tmp - - as ( - -with __dbt__cte__pos_dedup_cdcx_ab1 as ( - --- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema -select - JSONExtractRaw(_airbyte_data, 'id') as id, - JSONExtractRaw(_airbyte_data, 'name') as name, - JSONExtractRaw(_airbyte_data, '_ab_cdc_lsn') as _ab_cdc_lsn, - JSONExtractRaw(_airbyte_data, '_ab_cdc_updated_at') as _ab_cdc_updated_at, - JSONExtractRaw(_airbyte_data, '_ab_cdc_deleted_at') as _ab_cdc_deleted_at, - JSONExtractRaw(_airbyte_data, '_ab_cdc_log_pos') as _ab_cdc_log_pos, - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from test_normalization._airbyte_raw_pos_dedup_cdcx as table_alias --- pos_dedup_cdcx -where 1 = 1 - -), __dbt__cte__pos_dedup_cdcx_ab2 as ( - --- SQL model to cast each column to its adequate SQL type converted from the JSON schema type -select - accurateCastOrNull(id, ' - BIGINT -') as id, - nullif(accurateCastOrNull(trim(BOTH '"' from name), 'String'), 'null') as name, - accurateCastOrNull(_ab_cdc_lsn, ' - Float64 -') as _ab_cdc_lsn, - accurateCastOrNull(_ab_cdc_updated_at, ' - Float64 -') as _ab_cdc_updated_at, - accurateCastOrNull(_ab_cdc_deleted_at, ' - Float64 -') as _ab_cdc_deleted_at, - accurateCastOrNull(_ab_cdc_log_pos, ' - Float64 -') as _ab_cdc_log_pos, - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from __dbt__cte__pos_dedup_cdcx_ab1 --- pos_dedup_cdcx -where 1 = 1 - -)-- SQL model to build a hash column based on the values of this record -select - assumeNotNull(hex(MD5( - - toString(id) || '~' || - - - toString(name) || '~' || - - - toString(_ab_cdc_lsn) || '~' || - - - toString(_ab_cdc_updated_at) || '~' || - - - toString(_ab_cdc_deleted_at) || '~' || - - - toString(_ab_cdc_log_pos) - - ))) as _airbyte_pos_dedup_cdcx_hashid, - tmp.* -from __dbt__cte__pos_dedup_cdcx_ab2 tmp --- pos_dedup_cdcx -where 1 = 1 - - ) \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/renamed_dedup_cdc_excluded_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/renamed_dedup_cdc_excluded_ab3.sql deleted file mode 100644 index 43c5b8ad9e18a..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/renamed_dedup_cdc_excluded_ab3.sql +++ /dev/null @@ -1,45 +0,0 @@ - - - create view _airbyte_test_normalization.renamed_dedup_cdc_excluded_ab3__dbt_tmp - - as ( - -with __dbt__cte__renamed_dedup_cdc_excluded_ab1 as ( - --- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema -select - JSONExtractRaw(_airbyte_data, 'id') as id, - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from test_normalization._airbyte_raw_renamed_dedup_cdc_excluded as table_alias --- renamed_dedup_cdc_excluded -where 1 = 1 - -), __dbt__cte__renamed_dedup_cdc_excluded_ab2 as ( - --- SQL model to cast each column to its adequate SQL type converted from the JSON schema type -select - accurateCastOrNull(id, ' - BIGINT -') as id, - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from __dbt__cte__renamed_dedup_cdc_excluded_ab1 --- renamed_dedup_cdc_excluded -where 1 = 1 - -)-- SQL model to build a hash column based on the values of this record -select - assumeNotNull(hex(MD5( - - toString(id) - - ))) as _airbyte_renamed_dedup_cdc_excluded_hashid, - tmp.* -from __dbt__cte__renamed_dedup_cdc_excluded_ab2 tmp --- renamed_dedup_cdc_excluded -where 1 = 1 - - ) \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab1.sql index ad250a2de1969..5b9ee4b6b6820 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab1.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab1.sql @@ -4,6 +4,7 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: {{ source('test_normalization', '_airbyte_raw_dedup_cdc_excluded') }} select {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as id, {{ json_extract_scalar('_airbyte_data', ['name'], ['name']) }} as name, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab2.sql index b4921f53776b7..6f7e747a0699a 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab2.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab2.sql @@ -4,6 +4,7 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: {{ ref('dedup_cdc_excluded_ab1') }} select accurateCastOrNull(id, '{{ dbt_utils.type_bigint() }}') as id, nullif(accurateCastOrNull(trim(BOTH '"' from name), '{{ dbt_utils.type_string() }}'), 'null') as name, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab1.sql index bdfc716769aee..6e998ca141418 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab1.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab1.sql @@ -4,6 +4,7 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} select {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as id, {{ json_extract_scalar('_airbyte_data', ['currency'], ['currency']) }} as currency, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab2.sql index 351ccad8f300e..ee41ee94585ee 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab2.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_exchange_rate_ab2.sql @@ -4,6 +4,7 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: {{ ref('dedup_exchange_rate_ab1') }} select accurateCastOrNull(id, '{{ dbt_utils.type_bigint() }}') as id, nullif(accurateCastOrNull(trim(BOTH '"' from currency), '{{ dbt_utils.type_string() }}'), 'null') as currency, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab1.sql index 59f1c4bcfba0c..f9b9da32d25d1 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab1.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab1.sql @@ -4,6 +4,7 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} select {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as id, {{ json_extract_scalar('_airbyte_data', ['currency'], ['currency']) }} as currency, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab2.sql index a48a14a7aecc3..49cb5ea4c759b 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab2.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab2.sql @@ -4,6 +4,7 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: {{ ref('exchange_rate_ab1') }} select accurateCastOrNull(id, '{{ dbt_utils.type_bigint() }}') as id, nullif(accurateCastOrNull(trim(BOTH '"' from currency), '{{ dbt_utils.type_string() }}'), 'null') as currency, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab3.sql index d6593d4eb8f1a..c45103fae85c5 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab3.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab3.sql @@ -4,6 +4,7 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to build a hash column based on the values of this record +-- depends_on: {{ ref('exchange_rate_ab2') }} select {{ dbt_utils.surrogate_key([ 'id', diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab1.sql index 182bcd7dbfd4f..909b7bd2366b6 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab1.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab1.sql @@ -4,6 +4,7 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: {{ source('test_normalization', '_airbyte_raw_pos_dedup_cdcx') }} select {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as id, {{ json_extract_scalar('_airbyte_data', ['name'], ['name']) }} as name, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab2.sql index 3769adf4d02e0..0b9192b2620a4 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab2.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab2.sql @@ -4,6 +4,7 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: {{ ref('pos_dedup_cdcx_ab1') }} select accurateCastOrNull(id, '{{ dbt_utils.type_bigint() }}') as id, nullif(accurateCastOrNull(trim(BOTH '"' from name), '{{ dbt_utils.type_string() }}'), 'null') as name, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab1.sql index 4504a7bbffa32..a09668e69387e 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab1.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab1.sql @@ -4,6 +4,7 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: {{ source('test_normalization', '_airbyte_raw_renamed_dedup_cdc_excluded') }} select {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as id, _airbyte_ab_id, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab2.sql index 8b248db9590f7..2fd528509bc5a 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab2.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/renamed_dedup_cdc_excluded_ab2.sql @@ -4,6 +4,7 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: {{ ref('renamed_dedup_cdc_excluded_ab1') }} select accurateCastOrNull(id, '{{ dbt_utils.type_bigint() }}') as id, _airbyte_ab_id, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql index eca9d38763a06..eedb5184f0a89 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql @@ -1,15 +1,17 @@ {{ config( unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", + post_hook = ['drop view _airbyte_test_normalization.dedup_cdc_excluded_stg'], tags = [ "top-level" ] ) }} +-- depends_on: ref('dedup_cdc_excluded_stg') with {% if is_incremental() %} new_data as ( -- retrieve incremental "new" data select * - from {{ ref('dedup_cdc_excluded_ab3') }} + from {{ ref('dedup_cdc_excluded_stg') }} -- dedup_cdc_excluded from {{ source('test_normalization', '_airbyte_raw_dedup_cdc_excluded') }} where 1 = 1 {{ incremental_clause('_airbyte_emitted_at') }} @@ -22,26 +24,30 @@ new_data_ids as ( ]) }} as _airbyte_unique_key from new_data ), +empty_new_data as ( + -- build an empty table to only keep the table's column types + select * from new_data where 1 = 0 +), previous_active_scd_data as ( -- retrieve "incomplete old" data that needs to be updated with an end date because of new changes select - {{ star_intersect(ref('dedup_cdc_excluded_ab3'), this, from_alias='inc_data', intersect_alias='this_data') }} + {{ star_intersect(ref('dedup_cdc_excluded_stg'), this, from_alias='inc_data', intersect_alias='this_data') }} from {{ this }} as this_data -- make a join with new_data using primary key to filter active data that need to be updated only join new_data_ids on this_data._airbyte_unique_key = new_data_ids._airbyte_unique_key - -- force left join to NULL values (we just need to transfer column types only for the star_intersect macro) - --left join {{ ref('dedup_cdc_excluded_ab3') }} as inc_data on 1 = 0 + -- force left join to NULL values (we just need to transfer column types only for the star_intersect macro on schema changes) + --left join empty_new_data as inc_data on this_data._airbyte_ab_id = inc_data._airbyte_ab_id where _airbyte_active_row = 1 ), input_data as ( - select {{ dbt_utils.star(ref('dedup_cdc_excluded_ab3')) }} from new_data + select {{ dbt_utils.star(ref('dedup_cdc_excluded_stg')) }} from new_data union all - select {{ dbt_utils.star(ref('dedup_cdc_excluded_ab3')) }} from previous_active_scd_data + select {{ dbt_utils.star(ref('dedup_cdc_excluded_stg')) }} from previous_active_scd_data ), {% else %} input_data as ( select * - from {{ ref('dedup_cdc_excluded_ab3') }} + from {{ ref('dedup_cdc_excluded_stg') }} -- dedup_cdc_excluded from {{ source('test_normalization', '_airbyte_raw_dedup_cdc_excluded') }} ), {% endif %} @@ -68,15 +74,15 @@ scd_data as ( _ab_cdc_updated_at, _ab_cdc_deleted_at, _airbyte_emitted_at as _airbyte_start_at, + case when _airbyte_active_row_num = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row, anyOrNull(_airbyte_emitted_at) over ( partition by id order by _airbyte_emitted_at is null asc, _airbyte_emitted_at desc, _airbyte_emitted_at desc, _ab_cdc_updated_at desc - ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING + ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING ) as _airbyte_end_at, - case when _airbyte_active_row_num = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row, _airbyte_ab_id, _airbyte_emitted_at, _airbyte_dedup_cdc_excluded_hashid @@ -88,7 +94,7 @@ dedup_data as ( -- additionally, we generate a unique key for the scd table row_number() over ( partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at, accurateCastOrNull(_ab_cdc_deleted_at, '{{ dbt_utils.type_string() }}'), accurateCastOrNull(_ab_cdc_updated_at, '{{ dbt_utils.type_string() }}') - order by _airbyte_ab_id + order by _airbyte_active_row desc, _airbyte_ab_id ) as _airbyte_row_num, {{ dbt_utils.surrogate_key([ '_airbyte_unique_key', diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql index 97757d03ce77d..13744503505c2 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql @@ -1,15 +1,17 @@ {{ config( unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", + post_hook = ['drop view _airbyte_test_normalization.dedup_exchange_rate_stg'], tags = [ "top-level" ] ) }} +-- depends_on: ref('dedup_exchange_rate_stg') with {% if is_incremental() %} new_data as ( -- retrieve incremental "new" data select * - from {{ ref('dedup_exchange_rate_ab3') }} + from {{ ref('dedup_exchange_rate_stg') }} -- dedup_exchange_rate from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} where 1 = 1 {{ incremental_clause('_airbyte_emitted_at') }} @@ -24,26 +26,30 @@ new_data_ids as ( ]) }} as _airbyte_unique_key from new_data ), +empty_new_data as ( + -- build an empty table to only keep the table's column types + select * from new_data where 1 = 0 +), previous_active_scd_data as ( -- retrieve "incomplete old" data that needs to be updated with an end date because of new changes select - {{ star_intersect(ref('dedup_exchange_rate_ab3'), this, from_alias='inc_data', intersect_alias='this_data') }} + {{ star_intersect(ref('dedup_exchange_rate_stg'), this, from_alias='inc_data', intersect_alias='this_data') }} from {{ this }} as this_data -- make a join with new_data using primary key to filter active data that need to be updated only join new_data_ids on this_data._airbyte_unique_key = new_data_ids._airbyte_unique_key - -- force left join to NULL values (we just need to transfer column types only for the star_intersect macro) - --left join {{ ref('dedup_exchange_rate_ab3') }} as inc_data on 1 = 0 + -- force left join to NULL values (we just need to transfer column types only for the star_intersect macro on schema changes) + --left join empty_new_data as inc_data on this_data._airbyte_ab_id = inc_data._airbyte_ab_id where _airbyte_active_row = 1 ), input_data as ( - select {{ dbt_utils.star(ref('dedup_exchange_rate_ab3')) }} from new_data + select {{ dbt_utils.star(ref('dedup_exchange_rate_stg')) }} from new_data union all - select {{ dbt_utils.star(ref('dedup_exchange_rate_ab3')) }} from previous_active_scd_data + select {{ dbt_utils.star(ref('dedup_exchange_rate_stg')) }} from previous_active_scd_data ), {% else %} input_data as ( select * - from {{ ref('dedup_exchange_rate_ab3') }} + from {{ ref('dedup_exchange_rate_stg') }} -- dedup_exchange_rate from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} ), {% endif %} @@ -75,15 +81,15 @@ scd_data as ( NZD, USD, date as _airbyte_start_at, + case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row, anyOrNull(date) over ( partition by id, currency, cast(NZD as {{ dbt_utils.type_string() }}) order by date is null asc, date desc, _airbyte_emitted_at desc - ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING + ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING ) as _airbyte_end_at, - case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row, _airbyte_ab_id, _airbyte_emitted_at, _airbyte_dedup_exchange_rate_hashid @@ -95,7 +101,7 @@ dedup_data as ( -- additionally, we generate a unique key for the scd table row_number() over ( partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at - order by _airbyte_ab_id + order by _airbyte_active_row desc, _airbyte_ab_id ) as _airbyte_row_num, {{ dbt_utils.surrogate_key([ '_airbyte_unique_key', diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql index c0dcee2b2ccbb..525bee19a04f1 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql @@ -1,15 +1,17 @@ {{ config( unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", + post_hook = ['drop view _airbyte_test_normalization.renamed_dedup_cdc_excluded_stg'], tags = [ "top-level" ] ) }} +-- depends_on: ref('renamed_dedup_cdc_excluded_stg') with {% if is_incremental() %} new_data as ( -- retrieve incremental "new" data select * - from {{ ref('renamed_dedup_cdc_excluded_ab3') }} + from {{ ref('renamed_dedup_cdc_excluded_stg') }} -- renamed_dedup_cdc_excluded from {{ source('test_normalization', '_airbyte_raw_renamed_dedup_cdc_excluded') }} where 1 = 1 {{ incremental_clause('_airbyte_emitted_at') }} @@ -22,26 +24,30 @@ new_data_ids as ( ]) }} as _airbyte_unique_key from new_data ), +empty_new_data as ( + -- build an empty table to only keep the table's column types + select * from new_data where 1 = 0 +), previous_active_scd_data as ( -- retrieve "incomplete old" data that needs to be updated with an end date because of new changes select - {{ star_intersect(ref('renamed_dedup_cdc_excluded_ab3'), this, from_alias='inc_data', intersect_alias='this_data') }} + {{ star_intersect(ref('renamed_dedup_cdc_excluded_stg'), this, from_alias='inc_data', intersect_alias='this_data') }} from {{ this }} as this_data -- make a join with new_data using primary key to filter active data that need to be updated only join new_data_ids on this_data._airbyte_unique_key = new_data_ids._airbyte_unique_key - -- force left join to NULL values (we just need to transfer column types only for the star_intersect macro) - --left join {{ ref('renamed_dedup_cdc_excluded_ab3') }} as inc_data on 1 = 0 + -- force left join to NULL values (we just need to transfer column types only for the star_intersect macro on schema changes) + --left join empty_new_data as inc_data on this_data._airbyte_ab_id = inc_data._airbyte_ab_id where _airbyte_active_row = 1 ), input_data as ( - select {{ dbt_utils.star(ref('renamed_dedup_cdc_excluded_ab3')) }} from new_data + select {{ dbt_utils.star(ref('renamed_dedup_cdc_excluded_stg')) }} from new_data union all - select {{ dbt_utils.star(ref('renamed_dedup_cdc_excluded_ab3')) }} from previous_active_scd_data + select {{ dbt_utils.star(ref('renamed_dedup_cdc_excluded_stg')) }} from previous_active_scd_data ), {% else %} input_data as ( select * - from {{ ref('renamed_dedup_cdc_excluded_ab3') }} + from {{ ref('renamed_dedup_cdc_excluded_stg') }} -- renamed_dedup_cdc_excluded from {{ source('test_normalization', '_airbyte_raw_renamed_dedup_cdc_excluded') }} ), {% endif %} @@ -64,15 +70,15 @@ scd_data as ( ]) }} as _airbyte_unique_key, id, _airbyte_emitted_at as _airbyte_start_at, + case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row, anyOrNull(_airbyte_emitted_at) over ( partition by id order by _airbyte_emitted_at is null asc, _airbyte_emitted_at desc, _airbyte_emitted_at desc - ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING + ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING ) as _airbyte_end_at, - case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row, _airbyte_ab_id, _airbyte_emitted_at, _airbyte_renamed_dedup_cdc_excluded_hashid @@ -84,7 +90,7 @@ dedup_data as ( -- additionally, we generate a unique key for the scd table row_number() over ( partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at - order by _airbyte_ab_id + order by _airbyte_active_row desc, _airbyte_ab_id ) as _airbyte_row_num, {{ dbt_utils.surrogate_key([ '_airbyte_unique_key', diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql index 086676f173719..6a6248e7cb6a8 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql @@ -4,6 +4,7 @@ tags = [ "top-level" ] ) }} -- Final base SQL model +-- depends_on: {{ ref('dedup_cdc_excluded_scd') }} select _airbyte_unique_key, id, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/dedup_exchange_rate.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/dedup_exchange_rate.sql index 3fb8238f2479b..180310a437ff6 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/dedup_exchange_rate.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/dedup_exchange_rate.sql @@ -4,6 +4,7 @@ tags = [ "top-level" ] ) }} -- Final base SQL model +-- depends_on: {{ ref('dedup_exchange_rate_scd') }} select _airbyte_unique_key, id, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql index 8a8ff85f59024..d9f20813f833e 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/renamed_dedup_cdc_excluded.sql @@ -4,6 +4,7 @@ tags = [ "top-level" ] ) }} -- Final base SQL model +-- depends_on: {{ ref('renamed_dedup_cdc_excluded_scd') }} select _airbyte_unique_key, id, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_tables/test_normalization/exchange_rate.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_tables/test_normalization/exchange_rate.sql index c370f10264a3f..77fba32c34999 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_tables/test_normalization/exchange_rate.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_tables/test_normalization/exchange_rate.sql @@ -4,6 +4,7 @@ tags = [ "top-level" ] ) }} -- Final base SQL model +-- depends_on: {{ ref('exchange_rate_ab3') }} select id, currency, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/dedup_cdc_excluded_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/dedup_cdc_excluded_ab3.sql deleted file mode 100644 index 446204f691ebc..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/dedup_cdc_excluded_ab3.sql +++ /dev/null @@ -1,20 +0,0 @@ -{{ config( - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to build a hash column based on the values of this record -select - {{ dbt_utils.surrogate_key([ - 'id', - 'name', - '_ab_cdc_lsn', - '_ab_cdc_updated_at', - '_ab_cdc_deleted_at', - ]) }} as _airbyte_dedup_cdc_excluded_hashid, - tmp.* -from {{ ref('dedup_cdc_excluded_ab2') }} tmp --- dedup_cdc_excluded -where 1 = 1 -{{ incremental_clause('_airbyte_emitted_at') }} - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/dedup_exchange_rate_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/dedup_exchange_rate_stg.sql similarity index 92% rename from airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/dedup_exchange_rate_ab3.sql rename to airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/dedup_exchange_rate_stg.sql index b14a1fb639b1b..0b4900731039d 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/dedup_exchange_rate_ab3.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/dedup_exchange_rate_stg.sql @@ -4,6 +4,7 @@ tags = [ "top-level-intermediate" ] ) }} -- SQL model to build a hash column based on the values of this record +-- depends_on: {{ ref('dedup_exchange_rate_ab2') }} select {{ dbt_utils.surrogate_key([ 'id', diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/pos_dedup_cdcx_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/pos_dedup_cdcx_ab3.sql deleted file mode 100644 index dbe0c313b238b..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/pos_dedup_cdcx_ab3.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config( - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to build a hash column based on the values of this record -select - {{ dbt_utils.surrogate_key([ - 'id', - 'name', - '_ab_cdc_lsn', - '_ab_cdc_updated_at', - '_ab_cdc_deleted_at', - '_ab_cdc_log_pos', - ]) }} as _airbyte_pos_dedup_cdcx_hashid, - tmp.* -from {{ ref('pos_dedup_cdcx_ab2') }} tmp --- pos_dedup_cdcx -where 1 = 1 -{{ incremental_clause('_airbyte_emitted_at') }} - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/renamed_dedup_cdc_excluded_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/renamed_dedup_cdc_excluded_ab3.sql deleted file mode 100644 index 2356b929f1f38..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_views/test_normalization/renamed_dedup_cdc_excluded_ab3.sql +++ /dev/null @@ -1,16 +0,0 @@ -{{ config( - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to build a hash column based on the values of this record -select - {{ dbt_utils.surrogate_key([ - 'id', - ]) }} as _airbyte_renamed_dedup_cdc_excluded_hashid, - tmp.* -from {{ ref('renamed_dedup_cdc_excluded_ab2') }} tmp --- renamed_dedup_cdc_excluded -where 1 = 1 -{{ incremental_clause('_airbyte_emitted_at') }} - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_tables/test_normalization/exchange_rate.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_tables/test_normalization/exchange_rate.sql index 0eb15bc43e455..2ee3d293b8403 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_tables/test_normalization/exchange_rate.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_tables/test_normalization/exchange_rate.sql @@ -14,6 +14,7 @@ with __dbt__cte__exchange_rate_ab1 as ( -- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: test_normalization._airbyte_raw_exchange_rate select JSONExtractRaw(_airbyte_data, 'id') as id, JSONExtractRaw(_airbyte_data, 'currency') as currency, @@ -33,6 +34,7 @@ where 1 = 1 ), __dbt__cte__exchange_rate_ab2 as ( -- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__exchange_rate_ab1 select accurateCastOrNull(id, ' BIGINT @@ -60,6 +62,7 @@ where 1 = 1 ), __dbt__cte__exchange_rate_ab3 as ( -- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__exchange_rate_ab2 select assumeNotNull(hex(MD5( @@ -95,6 +98,7 @@ from __dbt__cte__exchange_rate_ab2 tmp -- exchange_rate where 1 = 1 )-- Final base SQL model +-- depends_on: __dbt__cte__exchange_rate_ab3 select id, currency, diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/dedup_cdc_excluded_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/dedup_cdc_excluded_ab3.sql deleted file mode 100644 index fe2bf632dbf20..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/dedup_cdc_excluded_ab3.sql +++ /dev/null @@ -1,71 +0,0 @@ - - - create view _airbyte_test_normalization.dedup_cdc_excluded_ab3__dbt_tmp - - as ( - -with __dbt__cte__dedup_cdc_excluded_ab1 as ( - --- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema -select - JSONExtractRaw(_airbyte_data, 'id') as id, - JSONExtractRaw(_airbyte_data, 'name') as name, - JSONExtractRaw(_airbyte_data, '_ab_cdc_lsn') as _ab_cdc_lsn, - JSONExtractRaw(_airbyte_data, '_ab_cdc_updated_at') as _ab_cdc_updated_at, - JSONExtractRaw(_airbyte_data, '_ab_cdc_deleted_at') as _ab_cdc_deleted_at, - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from test_normalization._airbyte_raw_dedup_cdc_excluded as table_alias --- dedup_cdc_excluded -where 1 = 1 - -), __dbt__cte__dedup_cdc_excluded_ab2 as ( - --- SQL model to cast each column to its adequate SQL type converted from the JSON schema type -select - accurateCastOrNull(id, ' - BIGINT -') as id, - nullif(accurateCastOrNull(trim(BOTH '"' from name), 'String'), 'null') as name, - accurateCastOrNull(_ab_cdc_lsn, ' - Float64 -') as _ab_cdc_lsn, - accurateCastOrNull(_ab_cdc_updated_at, ' - Float64 -') as _ab_cdc_updated_at, - accurateCastOrNull(_ab_cdc_deleted_at, ' - Float64 -') as _ab_cdc_deleted_at, - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from __dbt__cte__dedup_cdc_excluded_ab1 --- dedup_cdc_excluded -where 1 = 1 - -)-- SQL model to build a hash column based on the values of this record -select - assumeNotNull(hex(MD5( - - toString(id) || '~' || - - - toString(name) || '~' || - - - toString(_ab_cdc_lsn) || '~' || - - - toString(_ab_cdc_updated_at) || '~' || - - - toString(_ab_cdc_deleted_at) - - ))) as _airbyte_dedup_cdc_excluded_hashid, - tmp.* -from __dbt__cte__dedup_cdc_excluded_ab2 tmp --- dedup_cdc_excluded -where 1 = 1 - - ) \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/dedup_exchange_rate_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/dedup_exchange_rate_stg.sql similarity index 91% rename from airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/dedup_exchange_rate_ab3.sql rename to airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/dedup_exchange_rate_stg.sql index 28abd1a79a7f2..799af4ec78aba 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/dedup_exchange_rate_ab3.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/dedup_exchange_rate_stg.sql @@ -1,12 +1,13 @@ - create view _airbyte_test_normalization.dedup_exchange_rate_ab3__dbt_tmp + create view _airbyte_test_normalization.dedup_exchange_rate_stg__dbt_tmp as ( with __dbt__cte__dedup_exchange_rate_ab1 as ( -- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: test_normalization._airbyte_raw_dedup_exchange_rate select JSONExtractRaw(_airbyte_data, 'id') as id, JSONExtractRaw(_airbyte_data, 'currency') as currency, @@ -26,6 +27,7 @@ where 1 = 1 ), __dbt__cte__dedup_exchange_rate_ab2 as ( -- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__dedup_exchange_rate_ab1 select accurateCastOrNull(id, ' BIGINT @@ -51,6 +53,7 @@ from __dbt__cte__dedup_exchange_rate_ab1 where 1 = 1 )-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__dedup_exchange_rate_ab2 select assumeNotNull(hex(MD5( diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/pos_dedup_cdcx_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/pos_dedup_cdcx_ab3.sql deleted file mode 100644 index 9f515f09a4a44..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/pos_dedup_cdcx_ab3.sql +++ /dev/null @@ -1,78 +0,0 @@ - - - create view _airbyte_test_normalization.pos_dedup_cdcx_ab3__dbt_tmp - - as ( - -with __dbt__cte__pos_dedup_cdcx_ab1 as ( - --- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema -select - JSONExtractRaw(_airbyte_data, 'id') as id, - JSONExtractRaw(_airbyte_data, 'name') as name, - JSONExtractRaw(_airbyte_data, '_ab_cdc_lsn') as _ab_cdc_lsn, - JSONExtractRaw(_airbyte_data, '_ab_cdc_updated_at') as _ab_cdc_updated_at, - JSONExtractRaw(_airbyte_data, '_ab_cdc_deleted_at') as _ab_cdc_deleted_at, - JSONExtractRaw(_airbyte_data, '_ab_cdc_log_pos') as _ab_cdc_log_pos, - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from test_normalization._airbyte_raw_pos_dedup_cdcx as table_alias --- pos_dedup_cdcx -where 1 = 1 - -), __dbt__cte__pos_dedup_cdcx_ab2 as ( - --- SQL model to cast each column to its adequate SQL type converted from the JSON schema type -select - accurateCastOrNull(id, ' - BIGINT -') as id, - nullif(accurateCastOrNull(trim(BOTH '"' from name), 'String'), 'null') as name, - accurateCastOrNull(_ab_cdc_lsn, ' - Float64 -') as _ab_cdc_lsn, - accurateCastOrNull(_ab_cdc_updated_at, ' - Float64 -') as _ab_cdc_updated_at, - accurateCastOrNull(_ab_cdc_deleted_at, ' - Float64 -') as _ab_cdc_deleted_at, - accurateCastOrNull(_ab_cdc_log_pos, ' - Float64 -') as _ab_cdc_log_pos, - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from __dbt__cte__pos_dedup_cdcx_ab1 --- pos_dedup_cdcx -where 1 = 1 - -)-- SQL model to build a hash column based on the values of this record -select - assumeNotNull(hex(MD5( - - toString(id) || '~' || - - - toString(name) || '~' || - - - toString(_ab_cdc_lsn) || '~' || - - - toString(_ab_cdc_updated_at) || '~' || - - - toString(_ab_cdc_deleted_at) || '~' || - - - toString(_ab_cdc_log_pos) - - ))) as _airbyte_pos_dedup_cdcx_hashid, - tmp.* -from __dbt__cte__pos_dedup_cdcx_ab2 tmp --- pos_dedup_cdcx -where 1 = 1 - - ) \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/renamed_dedup_cdc_excluded_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/renamed_dedup_cdc_excluded_ab3.sql deleted file mode 100644 index 43c5b8ad9e18a..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_views/test_normalization/renamed_dedup_cdc_excluded_ab3.sql +++ /dev/null @@ -1,45 +0,0 @@ - - - create view _airbyte_test_normalization.renamed_dedup_cdc_excluded_ab3__dbt_tmp - - as ( - -with __dbt__cte__renamed_dedup_cdc_excluded_ab1 as ( - --- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema -select - JSONExtractRaw(_airbyte_data, 'id') as id, - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from test_normalization._airbyte_raw_renamed_dedup_cdc_excluded as table_alias --- renamed_dedup_cdc_excluded -where 1 = 1 - -), __dbt__cte__renamed_dedup_cdc_excluded_ab2 as ( - --- SQL model to cast each column to its adequate SQL type converted from the JSON schema type -select - accurateCastOrNull(id, ' - BIGINT -') as id, - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at -from __dbt__cte__renamed_dedup_cdc_excluded_ab1 --- renamed_dedup_cdc_excluded -where 1 = 1 - -)-- SQL model to build a hash column based on the values of this record -select - assumeNotNull(hex(MD5( - - toString(id) - - ))) as _airbyte_renamed_dedup_cdc_excluded_hashid, - tmp.* -from __dbt__cte__renamed_dedup_cdc_excluded_ab2 tmp --- renamed_dedup_cdc_excluded -where 1 = 1 - - ) \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index e41b683b6b1bc..1a8993ddf8cc3 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -649,7 +649,144 @@ def safe_cast_to_string(definition: Dict, column_name: str, destination_type: De return col def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> str: - scd_sql_template = """ + order_null = "is null asc" + if self.destination_type.value == DestinationType.ORACLE.value: + order_null = "asc nulls last" + if self.destination_type.value == DestinationType.MSSQL.value: + # SQL Server treats NULL values as the lowest values, then sorted in ascending order, NULLs come first. + order_null = "desc" + + lag_begin = "lag" + lag_end = "" + input_data_table = "input_data" + if self.destination_type == DestinationType.CLICKHOUSE: + # ClickHouse doesn't support lag() yet, this is a workaround solution + # Ref: https://clickhouse.com/docs/en/sql-reference/window-functions/ + lag_begin = "anyOrNull" + lag_end = "ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING" + input_data_table = "input_data_with_active_row_num" + + enable_left_join_null = "" + cast_begin = "cast(" + cast_as = " as " + cast_end = ")" + if self.destination_type == DestinationType.CLICKHOUSE: + enable_left_join_null = "--" + cast_begin = "accurateCastOrNull(" + cast_as = ", '" + cast_end = "')" + + # TODO move all cdc columns out of scd models + cdc_active_row_pattern = "" + cdc_updated_order_pattern = "" + cdc_cols = "" + quoted_cdc_cols = "" + if "_ab_cdc_deleted_at" in column_names.keys(): + col_cdc_deleted_at = self.name_transformer.normalize_column_name("_ab_cdc_deleted_at") + col_cdc_updated_at = self.name_transformer.normalize_column_name("_ab_cdc_updated_at") + quoted_col_cdc_deleted_at = self.name_transformer.normalize_column_name("_ab_cdc_deleted_at", in_jinja=True) + quoted_col_cdc_updated_at = self.name_transformer.normalize_column_name("_ab_cdc_updated_at", in_jinja=True) + cdc_active_row_pattern = f" and {col_cdc_deleted_at} is null" + cdc_updated_order_pattern = f", {col_cdc_updated_at} desc" + cdc_cols = ( + f", {cast_begin}{col_cdc_deleted_at}{cast_as}" + + "{{ dbt_utils.type_string() }}" + + f"{cast_end}" + + f", {cast_begin}{col_cdc_updated_at}{cast_as}" + + "{{ dbt_utils.type_string() }}" + + f"{cast_end}" + ) + quoted_cdc_cols = f", {quoted_col_cdc_deleted_at}, {quoted_col_cdc_updated_at}" + + if "_ab_cdc_log_pos" in column_names.keys(): + col_cdc_log_pos = self.name_transformer.normalize_column_name("_ab_cdc_log_pos") + quoted_col_cdc_log_pos = self.name_transformer.normalize_column_name("_ab_cdc_log_pos", in_jinja=True) + cdc_updated_order_pattern += f", {col_cdc_log_pos} desc" + cdc_cols += f", {cast_begin}{col_cdc_log_pos}{cast_as}" + "{{ dbt_utils.type_string() }}" + f"{cast_end}" + quoted_cdc_cols += f", {quoted_col_cdc_log_pos}" + + jinja_variables = { + "active_row": self.name_transformer.normalize_column_name("_airbyte_active_row"), + "airbyte_end_at": self.name_transformer.normalize_column_name("_airbyte_end_at"), + "airbyte_row_num": self.name_transformer.normalize_column_name("_airbyte_row_num"), + "airbyte_start_at": self.name_transformer.normalize_column_name("_airbyte_start_at"), + "airbyte_unique_key_scd": self.name_transformer.normalize_column_name(f"{self.airbyte_unique_key}_scd"), + "cdc_active_row": cdc_active_row_pattern, + "cdc_cols": cdc_cols, + "cdc_updated_at_order": cdc_updated_order_pattern, + "col_ab_id": self.get_ab_id(), + "col_emitted_at": self.get_emitted_at(), + "col_normalized_at": self.get_normalized_at(), + "cursor_field": self.get_cursor_field(column_names), + "enable_left_join_null": enable_left_join_null, + "fields": self.list_fields(column_names), + "from_table": from_table, + "hash_id": self.hash_id(), + "input_data_table": input_data_table, + "lag_begin": lag_begin, + "lag_end": lag_end, + "order_null": order_null, + "parent_hash_id": self.parent_hash_id(), + "primary_key_partition": self.get_primary_key_partition(column_names), + "primary_keys": self.list_primary_keys(column_names), + "quoted_airbyte_row_num": self.name_transformer.normalize_column_name("_airbyte_row_num", in_jinja=True), + "quoted_airbyte_start_at": self.name_transformer.normalize_column_name("_airbyte_start_at", in_jinja=True), + "quoted_cdc_cols": quoted_cdc_cols, + "quoted_col_emitted_at": self.get_emitted_at(in_jinja=True), + "quoted_unique_key": self.get_unique_key(in_jinja=True), + "sql_table_comment": self.sql_table_comment(include_from_table=True), + "unique_key": self.get_unique_key(), + } + if self.destination_type == DestinationType.CLICKHOUSE: + clickhouse_active_row_sql = Template( + """ +input_data_with_active_row_num as ( + select *, + row_number() over ( + partition by {{ primary_key_partition | join(", ") }} + order by + {{ cursor_field }} {{ order_null }}, + {{ cursor_field }} desc, + {{ col_emitted_at }} desc{{ cdc_updated_at_order }} + ) as _airbyte_active_row_num + from input_data +),""" + ).render(jinja_variables) + jinja_variables["clickhouse_active_row_sql"] = clickhouse_active_row_sql + scd_columns_sql = Template( + """ + case when _airbyte_active_row_num = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, + {{ lag_begin }}({{ cursor_field }}) over ( + partition by {{ primary_key_partition | join(", ") }} + order by + {{ cursor_field }} {{ order_null }}, + {{ cursor_field }} desc, + {{ col_emitted_at }} desc{{ cdc_updated_at_order }} + {{ lag_end }} + ) as {{ airbyte_end_at }}""" + ).render(jinja_variables) + jinja_variables["scd_columns_sql"] = scd_columns_sql + else: + scd_columns_sql = Template( + """ + lag({{ cursor_field }}) over ( + partition by {{ primary_key_partition | join(", ") }} + order by + {{ cursor_field }} {{ order_null }}, + {{ cursor_field }} desc, + {{ col_emitted_at }} desc{{ cdc_updated_at_order }} + ) as {{ airbyte_end_at }}, + case when row_number() over ( + partition by {{ primary_key_partition | join(", ") }} + order by + {{ cursor_field }} {{ order_null }}, + {{ cursor_field }} desc, + {{ col_emitted_at }} desc{{ cdc_updated_at_order }} + ) = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }}""" + ).render(jinja_variables) + jinja_variables["scd_columns_sql"] = scd_columns_sql + sql = Template( + """ -- depends_on: {{ from_table }} with {{ '{% if is_incremental() %}' }} @@ -699,17 +836,7 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup {{ sql_table_comment }} ), {{ '{% endif %}' }} -input_data_with_active_row_num as ( - select *, - row_number() over ( - partition by {{ primary_key_partition | join(", ") }} - order by - {{ cursor_field }} {{ order_null }}, - {{ cursor_field }} desc, - {{ col_emitted_at }} desc{{ cdc_updated_at_order }} - ) as _airbyte_active_row_num - from input_data -), +{{ clickhouse_active_row_sql }} scd_data as ( -- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key select @@ -725,19 +852,11 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup {{ field }}, {%- endfor %} {{ cursor_field }} as {{ airbyte_start_at }}, - {{ lag_begin }}({{ cursor_field }}) over ( - partition by {{ primary_key_partition | join(", ") }} - order by - {{ cursor_field }} {{ order_null }}, - {{ cursor_field }} desc, - {{ col_emitted_at }} desc{{ cdc_updated_at_order }} - {{ lag_end }} - ) as {{ airbyte_end_at }}, - case when _airbyte_active_row_num = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, + {{ scd_columns_sql }}, {{ col_ab_id }}, {{ col_emitted_at }}, {{ hash_id }} - from input_data_with_active_row_num + from {{ input_data_table }} ), dedup_data as ( select @@ -772,94 +891,8 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup {{ '{{ current_timestamp() }}' }} as {{ col_normalized_at }}, {{ hash_id }} from dedup_data where {{ airbyte_row_num }} = 1 - """ - template = Template(scd_sql_template) - - order_null = "is null asc" - if self.destination_type.value == DestinationType.ORACLE.value: - order_null = "asc nulls last" - if self.destination_type.value == DestinationType.MSSQL.value: - # SQL Server treats NULL values as the lowest values, then sorted in ascending order, NULLs come first. - order_null = "desc" - - lag_begin = "lag" - lag_end = "" - if self.destination_type == DestinationType.CLICKHOUSE: - # ClickHouse doesn't support lag() yet, this is a workaround solution - # Ref: https://clickhouse.com/docs/en/sql-reference/window-functions/ - lag_begin = "anyOrNull" - lag_end = "ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING" - - enable_left_join_null = "" - cast_begin = "cast(" - cast_as = " as " - cast_end = ")" - if self.destination_type == DestinationType.CLICKHOUSE: - enable_left_join_null = "--" - cast_begin = "accurateCastOrNull(" - cast_as = ", '" - cast_end = "')" - - # TODO move all cdc columns out of scd models - cdc_active_row_pattern = "" - cdc_updated_order_pattern = "" - cdc_cols = "" - quoted_cdc_cols = "" - if "_ab_cdc_deleted_at" in column_names.keys(): - col_cdc_deleted_at = self.name_transformer.normalize_column_name("_ab_cdc_deleted_at") - col_cdc_updated_at = self.name_transformer.normalize_column_name("_ab_cdc_updated_at") - quoted_col_cdc_deleted_at = self.name_transformer.normalize_column_name("_ab_cdc_deleted_at", in_jinja=True) - quoted_col_cdc_updated_at = self.name_transformer.normalize_column_name("_ab_cdc_updated_at", in_jinja=True) - cdc_active_row_pattern = f" and {col_cdc_deleted_at} is null" - cdc_updated_order_pattern = f", {col_cdc_updated_at} desc" - cdc_cols = ( - f", {cast_begin}{col_cdc_deleted_at}{cast_as}" - + "{{ dbt_utils.type_string() }}" - + f"{cast_end}" - + f", {cast_begin}{col_cdc_updated_at}{cast_as}" - + "{{ dbt_utils.type_string() }}" - + f"{cast_end}" - ) - quoted_cdc_cols = f", {quoted_col_cdc_deleted_at}, {quoted_col_cdc_updated_at}" - - if "_ab_cdc_log_pos" in column_names.keys(): - col_cdc_log_pos = self.name_transformer.normalize_column_name("_ab_cdc_log_pos") - quoted_col_cdc_log_pos = self.name_transformer.normalize_column_name("_ab_cdc_log_pos", in_jinja=True) - cdc_updated_order_pattern += f", {col_cdc_log_pos} desc" - cdc_cols += f", {cast_begin}{col_cdc_log_pos}{cast_as}" + "{{ dbt_utils.type_string() }}" + f"{cast_end}" - quoted_cdc_cols += f", {quoted_col_cdc_log_pos}" - - sql = template.render( - order_null=order_null, - airbyte_start_at=self.name_transformer.normalize_column_name("_airbyte_start_at"), - quoted_airbyte_start_at=self.name_transformer.normalize_column_name("_airbyte_start_at", in_jinja=True), - airbyte_end_at=self.name_transformer.normalize_column_name("_airbyte_end_at"), - active_row=self.name_transformer.normalize_column_name("_airbyte_active_row"), - airbyte_row_num=self.name_transformer.normalize_column_name("_airbyte_row_num"), - quoted_airbyte_row_num=self.name_transformer.normalize_column_name("_airbyte_row_num", in_jinja=True), - airbyte_unique_key_scd=self.name_transformer.normalize_column_name(f"{self.airbyte_unique_key}_scd"), - unique_key=self.get_unique_key(), - quoted_unique_key=self.get_unique_key(in_jinja=True), - col_ab_id=self.get_ab_id(), - col_emitted_at=self.get_emitted_at(), - quoted_col_emitted_at=self.get_emitted_at(in_jinja=True), - col_normalized_at=self.get_normalized_at(), - parent_hash_id=self.parent_hash_id(), - fields=self.list_fields(column_names), - cursor_field=self.get_cursor_field(column_names), - primary_keys=self.list_primary_keys(column_names), - primary_key_partition=self.get_primary_key_partition(column_names), - hash_id=self.hash_id(), - from_table=from_table, - sql_table_comment=self.sql_table_comment(include_from_table=True), - cdc_active_row=cdc_active_row_pattern, - cdc_updated_at_order=cdc_updated_order_pattern, - cdc_cols=cdc_cols, - quoted_cdc_cols=quoted_cdc_cols, - lag_begin=lag_begin, - lag_end=lag_end, - enable_left_join_null=enable_left_join_null, - ) +""" + ).render(jinja_variables) return sql def get_cursor_field(self, column_names: Dict[str, Tuple[str, str]], in_jinja: bool = False) -> str: diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java index c2454395b6501..adcee9eb3b7a4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java @@ -14,7 +14,7 @@ public class NormalizationRunnerFactory { public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization"; - public static final String NORMALIZATION_VERSION = "0.1.61"; + public static final String NORMALIZATION_VERSION = "0.1.62"; static final Map> NORMALIZATION_MAPPING = ImmutableMap.>builder() diff --git a/build.gradle b/build.gradle index a6e51434a74c6..91393c72c57f0 100644 --- a/build.gradle +++ b/build.gradle @@ -82,6 +82,7 @@ def createSpotlessTarget = { pattern -> 'dbt-project-template-mssql', 'dbt-project-template-mysql', 'dbt-project-template-oracle', + 'dbt-project-template-clickhouse', 'dbt_test_config', 'normalization_test_output', 'tools', diff --git a/settings.gradle b/settings.gradle index 56da8a917448b..b50a3f1933dea 100644 --- a/settings.gradle +++ b/settings.gradle @@ -97,6 +97,7 @@ if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD" include ':airbyte-integrations:connectors:destination-snowflake' include ':airbyte-integrations:connectors:destination-oracle' include ':airbyte-integrations:connectors:destination-mssql' + include ':airbyte-integrations:connectors:destination-clickhouse' //Needed by destination-bigquery include ':airbyte-integrations:connectors:destination-s3'