Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish PR 9029: clickhouse normalization #9072

Merged
merged 11 commits into from
Jan 5, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
!dbt-project-template-mssql
!dbt-project-template-mysql
!dbt-project-template-oracle
!dbt-project-template-clickhouse
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.61
LABEL io.airbyte.version=0.1.62
LABEL io.airbyte.name=airbyte/normalization
6 changes: 6 additions & 0 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,15 @@ task airbyteDockerOracle(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('oracle')
dependsOn assemble
}
task airbyteDockerClickhouse(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('clickhouse')
dependsOn assemble
}

airbyteDocker.dependsOn(airbyteDockerMSSql)
airbyteDocker.dependsOn(airbyteDockerMySql)
airbyteDocker.dependsOn(airbyteDockerOracle)
airbyteDocker.dependsOn(airbyteDockerClickhouse)

task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs) {
module = "pytest"
Expand All @@ -86,6 +91,7 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs
dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-oracle:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-mssql:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-clickhouse:airbyteDocker'
}

integrationTest.dependsOn("customIntegrationTestPython")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,6 @@ models:
dispatch:
- macro_namespace: dbt_utils
search_order: ['airbyte_utils', 'dbt_utils']

vars:
destination: "clickhouse"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline@eof (I wish prettier would do this for us)

Copy link
Contributor

@ChristopheDuong ChristopheDuong Jan 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to supply env vars at runtime for dbt?

when normalization runs to generate the dbt project/files ("at compile time"), we already know we are doing it for a certain destination, isn't that enough?

(copied comment from #9063 (comment))

Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ models:

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
destination: "mssql"
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,4 @@ models:

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
destination: "myssql"
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ models:

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
destination: "oracle"
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,6 @@ models:
dispatch:
- macro_namespace: dbt_utils
search_order: ['airbyte_utils', 'dbt_utils']

vars:
destination: "default"
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ services:
image: airbyte/normalization-mysql:${VERSION}
normalization-oracle:
image: airbyte/normalization-oracle:${VERSION}
normalization-clickhouse:
image: airbyte/normalization-clickhouse:${VERSION}
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,6 @@ models:
dispatch:
- macro_namespace: dbt_utils
search_order: ['airbyte_utils', 'dbt_utils']

vars:
destination: "clickhouse"
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ models:

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
destination: "oracle"
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,6 @@ models:
dispatch:
- macro_namespace: dbt_utils
search_order: ['airbyte_utils', 'dbt_utils']

vars:
destination: "default"
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,6 @@ models:
dispatch:
- macro_namespace: dbt_utils
search_order: ['airbyte_utils', 'dbt_utils']

vars:
destination: "default"
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,6 @@ models:
dispatch:
- macro_namespace: dbt_utils
search_order: ['airbyte_utils', 'dbt_utils']

vars:
destination: "default"
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup
{{ sql_table_comment }}
),
{{ '{% endif %}' }}
{{ '{%- if var("destination") == "clickhouse" %}' }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this if as jinja (runtime) ? instead of doing a if in python and change the generated SQL code when normalization runs (compile)?

Copy link
Member Author

@marcosmarxm marcosmarxm Jan 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the code block inside the if have some jinja variables, and I don't think there is an option to deep jinja variable attribution. I thought this would be more easy to read.

file.sql
{{ my_custom_code }}
run.py
s = "{{ another_jinja }}"
a = "select * from table"
render(my_custom_code=s, another_jinja=a)

This will break
What do you think @ChristopheDuong ? I can move the logic to a python if clause too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's move it to python and avoid adding a dbt environment variable for this in the dbt_project.yml

Here is a PR to split the jinja templating in multiple rendering:
#9278

input_data_with_active_row_num as (
select *,
row_number() over (
Expand All @@ -710,6 +711,7 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup
) as _airbyte_active_row_num
from input_data
),
{{ '{%- endif %}' }}
scd_data as (
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
select
Expand All @@ -725,19 +727,36 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup
{{ field }},
{%- endfor %}
{{ cursor_field }} as {{ airbyte_start_at }},
{{ lag_begin }}({{ cursor_field }}) over (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most of this doesn't need to be conditional on destination, since lag_begin and lag_end are being set for all destinations anyway. This feels a bit more clear to me (since it's easier to see the clickhouse-specific changes):

      {{ lag_begin }}({{ cursor_field }}) over (
        partition by {{ primary_key_partition | join(", ") }}
        order by
            {{ cursor_field }} {{ order_null }},
            {{ cursor_field }} desc,
            {{ col_emitted_at }} desc{{ cdc_updated_at_order }}
        {{ lag_end }}
      ) as {{ airbyte_end_at }},
      {{ '{%- if var("destination") == "clickhouse" %}' }}
        case when _airbyte_active_row_num = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }},
      {{ '{%- else %}' }}
        case when row_number() over (
          partition by {{ primary_key_partition | join(", ") }}
          order by
              {{ cursor_field }} {{ order_null }},
              {{ cursor_field }} desc,
              {{ col_emitted_at }} desc{{ cdc_updated_at_order }}
        ) = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }},
      {{ '{%- endif %}' }}

