-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Publish PR 9029: clickhouse normalization #9072
Changes from 8 commits
c6ae621
b88225e
cc499c2
a2517a3
4a51799
a57495c
c56ef54
f8ccfd6
f24ea4b
a6e4c31
4f9f8ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,3 +59,4 @@ models: | |
|
||
vars: | ||
dbt_utils_dispatch_list: ['airbyte_utils'] | ||
destination: "mssql" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,3 +61,4 @@ models: | |
|
||
vars: | ||
dbt_utils_dispatch_list: ['airbyte_utils'] | ||
destination: "myssql" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,3 +59,4 @@ models: | |
|
||
vars: | ||
dbt_utils_dispatch_list: ['airbyte_utils'] | ||
destination: "oracle" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,3 +59,4 @@ models: | |
|
||
vars: | ||
dbt_utils_dispatch_list: ['airbyte_utils'] | ||
destination: "oracle" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -699,6 +699,7 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup | |
{{ sql_table_comment }} | ||
), | ||
{{ '{% endif %}' }} | ||
{{ '{%- if var("destination") == "clickhouse" %}' }} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the code block inside the if have some jinja variables, and I don't think there is an option to deep jinja variable attribution. I thought this would be more easy to read. file.sql
{{ my_custom_code }}
This will break There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, let's move it to python and avoid adding a dbt environment variable for this in the Here is a PR to split the jinja templating in multiple rendering: |
||
input_data_with_active_row_num as ( | ||
select *, | ||
row_number() over ( | ||
|
@@ -710,6 +711,7 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup | |
) as _airbyte_active_row_num | ||
from input_data | ||
), | ||
{{ '{%- endif %}' }} | ||
scd_data as ( | ||
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key | ||
select | ||
|
@@ -725,19 +727,36 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup | |
{{ field }}, | ||
{%- endfor %} | ||
{{ cursor_field }} as {{ airbyte_start_at }}, | ||
{{ lag_begin }}({{ cursor_field }}) over ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think most of this doesn't need to be conditional on
arguably even the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made this to match the version before the Clickhouse PR got merged. Also the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tl;dr I think both versions work correctly, but feel free to merge as-is I'm running both versions on my laptop and (so far) they've produced identical (except whitespace) output for snowflake. Haven't actually been able to run clickhouse locally, but I expect it would also work correctly with both versions. That said, I don't feel super strongly about this; definitely don't consider this a blocker. for clickhouse, the refactored version will produce {{ lag_begin }}({{ cursor_field }}) over (
partition by {{ primary_key_partition | join(", ") }}
order by
{{ cursor_field }} {{ order_null }},
{{ cursor_field }} desc,
{{ col_emitted_at }} desc{{ cdc_updated_at_order }}
{{ lag_end }}
) as {{ airbyte_end_at }},
case when _airbyte_active_row_num = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, (tweaked whitespace for clarity) and for other destinations: {{ lag_begin }}({{ cursor_field }}) over (
partition by {{ primary_key_partition | join(", ") }}
order by
{{ cursor_field }} {{ order_null }},
{{ cursor_field }} desc,
{{ col_emitted_at }} desc{{ cdc_updated_at_order }}
{{ lag_end }}
) as {{ airbyte_end_at }},
case when row_number() over (
partition by {{ primary_key_partition | join(", ") }}
order by
{{ cursor_field }} {{ order_null }},
{{ cursor_field }} desc,
{{ col_emitted_at }} desc{{ cdc_updated_at_order }}
) = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, I.e. the |
||
{{ '{%- if var("destination") == "clickhouse" %}' }} | ||
case when _airbyte_active_row_num = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, | ||
{{ lag_begin }}({{ cursor_field }}) over ( | ||
partition by {{ primary_key_partition | join(", ") }} | ||
order by | ||
{{ cursor_field }} {{ order_null }}, | ||
{{ cursor_field }} desc, | ||
{{ col_emitted_at }} desc{{ cdc_updated_at_order }} | ||
{{ lag_end }} | ||
) as {{ airbyte_end_at }}, | ||
case when _airbyte_active_row_num = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, | ||
{{ '{%- else %}' }} | ||
lag({{ cursor_field }}) over ( | ||
partition by {{ primary_key_partition | join(", ") }} | ||
order by | ||
{{ cursor_field }} {{ order_null }}, | ||
{{ cursor_field }} desc, | ||
{{ col_emitted_at }} desc{{ cdc_updated_at_order }} | ||
) as {{ airbyte_end_at }}, | ||
case when row_number() over ( | ||
partition by {{ primary_key_partition | join(", ") }} | ||
order by | ||
{{ cursor_field }} {{ order_null }}, | ||
{{ cursor_field }} desc, | ||
{{ col_emitted_at }} desc{{ cdc_updated_at_order }} | ||
) = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, | ||
{{ '{%- endif %}' }} | ||
{{ col_ab_id }}, | ||
{{ col_emitted_at }}, | ||
{{ hash_id }} | ||
from input_data_with_active_row_num | ||
from {{ input_data_table }} | ||
), | ||
dedup_data as ( | ||
select | ||
|
@@ -784,11 +803,13 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup | |
|
||
lag_begin = "lag" | ||
lag_end = "" | ||
input_data_table = "input_data" | ||
if self.destination_type == DestinationType.CLICKHOUSE: | ||
# ClickHouse doesn't support lag() yet, this is a workaround solution | ||
# Ref: https://clickhouse.com/docs/en/sql-reference/window-functions/ | ||
lag_begin = "anyOrNull" | ||
lag_end = "ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING" | ||
input_data_table = "input_data_with_active_row_num" | ||
|
||
enable_left_join_null = "" | ||
cast_begin = "cast(" | ||
|
@@ -859,6 +880,7 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup | |
lag_begin=lag_begin, | ||
lag_end=lag_end, | ||
enable_left_join_null=enable_left_join_null, | ||
input_data_table=input_data_table, | ||
) | ||
return sql | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline@eof (I wish prettier would do this for us)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to supply env vars at runtime for dbt?
when normalization runs to generate the dbt project/files ("at compile time"), we already know we are doing it for a certain destination, isn't that enough?
(copied comment from #9063 (comment))