Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Normalization correctly propagates deletions to the final tables #12846

Merged
merged 47 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
b3715d8
first pass at deletions
edgao May 9, 2022
ce8970b
wip do deletions
edgao May 12, 2022
d270b92
update tests to reflect deleted rows
edgao May 13, 2022
ab5590a
revert final table generation
edgao May 13, 2022
0be2776
wip incremental delete
edgao May 14, 2022
eb2f0ee
refactor
edgao May 15, 2022
448edf6
remove todo
edgao May 15, 2022
76dc17b
fix macros
edgao May 17, 2022
4661e9e
use subquery for delete
edgao May 18, 2022
ea597c8
wip new_data model
edgao May 19, 2022
854a98b
wip
edgao May 20, 2022
36aa68b
wip move delete hook to scd
edgao May 21, 2022
26f5668
drop new_data model; clean up code
edgao May 24, 2022
2976e5c
wip better delete logic (need to verify performance)
edgao May 25, 2022
883f4d6
better delete logic+comments
edgao May 27, 2022
bdfd5ab
add record to test for edge case
edgao May 27, 2022
2608145
Merge branch 'master' into edgao/cdc_deletions
edgao May 27, 2022
ee3fe8f
slight tweaks
edgao May 27, 2022
578dd2f
better codegen
edgao May 28, 2022
5a08295
redshift does not support ctid in delete
edgao May 31, 2022
8239f11
clickhouse deletes
edgao May 31, 2022
8cba970
regenerate most test outputs (missing mssql + clickhouse)
edgao Jun 1, 2022
eb08d9f
mysql on m1
edgao Jun 1, 2022
eb29d6f
better clickhouse delete
edgao Jun 1, 2022
a176e0f
fix for clickhouse (no more CTEs in where clause)
edgao Jun 2, 2022
328539c
regenerate test outputs
edgao Jun 2, 2022
965777a
make mssql test run on m1 mac
edgao Jun 3, 2022
d828dfb
Revert "make mssql test run on m1 mac"
edgao Jun 3, 2022
600b084
fix drop view for mssql
edgao Jun 3, 2022
1a295e8
fix for linter
edgao Jun 3, 2022
d6f7ca6
much simpler query
edgao Jun 9, 2022
b542b4c
cleanup
edgao Jun 9, 2022
d205505
cleanup
edgao Jun 9, 2022
09814a3
remove new_data model
edgao Jun 9, 2022
e36b305
remove do_deletions flag
edgao Jun 10, 2022
05c560c
regenerate outputs
edgao Jun 10, 2022
20438e1
add test case case for cross-sync deletion
edgao Jun 13, 2022
dfee8e0
faster query
edgao Jun 13, 2022
90b2f86
regenerate outputs
edgao Jun 13, 2022
0556673
better sql
edgao Jun 13, 2022
c46495c
regenerate output
edgao Jun 13, 2022
a47cf05
simplify query
edgao Jun 13, 2022
087d556
regenerate output
edgao Jun 14, 2022
176b66f
Merge branch 'master' into edgao/cdc_deletions
edgao Jun 14, 2022
9526c19
bump versions + changelog
edgao Jun 14, 2022
cf841f7
Merge branch 'master' into edgao/cdc_deletions
edgao Jun 14, 2022
f428900
Merge branch 'master' into edgao/cdc_deletions
edgao Jun 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,43 @@
- incremental_clause controls the predicate to filter on new data to process incrementally
#}

{% macro incremental_clause(col_emitted_at) -%}
{{ adapter.dispatch('incremental_clause')(col_emitted_at) }}
{% macro incremental_clause(col_emitted_at, tablename) -%}
{{ adapter.dispatch('incremental_clause')(col_emitted_at, tablename) }}
{%- endmacro %}

