diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt index c39986a2ab..3396ec5eee 100644 --- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt +++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt @@ -25,7 +25,7 @@ write zilla:begin.ext ${kafka:beginEx() .name("dev.cities") .partitionCount(1) .replicas(1) - .config("cleanup.policy", "compact") + .config("cleanup.policy", "delete") .build() .timeout(30000) .validateOnly("false") diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt index 2504756e81..8e4570a35b 100644 --- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt +++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt @@ -29,7 +29,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .name("dev.cities") .partitionCount(1) .replicas(1) - .config("cleanup.policy", "compact") + .config("cleanup.policy", "delete") .build() .timeout(30000) .validateOnly("false") diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaAvroSchemaTemplate.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaAvroSchemaTemplate.java index 03a64ea96c..3e17017ba5 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaAvroSchemaTemplate.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaAvroSchemaTemplate.java @@ -21,15 +21,15 @@ protected String convertPgsqlTypeToAvro( { return switch (pgsqlType.toLowerCase()) { - case "varchar", "text", "char", "bpchar" -> // Blank-padded char in PG - "string"; - case "int", "integer", "serial" -> "int"; - case "bigint", "bigserial" -> "long"; - case "boolean", "bool" -> "boolean"; - case "real", "float4" -> "float"; - case "double precision", "float8" -> "double"; // Timestamp with time zone - case "timestamp", "timestamptz", "date", "time" -> - "timestamp-millis"; // Avro logical type for date/time values + case "varchar", "text", "char", "bpchar" -> "\\\"string\\\""; + case "int", "integer", "serial" -> "\\\"int\\\""; + case "numeric" -> "\\\"double\\\""; + case "bigint", "bigserial" -> "\\\"long\\\""; + case "boolean", "bool" -> "\\\"boolean\\\""; + case "real", "float4" -> "\\\"float\\\""; + case "double", "double precision", "float8" -> "\\\"double\\\""; + case "timestamp", "timestampz", "date", "time" -> + "{ \\\"type\\\": \\\"long\\\", \\\"logicalTyp\\\": \\\"timestamp-millis\\\" }"; default -> null; }; } diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java index c92a3319cb..cecef1e11f 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java @@ -60,7 +60,7 @@ public String generateSchema( String avroType = convertPgsqlTypeToAvro(pgsqlType); schemaBuilder.append(" {\\\"name\\\": \\\"").append(fieldName).append("\\\","); - schemaBuilder.append(" \\\"type\\\": [\\\"").append(avroType).append("\\\", \\\"null\\\"] },"); + schemaBuilder.append(" \\\"type\\\": [").append(avroType).append(", \\\"null\\\"] },"); } // Remove the last comma and close the fields array diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java index a6a712fe86..fe53279963 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java @@ -60,7 +60,7 @@ public String generateSchema( String avroType = convertPgsqlTypeToAvro(pgsqlType); schemaBuilder.append(" {\\\"name\\\": \\\"").append(fieldName).append("\\\","); - schemaBuilder.append(" \\\"type\\\": \\\"").append(avroType).append("\\\"},"); + schemaBuilder.append(" \\\"type\\\": ").append(avroType).append("},"); } // Remove the last comma and close the fields array diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java index e173ce5ad6..d7ca9e8180 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java @@ -1256,6 +1256,7 @@ else if (server.commandsProcessed == 0) final PgsqlKafkaBindingConfig binding = server.binding; final String primaryKey = binding.avroValueSchema.primaryKey(createTable); + final int primaryKeyCount = binding.avroValueSchema.primaryKeyCount(createTable); int versionId = NO_ERROR_SCHEMA_VERSION_ID; if (primaryKey != null) @@ -1263,7 +1264,6 @@ else if (server.commandsProcessed == 0) //TODO: assign versionId to avoid test failure final String subjectKey = String.format("%s.%s-key", server.database, topic); - final int primaryKeyCount = binding.avroValueSchema.primaryKeyCount(createTable); String keySchema = primaryKeyCount > 1 ? binding.avroKeySchema.generateSchema(server.database, createTable) : AVRO_KEY_SCHEMA; @@ -1278,7 +1278,7 @@ else if (server.commandsProcessed == 0) if (versionId != NO_VERSION_ID) { - final String policy = primaryKey != null + final String policy = primaryKey != null && primaryKeyCount == 1 ? "compact" : "delete"; diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt index 3da60e9f42..ac1ef564cb 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt @@ -154,10 +154,10 @@ write "CREATE SINK distinct_cities_sink\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" " topic='dev.distinct_cities',\n" - " primary_key='city'\n" + " primary_key='id'\n" ") FORMAT UPSERT ENCODE AVRO (\n" " schema.registry='http://localhost:8081'\n" - ");" + ") KEY ENCODE TEXT;" [0x00] write flush diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt index b0e6c2c687..102aeed230 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt @@ -162,10 +162,10 @@ read "CREATE SINK distinct_cities_sink\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" " topic='dev.distinct_cities',\n" - " primary_key='city'\n" + " primary_key='id'\n" ") FORMAT UPSERT ENCODE AVRO (\n" " schema.registry='http://localhost:8081'\n" - ");" + ") KEY ENCODE TEXT;" [0x00] write advise zilla:flush ${pgsql:flushEx() diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt index e702643dc8..f9451bc7bb 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt @@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx() .build()} write "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" - " PRIMARY KEY (key)\n" + " PRIMARY KEY (id)\n" ")\n" "INCLUDE KEY AS key\n" "INCLUDE header 'zilla:correlation-id' AS correlation_id\n" @@ -46,7 +46,7 @@ write "CREATE TABLE IF NOT EXISTS cities (\n" " topic='dev.cities',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ") FORMAT UPSERT ENCODE AVRO (\n" + ") FORMAT PLAIN ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" [0x00] diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt index 00c261f9db..189026d000 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt @@ -38,7 +38,7 @@ read zilla:data.ext ${pgsql:dataEx() .build()} read "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" - " PRIMARY KEY (key)\n" + " PRIMARY KEY (id)\n" ")\n" "INCLUDE KEY AS key\n" "INCLUDE header 'zilla:correlation-id' AS correlation_id\n" @@ -50,7 +50,7 @@ read "CREATE TABLE IF NOT EXISTS cities (\n" " topic='dev.cities',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ") FORMAT UPSERT ENCODE AVRO (\n" + ") FORMAT PLAIN ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" [0x00] diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt index be0c185f67..be60e5fe79 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt @@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx() .build()} write "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" - " PRIMARY KEY (key)\n" + " PRIMARY KEY (id)\n" ")\n" "INCLUDE KEY AS key\n" "WITH (\n" @@ -43,7 +43,7 @@ write "CREATE TABLE IF NOT EXISTS cities (\n" " topic='dev.cities',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ") FORMAT UPSERT ENCODE AVRO (\n" + ") FORMAT PLAIN ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" [0x00] diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt index 7554c745e1..8d2121373d 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt @@ -38,7 +38,7 @@ read zilla:data.ext ${pgsql:dataEx() .build()} read "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" - " PRIMARY KEY (key)\n" + " PRIMARY KEY (id)\n" ")\n" "INCLUDE KEY AS key\n" "WITH (\n" @@ -47,7 +47,7 @@ read "CREATE TABLE IF NOT EXISTS cities (\n" " topic='dev.cities',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ") FORMAT UPSERT ENCODE AVRO (\n" + ") FORMAT PLAIN ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" [0x00] diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt index a2cb2b4ad2..ab484b7434 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt @@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx() .build()} write "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" - " PRIMARY KEY (key)\n" + " PRIMARY KEY (id)\n" ")\n" "INCLUDE KEY AS key\n" "WITH (\n" @@ -43,7 +43,7 @@ write "CREATE TABLE IF NOT EXISTS cities (\n" " topic='dev.cities',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ") FORMAT UPSERT ENCODE AVRO (\n" + ") FORMAT PLAIN ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" [0x00] diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt index 6596416bdd..39350568c4 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt @@ -38,7 +38,7 @@ read zilla:data.ext ${pgsql:dataEx() .build()} read "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" - " PRIMARY KEY (key)\n" + " PRIMARY KEY (id)\n" ")\n" "INCLUDE KEY AS key\n" "WITH (\n" @@ -47,7 +47,7 @@ read "CREATE TABLE IF NOT EXISTS cities (\n" " topic='dev.cities',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ") FORMAT UPSERT ENCODE AVRO (\n" + ") FORMAT PLAIN ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" [0x00] diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java index 1c34fcba3c..b9ffc65efb 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java @@ -27,11 +27,12 @@ public class RisingwaveCreateSinkTemplate extends RisingwaveCommandTemplate WITH ( connector='kafka', properties.bootstrap.server='%s', - topic='%s.%s', - primary_key='%s' + topic='%s.%s'%s ) FORMAT UPSERT ENCODE AVRO ( schema.registry='%s' - );\u0000"""; + ) KEY ENCODE TEXT;\u0000"""; + + private final String primaryKeyFormat = ",\n primary_key='%s'"; private final String bootstrapServer; private final String schemaRegistry; @@ -51,7 +52,13 @@ public String generate( { CreateView createView = (CreateView) statement; String viewName = createView.getView().getName(); - String primaryKey = columns.keySet().iterator().next(); + + String textPrimaryKey = columns.entrySet().stream() + .filter(e -> e.getKey().toLowerCase().contains("id") && "character varying".equals(e.getValue())) + .map(Map.Entry::getKey) + .findFirst() + .orElse(null); + String primaryKey = textPrimaryKey != null ? primaryKeyFormat.formatted(textPrimaryKey) : ""; return String.format(sqlFormat, viewName, viewName, bootstrapServer, database, viewName, primaryKey, schemaRegistry); } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java index 8922b37cff..ac4d8507dc 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java @@ -23,7 +23,7 @@ public class RisingwaveCreateTableTemplate extends RisingwaveCommandTemplate private final String sqlFormat = """ CREATE TABLE IF NOT EXISTS %s ( *, - PRIMARY KEY (key) + PRIMARY KEY (%s) ) INCLUDE KEY AS key%s WITH ( @@ -32,7 +32,7 @@ PRIMARY KEY (key) topic='%s.%s', scan.startup.mode='latest', scan.startup.timestamp.millis='%d' - ) FORMAT UPSERT ENCODE AVRO ( + ) FORMAT PLAIN ENCODE AVRO ( schema.registry = '%s' );\u0000"""; @@ -57,6 +57,8 @@ public String generate( CreateTable createTable = command.createTable; String table = createTable.getTable().getName(); + String primaryKey = primaryKey(createTable); + includeBuilder.setLength(0); final Map<String, String> includes = command.includes; if (includes != null && !includes.isEmpty()) @@ -66,7 +68,7 @@ public String generate( includeBuilder.delete(includeBuilder.length() - 1, includeBuilder.length()); } - return String.format(sqlFormat, table, includeBuilder, bootstrapServer, database, + return String.format(sqlFormat, table, primaryKey, includeBuilder, bootstrapServer, database, table, scanStartupMil, schemaRegistry); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java index 54079411bd..516ceaffec 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java @@ -31,6 +31,7 @@ public final class RisingwavePgsqlTypeMapping TYPE_MAPPINGS.put("timestamp without time zone", "TIMESTAMP"); TYPE_MAPPINGS.put("timestamp with time zone", "TIMESTAMPZ"); TYPE_MAPPINGS.put("double precision", "DOUBLE"); + TYPE_MAPPINGS.put("numeric", "NUMERIC"); } private RisingwavePgsqlTypeMapping()