From c5d4a973631ccae7918b9d7881f875a265f30619 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Thu, 6 Jan 2022 18:59:09 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Fix=20normalization=20issue=20wi?= =?UTF-8?q?th=20quoted=20&=20case=20sensitive=20columns=20(#9317)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bases/base-normalization/.gitignore | 1 + .../multiple_column_names_conflicts_stg.sql | 83 +++++++++++++++++ .../models/generated/sources.yml | 1 + .../test_normalization/dedup_cdc_excluded.sql | 34 ------- .../multiple_column_names_conflicts_stg.sql | 86 +++++++++++++++++ .../dedup_cdc_excluded_ab1.sql | 21 ----- .../dedup_cdc_excluded_ab2.sql | 21 ----- .../test_normalization/exchange_rate_ab1.sql | 24 ----- .../test_normalization/exchange_rate_ab2.sql | 24 ----- .../test_normalization/exchange_rate_ab3.sql | 24 ----- .../test_normalization/pos_dedup_cdcx_ab1.sql | 22 ----- .../test_normalization/pos_dedup_cdcx_ab2.sql | 22 ----- .../test_normalization/dedup_cdc_excluded.sql | 24 ----- .../models/generated/sources.yml | 1 + .../dedup_cdc_excluded_scd.sql | 5 - .../test_normalization/dedup_cdc_excluded.sql | 5 - .../multiple_column_names_conflicts_stg.sql | 76 +++++++++++++++ .../models/generated/sources.yml | 1 + .../multiple_column_names_conflicts_stg.sql | 72 ++++++++++++++ .../models/generated/sources.yml | 1 + .../multiple_column_names_conflicts_stg.sql | 87 +++++++++++++++++ .../models/generated/sources.yml | 1 + .../multiple_column_names_conflicts_scd.sql | 93 +++++++++++++++++++ .../multiple_column_names_conflicts.sql | 28 ++++++ .../multiple_column_names_conflicts_stg.sql | 85 +++++++++++++++++ .../multiple_column_names_conflicts_ab1.sql | 24 +++++ .../multiple_column_names_conflicts_ab2.sql | 24 +++++ .../multiple_column_names_conflicts_scd.sql} | 80 ++++++++-------- .../multiple_column_names_conflicts.sql | 27 ++++++ .../multiple_column_names_conflicts_stg.sql | 24 +++++ .../models/generated/sources.yml | 1 + .../multiple_column_names_conflicts_scd.sql | 14 +++ .../multiple_column_names_conflicts.sql | 14 +++ .../multiple_column_names_conflicts_stg.sql | 14 +++ .../multiple_column_names_conflicts_stg.sql | 62 +++++++++++++ .../models/generated/sources.yml | 1 + .../MULTIPLE_COLUMN_NAMES_CONFLICTS_STG.sql | 81 ++++++++++++++++ .../models/generated/sources.yml | 1 + .../data_input/catalog.json | 38 ++++++++ .../data_input/messages.txt | 2 + .../destination_name_transformer.py | 38 ++++++++ .../transform_catalog/stream_processor.py | 8 +- .../src/main/resources/spec.json | 2 +- 43 files changed, 1030 insertions(+), 267 deletions(-) create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab1.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab2.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab1.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab2.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab3.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab1.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab2.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql delete mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts_stg.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/multiple_column_names_conflicts_ab1.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/multiple_column_names_conflicts_ab2.sql rename airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/{clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql => postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql} (53%) create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/multiple_column_names_conflicts.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/multiple_column_names_conflicts_stg.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts_stg.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql create mode 100644 airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/first_output/airbyte_views/TEST_NORMALIZATION/MULTIPLE_COLUMN_NAMES_CONFLICTS_STG.sql diff --git a/airbyte-integrations/bases/base-normalization/.gitignore b/airbyte-integrations/bases/base-normalization/.gitignore index 707446495c018..5e426c453be52 100644 --- a/airbyte-integrations/bases/base-normalization/.gitignore +++ b/airbyte-integrations/bases/base-normalization/.gitignore @@ -20,6 +20,7 @@ integration_tests/normalization_test_output/**/*.yml # Simple Streams !integration_tests/normalization_test_output/**/dedup_exchange_rate*.sql !integration_tests/normalization_test_output/**/exchange_rate.sql +!integration_tests/normalization_test_output/**/test_simple_streams/first_output/airbyte_views/**/multiple_column_names_conflicts_stg.sql # Nested Streams # Parent table !integration_tests/normalization_test_output/**/nested_stream_with*_names_ab*.sql diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql new file mode 100644 index 0000000000000..f5079fc4f3003 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql @@ -0,0 +1,83 @@ + + + create or replace view `dataline-integration-testing`._airbyte_test_normalization.`multiple_column_names_conflicts_stg` + OPTIONS() + as +with __dbt__cte__multiple_column_names_conflicts_ab1 as ( + +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: `dataline-integration-testing`.test_normalization._airbyte_raw_multiple_column_names_conflicts +select + json_extract_scalar(_airbyte_data, "$['id']") as id, + json_extract_scalar(_airbyte_data, "$['User Id']") as User_Id, + json_extract_scalar(_airbyte_data, "$['user_id']") as user_id_1, + json_extract_scalar(_airbyte_data, "$['User id']") as User_id_2, + json_extract_scalar(_airbyte_data, "$['user id']") as user_id_3, + json_extract_scalar(_airbyte_data, "$['User@Id']") as User_Id_4, + json_extract_scalar(_airbyte_data, "$['UserId']") as UserId, + _airbyte_ab_id, + _airbyte_emitted_at, + CURRENT_TIMESTAMP() as _airbyte_normalized_at +from `dataline-integration-testing`.test_normalization._airbyte_raw_multiple_column_names_conflicts as table_alias +-- multiple_column_names_conflicts +where 1 = 1 + +), __dbt__cte__multiple_column_names_conflicts_ab2 as ( + +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__multiple_column_names_conflicts_ab1 +select + cast(id as + int64 +) as id, + cast(User_Id as + string +) as User_Id, + cast(user_id_1 as + float64 +) as user_id_1, + cast(User_id_2 as + float64 +) as User_id_2, + cast(user_id_3 as + float64 +) as user_id_3, + cast(User_Id_4 as + string +) as User_Id_4, + cast(UserId as + float64 +) as UserId, + _airbyte_ab_id, + _airbyte_emitted_at, + CURRENT_TIMESTAMP() as _airbyte_normalized_at +from __dbt__cte__multiple_column_names_conflicts_ab1 +-- multiple_column_names_conflicts +where 1 = 1 + +)-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__multiple_column_names_conflicts_ab2 +select + to_hex(md5(cast(concat(coalesce(cast(id as + string +), ''), '-', coalesce(cast(User_Id as + string +), ''), '-', coalesce(cast(user_id_1 as + string +), ''), '-', coalesce(cast(User_id_2 as + string +), ''), '-', coalesce(cast(user_id_3 as + string +), ''), '-', coalesce(cast(User_Id_4 as + string +), ''), '-', coalesce(cast(UserId as + string +), '')) as + string +))) as _airbyte_multiple_column_names_conflicts_hashid, + tmp.* +from __dbt__cte__multiple_column_names_conflicts_ab2 tmp +-- multiple_column_names_conflicts +where 1 = 1 +; + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/sources.yml index 45c338b893cab..0e116b2bbec5d 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/sources.yml @@ -9,5 +9,6 @@ sources: - name: _airbyte_raw_dedup_cdc_excluded - name: _airbyte_raw_dedup_exchange_rate - name: _airbyte_raw_exchange_rate + - name: _airbyte_raw_multiple_column_names_conflicts - name: _airbyte_raw_pos_dedup_cdcx - name: _airbyte_raw_renamed_dedup_cdc_excluded diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql deleted file mode 100644 index 8aea31930d35c..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql +++ /dev/null @@ -1,34 +0,0 @@ - - - - - create table test_normalization.dedup_cdc_excluded - - - - engine = MergeTree() - - order by (tuple()) - - as ( - --- Final base SQL model --- depends_on: test_normalization.dedup_cdc_excluded_scd -select - _airbyte_unique_key, - id, - name, - _ab_cdc_lsn, - _ab_cdc_updated_at, - _ab_cdc_deleted_at, - _airbyte_ab_id, - _airbyte_emitted_at, - now() as _airbyte_normalized_at, - _airbyte_dedup_cdc_excluded_hashid -from test_normalization.dedup_cdc_excluded_scd --- dedup_cdc_excluded from test_normalization._airbyte_raw_dedup_cdc_excluded -where 1 = 1 -and _airbyte_active_row = 1 - - ) - \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql new file mode 100644 index 0000000000000..b22cc4439922a --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql @@ -0,0 +1,86 @@ + + + create view _airbyte_test_normalization.multiple_column_names_conflicts_stg__dbt_tmp + + as ( + +with __dbt__cte__multiple_column_names_conflicts_ab1 as ( + +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: test_normalization._airbyte_raw_multiple_column_names_conflicts +select + JSONExtractRaw(_airbyte_data, 'id') as id, + JSONExtractRaw(_airbyte_data, 'User Id') as "User Id", + JSONExtractRaw(_airbyte_data, 'user_id') as user_id, + JSONExtractRaw(_airbyte_data, 'User id') as "User id", + JSONExtractRaw(_airbyte_data, 'user id') as "user id", + JSONExtractRaw(_airbyte_data, 'User@Id') as "User@Id", + JSONExtractRaw(_airbyte_data, 'UserId') as UserId, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from test_normalization._airbyte_raw_multiple_column_names_conflicts as table_alias +-- multiple_column_names_conflicts +where 1 = 1 + +), __dbt__cte__multiple_column_names_conflicts_ab2 as ( + +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__multiple_column_names_conflicts_ab1 +select + accurateCastOrNull(id, ' + BIGINT +') as id, + nullif(accurateCastOrNull(trim(BOTH '"' from "User Id"), 'String'), 'null') as "User Id", + accurateCastOrNull(user_id, ' + Float64 +') as user_id, + accurateCastOrNull("User id", ' + Float64 +') as "User id", + accurateCastOrNull("user id", ' + Float64 +') as "user id", + nullif(accurateCastOrNull(trim(BOTH '"' from "User@Id"), 'String'), 'null') as "User@Id", + accurateCastOrNull(UserId, ' + Float64 +') as UserId, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from __dbt__cte__multiple_column_names_conflicts_ab1 +-- multiple_column_names_conflicts +where 1 = 1 + +)-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__multiple_column_names_conflicts_ab2 +select + assumeNotNull(hex(MD5( + + toString(id) || '~' || + + + toString("User Id") || '~' || + + + toString(user_id) || '~' || + + + toString("User id") || '~' || + + + toString("user id") || '~' || + + + toString("User@Id") || '~' || + + + toString(UserId) + + ))) as _airbyte_multiple_co__ames_conflicts_hashid, + tmp.* +from __dbt__cte__multiple_column_names_conflicts_ab2 tmp +-- multiple_column_names_conflicts +where 1 = 1 + + ) \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab1.sql deleted file mode 100644 index 5b9ee4b6b6820..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab1.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config( - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema --- depends_on: {{ source('test_normalization', '_airbyte_raw_dedup_cdc_excluded') }} -select - {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as id, - {{ json_extract_scalar('_airbyte_data', ['name'], ['name']) }} as name, - {{ json_extract_scalar('_airbyte_data', ['_ab_cdc_lsn'], ['_ab_cdc_lsn']) }} as _ab_cdc_lsn, - {{ json_extract_scalar('_airbyte_data', ['_ab_cdc_updated_at'], ['_ab_cdc_updated_at']) }} as _ab_cdc_updated_at, - {{ json_extract_scalar('_airbyte_data', ['_ab_cdc_deleted_at'], ['_ab_cdc_deleted_at']) }} as _ab_cdc_deleted_at, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at -from {{ source('test_normalization', '_airbyte_raw_dedup_cdc_excluded') }} as table_alias --- dedup_cdc_excluded -where 1 = 1 -{{ incremental_clause('_airbyte_emitted_at') }} - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab2.sql deleted file mode 100644 index 6f7e747a0699a..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/dedup_cdc_excluded_ab2.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ config( - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to cast each column to its adequate SQL type converted from the JSON schema type --- depends_on: {{ ref('dedup_cdc_excluded_ab1') }} -select - accurateCastOrNull(id, '{{ dbt_utils.type_bigint() }}') as id, - nullif(accurateCastOrNull(trim(BOTH '"' from name), '{{ dbt_utils.type_string() }}'), 'null') as name, - accurateCastOrNull(_ab_cdc_lsn, '{{ dbt_utils.type_float() }}') as _ab_cdc_lsn, - accurateCastOrNull(_ab_cdc_updated_at, '{{ dbt_utils.type_float() }}') as _ab_cdc_updated_at, - accurateCastOrNull(_ab_cdc_deleted_at, '{{ dbt_utils.type_float() }}') as _ab_cdc_deleted_at, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at -from {{ ref('dedup_cdc_excluded_ab1') }} --- dedup_cdc_excluded -where 1 = 1 -{{ incremental_clause('_airbyte_emitted_at') }} - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab1.sql deleted file mode 100644 index f9b9da32d25d1..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab1.sql +++ /dev/null @@ -1,24 +0,0 @@ -{{ config( - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema --- depends_on: {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} -select - {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as id, - {{ json_extract_scalar('_airbyte_data', ['currency'], ['currency']) }} as currency, - {{ json_extract_scalar('_airbyte_data', ['date'], ['date']) }} as date, - {{ json_extract_scalar('_airbyte_data', ['timestamp_col'], ['timestamp_col']) }} as timestamp_col, - {{ json_extract_scalar('_airbyte_data', ['HKD@spéçiäl & characters'], ['HKD@spéçiäl & characters']) }} as {{ quote('HKD@spéçiäl & characters') }}, - {{ json_extract_scalar('_airbyte_data', ['HKD_special___characters'], ['HKD_special___characters']) }} as HKD_special___characters, - {{ json_extract_scalar('_airbyte_data', ['NZD'], ['NZD']) }} as NZD, - {{ json_extract_scalar('_airbyte_data', ['USD'], ['USD']) }} as USD, - {{ json_extract_scalar('_airbyte_data', ['column`_\'with"_quotes'], ['column___with__quotes']) }} as {{ quote('column`_\'with""_quotes') }}, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at -from {{ source('test_normalization', '_airbyte_raw_exchange_rate') }} as table_alias --- exchange_rate -where 1 = 1 - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab2.sql deleted file mode 100644 index 49cb5ea4c759b..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab2.sql +++ /dev/null @@ -1,24 +0,0 @@ -{{ config( - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to cast each column to its adequate SQL type converted from the JSON schema type --- depends_on: {{ ref('exchange_rate_ab1') }} -select - accurateCastOrNull(id, '{{ dbt_utils.type_bigint() }}') as id, - nullif(accurateCastOrNull(trim(BOTH '"' from currency), '{{ dbt_utils.type_string() }}'), 'null') as currency, - parseDateTimeBestEffortOrNull(trim(BOTH '"' from {{ empty_string_to_null('date') }})) as date, - parseDateTime64BestEffortOrNull(trim(BOTH '"' from {{ empty_string_to_null('timestamp_col') }})) as timestamp_col, - accurateCastOrNull({{ quote('HKD@spéçiäl & characters') }}, '{{ dbt_utils.type_float() }}') as {{ quote('HKD@spéçiäl & characters') }}, - nullif(accurateCastOrNull(trim(BOTH '"' from HKD_special___characters), '{{ dbt_utils.type_string() }}'), 'null') as HKD_special___characters, - accurateCastOrNull(NZD, '{{ dbt_utils.type_float() }}') as NZD, - accurateCastOrNull(USD, '{{ dbt_utils.type_float() }}') as USD, - nullif(accurateCastOrNull(trim(BOTH '"' from {{ quote('column`_\'with""_quotes') }}), '{{ dbt_utils.type_string() }}'), 'null') as {{ quote('column`_\'with""_quotes') }}, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at -from {{ ref('exchange_rate_ab1') }} --- exchange_rate -where 1 = 1 - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab3.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab3.sql deleted file mode 100644 index c45103fae85c5..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/exchange_rate_ab3.sql +++ /dev/null @@ -1,24 +0,0 @@ -{{ config( - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to build a hash column based on the values of this record --- depends_on: {{ ref('exchange_rate_ab2') }} -select - {{ dbt_utils.surrogate_key([ - 'id', - 'currency', - 'date', - 'timestamp_col', - quote('HKD@spéçiäl & characters'), - 'HKD_special___characters', - 'NZD', - 'USD', - quote('column`_\'with""_quotes'), - ]) }} as _airbyte_exchange_rate_hashid, - tmp.* -from {{ ref('exchange_rate_ab2') }} tmp --- exchange_rate -where 1 = 1 - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab1.sql deleted file mode 100644 index 909b7bd2366b6..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab1.sql +++ /dev/null @@ -1,22 +0,0 @@ -{{ config( - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema --- depends_on: {{ source('test_normalization', '_airbyte_raw_pos_dedup_cdcx') }} -select - {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as id, - {{ json_extract_scalar('_airbyte_data', ['name'], ['name']) }} as name, - {{ json_extract_scalar('_airbyte_data', ['_ab_cdc_lsn'], ['_ab_cdc_lsn']) }} as _ab_cdc_lsn, - {{ json_extract_scalar('_airbyte_data', ['_ab_cdc_updated_at'], ['_ab_cdc_updated_at']) }} as _ab_cdc_updated_at, - {{ json_extract_scalar('_airbyte_data', ['_ab_cdc_deleted_at'], ['_ab_cdc_deleted_at']) }} as _ab_cdc_deleted_at, - {{ json_extract_scalar('_airbyte_data', ['_ab_cdc_log_pos'], ['_ab_cdc_log_pos']) }} as _ab_cdc_log_pos, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at -from {{ source('test_normalization', '_airbyte_raw_pos_dedup_cdcx') }} as table_alias --- pos_dedup_cdcx -where 1 = 1 -{{ incremental_clause('_airbyte_emitted_at') }} - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab2.sql deleted file mode 100644 index 0b9192b2620a4..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_ctes/test_normalization/pos_dedup_cdcx_ab2.sql +++ /dev/null @@ -1,22 +0,0 @@ -{{ config( - unique_key = '_airbyte_ab_id', - schema = "_airbyte_test_normalization", - tags = [ "top-level-intermediate" ] -) }} --- SQL model to cast each column to its adequate SQL type converted from the JSON schema type --- depends_on: {{ ref('pos_dedup_cdcx_ab1') }} -select - accurateCastOrNull(id, '{{ dbt_utils.type_bigint() }}') as id, - nullif(accurateCastOrNull(trim(BOTH '"' from name), '{{ dbt_utils.type_string() }}'), 'null') as name, - accurateCastOrNull(_ab_cdc_lsn, '{{ dbt_utils.type_float() }}') as _ab_cdc_lsn, - accurateCastOrNull(_ab_cdc_updated_at, '{{ dbt_utils.type_float() }}') as _ab_cdc_updated_at, - accurateCastOrNull(_ab_cdc_deleted_at, '{{ dbt_utils.type_float() }}') as _ab_cdc_deleted_at, - accurateCastOrNull(_ab_cdc_log_pos, '{{ dbt_utils.type_float() }}') as _ab_cdc_log_pos, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at -from {{ ref('pos_dedup_cdcx_ab1') }} --- pos_dedup_cdcx -where 1 = 1 -{{ incremental_clause('_airbyte_emitted_at') }} - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql deleted file mode 100644 index 6a6248e7cb6a8..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql +++ /dev/null @@ -1,24 +0,0 @@ -{{ config( - unique_key = "_airbyte_unique_key", - schema = "test_normalization", - tags = [ "top-level" ] -) }} --- Final base SQL model --- depends_on: {{ ref('dedup_cdc_excluded_scd') }} -select - _airbyte_unique_key, - id, - name, - _ab_cdc_lsn, - _ab_cdc_updated_at, - _ab_cdc_deleted_at, - _airbyte_ab_id, - _airbyte_emitted_at, - {{ current_timestamp() }} as _airbyte_normalized_at, - _airbyte_dedup_cdc_excluded_hashid -from {{ ref('dedup_cdc_excluded_scd') }} --- dedup_cdc_excluded from {{ source('test_normalization', '_airbyte_raw_dedup_cdc_excluded') }} -where 1 = 1 -and _airbyte_active_row = 1 -{{ incremental_clause('_airbyte_emitted_at') }} - diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/sources.yml index 45c338b893cab..0e116b2bbec5d 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/sources.yml @@ -9,5 +9,6 @@ sources: - name: _airbyte_raw_dedup_cdc_excluded - name: _airbyte_raw_dedup_exchange_rate - name: _airbyte_raw_exchange_rate + - name: _airbyte_raw_multiple_column_names_conflicts - name: _airbyte_raw_pos_dedup_cdcx - name: _airbyte_raw_renamed_dedup_cdc_excluded diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql deleted file mode 100644 index 029806e67c97d..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql +++ /dev/null @@ -1,5 +0,0 @@ - - insert into test_normalization.dedup_cdc_excluded_scd ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "name", "_ab_cdc_lsn", "_ab_cdc_updated_at", "_ab_cdc_deleted_at", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_cdc_excluded_hashid") - select "_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "name", "_ab_cdc_lsn", "_ab_cdc_updated_at", "_ab_cdc_deleted_at", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_cdc_excluded_hashid" - from dedup_cdc_excluded_scd__dbt_tmp - \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql deleted file mode 100644 index bd7ed508ea036..0000000000000 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/second_output/airbyte_incremental/test_normalization/dedup_cdc_excluded.sql +++ /dev/null @@ -1,5 +0,0 @@ - - insert into test_normalization.dedup_cdc_excluded ("_airbyte_unique_key", "id", "name", "_ab_cdc_lsn", "_ab_cdc_updated_at", "_ab_cdc_deleted_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_cdc_excluded_hashid") - select "_airbyte_unique_key", "id", "name", "_ab_cdc_lsn", "_ab_cdc_updated_at", "_ab_cdc_deleted_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_cdc_excluded_hashid" - from dedup_cdc_excluded__dbt_tmp - \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql new file mode 100644 index 0000000000000..6f5a0cc6eb2d8 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql @@ -0,0 +1,76 @@ +USE [test_normalization]; + execute('create view _airbyte_test_normalization."multiple_column_names_conflicts_stg__dbt_tmp" as + +with __dbt__cte__multiple_column_names_conflicts_ab1 as ( + +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: "test_normalization".test_normalization._airbyte_raw_multiple_column_names_conflicts +select + json_value(_airbyte_data, ''$."id"'') as id, + json_value(_airbyte_data, ''$."User Id"'') as "User Id", + json_value(_airbyte_data, ''$."user_id"'') as user_id, + json_value(_airbyte_data, ''$."User id"'') as "User id_1", + json_value(_airbyte_data, ''$."user id"'') as "user id_2", + json_value(_airbyte_data, ''$."User@Id"'') as "User@Id", + json_value(_airbyte_data, ''$."UserId"'') as userid, + _airbyte_ab_id, + _airbyte_emitted_at, + SYSDATETIME() as _airbyte_normalized_at +from "test_normalization".test_normalization._airbyte_raw_multiple_column_names_conflicts as table_alias +-- multiple_column_names_conflicts +where 1 = 1 + +), __dbt__cte__multiple_column_names_conflicts_ab2 as ( + +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__multiple_column_names_conflicts_ab1 +select + cast(id as + bigint +) as id, + cast("User Id" as + VARCHAR(max)) as "User Id", + cast(user_id as + float +) as user_id, + cast("User id_1" as + float +) as "User id_1", + cast("user id_2" as + float +) as "user id_2", + cast("User@Id" as + VARCHAR(max)) as "User@Id", + cast(userid as + float +) as userid, + _airbyte_ab_id, + _airbyte_emitted_at, + SYSDATETIME() as _airbyte_normalized_at +from __dbt__cte__multiple_column_names_conflicts_ab1 +-- multiple_column_names_conflicts +where 1 = 1 + +)-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__multiple_column_names_conflicts_ab2 +select + convert(varchar(32), HashBytes(''md5'', coalesce(cast( + + + + concat(concat(coalesce(cast(id as + VARCHAR(max)), ''''), ''-'', coalesce(cast("User Id" as + VARCHAR(max)), ''''), ''-'', coalesce(cast(user_id as + VARCHAR(max)), ''''), ''-'', coalesce(cast("User id_1" as + VARCHAR(max)), ''''), ''-'', coalesce(cast("user id_2" as + VARCHAR(max)), ''''), ''-'', coalesce(cast("User@Id" as + VARCHAR(max)), ''''), ''-'', coalesce(cast(userid as + VARCHAR(max)), ''''),''''), '''') as + VARCHAR(max)), '''')), 2) as _airbyte_multiple_col__ames_conflicts_hashid, + tmp.* +from __dbt__cte__multiple_column_names_conflicts_ab2 tmp +-- multiple_column_names_conflicts +where 1 = 1 + + '); + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/models/generated/sources.yml index 45c338b893cab..0e116b2bbec5d 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mssql/test_simple_streams/models/generated/sources.yml @@ -9,5 +9,6 @@ sources: - name: _airbyte_raw_dedup_cdc_excluded - name: _airbyte_raw_dedup_exchange_rate - name: _airbyte_raw_exchange_rate + - name: _airbyte_raw_multiple_column_names_conflicts - name: _airbyte_raw_pos_dedup_cdcx - name: _airbyte_raw_renamed_dedup_cdc_excluded diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql new file mode 100644 index 0000000000000..ba1ab02165406 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql @@ -0,0 +1,72 @@ + + create view _airbyte_test_normalization.`multiple_column_names_conflicts_stg__dbt_tmp` as ( + +with __dbt__CTE__multiple_column_names_conflicts_ab1 as ( + +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: test_normalization._airbyte_raw_multiple_column_names_conflicts +select + json_value(_airbyte_data, + '$."id"') as id, + json_value(_airbyte_data, + '$."User Id"') as `User Id`, + json_value(_airbyte_data, + '$."user_id"') as user_id, + json_value(_airbyte_data, + '$."User id"') as `User id_1`, + json_value(_airbyte_data, + '$."user id"') as `user id_2`, + json_value(_airbyte_data, + '$."User@Id"') as `User@Id`, + json_value(_airbyte_data, + '$."UserId"') as userid, + _airbyte_ab_id, + _airbyte_emitted_at, + + CURRENT_TIMESTAMP + as _airbyte_normalized_at +from test_normalization._airbyte_raw_multiple_column_names_conflicts as table_alias +-- multiple_column_names_conflicts +where 1 = 1 + +), __dbt__CTE__multiple_column_names_conflicts_ab2 as ( + +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__CTE__multiple_column_names_conflicts_ab1 +select + cast(id as + signed +) as id, + cast(`User Id` as char) as `User Id`, + cast(user_id as + float +) as user_id, + cast(`User id_1` as + float +) as `User id_1`, + cast(`user id_2` as + float +) as `user id_2`, + cast(`User@Id` as char) as `User@Id`, + cast(userid as + float +) as userid, + _airbyte_ab_id, + _airbyte_emitted_at, + + CURRENT_TIMESTAMP + as _airbyte_normalized_at +from __dbt__CTE__multiple_column_names_conflicts_ab1 +-- multiple_column_names_conflicts +where 1 = 1 + +)-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__CTE__multiple_column_names_conflicts_ab2 +select + md5(cast(concat(coalesce(cast(id as char), ''), '-', coalesce(cast(`User Id` as char), ''), '-', coalesce(cast(user_id as char), ''), '-', coalesce(cast(`User id_1` as char), ''), '-', coalesce(cast(`user id_2` as char), ''), '-', coalesce(cast(`User@Id` as char), ''), '-', coalesce(cast(userid as char), '')) as char)) as _airbyte_multiple_col__ames_conflicts_hashid, + tmp.* +from __dbt__CTE__multiple_column_names_conflicts_ab2 tmp +-- multiple_column_names_conflicts +where 1 = 1 + + ); diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/models/generated/sources.yml index 45c338b893cab..0e116b2bbec5d 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/mysql/test_simple_streams/models/generated/sources.yml @@ -9,5 +9,6 @@ sources: - name: _airbyte_raw_dedup_cdc_excluded - name: _airbyte_raw_dedup_exchange_rate - name: _airbyte_raw_exchange_rate + - name: _airbyte_raw_multiple_column_names_conflicts - name: _airbyte_raw_pos_dedup_cdcx - name: _airbyte_raw_renamed_dedup_cdc_excluded diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql new file mode 100644 index 0000000000000..5c34c11584562 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql @@ -0,0 +1,87 @@ + + create view test_normalization.multiple_column_names_conflicts_stg__dbt_tmp as + +with dbt__cte__multiple_column_names_conflicts_ab1__ as ( + +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: test_normalization.airbyte_raw_multiple_column_names_conflicts +select + json_value("_AIRBYTE_DATA", '$."id"') as id, + json_value("_AIRBYTE_DATA", '$."User Id"') as user_id, + json_value("_AIRBYTE_DATA", '$."user_id"') as user_id_1, + json_value("_AIRBYTE_DATA", '$."User id"') as user_id_2, + json_value("_AIRBYTE_DATA", '$."user id"') as user_id_3, + json_value("_AIRBYTE_DATA", '$."User@Id"') as user_id_4, + json_value("_AIRBYTE_DATA", '$."UserId"') as userid, + "_AIRBYTE_AB_ID", + "_AIRBYTE_EMITTED_AT", + + CURRENT_TIMESTAMP + as "_AIRBYTE_NORMALIZED_AT" +from test_normalization.airbyte_raw_multiple_column_names_conflicts +-- multiple_column_names_conflicts +where 1 = 1 + +), dbt__cte__multiple_column_names_conflicts_ab2__ as ( + +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: dbt__cte__multiple_column_names_conflicts_ab1__ +select + cast(id as + numeric +) as id, + cast(user_id as varchar2(4000)) as user_id, + cast(user_id_1 as + float +) as user_id_1, + cast(user_id_2 as + float +) as user_id_2, + cast(user_id_3 as + float +) as user_id_3, + cast(user_id_4 as varchar2(4000)) as user_id_4, + cast(userid as + float +) as userid, + "_AIRBYTE_AB_ID", + "_AIRBYTE_EMITTED_AT", + + CURRENT_TIMESTAMP + as "_AIRBYTE_NORMALIZED_AT" +from dbt__cte__multiple_column_names_conflicts_ab1__ +-- multiple_column_names_conflicts +where 1 = 1 + +)-- SQL model to build a hash column based on the values of this record +-- depends_on: dbt__cte__multiple_column_names_conflicts_ab2__ +select + ora_hash( + + id || '~' || + + + user_id || '~' || + + + user_id_1 || '~' || + + + user_id_2 || '~' || + + + user_id_3 || '~' || + + + user_id_4 || '~' || + + + userid + + ) as "_AIRBYTE_MULTIPLE_COLUMN_NAMES_CONFLICTS_HASHID", + tmp.* +from dbt__cte__multiple_column_names_conflicts_ab2__ tmp +-- multiple_column_names_conflicts +where 1 = 1 + + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/models/generated/sources.yml index c6b4b6023ba5f..3faad76c57b34 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/oracle/test_simple_streams/models/generated/sources.yml @@ -9,5 +9,6 @@ sources: - name: airbyte_raw_dedup_cdc_excluded - name: airbyte_raw_dedup_exchange_rate - name: airbyte_raw_exchange_rate + - name: airbyte_raw_multiple_column_names_conflicts - name: airbyte_raw_pos_dedup_cdcx - name: airbyte_raw_renamed_dedup_cdc_excluded diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql new file mode 100644 index 0000000000000..e94644c18a173 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql @@ -0,0 +1,93 @@ + + + + create table "postgres".test_normalization."multiple_column_names_conflicts_scd" + as ( + +-- depends_on: ref('multiple_column_names_conflicts_stg') +with + +input_data as ( + select * + from "postgres"._airbyte_test_normalization."multiple_column_names_conflicts_stg" + -- multiple_column_names_conflicts from "postgres".test_normalization._airbyte_raw_multiple_column_names_conflicts +), + +scd_data as ( + -- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key + select + md5(cast(coalesce(cast("id" as + varchar +), '') as + varchar +)) as _airbyte_unique_key, + "id", + "User Id", + user_id, + "User id", + "user id", + "User@Id", + userid, + _airbyte_emitted_at as _airbyte_start_at, + lag(_airbyte_emitted_at) over ( + partition by "id" + order by + _airbyte_emitted_at is null asc, + _airbyte_emitted_at desc, + _airbyte_emitted_at desc + ) as _airbyte_end_at, + case when row_number() over ( + partition by "id" + order by + _airbyte_emitted_at is null asc, + _airbyte_emitted_at desc, + _airbyte_emitted_at desc + ) = 1 then 1 else 0 end as _airbyte_active_row, + _airbyte_ab_id, + _airbyte_emitted_at, + _airbyte_multiple_co__ames_conflicts_hashid + from input_data +), +dedup_data as ( + select + -- we need to ensure de-duplicated rows for merge/update queries + -- additionally, we generate a unique key for the scd table + row_number() over ( + partition by + _airbyte_unique_key, + _airbyte_start_at, + _airbyte_emitted_at + order by _airbyte_active_row desc, _airbyte_ab_id + ) as _airbyte_row_num, + md5(cast(coalesce(cast(_airbyte_unique_key as + varchar +), '') || '-' || coalesce(cast(_airbyte_start_at as + varchar +), '') || '-' || coalesce(cast(_airbyte_emitted_at as + varchar +), '') as + varchar +)) as _airbyte_unique_key_scd, + scd_data.* + from scd_data +) +select + _airbyte_unique_key, + _airbyte_unique_key_scd, + "id", + "User Id", + user_id, + "User id", + "user id", + "User@Id", + userid, + _airbyte_start_at, + _airbyte_end_at, + _airbyte_active_row, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at, + _airbyte_multiple_co__ames_conflicts_hashid +from dedup_data where _airbyte_row_num = 1 + ); + \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts.sql new file mode 100644 index 0000000000000..eba2d8af4fcee --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts.sql @@ -0,0 +1,28 @@ + + + + create table "postgres".test_normalization."multiple_column_names_conflicts" + as ( + +-- Final base SQL model +-- depends_on: "postgres".test_normalization."multiple_column_names_conflicts_scd" +select + _airbyte_unique_key, + "id", + "User Id", + user_id, + "User id", + "user id", + "User@Id", + userid, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at, + _airbyte_multiple_co__ames_conflicts_hashid +from "postgres".test_normalization."multiple_column_names_conflicts_scd" +-- multiple_column_names_conflicts from "postgres".test_normalization._airbyte_raw_multiple_column_names_conflicts +where 1 = 1 +and _airbyte_active_row = 1 + + ); + \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts_stg.sql new file mode 100644 index 0000000000000..7a2c133f995f7 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/first_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts_stg.sql @@ -0,0 +1,85 @@ + + + + create table "postgres"._airbyte_test_normalization."multiple_column_names_conflicts_stg" + as ( + +with __dbt__cte__multiple_column_names_conflicts_ab1 as ( + +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: "postgres".test_normalization._airbyte_raw_multiple_column_names_conflicts +select + jsonb_extract_path_text(_airbyte_data, 'id') as "id", + jsonb_extract_path_text(_airbyte_data, 'User Id') as "User Id", + jsonb_extract_path_text(_airbyte_data, 'user_id') as user_id, + jsonb_extract_path_text(_airbyte_data, 'User id') as "User id", + jsonb_extract_path_text(_airbyte_data, 'user id') as "user id", + jsonb_extract_path_text(_airbyte_data, 'User@Id') as "User@Id", + jsonb_extract_path_text(_airbyte_data, 'UserId') as userid, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from "postgres".test_normalization._airbyte_raw_multiple_column_names_conflicts as table_alias +-- multiple_column_names_conflicts +where 1 = 1 + +), __dbt__cte__multiple_column_names_conflicts_ab2 as ( + +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__multiple_column_names_conflicts_ab1 +select + cast("id" as + bigint +) as "id", + cast("User Id" as + varchar +) as "User Id", + cast(user_id as + float +) as user_id, + cast("User id" as + float +) as "User id", + cast("user id" as + float +) as "user id", + cast("User@Id" as + varchar +) as "User@Id", + cast(userid as + float +) as userid, + _airbyte_ab_id, + _airbyte_emitted_at, + now() as _airbyte_normalized_at +from __dbt__cte__multiple_column_names_conflicts_ab1 +-- multiple_column_names_conflicts +where 1 = 1 + +)-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__multiple_column_names_conflicts_ab2 +select + md5(cast(coalesce(cast("id" as + varchar +), '') || '-' || coalesce(cast("User Id" as + varchar +), '') || '-' || coalesce(cast(user_id as + varchar +), '') || '-' || coalesce(cast("User id" as + varchar +), '') || '-' || coalesce(cast("user id" as + varchar +), '') || '-' || coalesce(cast("User@Id" as + varchar +), '') || '-' || coalesce(cast(userid as + varchar +), '') as + varchar +)) as _airbyte_multiple_co__ames_conflicts_hashid, + tmp.* +from __dbt__cte__multiple_column_names_conflicts_ab2 tmp +-- multiple_column_names_conflicts +where 1 = 1 + + ); + \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/multiple_column_names_conflicts_ab1.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/multiple_column_names_conflicts_ab1.sql new file mode 100644 index 0000000000000..7268a550c1560 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/multiple_column_names_conflicts_ab1.sql @@ -0,0 +1,24 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + unique_key = '_airbyte_ab_id', + schema = "_airbyte_test_normalization", + tags = [ "top-level-intermediate" ] +) }} +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: {{ source('test_normalization', '_airbyte_raw_multiple_column_names_conflicts') }} +select + {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as {{ adapter.quote('id') }}, + {{ json_extract_scalar('_airbyte_data', ['User Id'], ['User Id']) }} as {{ adapter.quote('User Id') }}, + {{ json_extract_scalar('_airbyte_data', ['user_id'], ['user_id']) }} as user_id, + {{ json_extract_scalar('_airbyte_data', ['User id'], ['User id']) }} as {{ adapter.quote('User id') }}, + {{ json_extract_scalar('_airbyte_data', ['user id'], ['user id']) }} as {{ adapter.quote('user id') }}, + {{ json_extract_scalar('_airbyte_data', ['User@Id'], ['User@Id']) }} as {{ adapter.quote('User@Id') }}, + {{ json_extract_scalar('_airbyte_data', ['UserId'], ['UserId']) }} as userid, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at +from {{ source('test_normalization', '_airbyte_raw_multiple_column_names_conflicts') }} as table_alias +-- multiple_column_names_conflicts +where 1 = 1 +{{ incremental_clause('_airbyte_emitted_at') }} + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/multiple_column_names_conflicts_ab2.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/multiple_column_names_conflicts_ab2.sql new file mode 100644 index 0000000000000..afed155ffbd8d --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_ctes/test_normalization/multiple_column_names_conflicts_ab2.sql @@ -0,0 +1,24 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + unique_key = '_airbyte_ab_id', + schema = "_airbyte_test_normalization", + tags = [ "top-level-intermediate" ] +) }} +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: {{ ref('multiple_column_names_conflicts_ab1') }} +select + cast({{ adapter.quote('id') }} as {{ dbt_utils.type_bigint() }}) as {{ adapter.quote('id') }}, + cast({{ adapter.quote('User Id') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('User Id') }}, + cast(user_id as {{ dbt_utils.type_float() }}) as user_id, + cast({{ adapter.quote('User id') }} as {{ dbt_utils.type_float() }}) as {{ adapter.quote('User id') }}, + cast({{ adapter.quote('user id') }} as {{ dbt_utils.type_float() }}) as {{ adapter.quote('user id') }}, + cast({{ adapter.quote('User@Id') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('User@Id') }}, + cast(userid as {{ dbt_utils.type_float() }}) as userid, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at +from {{ ref('multiple_column_names_conflicts_ab1') }} +-- multiple_column_names_conflicts +where 1 = 1 +{{ incremental_clause('_airbyte_emitted_at') }} + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql similarity index 53% rename from airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql rename to airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql index 9e3c81ac18178..3fcf2e971cbc6 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/renamed_dedup_cdc_excluded_scd.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql @@ -1,18 +1,19 @@ {{ config( + indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}], unique_key = "_airbyte_unique_key_scd", schema = "test_normalization", - post_hook = ['drop view _airbyte_test_normalization.renamed_dedup_cdc_excluded_stg'], + post_hook = ['delete from _airbyte_test_normalization.multiple_column_names_conflicts_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.multiple_column_names_conflicts_stg)'], tags = [ "top-level" ] ) }} --- depends_on: ref('renamed_dedup_cdc_excluded_stg') +-- depends_on: ref('multiple_column_names_conflicts_stg') with {% if is_incremental() %} new_data as ( -- retrieve incremental "new" data select * - from {{ ref('renamed_dedup_cdc_excluded_stg') }} - -- renamed_dedup_cdc_excluded from {{ source('test_normalization', '_airbyte_raw_renamed_dedup_cdc_excluded') }} + from {{ ref('multiple_column_names_conflicts_stg') }} + -- multiple_column_names_conflicts from {{ source('test_normalization', '_airbyte_raw_multiple_column_names_conflicts') }} where 1 = 1 {{ incremental_clause('_airbyte_emitted_at') }} ), @@ -20,7 +21,7 @@ new_data_ids as ( -- build a subset of _airbyte_unique_key from rows that are new select distinct {{ dbt_utils.surrogate_key([ - 'id', + adapter.quote('id'), ]) }} as _airbyte_unique_key from new_data ), @@ -31,58 +32,58 @@ empty_new_data as ( previous_active_scd_data as ( -- retrieve "incomplete old" data that needs to be updated with an end date because of new changes select - {{ star_intersect(ref('renamed_dedup_cdc_excluded_stg'), this, from_alias='inc_data', intersect_alias='this_data') }} + {{ star_intersect(ref('multiple_column_names_conflicts_stg'), this, from_alias='inc_data', intersect_alias='this_data') }} from {{ this }} as this_data -- make a join with new_data using primary key to filter active data that need to be updated only join new_data_ids on this_data._airbyte_unique_key = new_data_ids._airbyte_unique_key -- force left join to NULL values (we just need to transfer column types only for the star_intersect macro on schema changes) - --left join empty_new_data as inc_data on this_data._airbyte_ab_id = inc_data._airbyte_ab_id + left join empty_new_data as inc_data on this_data._airbyte_ab_id = inc_data._airbyte_ab_id where _airbyte_active_row = 1 ), input_data as ( - select {{ dbt_utils.star(ref('renamed_dedup_cdc_excluded_stg')) }} from new_data + select {{ dbt_utils.star(ref('multiple_column_names_conflicts_stg')) }} from new_data union all - select {{ dbt_utils.star(ref('renamed_dedup_cdc_excluded_stg')) }} from previous_active_scd_data + select {{ dbt_utils.star(ref('multiple_column_names_conflicts_stg')) }} from previous_active_scd_data ), {% else %} input_data as ( select * - from {{ ref('renamed_dedup_cdc_excluded_stg') }} - -- renamed_dedup_cdc_excluded from {{ source('test_normalization', '_airbyte_raw_renamed_dedup_cdc_excluded') }} + from {{ ref('multiple_column_names_conflicts_stg') }} + -- multiple_column_names_conflicts from {{ source('test_normalization', '_airbyte_raw_multiple_column_names_conflicts') }} ), {% endif %} -input_data_with_active_row_num as ( - select *, - row_number() over ( - partition by id - order by - _ab_cdc_updated_at is null asc, - _ab_cdc_updated_at desc, - _airbyte_emitted_at desc - ) as _airbyte_active_row_num - from input_data -), scd_data as ( -- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key select {{ dbt_utils.surrogate_key([ - 'id', + adapter.quote('id'), ]) }} as _airbyte_unique_key, - id, - _ab_cdc_updated_at, - _ab_cdc_updated_at as _airbyte_start_at, - case when _airbyte_active_row_num = 1 then 1 else 0 end as _airbyte_active_row, - anyOrNull(_ab_cdc_updated_at) over ( - partition by id + {{ adapter.quote('id') }}, + {{ adapter.quote('User Id') }}, + user_id, + {{ adapter.quote('User id') }}, + {{ adapter.quote('user id') }}, + {{ adapter.quote('User@Id') }}, + userid, + _airbyte_emitted_at as _airbyte_start_at, + lag(_airbyte_emitted_at) over ( + partition by {{ adapter.quote('id') }} order by - _ab_cdc_updated_at is null asc, - _ab_cdc_updated_at desc, + _airbyte_emitted_at is null asc, + _airbyte_emitted_at desc, _airbyte_emitted_at desc - ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) as _airbyte_end_at, + ) as _airbyte_end_at, + case when row_number() over ( + partition by {{ adapter.quote('id') }} + order by + _airbyte_emitted_at is null asc, + _airbyte_emitted_at desc, + _airbyte_emitted_at desc + ) = 1 then 1 else 0 end as _airbyte_active_row, _airbyte_ab_id, _airbyte_emitted_at, - _airbyte_renamed_dedup_cdc_excluded_hashid - from input_data_with_active_row_num + _airbyte_multiple_co__ames_conflicts_hashid + from input_data ), dedup_data as ( select @@ -106,14 +107,19 @@ dedup_data as ( select _airbyte_unique_key, _airbyte_unique_key_scd, - id, - _ab_cdc_updated_at, + {{ adapter.quote('id') }}, + {{ adapter.quote('User Id') }}, + user_id, + {{ adapter.quote('User id') }}, + {{ adapter.quote('user id') }}, + {{ adapter.quote('User@Id') }}, + userid, _airbyte_start_at, _airbyte_end_at, _airbyte_active_row, _airbyte_ab_id, _airbyte_emitted_at, {{ current_timestamp() }} as _airbyte_normalized_at, - _airbyte_renamed_dedup_cdc_excluded_hashid + _airbyte_multiple_co__ames_conflicts_hashid from dedup_data where _airbyte_row_num = 1 diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/multiple_column_names_conflicts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/multiple_column_names_conflicts.sql new file mode 100644 index 0000000000000..9aa1f765c0c8f --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/multiple_column_names_conflicts.sql @@ -0,0 +1,27 @@ +{{ config( + indexes = [{'columns':['_airbyte_unique_key'],'unique':True}], + unique_key = "_airbyte_unique_key", + schema = "test_normalization", + tags = [ "top-level" ] +) }} +-- Final base SQL model +-- depends_on: {{ ref('multiple_column_names_conflicts_scd') }} +select + _airbyte_unique_key, + {{ adapter.quote('id') }}, + {{ adapter.quote('User Id') }}, + user_id, + {{ adapter.quote('User id') }}, + {{ adapter.quote('user id') }}, + {{ adapter.quote('User@Id') }}, + userid, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at, + _airbyte_multiple_co__ames_conflicts_hashid +from {{ ref('multiple_column_names_conflicts_scd') }} +-- multiple_column_names_conflicts from {{ source('test_normalization', '_airbyte_raw_multiple_column_names_conflicts') }} +where 1 = 1 +and _airbyte_active_row = 1 +{{ incremental_clause('_airbyte_emitted_at') }} + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/multiple_column_names_conflicts_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/multiple_column_names_conflicts_stg.sql new file mode 100644 index 0000000000000..85ac753575979 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/airbyte_incremental/test_normalization/multiple_column_names_conflicts_stg.sql @@ -0,0 +1,24 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + unique_key = '_airbyte_ab_id', + schema = "_airbyte_test_normalization", + tags = [ "top-level-intermediate" ] +) }} +-- SQL model to build a hash column based on the values of this record +-- depends_on: {{ ref('multiple_column_names_conflicts_ab2') }} +select + {{ dbt_utils.surrogate_key([ + adapter.quote('id'), + adapter.quote('User Id'), + 'user_id', + adapter.quote('User id'), + adapter.quote('user id'), + adapter.quote('User@Id'), + 'userid', + ]) }} as _airbyte_multiple_co__ames_conflicts_hashid, + tmp.* +from {{ ref('multiple_column_names_conflicts_ab2') }} tmp +-- multiple_column_names_conflicts +where 1 = 1 +{{ incremental_clause('_airbyte_emitted_at') }} + diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/sources.yml index 45c338b893cab..0e116b2bbec5d 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/models/generated/sources.yml @@ -9,5 +9,6 @@ sources: - name: _airbyte_raw_dedup_cdc_excluded - name: _airbyte_raw_dedup_exchange_rate - name: _airbyte_raw_exchange_rate + - name: _airbyte_raw_multiple_column_names_conflicts - name: _airbyte_raw_pos_dedup_cdcx - name: _airbyte_raw_renamed_dedup_cdc_excluded diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql new file mode 100644 index 0000000000000..77ba6202fe818 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/scd/test_normalization/multiple_column_names_conflicts_scd.sql @@ -0,0 +1,14 @@ + + delete + from "postgres".test_normalization."multiple_column_names_conflicts_scd" + where (_airbyte_unique_key_scd) in ( + select (_airbyte_unique_key_scd) + from "multiple_column_names_conflicts_scd__dbt_tmp" + ); + + insert into "postgres".test_normalization."multiple_column_names_conflicts_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "User Id", "user_id", "User id", "user id", "User@Id", "userid", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_multiple_co__ames_conflicts_hashid") + ( + select "_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "User Id", "user_id", "User id", "user id", "User@Id", "userid", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_multiple_co__ames_conflicts_hashid" + from "multiple_column_names_conflicts_scd__dbt_tmp" + ); + \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts.sql new file mode 100644 index 0000000000000..55bd30540c228 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts.sql @@ -0,0 +1,14 @@ + + delete + from "postgres".test_normalization."multiple_column_names_conflicts" + where (_airbyte_unique_key) in ( + select (_airbyte_unique_key) + from "multiple_column_names_conflicts__dbt_tmp" + ); + + insert into "postgres".test_normalization."multiple_column_names_conflicts" ("_airbyte_unique_key", "id", "User Id", "user_id", "User id", "user id", "User@Id", "userid", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_multiple_co__ames_conflicts_hashid") + ( + select "_airbyte_unique_key", "id", "User Id", "user_id", "User id", "user id", "User@Id", "userid", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_multiple_co__ames_conflicts_hashid" + from "multiple_column_names_conflicts__dbt_tmp" + ); + \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts_stg.sql new file mode 100644 index 0000000000000..9acf3e0a0ee38 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/postgres/test_simple_streams/second_output/airbyte_incremental/test_normalization/multiple_column_names_conflicts_stg.sql @@ -0,0 +1,14 @@ + + delete + from "postgres"._airbyte_test_normalization."multiple_column_names_conflicts_stg" + where (_airbyte_ab_id) in ( + select (_airbyte_ab_id) + from "multiple_column_names_conflicts_stg__dbt_tmp" + ); + + insert into "postgres"._airbyte_test_normalization."multiple_column_names_conflicts_stg" ("_airbyte_multiple_co__ames_conflicts_hashid", "id", "User Id", "user_id", "User id", "user id", "User@Id", "userid", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at") + ( + select "_airbyte_multiple_co__ames_conflicts_hashid", "id", "User Id", "user_id", "User id", "user id", "User@Id", "userid", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at" + from "multiple_column_names_conflicts_stg__dbt_tmp" + ); + \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql new file mode 100644 index 0000000000000..0777ba0c53932 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/first_output/airbyte_views/test_normalization/multiple_column_names_conflicts_stg.sql @@ -0,0 +1,62 @@ + + + create view "integrationtests"._airbyte_test_normalization."multiple_column_names_conflicts_stg__dbt_tmp" as ( + +with __dbt__cte__multiple_column_names_conflicts_ab1 as ( + +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: "integrationtests".test_normalization._airbyte_raw_multiple_column_names_conflicts +select + case when json_extract_path_text(_airbyte_data, 'id', true) != '' then json_extract_path_text(_airbyte_data, 'id', true) end as id, + case when json_extract_path_text(_airbyte_data, 'User Id', true) != '' then json_extract_path_text(_airbyte_data, 'User Id', true) end as "user id", + case when json_extract_path_text(_airbyte_data, 'user_id', true) != '' then json_extract_path_text(_airbyte_data, 'user_id', true) end as user_id, + case when json_extract_path_text(_airbyte_data, 'User id', true) != '' then json_extract_path_text(_airbyte_data, 'User id', true) end as "user id_1", + case when json_extract_path_text(_airbyte_data, 'user id', true) != '' then json_extract_path_text(_airbyte_data, 'user id', true) end as "user id_2", + case when json_extract_path_text(_airbyte_data, 'User@Id', true) != '' then json_extract_path_text(_airbyte_data, 'User@Id', true) end as "user@id", + case when json_extract_path_text(_airbyte_data, 'UserId', true) != '' then json_extract_path_text(_airbyte_data, 'UserId', true) end as userid, + _airbyte_ab_id, + _airbyte_emitted_at, + getdate() as _airbyte_normalized_at +from "integrationtests".test_normalization._airbyte_raw_multiple_column_names_conflicts as table_alias +-- multiple_column_names_conflicts +where 1 = 1 + +), __dbt__cte__multiple_column_names_conflicts_ab2 as ( + +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__multiple_column_names_conflicts_ab1 +select + cast(id as + bigint +) as id, + cast("user id" as varchar) as "user id", + cast(user_id as + float +) as user_id, + cast("user id_1" as + float +) as "user id_1", + cast("user id_2" as + float +) as "user id_2", + cast("user@id" as varchar) as "user@id", + cast(userid as + float +) as userid, + _airbyte_ab_id, + _airbyte_emitted_at, + getdate() as _airbyte_normalized_at +from __dbt__cte__multiple_column_names_conflicts_ab1 +-- multiple_column_names_conflicts +where 1 = 1 + +)-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__multiple_column_names_conflicts_ab2 +select + md5(cast(coalesce(cast(id as varchar), '') || '-' || coalesce(cast("user id" as varchar), '') || '-' || coalesce(cast(user_id as varchar), '') || '-' || coalesce(cast("user id_1" as varchar), '') || '-' || coalesce(cast("user id_2" as varchar), '') || '-' || coalesce(cast("user@id" as varchar), '') || '-' || coalesce(cast(userid as varchar), '') as varchar)) as _airbyte_multiple_column_names_conflicts_hashid, + tmp.* +from __dbt__cte__multiple_column_names_conflicts_ab2 tmp +-- multiple_column_names_conflicts +where 1 = 1 + + ) ; diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/models/generated/sources.yml index 45c338b893cab..0e116b2bbec5d 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/redshift/test_simple_streams/models/generated/sources.yml @@ -9,5 +9,6 @@ sources: - name: _airbyte_raw_dedup_cdc_excluded - name: _airbyte_raw_dedup_exchange_rate - name: _airbyte_raw_exchange_rate + - name: _airbyte_raw_multiple_column_names_conflicts - name: _airbyte_raw_pos_dedup_cdcx - name: _airbyte_raw_renamed_dedup_cdc_excluded diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/first_output/airbyte_views/TEST_NORMALIZATION/MULTIPLE_COLUMN_NAMES_CONFLICTS_STG.sql b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/first_output/airbyte_views/TEST_NORMALIZATION/MULTIPLE_COLUMN_NAMES_CONFLICTS_STG.sql new file mode 100644 index 0000000000000..c9a26f11445b2 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/first_output/airbyte_views/TEST_NORMALIZATION/MULTIPLE_COLUMN_NAMES_CONFLICTS_STG.sql @@ -0,0 +1,81 @@ + + create or replace view "AIRBYTE_DATABASE"._AIRBYTE_TEST_NORMALIZATION."MULTIPLE_COLUMN_NAMES_CONFLICTS_STG" as ( + +with __dbt__cte__MULTIPLE_COLUMN_NAMES_CONFLICTS_AB1 as ( + +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: "AIRBYTE_DATABASE".TEST_NORMALIZATION._AIRBYTE_RAW_MULTIPLE_COLUMN_NAMES_CONFLICTS +select + to_varchar(get_path(parse_json(_airbyte_data), '"id"')) as ID, + to_varchar(get_path(parse_json(_airbyte_data), '"User Id"')) as "User Id", + to_varchar(get_path(parse_json(_airbyte_data), '"user_id"')) as USER_ID, + to_varchar(get_path(parse_json(_airbyte_data), '"User id"')) as "User id", + to_varchar(get_path(parse_json(_airbyte_data), '"user id"')) as "user id", + to_varchar(get_path(parse_json(_airbyte_data), '"User@Id"')) as "User@Id", + to_varchar(get_path(parse_json(_airbyte_data), '"UserId"')) as USERID, + _AIRBYTE_AB_ID, + _AIRBYTE_EMITTED_AT, + convert_timezone('UTC', current_timestamp()) as _AIRBYTE_NORMALIZED_AT +from "AIRBYTE_DATABASE".TEST_NORMALIZATION._AIRBYTE_RAW_MULTIPLE_COLUMN_NAMES_CONFLICTS as table_alias +-- MULTIPLE_COLUMN_NAMES_CONFLICTS +where 1 = 1 + +), __dbt__cte__MULTIPLE_COLUMN_NAMES_CONFLICTS_AB2 as ( + +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: __dbt__cte__MULTIPLE_COLUMN_NAMES_CONFLICTS_AB1 +select + cast(ID as + bigint +) as ID, + cast("User Id" as + varchar +) as "User Id", + cast(USER_ID as + float +) as USER_ID, + cast("User id" as + float +) as "User id", + cast("user id" as + float +) as "user id", + cast("User@Id" as + varchar +) as "User@Id", + cast(USERID as + float +) as USERID, + _AIRBYTE_AB_ID, + _AIRBYTE_EMITTED_AT, + convert_timezone('UTC', current_timestamp()) as _AIRBYTE_NORMALIZED_AT +from __dbt__cte__MULTIPLE_COLUMN_NAMES_CONFLICTS_AB1 +-- MULTIPLE_COLUMN_NAMES_CONFLICTS +where 1 = 1 + +)-- SQL model to build a hash column based on the values of this record +-- depends_on: __dbt__cte__MULTIPLE_COLUMN_NAMES_CONFLICTS_AB2 +select + md5(cast(coalesce(cast(ID as + varchar +), '') || '-' || coalesce(cast("User Id" as + varchar +), '') || '-' || coalesce(cast(USER_ID as + varchar +), '') || '-' || coalesce(cast("User id" as + varchar +), '') || '-' || coalesce(cast("user id" as + varchar +), '') || '-' || coalesce(cast("User@Id" as + varchar +), '') || '-' || coalesce(cast(USERID as + varchar +), '') as + varchar +)) as _AIRBYTE_MULTIPLE_COLUMN_NAMES_CONFLICTS_HASHID, + tmp.* +from __dbt__cte__MULTIPLE_COLUMN_NAMES_CONFLICTS_AB2 tmp +-- MULTIPLE_COLUMN_NAMES_CONFLICTS +where 1 = 1 + + ); diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/models/generated/sources.yml b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/models/generated/sources.yml index c09293ca67262..bec4269ba6bf8 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/models/generated/sources.yml +++ b/airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/snowflake/test_simple_streams/models/generated/sources.yml @@ -9,5 +9,6 @@ sources: - name: _AIRBYTE_RAW_DEDUP_CDC_EXCLUDED - name: _AIRBYTE_RAW_DEDUP_EXCHANGE_RATE - name: _AIRBYTE_RAW_EXCHANGE_RATE + - name: _AIRBYTE_RAW_MULTIPLE_COLUMN_NAMES_CONFLICTS - name: _AIRBYTE_RAW_POS_DEDUP_CDCX - name: _AIRBYTE_RAW_RENAMED_DEDUP_CDC_EXCLUDED diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json index 9b44f5e68d18a..dc25bf3713604 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/catalog.json @@ -177,6 +177,44 @@ "cursor_field": [], "destination_sync_mode": "append_dedup", "primary_key": [["id"]] + }, + { + "stream": { + "name": "multiple_column_names_conflicts", + "json_schema": { + "type": ["null", "object"], + "properties": { + "id": { + "type": "integer" + }, + "User Id": { + "type": ["string", "null"] + }, + "user_id": { + "type": ["null", "number"] + }, + "User id": { + "type": ["null", "number"] + }, + "user id": { + "type": ["null", "number"] + }, + "User@Id": { + "type": ["null", "string"] + }, + "UserId": { + "type": ["null", "number"] + } + } + }, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "full_refresh", + "cursor_field": [], + "destination_sync_mode": "append_dedup", + "primary_key": [["id"]] } ] } diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt index fa3af2c3f2548..e84ca1f63c79a 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/test_simple_streams/data_input/messages.txt @@ -41,3 +41,5 @@ {"type":"RECORD","record":{"stream":"pos_dedup_cdcx","data":{"id":4,"name":null,"_ab_cdc_updated_at":1623850868371,"_ab_cdc_lsn":27010232,"_ab_cdc_log_pos": 33279,"_ab_cdc_deleted_at":1623850868371},"emitted_at":1623861660}} {"type":"RECORD","record":{"stream":"pos_dedup_cdcx","data":{"id":5,"name":"lotus","_ab_cdc_updated_at":1623850868371,"_ab_cdc_lsn":27010048,"_ab_cdc_log_pos": 33280,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}} {"type":"RECORD","record":{"stream":"pos_dedup_cdcx","data":{"id":5,"name":"lily","_ab_cdc_updated_at":1623850868371,"_ab_cdc_lsn":27010232,"_ab_cdc_log_pos": 33281,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}} + +{"type":"RECORD","record":{"stream":"multiple_column_names_conflicts","data":{"id":1,"User Id":"chris","user_id":42,"User id":300,"user id": 102,"UserId":101},"emitted_at":1623959926}} diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/destination_name_transformer.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/destination_name_transformer.py index 352fa8b9f93d1..ab7b3894d6134 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/destination_name_transformer.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/destination_name_transformer.py @@ -223,6 +223,44 @@ def __normalize_identifier_case(self, input_name: str, is_quoted: bool = False) raise KeyError(f"Unknown destination type {self.destination_type}") return result + def normalize_column_identifier_case_for_lookup(self, input_name: str, is_quoted: bool = False) -> str: + """ + This function adds an additional normalization regarding the column name casing to determine if multiple columns + are in collisions. On certain destinations/settings, case sensitivity matters, in others it does not. + We separate this from standard identifier normalization "__normalize_identifier_case", + so the generated SQL queries are keeping the original casing from the catalog. + But we still need to determine if casing matters or not, thus by using this function. + """ + result = input_name + if self.destination_type.value == DestinationType.BIGQUERY.value: + # Columns are considered identical regardless of casing + result = input_name.lower() + elif self.destination_type.value == DestinationType.REDSHIFT.value: + # Columns are considered identical regardless of casing (even quoted ones) + result = input_name.lower() + elif self.destination_type.value == DestinationType.POSTGRES.value: + if not is_quoted and not self.needs_quotes(input_name): + result = input_name.lower() + elif self.destination_type.value == DestinationType.SNOWFLAKE.value: + if not is_quoted and not self.needs_quotes(input_name): + result = input_name.upper() + elif self.destination_type.value == DestinationType.MYSQL.value: + # Columns are considered identical regardless of casing (even quoted ones) + result = input_name.lower() + elif self.destination_type.value == DestinationType.MSSQL.value: + # Columns are considered identical regardless of casing (even quoted ones) + result = input_name.lower() + elif self.destination_type.value == DestinationType.ORACLE.value: + if not is_quoted and not self.needs_quotes(input_name): + result = input_name.lower() + else: + result = input_name.upper() + elif self.destination_type.value == DestinationType.CLICKHOUSE.value: + pass + else: + raise KeyError(f"Unknown destination type {self.destination_type}") + return result + # Static Functions diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index 8cae3703bc9fc..d5ba201b9746b 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -314,15 +314,17 @@ def extract_column_names(self) -> Dict[str, Tuple[str, str]]: field_names = set() for field in fields: field_name = self.name_transformer.normalize_column_name(field, in_jinja=False) + field_name_lookup = self.name_transformer.normalize_column_identifier_case_for_lookup(field_name) jinja_name = self.name_transformer.normalize_column_name(field, in_jinja=True) - if field_name in field_names: + if field_name_lookup in field_names: # TODO handle column name duplicates or collisions deterministically in this stream for i in range(1, 1000): field_name = self.name_transformer.normalize_column_name(f"{field}_{i}", in_jinja=False) + field_name_lookup = self.name_transformer.normalize_column_identifier_case_for_lookup(field_name) jinja_name = self.name_transformer.normalize_column_name(f"{field}_{i}", in_jinja=True) - if field_name not in field_names: + if field_name_lookup not in field_names: break - field_names.add(field_name) + field_names.add(field_name_lookup) result[field] = (field_name, jinja_name) return result diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-clickhouse/src/main/resources/spec.json index 1edbbb8465d94..e0345d4780fe5 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-clickhouse/src/main/resources/spec.json @@ -2,7 +2,7 @@ "documentationUrl": "https://docs.airbyte.io/integrations/destinations/clickhouse", "supportsIncremental": true, "supportsNormalization": true, - "supportsDBT": false, + "supportsDBT": true, "supported_destination_sync_modes": ["overwrite", "append", "append_dedup"], "connectionSpecification": { "$schema": "http://json-schema.org/draft-07/schema#",