{%- macro default__incremental_clause(col_emitted_at) -%}
{%- macro default__incremental_clause(col_emitted_at, tablename) -%}
{% if is_incremental() %}
and coalesce(
cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= (select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}),
cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= (select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ tablename }}),
{# -- if {{ col_emitted_at }} is NULL in either table, the previous comparison would evaluate to NULL, #}
{# -- so we coalesce and make sure the row is always returned for incremental processing instead #}
true)
{% endif %}
{%- endmacro -%}

{# -- see https://on-systems.tech/113-beware-dbt-incremental-updates-against-snowflake-external-tables/ #}
{%- macro snowflake__incremental_clause(col_emitted_at) -%}
{%- macro snowflake__incremental_clause(col_emitted_at, tablename) -%}
{% if is_incremental() %}
{% if get_max_normalized_cursor(col_emitted_at) %}
{% if get_max_normalized_cursor(col_emitted_at, tablename) %}
and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >=
cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }})
cast('{{ get_max_normalized_cursor(col_emitted_at, tablename) }}' as {{ type_timestamp_with_timezone() }})
{% endif %}
{% endif %}
{%- endmacro -%}

{%- macro sqlserver__incremental_clause(col_emitted_at) -%}
{%- macro sqlserver__incremental_clause(col_emitted_at, tablename) -%}
{% if is_incremental() %}
and ((select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}) is null
and ((select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ tablename }}) is null
or cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >=
(select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}))
(select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ tablename }}))
{% endif %}
{%- endmacro -%}

{% macro get_max_normalized_cursor(col_emitted_at) %}
{% macro get_max_normalized_cursor(col_emitted_at, tablename) %}
{% if execute and is_incremental() %}
{% if env_var('INCREMENTAL_CURSOR', 'UNSET') == 'UNSET' %}
{% set query %}
select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}
select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ tablename }}
{% endset %}
{% set max_cursor = run_query(query).columns[0][0] %}
{% do return(max_cursor) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,12 @@ def setup_mysql_db(self):
"MYSQL_INITDB_SKIP_TZINFO=yes",
"-e",
f"MYSQL_DATABASE={config['database']}",
"-e",
"MYSQL_ROOT_HOST=%",
"-p",
f"{config['port']}:3306",
"-d",
"mysql",
"mysql/mysql-server",
]
print("Executing: ", " ".join(commands))
subprocess.call(commands)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,29 @@
# This file is necessary to install dbt-utils with dbt deps
# the content will be overwritten by the transform function

# Name your package! Package names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: "airbyte_utils"
name: airbyte_utils
version: "1.0"
config-version: 2

# This setting configures which "profile" dbt uses for this project. Profiles contain
# database connection information, and should be configured in the ~/.dbt/profiles.yml file
profile: "normalize"

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that source models can be found
# in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
docs-paths: ["docs"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]

target-path: "../build" # directory which will store compiled SQL files
log-path: "../logs" # directory which will store DBT logs
packages-install-path: "/dbt" # directory which will store external DBT dependencies

clean-targets: # directories to be removed by `dbt clean`
- "build"
- "dbt_modules"

profile: normalize
model-paths:
- models
docs-paths:
- docs
analysis-paths:
- analysis
test-paths:
- tests
seed-paths:
- data
macro-paths:
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
quoting:
database: true
# Temporarily disabling the behavior of the ExtendedNameTransformer on table/schema names, see (issue #1785)
# all schemas should be unquoted
schema: false
identifier: true

# You can define configurations for models in the `model-paths` directory here.
# Using these configurations, you can enable or disable models, change how they
# are materialized, and more!
models:
airbyte_utils:
+materialized: table
Expand All @@ -57,7 +41,79 @@ models:
airbyte_views:
+tags: airbyte_internal_views
+materialized: view