arguably even the = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, could be extracted, but that felt a bit messier

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this to match the version before the Clickhouse PR got merged. Also the _airbyte_active_row_num is conditional to Clickhouse destination.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tl;dr I think both versions work correctly, but feel free to merge as-is

I'm running both versions on my laptop and (so far) they've produced identical (except whitespace) output for snowflake. Haven't actually been able to run clickhouse locally, but I expect it would also work correctly with both versions. That said, I don't feel super strongly about this; definitely don't consider this a blocker.

for clickhouse, the refactored version will produce

      {{ lag_begin }}({{ cursor_field }}) over (
        partition by {{ primary_key_partition | join(", ") }}
        order by
            {{ cursor_field }} {{ order_null }},
            {{ cursor_field }} desc,
            {{ col_emitted_at }} desc{{ cdc_updated_at_order }}
        {{ lag_end }}
      ) as {{ airbyte_end_at }},
      case when _airbyte_active_row_num = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }},

(tweaked whitespace for clarity)

and for other destinations:

      {{ lag_begin }}({{ cursor_field }}) over (
        partition by {{ primary_key_partition | join(", ") }}
        order by
            {{ cursor_field }} {{ order_null }},
            {{ cursor_field }} desc,
            {{ col_emitted_at }} desc{{ cdc_updated_at_order }}
        {{ lag_end }}
      ) as {{ airbyte_end_at }},
      case when row_number() over (
        partition by {{ primary_key_partition | join(", ") }}
        order by
            {{ cursor_field }} {{ order_null }},
            {{ cursor_field }} desc,
            {{ col_emitted_at }} desc{{ cdc_updated_at_order }}
      ) = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }},

I.e. the airbyte_end_at column is defined identically (since lag_begin="lag" and lag_end=""), but the active_row column is different (as you pointed out, clickhouse uses _airbyte_active_row_num, other destinations use the case when row_number() thing).

{{ '{%- if var("destination") == "clickhouse" %}' }}
case when _airbyte_active_row_num = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }},
{{ lag_begin }}({{ cursor_field }}) over (
partition by {{ primary_key_partition | join(", ") }}
order by
{{ cursor_field }} {{ order_null }},
{{ cursor_field }} desc,
{{ col_emitted_at }} desc{{ cdc_updated_at_order }}
{{ lag_end }}
) as {{ airbyte_end_at }},
case when _airbyte_active_row_num = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }},
{{ '{%- else %}' }}
lag({{ cursor_field }}) over (
partition by {{ primary_key_partition | join(", ") }}
order by
{{ cursor_field }} {{ order_null }},
{{ cursor_field }} desc,
{{ col_emitted_at }} desc{{ cdc_updated_at_order }}
) as {{ airbyte_end_at }},
case when row_number() over (
partition by {{ primary_key_partition | join(", ") }}
order by
{{ cursor_field }} {{ order_null }},
{{ cursor_field }} desc,
{{ col_emitted_at }} desc{{ cdc_updated_at_order }}
) = 1{{ cdc_active_row }} then 1 else 0 end as {{ active_row }},
{{ '{%- endif %}' }}
{{ col_ab_id }},
{{ col_emitted_at }},
{{ hash_id }}
from input_data_with_active_row_num
from {{ input_data_table }}
),
dedup_data as (
select
Expand Down Expand Up @@ -784,11 +803,13 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup

lag_begin = "lag"
lag_end = ""
input_data_table = "input_data"
if self.destination_type == DestinationType.CLICKHOUSE:
# ClickHouse doesn't support lag() yet, this is a workaround solution
# Ref: https://clickhouse.com/docs/en/sql-reference/window-functions/
lag_begin = "anyOrNull"
lag_end = "ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING"
input_data_table = "input_data_with_active_row_num"

enable_left_join_null = ""
cast_begin = "cast("
Expand Down Expand Up @@ -859,6 +880,7 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup
lag_begin=lag_begin,
lag_end=lag_end,
enable_left_join_null=enable_left_join_null,
input_data_table=input_data_table,
)
return sql

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class NormalizationRunnerFactory {

public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization";
public static final String NORMALIZATION_VERSION = "0.1.61";
public static final String NORMALIZATION_VERSION = "0.1.62";

static final Map<String, ImmutablePair<String, DefaultNormalizationRunner.DestinationType>> NORMALIZATION_MAPPING =
ImmutableMap.<String, ImmutablePair<String, DefaultNormalizationRunner.DestinationType>>builder()
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def createSpotlessTarget = { pattern ->
'dbt-project-template-mssql',
'dbt-project-template-mysql',
'dbt-project-template-oracle',
'dbt-project-template-clickhouse',
'dbt_test_config',
'normalization_test_output',
'tools',
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD"
include ':airbyte-integrations:connectors:destination-snowflake'
include ':airbyte-integrations:connectors:destination-oracle'
include ':airbyte-integrations:connectors:destination-mssql'
include ':airbyte-integrations:connectors:destination-clickhouse'

//Needed by destination-bigquery
include ':airbyte-integrations:connectors:destination-s3'
Expand Down