diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt
index c39986a2ab..3396ec5eee 100644
--- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt
+++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt
@@ -25,7 +25,7 @@ write zilla:begin.ext ${kafka:beginEx()
                                        .name("dev.cities")
                                        .partitionCount(1)
                                        .replicas(1)
-                                       .config("cleanup.policy", "compact")
+                                       .config("cleanup.policy", "delete")
                                        .build()
                                     .timeout(30000)
                                     .validateOnly("false")
diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt
index 2504756e81..8e4570a35b 100644
--- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt
+++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt
@@ -29,7 +29,7 @@ read zilla:begin.ext ${kafka:matchBeginEx()
                                        .name("dev.cities")
                                        .partitionCount(1)
                                        .replicas(1)
-                                       .config("cleanup.policy", "compact")
+                                       .config("cleanup.policy", "delete")
                                        .build()
                                    .timeout(30000)
                                    .validateOnly("false")
diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaAvroSchemaTemplate.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaAvroSchemaTemplate.java
index 03a64ea96c..3e17017ba5 100644
--- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaAvroSchemaTemplate.java
+++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaAvroSchemaTemplate.java
@@ -21,15 +21,15 @@ protected String convertPgsqlTypeToAvro(
     {
         return switch (pgsqlType.toLowerCase())
         {
-        case "varchar", "text", "char", "bpchar" -> // Blank-padded char in PG
-            "string";
-        case "int", "integer", "serial" -> "int";
-        case "bigint", "bigserial" -> "long";
-        case "boolean", "bool" -> "boolean";
-        case "real", "float4" -> "float";
-        case "double precision", "float8" -> "double"; // Timestamp with time zone
-        case "timestamp", "timestamptz", "date", "time" ->
-            "timestamp-millis"; // Avro logical type for date/time values
+        case "varchar", "text", "char", "bpchar" -> "\\\"string\\\"";
+        case "int", "integer", "serial" -> "\\\"int\\\"";
+        case "numeric" -> "\\\"double\\\"";
+        case "bigint", "bigserial" -> "\\\"long\\\"";
+        case "boolean", "bool" -> "\\\"boolean\\\"";
+        case "real", "float4" -> "\\\"float\\\"";
+        case "double", "double precision", "float8" -> "\\\"double\\\"";
+        case "timestamp", "timestampz", "date", "time" ->
+            "{ \\\"type\\\": \\\"long\\\", \\\"logicalTyp\\\": \\\"timestamp-millis\\\" }";
         default -> null;
         };
     }
diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java
index c92a3319cb..cecef1e11f 100644
--- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java
+++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java
@@ -60,7 +60,7 @@ public String generateSchema(
             String avroType = convertPgsqlTypeToAvro(pgsqlType);
 
             schemaBuilder.append(" {\\\"name\\\": \\\"").append(fieldName).append("\\\",");
-            schemaBuilder.append(" \\\"type\\\": [\\\"").append(avroType).append("\\\", \\\"null\\\"] },");
+            schemaBuilder.append(" \\\"type\\\": [").append(avroType).append(", \\\"null\\\"] },");
         }
 
         // Remove the last comma and close the fields array
diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java
index a6a712fe86..fe53279963 100644
--- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java
+++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java
@@ -60,7 +60,7 @@ public String generateSchema(
             String avroType = convertPgsqlTypeToAvro(pgsqlType);
 
             schemaBuilder.append(" {\\\"name\\\": \\\"").append(fieldName).append("\\\",");
-            schemaBuilder.append(" \\\"type\\\": \\\"").append(avroType).append("\\\"},");
+            schemaBuilder.append(" \\\"type\\\": ").append(avroType).append("},");
         }
 
         // Remove the last comma and close the fields array
diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java
index e173ce5ad6..d7ca9e8180 100644
--- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java
+++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java
@@ -1256,6 +1256,7 @@ else if (server.commandsProcessed == 0)
 
             final PgsqlKafkaBindingConfig binding = server.binding;
             final String primaryKey = binding.avroValueSchema.primaryKey(createTable);
+            final int primaryKeyCount = binding.avroValueSchema.primaryKeyCount(createTable);
 
             int versionId = NO_ERROR_SCHEMA_VERSION_ID;
             if (primaryKey != null)
@@ -1263,7 +1264,6 @@ else if (server.commandsProcessed == 0)
                 //TODO: assign versionId to avoid test failure
                 final String subjectKey = String.format("%s.%s-key", server.database, topic);
 
-                final int primaryKeyCount = binding.avroValueSchema.primaryKeyCount(createTable);
                 String keySchema = primaryKeyCount > 1
                     ? binding.avroKeySchema.generateSchema(server.database, createTable)
                     : AVRO_KEY_SCHEMA;
@@ -1278,7 +1278,7 @@ else if (server.commandsProcessed == 0)
 
             if (versionId != NO_VERSION_ID)
             {
-                final String policy = primaryKey != null
+                final String policy = primaryKey != null && primaryKeyCount == 1
                     ? "compact"
                     : "delete";
 
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt
index 3da60e9f42..ac1ef564cb 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt
@@ -154,10 +154,10 @@ write "CREATE SINK distinct_cities_sink\n"
       "   connector='kafka',\n"
       "   properties.bootstrap.server='localhost:9092',\n"
       "   topic='dev.distinct_cities',\n"
-      "   primary_key='city'\n"
+      "   primary_key='id'\n"
       ") FORMAT UPSERT ENCODE AVRO (\n"
       "   schema.registry='http://localhost:8081'\n"
-      ");"
+      ") KEY ENCODE TEXT;"
      [0x00]
 write flush
 
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt
index b0e6c2c687..102aeed230 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt
@@ -162,10 +162,10 @@ read "CREATE SINK distinct_cities_sink\n"
      "   connector='kafka',\n"
      "   properties.bootstrap.server='localhost:9092',\n"
      "   topic='dev.distinct_cities',\n"
-     "   primary_key='city'\n"
+     "   primary_key='id'\n"
      ") FORMAT UPSERT ENCODE AVRO (\n"
      "   schema.registry='http://localhost:8081'\n"
-     ");"
+     ") KEY ENCODE TEXT;"
      [0x00]
 
 write advise zilla:flush ${pgsql:flushEx()
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt
index e702643dc8..f9451bc7bb 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt
@@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx()
                               .build()}
 write "CREATE TABLE IF NOT EXISTS cities (\n"
       "    *,\n"
-      "    PRIMARY KEY (key)\n"
+      "    PRIMARY KEY (id)\n"
       ")\n"
       "INCLUDE KEY AS key\n"
       "INCLUDE header 'zilla:correlation-id' AS correlation_id\n"
@@ -46,7 +46,7 @@ write "CREATE TABLE IF NOT EXISTS cities (\n"
       "   topic='dev.cities',\n"
       "   scan.startup.mode='latest',\n"
       "   scan.startup.timestamp.millis='140000000'\n"
-      ") FORMAT UPSERT ENCODE AVRO (\n"
+      ") FORMAT PLAIN ENCODE AVRO (\n"
       "   schema.registry = 'http://localhost:8081'\n"
       ");"
       [0x00]
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt
index 00c261f9db..189026d000 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt
@@ -38,7 +38,7 @@ read zilla:data.ext ${pgsql:dataEx()
                               .build()}
 read "CREATE TABLE IF NOT EXISTS cities (\n"
      "    *,\n"
-     "    PRIMARY KEY (key)\n"
+     "    PRIMARY KEY (id)\n"
      ")\n"
      "INCLUDE KEY AS key\n"
      "INCLUDE header 'zilla:correlation-id' AS correlation_id\n"
@@ -50,7 +50,7 @@ read "CREATE TABLE IF NOT EXISTS cities (\n"
      "   topic='dev.cities',\n"
      "   scan.startup.mode='latest',\n"
      "   scan.startup.timestamp.millis='140000000'\n"
-     ") FORMAT UPSERT ENCODE AVRO (\n"
+     ") FORMAT PLAIN ENCODE AVRO (\n"
      "   schema.registry = 'http://localhost:8081'\n"
      ");"
      [0x00]
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt
index be0c185f67..be60e5fe79 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt
@@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx()
                               .build()}
 write "CREATE TABLE IF NOT EXISTS cities (\n"
       "    *,\n"
-      "    PRIMARY KEY (key)\n"
+      "    PRIMARY KEY (id)\n"
       ")\n"
       "INCLUDE KEY AS key\n"
       "WITH (\n"
@@ -43,7 +43,7 @@ write "CREATE TABLE IF NOT EXISTS cities (\n"
       "   topic='dev.cities',\n"
       "   scan.startup.mode='latest',\n"
       "   scan.startup.timestamp.millis='140000000'\n"
-      ") FORMAT UPSERT ENCODE AVRO (\n"
+      ") FORMAT PLAIN ENCODE AVRO (\n"
       "   schema.registry = 'http://localhost:8081'\n"
       ");"
       [0x00]
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt
index 7554c745e1..8d2121373d 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt
@@ -38,7 +38,7 @@ read zilla:data.ext ${pgsql:dataEx()
                               .build()}
 read "CREATE TABLE IF NOT EXISTS cities (\n"
      "    *,\n"
-     "    PRIMARY KEY (key)\n"
+     "    PRIMARY KEY (id)\n"
      ")\n"
      "INCLUDE KEY AS key\n"
      "WITH (\n"
@@ -47,7 +47,7 @@ read "CREATE TABLE IF NOT EXISTS cities (\n"
      "   topic='dev.cities',\n"
      "   scan.startup.mode='latest',\n"
      "   scan.startup.timestamp.millis='140000000'\n"
-     ") FORMAT UPSERT ENCODE AVRO (\n"
+     ") FORMAT PLAIN ENCODE AVRO (\n"
      "   schema.registry = 'http://localhost:8081'\n"
      ");"
      [0x00]
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt
index a2cb2b4ad2..ab484b7434 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt
@@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx()
                               .build()}
 write "CREATE TABLE IF NOT EXISTS cities (\n"
       "    *,\n"
-      "    PRIMARY KEY (key)\n"
+      "    PRIMARY KEY (id)\n"
       ")\n"
       "INCLUDE KEY AS key\n"
       "WITH (\n"
@@ -43,7 +43,7 @@ write "CREATE TABLE IF NOT EXISTS cities (\n"
       "   topic='dev.cities',\n"
       "   scan.startup.mode='latest',\n"
       "   scan.startup.timestamp.millis='140000000'\n"
-      ") FORMAT UPSERT ENCODE AVRO (\n"
+      ") FORMAT PLAIN ENCODE AVRO (\n"
       "   schema.registry = 'http://localhost:8081'\n"
       ");"
       [0x00]
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt
index 6596416bdd..39350568c4 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt
@@ -38,7 +38,7 @@ read zilla:data.ext ${pgsql:dataEx()
                               .build()}
 read "CREATE TABLE IF NOT EXISTS cities (\n"
      "    *,\n"
-     "    PRIMARY KEY (key)\n"
+     "    PRIMARY KEY (id)\n"
      ")\n"
      "INCLUDE KEY AS key\n"
      "WITH (\n"
@@ -47,7 +47,7 @@ read "CREATE TABLE IF NOT EXISTS cities (\n"
      "   topic='dev.cities',\n"
      "   scan.startup.mode='latest',\n"
      "   scan.startup.timestamp.millis='140000000'\n"
-     ") FORMAT UPSERT ENCODE AVRO (\n"
+     ") FORMAT PLAIN ENCODE AVRO (\n"
      "   schema.registry = 'http://localhost:8081'\n"
      ");"
      [0x00]
diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java
index 1c34fcba3c..b9ffc65efb 100644
--- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java
+++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java
@@ -27,11 +27,12 @@ public class RisingwaveCreateSinkTemplate extends RisingwaveCommandTemplate
         WITH (
            connector='kafka',
            properties.bootstrap.server='%s',
-           topic='%s.%s',
-           primary_key='%s'
+           topic='%s.%s'%s
         ) FORMAT UPSERT ENCODE AVRO (
            schema.registry='%s'
-        );\u0000""";
+        ) KEY ENCODE TEXT;\u0000""";
+
+    private final String primaryKeyFormat = ",\n   primary_key='%s'";
 
     private final String bootstrapServer;
     private final String schemaRegistry;
@@ -51,7 +52,13 @@ public String generate(
     {
         CreateView createView = (CreateView) statement;
         String viewName = createView.getView().getName();
-        String primaryKey = columns.keySet().iterator().next();
+
+        String textPrimaryKey = columns.entrySet().stream()
+            .filter(e -> e.getKey().toLowerCase().contains("id") && "character varying".equals(e.getValue()))
+            .map(Map.Entry::getKey)
+            .findFirst()
+            .orElse(null);
+        String primaryKey = textPrimaryKey != null ? primaryKeyFormat.formatted(textPrimaryKey) : "";
 
         return String.format(sqlFormat, viewName, viewName, bootstrapServer, database, viewName, primaryKey, schemaRegistry);
     }
diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java
index 8922b37cff..ac4d8507dc 100644
--- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java
+++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java
@@ -23,7 +23,7 @@ public class RisingwaveCreateTableTemplate extends RisingwaveCommandTemplate
     private final String sqlFormat = """
         CREATE TABLE IF NOT EXISTS %s (
             *,
-            PRIMARY KEY (key)
+            PRIMARY KEY (%s)
         )
         INCLUDE KEY AS key%s
         WITH (
@@ -32,7 +32,7 @@ PRIMARY KEY (key)
            topic='%s.%s',
            scan.startup.mode='latest',
            scan.startup.timestamp.millis='%d'
-        ) FORMAT UPSERT ENCODE AVRO (
+        ) FORMAT PLAIN ENCODE AVRO (
            schema.registry = '%s'
         );\u0000""";
 
@@ -57,6 +57,8 @@ public String generate(
         CreateTable createTable = command.createTable;
         String table = createTable.getTable().getName();
 
+        String primaryKey = primaryKey(createTable);
+
         includeBuilder.setLength(0);
         final Map<String, String> includes = command.includes;
         if (includes != null && !includes.isEmpty())
@@ -66,7 +68,7 @@ public String generate(
             includeBuilder.delete(includeBuilder.length() - 1, includeBuilder.length());
         }
 
-        return String.format(sqlFormat, table, includeBuilder, bootstrapServer, database,
+        return String.format(sqlFormat, table, primaryKey, includeBuilder, bootstrapServer, database,
             table, scanStartupMil, schemaRegistry);
     }
 }
diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java
index 54079411bd..516ceaffec 100644
--- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java
+++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java
@@ -31,6 +31,7 @@ public final class RisingwavePgsqlTypeMapping
         TYPE_MAPPINGS.put("timestamp without time zone", "TIMESTAMP");
         TYPE_MAPPINGS.put("timestamp with time zone", "TIMESTAMPZ");
         TYPE_MAPPINGS.put("double precision", "DOUBLE");
+        TYPE_MAPPINGS.put("numeric", "NUMERIC");
     }
 
     private RisingwavePgsqlTypeMapping()