Skip to content

Commit 9bf7f99

Browse files
committed
Fix meta field sync error
1 parent c352a37 commit 9bf7f99

File tree

5 files changed

+184
-61
lines changed

5 files changed

+184
-61
lines changed

inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormat.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public CanalJsonFormat() {
7676
@JsonIgnore
7777
@Override
7878
public String getFormat() {
79-
return "canal-json";
79+
return "canal-json-inlong";
8080
}
8181

8282
/**
@@ -89,13 +89,13 @@ public Map<String, String> generateOptions() {
8989
Map<String, String> options = new HashMap<>(16);
9090
options.put("format", getFormat());
9191
if (this.ignoreParseErrors != null) {
92-
options.put("canal-json.ignore-parse-errors", this.ignoreParseErrors.toString());
92+
options.put("canal-json-inlong.ignore-parse-errors", this.ignoreParseErrors.toString());
9393
}
94-
options.put("canal-json.timestamp-format.standard", this.timestampFormatStandard);
95-
options.put("canal-json.map-null-key.mode", this.mapNullKeyMode);
96-
options.put("canal-json.map-null-key.literal", this.mapNullKeyLiteral);
94+
options.put("canal-json-inlong.timestamp-format.standard", this.timestampFormatStandard);
95+
options.put("canal-json-inlong.map-null-key.mode", this.mapNullKeyMode);
96+
options.put("canal-json-inlong.map-null-key.literal", this.mapNullKeyLiteral);
9797
if (this.encodeDecimalAsPlainNumber != null) {
98-
options.put("canal-json.encode.decimal-as-plain-number", this.encodeDecimalAsPlainNumber.toString());
98+
options.put("canal-json-inlong.encode.decimal-as-plain-number", this.encodeDecimalAsPlainNumber.toString());
9999
}
100100
return options;
101101
}

inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public Map<String, String> tableOptions() {
110110
options.putAll(format.generateOptions(true));
111111
}
112112
} else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
113-
options.put("connector", "kafka");
113+
options.put("connector", "kafka-inlong");
114114
options.putAll(format.generateOptions(false));
115115
} else {
116116
throw new IllegalArgumentException("kafka load Node format is IllegalArgument");

inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -619,34 +619,34 @@ private String parseKafkaLoadNodeMetaField(BuiltInFieldInfo metaField) {
619619
metaType = "STRING METADATA FROM 'value.database'";
620620
break;
621621
case MYSQL_METADATA_EVENT_TIME:
622-
metaType = "TIMESTAMP(3) METADATA FROM 'value.op_ts'";
622+
metaType = "TIMESTAMP(3) METADATA FROM 'value.event-timestamp'";
623623
break;
624624
case MYSQL_METADATA_EVENT_TYPE:
625-
metaType = "STRING METADATA FROM 'value.op_type'";
625+
metaType = "STRING METADATA FROM 'value.op-type'";
626626
break;
627627
case MYSQL_METADATA_DATA:
628628
metaType = "STRING METADATA FROM 'value.data'";
629629
break;
630630
case MYSQL_METADATA_IS_DDL:
631-
metaType = "BOOLEAN METADATA FROM 'value.is_ddl'";
631+
metaType = "BOOLEAN METADATA FROM 'value.is-ddl'";
632632
break;
633633
case METADATA_TS:
634-
metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ts'";
634+
metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'";
635635
break;
636636
case METADATA_SQL_TYPE:
637-
metaType = "MAP<STRING, INT> METADATA FROM 'value.sql_type'";
637+
metaType = "MAP<STRING, INT> METADATA FROM 'value.sql-type'";
638638
break;
639639
case METADATA_MYSQL_TYPE:
640-
metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql_type'";
640+
metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql-type'";
641641
break;
642642
case METADATA_PK_NAMES:
643-
metaType = "ARRAY<STRING> METADATA FROM 'value.pk_names'";
643+
metaType = "ARRAY<STRING> METADATA FROM 'value.pk-names'";
644644
break;
645645
case METADATA_BATCH_ID:
646-
metaType = "BIGINT METADATA FROM 'value.batch_id'";
646+
metaType = "BIGINT METADATA FROM 'value.batch-id'";
647647
break;
648648
case METADATA_UPDATE_BEFORE:
649-
metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update_before'";
649+
metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
650650
break;
651651
default:
652652
metaType = TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSummaryString();

inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory

+1
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,4 @@
3232

3333
org.apache.inlong.sort.singletenant.flink.cdc.mysql.table.MySqlTableInlongSourceFactory
3434
org.apache.inlong.sort.singletenant.flink.connectors.hive.table.catalog.factories.HiveTableInlongFactory
35+
org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory

0 commit comments

Comments
 (0)