From d3b6ddc70db83aa8e7cff08a9c1a008a99598036 Mon Sep 17 00:00:00 2001 From: yunqingmoswu Date: Sun, 1 May 2022 14:26:37 +0800 Subject: [PATCH] Fix meta field sync error --- .../protocol/node/format/CanalJsonFormat.java | 12 +- .../protocol/node/load/KafkaLoadNode.java | 2 +- .../flink/parser/impl/FlinkSqlParser.java | 18 +- .../org.apache.flink.table.factories.Factory | 1 + .../flink/parser/MetaFieldSyncTest.java | 212 ++++++++++++++---- 5 files changed, 184 insertions(+), 61 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormat.java index 717ca5b057d..a0d3ba40785 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormat.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormat.java @@ -76,7 +76,7 @@ public CanalJsonFormat() { @JsonIgnore @Override public String getFormat() { - return "canal-json"; + return "canal-json-inlong"; } /** @@ -89,13 +89,13 @@ public Map generateOptions() { Map options = new HashMap<>(16); options.put("format", getFormat()); if (this.ignoreParseErrors != null) { - options.put("canal-json.ignore-parse-errors", this.ignoreParseErrors.toString()); + options.put("canal-json-inlong.ignore-parse-errors", this.ignoreParseErrors.toString()); } - options.put("canal-json.timestamp-format.standard", this.timestampFormatStandard); - options.put("canal-json.map-null-key.mode", this.mapNullKeyMode); - options.put("canal-json.map-null-key.literal", this.mapNullKeyLiteral); + options.put("canal-json-inlong.timestamp-format.standard", this.timestampFormatStandard); + options.put("canal-json-inlong.map-null-key.mode", this.mapNullKeyMode); + options.put("canal-json-inlong.map-null-key.literal", this.mapNullKeyLiteral); if (this.encodeDecimalAsPlainNumber != null) { - options.put("canal-json.encode.decimal-as-plain-number", this.encodeDecimalAsPlainNumber.toString()); + options.put("canal-json-inlong.encode.decimal-as-plain-number", this.encodeDecimalAsPlainNumber.toString()); } return options; } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java index ae1a1179c66..7ef0d8cb3df 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java @@ -110,7 +110,7 @@ public Map tableOptions() { options.putAll(format.generateOptions(true)); } } else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) { - options.put("connector", "kafka"); + options.put("connector", "kafka-inlong"); options.putAll(format.generateOptions(false)); } else { throw new IllegalArgumentException("kafka load Node format is IllegalArgument"); diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java index f900b23054e..5117ca9b59b 100644 --- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java +++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java @@ -628,34 +628,34 @@ private String parseKafkaLoadNodeMetaField(BuiltInFieldInfo metaField) { metaType = "STRING METADATA FROM 'value.database'"; break; case MYSQL_METADATA_EVENT_TIME: - metaType = "TIMESTAMP(3) METADATA FROM 'value.op_ts'"; + metaType = "TIMESTAMP(3) METADATA FROM 'value.event-timestamp'"; break; case MYSQL_METADATA_EVENT_TYPE: - metaType = "STRING METADATA FROM 'value.op_type'"; + metaType = "STRING METADATA FROM 'value.op-type'"; break; case MYSQL_METADATA_DATA: metaType = "STRING METADATA FROM 'value.data'"; break; case MYSQL_METADATA_IS_DDL: - metaType = "BOOLEAN METADATA FROM 'value.is_ddl'"; + metaType = "BOOLEAN METADATA FROM 'value.is-ddl'"; break; case METADATA_TS: - metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ts'"; + metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'"; break; case METADATA_SQL_TYPE: - metaType = "MAP METADATA FROM 'value.sql_type'"; + metaType = "MAP METADATA FROM 'value.sql-type'"; break; case METADATA_MYSQL_TYPE: - metaType = "MAP METADATA FROM 'value.mysql_type'"; + metaType = "MAP METADATA FROM 'value.mysql-type'"; break; case METADATA_PK_NAMES: - metaType = "ARRAY METADATA FROM 'value.pk_names'"; + metaType = "ARRAY METADATA FROM 'value.pk-names'"; break; case METADATA_BATCH_ID: - metaType = "BIGINT METADATA FROM 'value.batch_id'"; + metaType = "BIGINT METADATA FROM 'value.batch-id'"; break; case METADATA_UPDATE_BEFORE: - metaType = "ARRAY> METADATA FROM 'value.update_before'"; + metaType = "ARRAY> METADATA FROM 'value.update-before'"; break; default: metaType = TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSummaryString(); diff --git a/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 65aff5ec602..9c7b7e25142 100644 --- a/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -32,3 +32,4 @@ org.apache.inlong.sort.singletenant.flink.cdc.mysql.table.MySqlTableInlongSourceFactory org.apache.inlong.sort.singletenant.flink.connectors.hive.table.catalog.factories.HiveTableInlongFactory +org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory \ No newline at end of file diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java index 103e13a15da..2237cb67ed5 100644 --- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java +++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java @@ -34,16 +34,13 @@ import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.GroupInfo; import org.apache.inlong.sort.protocol.StreamInfo; +import org.apache.inlong.sort.protocol.enums.ScanStartupMode; import org.apache.inlong.sort.protocol.node.Node; +import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode; import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode; -import org.apache.inlong.sort.protocol.node.format.JsonFormat; +import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat; import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode; import org.apache.inlong.sort.protocol.transformation.FieldRelationShip; -import org.apache.inlong.sort.protocol.transformation.FilterFunction; -import org.apache.inlong.sort.protocol.transformation.StringConstantParam; -import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; -import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; -import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator; import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip; import org.apache.inlong.sort.singletenant.flink.parser.impl.FlinkSqlParser; import org.apache.inlong.sort.singletenant.flink.parser.result.FlinkSqlParseResult; @@ -56,51 +53,71 @@ import java.util.stream.Collectors; /** - * Test for mysql meta field + * Test for meta field sync */ public class MetaFieldSyncTest extends AbstractTestBase { - private MySqlExtractNode buildMySQLExtractNode() { + private Node buildMySQLExtractNode() { List fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), new FieldInfo("name", new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo()), new FieldInfo("salary", new FloatFormatInfo()), new FieldInfo("ts", new TimestampFormatInfo()), - new BuiltInFieldInfo("database", new TimestampFormatInfo(), BuiltInField.MYSQL_METADATA_DATABASE), - new BuiltInFieldInfo("table", new TimestampFormatInfo(), BuiltInField.MYSQL_METADATA_TABLE), - new BuiltInFieldInfo("pk_names", new TimestampFormatInfo(), BuiltInField.METADATA_PK_NAMES), - new BuiltInFieldInfo("event_time", new TimestampFormatInfo(), BuiltInField.MYSQL_METADATA_EVENT_TIME), - new BuiltInFieldInfo("event_type", new TimestampFormatInfo(), BuiltInField.MYSQL_METADATA_EVENT_TYPE), - new BuiltInFieldInfo("isddl", new TimestampFormatInfo(), BuiltInField.MYSQL_METADATA_IS_DDL), - new BuiltInFieldInfo("batch_id", new TimestampFormatInfo(), BuiltInField.METADATA_BATCH_ID), - new BuiltInFieldInfo("mysql_type", new TimestampFormatInfo(), BuiltInField.METADATA_MYSQL_TYPE), - new BuiltInFieldInfo("sql_type", new TimestampFormatInfo(), BuiltInField.METADATA_SQL_TYPE), + new BuiltInFieldInfo("database", new StringFormatInfo(), + BuiltInField.MYSQL_METADATA_DATABASE), + new BuiltInFieldInfo("table", new StringFormatInfo(), + BuiltInField.MYSQL_METADATA_TABLE), + new BuiltInFieldInfo("pk_names", new ArrayFormatInfo(new StringFormatInfo()), + BuiltInField.METADATA_PK_NAMES), + new BuiltInFieldInfo("event_time", new TimestampFormatInfo(), + BuiltInField.MYSQL_METADATA_EVENT_TIME), + new BuiltInFieldInfo("event_type", new StringFormatInfo(), + BuiltInField.MYSQL_METADATA_EVENT_TYPE), + new BuiltInFieldInfo("isddl", new BooleanFormatInfo(), + BuiltInField.MYSQL_METADATA_IS_DDL), + new BuiltInFieldInfo("batch_id", new LongFormatInfo(), + BuiltInField.METADATA_BATCH_ID), + new BuiltInFieldInfo("mysql_type", new MapFormatInfo(new StringFormatInfo(), + new StringFormatInfo()), BuiltInField.METADATA_MYSQL_TYPE), + new BuiltInFieldInfo("sql_type", new MapFormatInfo(new StringFormatInfo(), + new IntFormatInfo()), BuiltInField.METADATA_SQL_TYPE), new BuiltInFieldInfo("meta_ts", new TimestampFormatInfo(), BuiltInField.METADATA_TS), - new BuiltInFieldInfo("up_before", new TimestampFormatInfo(), BuiltInField.METADATA_UPDATE_BEFORE) + new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(), + new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE) ); - return new MySqlExtractNode("1", "mysql_input", fields, null, null, "id", - Arrays.asList("sort"), "localhost", "inlong", "password", - "test", null, null, null, null); + return new MySqlExtractNode("1", "mysql_input", fields, null, null, + "id", Collections.singletonList("mysql_table"), + "localhost", "inlong", "inlong", + "inlong", null, null, null, null); } - private KafkaLoadNode buildKafkaNode() { + private Node buildKafkaLoadNode() { List fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), new FieldInfo("name", new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo()), new FieldInfo("salary", new FloatFormatInfo()), new FieldInfo("ts", new TimestampFormatInfo()), - new FieldInfo("database", new StringFormatInfo()), - new FieldInfo("table", new StringFormatInfo()), - new FieldInfo("pk_names", new ArrayFormatInfo(new StringFormatInfo())), - new FieldInfo("event_time", new TimestampFormatInfo()), - new FieldInfo("event_type", new StringFormatInfo()), - new FieldInfo("isddl", new BooleanFormatInfo()), - new FieldInfo("batch_id", new LongFormatInfo()), - new FieldInfo("mysql_type", new MapFormatInfo(new StringFormatInfo(), new StringFormatInfo())), - new FieldInfo("sql_type", new MapFormatInfo(new StringFormatInfo(), new IntFormatInfo())), - new FieldInfo("meta_ts", new TimestampFormatInfo()), - new FieldInfo("up_before", - new ArrayFormatInfo(new MapFormatInfo(new StringFormatInfo(), new StringFormatInfo()))) + new BuiltInFieldInfo("database", new StringFormatInfo(), + BuiltInField.MYSQL_METADATA_DATABASE), + new BuiltInFieldInfo("table", new StringFormatInfo(), + BuiltInField.MYSQL_METADATA_TABLE), + new BuiltInFieldInfo("pk_names", new ArrayFormatInfo(new StringFormatInfo()), + BuiltInField.METADATA_PK_NAMES), + new BuiltInFieldInfo("event_time", new TimestampFormatInfo(), + BuiltInField.MYSQL_METADATA_EVENT_TIME), + new BuiltInFieldInfo("event_type", new StringFormatInfo(), + BuiltInField.MYSQL_METADATA_EVENT_TYPE), + new BuiltInFieldInfo("isddl", new BooleanFormatInfo(), + BuiltInField.MYSQL_METADATA_IS_DDL), + new BuiltInFieldInfo("batch_id", new LongFormatInfo(), + BuiltInField.METADATA_BATCH_ID), + new BuiltInFieldInfo("mysql_type", new MapFormatInfo(new StringFormatInfo(), + new StringFormatInfo()), BuiltInField.METADATA_MYSQL_TYPE), + new BuiltInFieldInfo("sql_type", new MapFormatInfo(new StringFormatInfo(), + new IntFormatInfo()), BuiltInField.METADATA_SQL_TYPE), + new BuiltInFieldInfo("meta_ts", new TimestampFormatInfo(), BuiltInField.METADATA_TS), + new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(), + new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE) ); List relations = Arrays .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()), @@ -134,14 +151,110 @@ private KafkaLoadNode buildKafkaNode() { new FieldRelationShip(new FieldInfo("up_before", new TimestampFormatInfo()), new FieldInfo("up_before", new TimestampFormatInfo())) ); - List filters = Arrays.asList(new SingleValueFilterFunction(EmptyOperator.getInstance(), + return new KafkaLoadNode("2", "kafka_output", fields, relations, + null, "topic1", "localhost:9092", + new CanalJsonFormat(), null, + null, "id"); + } + + private KafkaExtractNode buildKafkaExtractNode() { + List fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("name", new StringFormatInfo()), + new FieldInfo("age", new IntFormatInfo()), + new FieldInfo("salary", new FloatFormatInfo()), + new FieldInfo("ts", new TimestampFormatInfo()), + new BuiltInFieldInfo("database", new StringFormatInfo(), + BuiltInField.MYSQL_METADATA_DATABASE), + new BuiltInFieldInfo("table", new StringFormatInfo(), + BuiltInField.MYSQL_METADATA_TABLE), + new BuiltInFieldInfo("pk_names", new ArrayFormatInfo(new StringFormatInfo()), + BuiltInField.METADATA_PK_NAMES), + new BuiltInFieldInfo("event_time", new TimestampFormatInfo(), + BuiltInField.MYSQL_METADATA_EVENT_TIME), + new BuiltInFieldInfo("event_type", new StringFormatInfo(), + BuiltInField.MYSQL_METADATA_EVENT_TYPE), + new BuiltInFieldInfo("isddl", new BooleanFormatInfo(), + BuiltInField.MYSQL_METADATA_IS_DDL), + new BuiltInFieldInfo("batch_id", new LongFormatInfo(), + BuiltInField.METADATA_BATCH_ID), + new BuiltInFieldInfo("mysql_type", new MapFormatInfo(new StringFormatInfo(), + new StringFormatInfo()), BuiltInField.METADATA_MYSQL_TYPE), + new BuiltInFieldInfo("sql_type", new MapFormatInfo(new StringFormatInfo(), + new IntFormatInfo()), BuiltInField.METADATA_SQL_TYPE), + new BuiltInFieldInfo("meta_ts", new TimestampFormatInfo(), BuiltInField.METADATA_TS), + new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(), + new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE) + ); + return new KafkaExtractNode("3", "kafka_input", fields, + null, null, "topic1", "localhost:9092", + new CanalJsonFormat(), ScanStartupMode.EARLIEST_OFFSET, + null); + } + + private Node buildKafkaLoadNode2() { + List fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), new FieldInfo("name", new StringFormatInfo()), - EqualOperator.getInstance(), new StringConstantParam("test"))); - KafkaLoadNode node = new KafkaLoadNode("2", "kafka_output2", fields, relations, - null, "worker123", "localhost:9092", - new JsonFormat(), null, + new FieldInfo("age", new IntFormatInfo()), + new FieldInfo("salary", new FloatFormatInfo()), + new FieldInfo("ts", new TimestampFormatInfo()), + new BuiltInFieldInfo("database", new StringFormatInfo(), + BuiltInField.MYSQL_METADATA_DATABASE), + new BuiltInFieldInfo("table", new StringFormatInfo(), + BuiltInField.MYSQL_METADATA_TABLE), + new BuiltInFieldInfo("pk_names", new ArrayFormatInfo(new StringFormatInfo()), + BuiltInField.METADATA_PK_NAMES), + new BuiltInFieldInfo("event_time", new TimestampFormatInfo(), + BuiltInField.MYSQL_METADATA_EVENT_TIME), + new BuiltInFieldInfo("event_type", new StringFormatInfo(), + BuiltInField.MYSQL_METADATA_EVENT_TYPE), + new BuiltInFieldInfo("isddl", new BooleanFormatInfo(), + BuiltInField.MYSQL_METADATA_IS_DDL), + new BuiltInFieldInfo("batch_id", new LongFormatInfo(), + BuiltInField.METADATA_BATCH_ID), + new BuiltInFieldInfo("mysql_type", new MapFormatInfo(new StringFormatInfo(), + new StringFormatInfo()), BuiltInField.METADATA_MYSQL_TYPE), + new BuiltInFieldInfo("sql_type", new MapFormatInfo(new StringFormatInfo(), + new IntFormatInfo()), BuiltInField.METADATA_SQL_TYPE), + new BuiltInFieldInfo("meta_ts", new TimestampFormatInfo(), BuiltInField.METADATA_TS), + new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(), + new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE) + ); + List relations = Arrays + .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("id", new LongFormatInfo())), + new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()), + new FieldInfo("name", new StringFormatInfo())), + new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()), + new FieldInfo("age", new IntFormatInfo())), + new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()), + new FieldInfo("ts", new TimestampFormatInfo())), + new FieldRelationShip(new FieldInfo("database", new TimestampFormatInfo()), + new FieldInfo("database", new TimestampFormatInfo())), + new FieldRelationShip(new FieldInfo("table", new TimestampFormatInfo()), + new FieldInfo("table", new TimestampFormatInfo())), + new FieldRelationShip(new FieldInfo("pk_names", new TimestampFormatInfo()), + new FieldInfo("pk_names", new TimestampFormatInfo())), + new FieldRelationShip(new FieldInfo("event_time", new TimestampFormatInfo()), + new FieldInfo("event_time", new TimestampFormatInfo())), + new FieldRelationShip(new FieldInfo("event_type", new TimestampFormatInfo()), + new FieldInfo("event_type", new TimestampFormatInfo())), + new FieldRelationShip(new FieldInfo("isddl", new TimestampFormatInfo()), + new FieldInfo("isddl", new TimestampFormatInfo())), + new FieldRelationShip(new FieldInfo("batch_id", new TimestampFormatInfo()), + new FieldInfo("batch_id", new TimestampFormatInfo())), + new FieldRelationShip(new FieldInfo("mysql_type", new TimestampFormatInfo()), + new FieldInfo("mysql_type", new TimestampFormatInfo())), + new FieldRelationShip(new FieldInfo("sql_type", new TimestampFormatInfo()), + new FieldInfo("sql_type", new TimestampFormatInfo())), + new FieldRelationShip(new FieldInfo("meta_ts", new TimestampFormatInfo()), + new FieldInfo("meta_ts", new TimestampFormatInfo())), + new FieldRelationShip(new FieldInfo("up_before", new TimestampFormatInfo()), + new FieldInfo("up_before", new TimestampFormatInfo())) + ); + return new KafkaLoadNode("4", "kafka_output2", fields, relations, + null, "topic2", "localhost:9092", + new CanalJsonFormat(), null, null, "id"); - return node; } public NodeRelationShip buildNodeRelation(List inputs, List outputs) { @@ -152,6 +265,7 @@ public NodeRelationShip buildNodeRelation(List inputs, List outputs) /** * Test meta field sync test + * It contains mysql cdc to kafka canal-json, kafka canal-json to kafka canal-json test * * @throws Exception The exception may throws when execute the case */ @@ -166,11 +280,19 @@ public void testMetaFieldSyncTest() throws Exception { env.setParallelism(1); env.enableCheckpointing(10000); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); - Node inputNode = buildMySQLExtractNode(); - Node outputNode = buildKafkaNode(); - StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode), - Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode), - Collections.singletonList(outputNode)))); + Node mysqlInputNode = buildMySQLExtractNode(); + Node kafkaOutputNode = buildKafkaLoadNode(); + Node kafkaInputNode = buildKafkaExtractNode(); + Node kafkaOutputNode2 = buildKafkaLoadNode2(); + StreamInfo streamInfo = new StreamInfo("1", + Arrays.asList(mysqlInputNode, kafkaInputNode, kafkaOutputNode, kafkaOutputNode2), + Arrays.asList( + buildNodeRelation(Collections.singletonList(mysqlInputNode), + Collections.singletonList(kafkaOutputNode)), + buildNodeRelation(Collections.singletonList(kafkaInputNode), + Collections.singletonList(kafkaOutputNode2)) + ) + ); GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo)); FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo); FlinkSqlParseResult result = parser.parse();