dispatch:
- macro_namespace: dbt_utils
search_order: ["airbyte_utils", "dbt_utils"]
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:
nested_stream_with_complex_columns_resulting_into_long_names_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_stg: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_scd_new_data: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_scd: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names_ab1: test_normalization._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names_ab2: test_normalization._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names_ab3: test_normalization._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names: test_normalization._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
some_stream_that_was_empty_ab1: test_normalization._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty_ab2: test_normalization._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty_stg: test_normalization._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty_scd_new_data: test_normalization._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty_scd: test_normalization._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty: test_normalization._airbyte_raw_some_stream_that_was_empty
simple_stream_with_namespace_resulting_into_long_names_ab1: test_normalization_namespace._airbyte_raw_simple_stream_with_namespace_resulting_into_long_names
simple_stream_with_namespace_resulting_into_long_names_ab2: test_normalization_namespace._airbyte_raw_simple_stream_with_namespace_resulting_into_long_names
simple_stream_with_namespace_resulting_into_long_names_ab3: test_normalization_namespace._airbyte_raw_simple_stream_with_namespace_resulting_into_long_names
simple_stream_with_namespace_resulting_into_long_names: test_normalization_namespace._airbyte_raw_simple_stream_with_namespace_resulting_into_long_names
conflict_stream_name_ab1: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_ab2: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_ab3: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_scalar_ab1: test_normalization._airbyte_raw_conflict_stream_scalar
conflict_stream_scalar_ab2: test_normalization._airbyte_raw_conflict_stream_scalar
conflict_stream_scalar_ab3: test_normalization._airbyte_raw_conflict_stream_scalar
conflict_stream_scalar: test_normalization._airbyte_raw_conflict_stream_scalar
conflict_stream_array_ab1: test_normalization._airbyte_raw_conflict_stream_array
conflict_stream_array_ab2: test_normalization._airbyte_raw_conflict_stream_array
conflict_stream_array_ab3: test_normalization._airbyte_raw_conflict_stream_array
conflict_stream_array: test_normalization._airbyte_raw_conflict_stream_array
unnest_alias_ab1: test_normalization._airbyte_raw_unnest_alias
unnest_alias_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias: test_normalization._airbyte_raw_unnest_alias
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
conflict_stream_name_conflict_stream_name_ab1: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_ab2: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_ab3: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name: test_normalization._airbyte_raw_conflict_stream_name
unnest_alias_children_ab1: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children: test_normalization._airbyte_raw_unnest_alias
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
conflict_stream_name_conflict_stream_name_conflict_stream_name_ab1: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_conflict_stream_name_ab2: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_conflict_stream_name_ab3: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_conflict_stream_name: test_normalization._airbyte_raw_conflict_stream_name
unnest_alias_children_owner_ab1: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes_ab1: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes: test_normalization._airbyte_raw_unnest_alias
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
OPTIONS()
as (

-- depends_on: ref('nested_stream_with_complex_columns_resulting_into_long_names_stg')
-- depends on: `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_scd_new_data`
with

input_data as (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_scd_new_data`
OPTIONS()
as
-- depends_on: ref('nested_stream_with_complex_columns_resulting_into_long_names_stg')

select * from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_stg`

;

Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ select
from {{ source('test_normalization', '_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names') }} as table_alias
-- nested_stream_with_complex_columns_resulting_into_long_names
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at') }}
{{ incremental_clause('_airbyte_emitted_at', this) }}

Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ select
from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_ab1') }}
-- nested_stream_with_complex_columns_resulting_into_long_names
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at') }}
{{ incremental_clause('_airbyte_emitted_at', this) }}

Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partit
{{ cross_join_unnest('partition', 'DATA') }}
where 1 = 1
and DATA is not null
{{ incremental_clause('_airbyte_emitted_at') }}
{{ incremental_clause('_airbyte_emitted_at', this) }}

Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_scd')
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition
where 1 = 1
and {{ adapter.quote('partition') }} is not null
{{ incremental_clause('_airbyte_emitted_at') }}
{{ incremental_clause('_airbyte_emitted_at', this) }}

Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_partit
{{ cross_join_unnest('partition', 'double_array_data') }}
where 1 = 1
and double_array_data is not null
{{ incremental_clause('_airbyte_emitted_at') }}
{{ incremental_clause('_airbyte_emitted_at', this) }}

Loading