Skip to content

Commit f13313e

Browse files
Order CDC records by cdc log position if present (#6688)
1 parent 4769819 commit f13313e

File tree

1 file changed

+4
-0
lines changed
  • airbyte-integrations/bases/base-normalization/normalization/transform_catalog

1 file changed

+4
-0
lines changed

airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py

+4
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,10 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup
597597
cdc_active_row_pattern = f"and {col_cdc_deleted_at} is null "
598598
cdc_updated_order_pattern = f", {col_cdc_updated_at} desc"
599599

600+
if "_ab_cdc_log_pos" in column_names.keys():
601+
col_cdc_log_pos = self.name_transformer.normalize_column_name("_ab_cdc_log_pos")
602+
cdc_updated_order_pattern += f", {col_cdc_log_pos} desc"
603+
600604
sql = template.render(
601605
order_null=order_null,
602606
airbyte_start_at=self.name_transformer.normalize_column_name("_airbyte_start_at"),

0 commit comments

Comments
 (0)