Skip to content

Commit 2b1be0d

Browse files
authored
[INLONG-3815][Sort] Fix meta field sync error (#4055)
1 parent 3ff161e commit 2b1be0d

File tree

5 files changed

+184
-61
lines changed

5 files changed

+184
-61
lines changed

inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormat.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public CanalJsonFormat() {
7676
@JsonIgnore
7777
@Override
7878
public String getFormat() {
79-
return "canal-json";
79+
return "canal-json-inlong";
8080
}
8181

8282
/**
@@ -89,13 +89,13 @@ public Map<String, String> generateOptions() {
8989
Map<String, String> options = new HashMap<>(16);
9090
options.put("format", getFormat());
9191
if (this.ignoreParseErrors != null) {
92-
options.put("canal-json.ignore-parse-errors", this.ignoreParseErrors.toString());
92+
options.put("canal-json-inlong.ignore-parse-errors", this.ignoreParseErrors.toString());
9393
}
94-
options.put("canal-json.timestamp-format.standard", this.timestampFormatStandard);
95-
options.put("canal-json.map-null-key.mode", this.mapNullKeyMode);
96-
options.put("canal-json.map-null-key.literal", this.mapNullKeyLiteral);
94+
options.put("canal-json-inlong.timestamp-format.standard", this.timestampFormatStandard);
95+
options.put("canal-json-inlong.map-null-key.mode", this.mapNullKeyMode);
96+
options.put("canal-json-inlong.map-null-key.literal", this.mapNullKeyLiteral);
9797
if (this.encodeDecimalAsPlainNumber != null) {
98-
options.put("canal-json.encode.decimal-as-plain-number", this.encodeDecimalAsPlainNumber.toString());
98+
options.put("canal-json-inlong.encode.decimal-as-plain-number", this.encodeDecimalAsPlainNumber.toString());
9999
}
100100
return options;
101101
}

inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public Map<String, String> tableOptions() {
110110
options.putAll(format.generateOptions(true));
111111
}
112112
} else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
113-
options.put("connector", "kafka");
113+
options.put("connector", "kafka-inlong");
114114
options.putAll(format.generateOptions(false));
115115
} else {
116116
throw new IllegalArgumentException("kafka load Node format is IllegalArgument");

inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -628,34 +628,34 @@ private String parseKafkaLoadNodeMetaField(BuiltInFieldInfo metaField) {
628628
metaType = "STRING METADATA FROM 'value.database'";
629629
break;
630630
case MYSQL_METADATA_EVENT_TIME:
631-
metaType = "TIMESTAMP(3) METADATA FROM 'value.op_ts'";
631+
metaType = "TIMESTAMP(3) METADATA FROM 'value.event-timestamp'";
632632
break;
633633
case MYSQL_METADATA_EVENT_TYPE:
634-
metaType = "STRING METADATA FROM 'value.op_type'";
634+
metaType = "STRING METADATA FROM 'value.op-type'";
635635
break;
636636
case MYSQL_METADATA_DATA:
637637
metaType = "STRING METADATA FROM 'value.data'";
638638
break;
639639
case MYSQL_METADATA_IS_DDL:
640-
metaType = "BOOLEAN METADATA FROM 'value.is_ddl'";
640+
metaType = "BOOLEAN METADATA FROM 'value.is-ddl'";
641641
break;
642642
case METADATA_TS:
643-
metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ts'";
643+
metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'";
644644
break;
645645
case METADATA_SQL_TYPE:
646-
metaType = "MAP<STRING, INT> METADATA FROM 'value.sql_type'";
646+
metaType = "MAP<STRING, INT> METADATA FROM 'value.sql-type'";
647647
break;
648648
case METADATA_MYSQL_TYPE:
649-
metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql_type'";
649+
metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql-type'";
650650
break;
651651
case METADATA_PK_NAMES:
652-
metaType = "ARRAY<STRING> METADATA FROM 'value.pk_names'";
652+
metaType = "ARRAY<STRING> METADATA FROM 'value.pk-names'";
653653
break;
654654
case METADATA_BATCH_ID:
655-
metaType = "BIGINT METADATA FROM 'value.batch_id'";
655+
metaType = "BIGINT METADATA FROM 'value.batch-id'";
656656
break;
657657
case METADATA_UPDATE_BEFORE:
658-
metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update_before'";
658+
metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
659659
break;
660660
default:
661661
metaType = TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSummaryString();

inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory

+1
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,4 @@
3232

3333
org.apache.inlong.sort.singletenant.flink.cdc.mysql.table.MySqlTableInlongSourceFactory
3434
org.apache.inlong.sort.singletenant.flink.connectors.hive.table.catalog.factories.HiveTableInlongFactory
35+
org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory

0 commit comments

Comments
 (0)