Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Normalization correctly propagates deletions to the final tables #12846

Merged
merged 47 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
b3715d8
first pass at deletions
edgao May 9, 2022
ce8970b
wip do deletions
edgao May 12, 2022
d270b92
update tests to reflect deleted rows
edgao May 13, 2022
ab5590a
revert final table generation
edgao May 13, 2022
0be2776
wip incremental delete
edgao May 14, 2022
eb2f0ee
refactor
edgao May 15, 2022
448edf6
remove todo
edgao May 15, 2022
76dc17b
fix macros
edgao May 17, 2022
4661e9e
use subquery for delete
edgao May 18, 2022
ea597c8
wip new_data model
edgao May 19, 2022
854a98b
wip
edgao May 20, 2022
36aa68b
wip move delete hook to scd
edgao May 21, 2022
26f5668
drop new_data model; clean up code
edgao May 24, 2022
2976e5c
wip better delete logic (need to verify performance)
edgao May 25, 2022
883f4d6
better delete logic+comments
edgao May 27, 2022
bdfd5ab
add record to test for edge case
edgao May 27, 2022
2608145
Merge branch 'master' into edgao/cdc_deletions
edgao May 27, 2022
ee3fe8f
slight tweaks
edgao May 27, 2022
578dd2f
better codegen
edgao May 28, 2022
5a08295
redshift does not support ctid in delete
edgao May 31, 2022
8239f11
clickhouse deletes
edgao May 31, 2022
8cba970
regenerate most test outputs (missing mssql + clickhouse)
edgao Jun 1, 2022
eb08d9f
mysql on m1
edgao Jun 1, 2022
eb29d6f
better clickhouse delete
edgao Jun 1, 2022
a176e0f
fix for clickhouse (no more CTEs in where clause)
edgao Jun 2, 2022
328539c
regenerate test outputs
edgao Jun 2, 2022
965777a
make mssql test run on m1 mac
edgao Jun 3, 2022
d828dfb
Revert "make mssql test run on m1 mac"
edgao Jun 3, 2022
600b084
fix drop view for mssql
edgao Jun 3, 2022
1a295e8
fix for linter
edgao Jun 3, 2022
d6f7ca6
much simpler query
edgao Jun 9, 2022
b542b4c
cleanup
edgao Jun 9, 2022
d205505
cleanup
edgao Jun 9, 2022
09814a3
remove new_data model
edgao Jun 9, 2022
e36b305
remove do_deletions flag
edgao Jun 10, 2022
05c560c
regenerate outputs
edgao Jun 10, 2022
20438e1
add test case case for cross-sync deletion
edgao Jun 13, 2022
dfee8e0
faster query
edgao Jun 13, 2022
90b2f86
regenerate outputs
edgao Jun 13, 2022
0556673
better sql
edgao Jun 13, 2022
c46495c
regenerate output
edgao Jun 13, 2022
a47cf05
simplify query
edgao Jun 13, 2022
087d556
regenerate output
edgao Jun 14, 2022
176b66f
Merge branch 'master' into edgao/cdc_deletions
edgao Jun 14, 2022
9526c19
bump versions + changelog
edgao Jun 14, 2022
cf841f7
Merge branch 'master' into edgao/cdc_deletions
edgao Jun 14, 2022
f428900
Merge branch 'master' into edgao/cdc_deletions
edgao Jun 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,38 @@
"cursor_field": [],
"destination_sync_mode": "append_dedup",
"primary_key": [["id"]]
},
{
"stream": {
"name": "dedup_cdc_excluded",
"json_schema": {
"type": ["null", "object"],
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": ["string", "null"]
},
"_ab_cdc_lsn": {
"type": ["null", "number"]
},
"_ab_cdc_updated_at": {
"type": ["null", "number"]
},
"_ab_cdc_deleted_at": {
"type": ["null", "number"]
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"cursor_field": ["_ab_cdc_lsn"],
"destination_sync_mode": "append_dedup",
"primary_key": [["id"]]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@
{"type":"RECORD","record":{"stream":"renamed_dedup_cdc_excluded","data":{"id":8,"name":"vw","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623949314663,"_ab_cdc_lsn":26985264,"_ab_cdc_deleted_at":null},"emitted_at":1623960160}}
{"type":"RECORD","record":{"stream":"renamed_dedup_cdc_excluded","data":{"id":9,"name":"opel","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623950868109,"_ab_cdc_lsn":28009440,"_ab_cdc_deleted_at":null},"emitted_at":1623961660}}
{"type":"RECORD","record":{"stream":"renamed_dedup_cdc_excluded","data":{"id":9,"name":null,"column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623950868371,"_ab_cdc_lsn":28010232,"_ab_cdc_deleted_at":1623950868371},"emitted_at":1623961660}}

{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":8,"name":"ford","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1625000000000,"_ab_cdc_lsn":29020252,"_ab_cdc_deleted_at":1625000000000},"emitted_at":1625000000000}}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ union all

union all

select distinct '_airbyte_raw_dedup_cdc_excluded' as label, count(*) as row_count, 3 as expected_count
select distinct '_airbyte_raw_dedup_cdc_excluded' as label, count(*) as row_count, 4 as expected_count
from test_normalization._airbyte_raw_dedup_cdc_excluded
union all
select distinct 'dedup_cdc_excluded_scd' as label, count(*) as row_count, 10 as expected_count
select distinct 'dedup_cdc_excluded_scd' as label, count(*) as row_count, 11 as expected_count
from test_normalization.dedup_cdc_excluded_scd
union all
select distinct 'dedup_cdc_excluded' as label, count(*) as row_count, 4 as expected_count
select distinct 'dedup_cdc_excluded' as label, count(*) as row_count, 3 as expected_count
from test_normalization.dedup_cdc_excluded
)
select *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup
jinja_variables["scd_columns_sql"] = scd_columns_sql
sql = Template(
"""
-- depends on: {{ from_table }}
-- depends_on: {{ from_table }}
with
{{ '{% if is_incremental() %}' }}
new_data as (
Expand Down Expand Up @@ -1120,11 +1120,15 @@ def add_to_outputs(

final_table_name = self.tables_registry.get_file_name(schema, self.json_path, self.stream_name, "", truncate_name)
active_row_column_name = self.name_transformer.normalize_column_name("_airbyte_active_row")
clickhouse_nullable_join_setting = ""
if self.destination_type == DestinationType.CLICKHOUSE:
# Clickhouse has special delete syntax
delete_statement = "alter table {{ final_table_relation }} delete"
unique_key_reference = self.get_unique_key(in_jinja=False)
noop_delete_statement = "alter table {{ this }} delete where 1=0"
# Without this, our LEFT JOIN would return empty string for non-matching rows, so our COUNT would include those rows.
# We want to exclude them (this is the default behavior in other DBs) so we have to set join_use_nulls=1
clickhouse_nullable_join_setting = "SETTINGS join_use_nulls=1"
elif self.destination_type == DestinationType.BIGQUERY:
# Bigquery doesn't like the "delete from project.schema.table where project.schema.table.column in" syntax;
# it requires "delete from project.schema.table table_alias where table_alias.column in"
Expand Down Expand Up @@ -1154,17 +1158,24 @@ def add_to_outputs(
{{ '%}' }}

-- Delete records which are no longer active:
-- The first subquery finds the most recent increment to the SCD table
-- The second subquery finds, within that increment, the records which are still active
-- We want to delete rows which are in that increment, but are not active
-- This query is equivalent, but the left join version is more performant:
-- delete from final_table where unique_key in (
-- select unique_key from scd_table where 1 = 1 <incremental_clause(normalized_at, final_table)>
-- ) and unique_key not in (
-- select unique_key from scd_table where active_row = 1 <incremental_clause(normalized_at, final_table)>
-- )
{{ delete_statement }} where {{ unique_key_reference }} in (
select {{ unique_key }}
from {{ '{{ this }}' }}
where 1 = 1 {{ normalized_at_incremental_clause }}
) and {{ unique_key_reference }} not in (
select {{ unique_key }}
select distinct {{ unique_key }}
from {{ '{{ this }}' }}
where {{ active_row_column_name }} = 1 {{ normalized_at_incremental_clause }}
left join (
select {{ unique_key }} as active_unique_key
from {{ '{{ this }}' }}
where {{ active_row_column_name }} = 1 {{ normalized_at_incremental_clause }}
) active_recent_scd_rows on {{ unique_key }} = active_unique_key
where 1=1 {{ normalized_at_incremental_clause }}
group by {{ unique_key }}
having count(active_unique_key) = 0
Copy link
Contributor

@ChristopheDuong ChristopheDuong Jun 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it mandatory to use having clause? I'm not sure about performance, but it might be less efficient than other approaches using where if possible instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

running some manual performance tests; at first glance neither version has been significantly faster, but the non-having version does at least feel more readable 🤷

{{ clickhouse_nullable_join_setting }}
)
{{ '{% else %}' }}
-- We have to have a non-empty query, so just do a noop delete
Expand All @@ -1183,6 +1194,7 @@ def add_to_outputs(
self.get_normalized_at(in_jinja=True),
),
unique_key_reference=unique_key_reference,
clickhouse_nullable_join_setting=clickhouse_nullable_join_setting,
)
hooks.append(deletion_hook)

Expand Down