diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessor.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessor.java index 17246a5854ca0..93917d9752b9c 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessor.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessor.java @@ -10,6 +10,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.airbyte.commons.json.Jsons; +import io.airbyte.validation.json.JsonSchemaValidator; import java.util.List; import java.util.Optional; import org.slf4j.Logger; @@ -17,10 +18,12 @@ public class JsonSecretsProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(JsonSecretsProcessor.class); + public static String AIRBYTE_SECRET_FIELD = "airbyte_secret"; public static final String PROPERTIES_FIELD = "properties"; - private static final Logger LOGGER = LoggerFactory.getLogger(JsonSecretsProcessor.class); + private static final JsonSchemaValidator VALIDATOR = new JsonSchemaValidator(); @VisibleForTesting static String SECRETS_MASK = "**********"; @@ -118,8 +121,16 @@ public JsonNode copySecrets(final JsonNode src, final JsonNode dst, final JsonNo if (src.has(key)) { final var arrayNode = (ArrayNode) fieldSchema.get(combinationKey.get()); for (int i = 0; i < arrayNode.size(); i++) { - // Absorb field values if any of the combination option is declaring it as secrets - combinationCopy = copySecrets(src.get(key), combinationCopy, arrayNode.get(i)); + final JsonNode childSchema = arrayNode.get(i); + /* + * when traversing a oneOf or anyOf if multiple schema in the oneOf or anyOf have the SAME key, but + * a different type, then, without this test, we can try to apply the wrong schema to the object + * resulting in errors because of type mismatches. + */ + if (VALIDATOR.test(childSchema, combinationCopy)) { + // Absorb field values if any of the combination option is declaring it as secrets + combinationCopy = copySecrets(src.get(key), combinationCopy, childSchema); + } } } dstCopy.set(key, combinationCopy); diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessorTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessorTest.java index 7bc26ef5ef416..7678024f8c7f1 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessorTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessorTest.java @@ -62,6 +62,97 @@ public class JsonSecretsProcessorTest { + " }\n" + " }"); + private static final JsonNode ONE_OF_WITH_SAME_KEY_IN_SUB_SCHEMAS = Jsons.deserialize( + "{\n" + + " \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n" + + " \"title\": \"S3 Destination Spec\",\n" + + " \"type\": \"object\",\n" + + " \"required\": [\n" + + " \"client_id\",\n" + + " \"format\"\n" + + " ],\n" + + " \"additionalProperties\": false,\n" + + " \"properties\": {\n" + + " \"client_id\": {\n" + + " \"title\": \"client it\",\n" + + " \"type\": \"string\",\n" + + " \"default\": \"\"\n" + + " },\n" + + " \"format\": {\n" + + " \"title\": \"Output Format\",\n" + + " \"type\": \"object\",\n" + + " \"description\": \"Output data format\",\n" + + " \"oneOf\": [\n" + + " {\n" + + " \"title\": \"Avro: Apache Avro\",\n" + + " \"required\": [\"format_type\", \"compression_codec\"],\n" + + " \"properties\": {\n" + + " \"format_type\": {\n" + + " \"type\": \"string\",\n" + + " \"enum\": [\"Avro\"],\n" + + " \"default\": \"Avro\"\n" + + " },\n" + + " \"compression_codec\": {\n" + + " \"title\": \"Compression Codec\",\n" + + " \"description\": \"The compression algorithm used to compress data. Default to no compression.\",\n" + + " \"type\": \"object\",\n" + + " \"oneOf\": [\n" + + " {\n" + + " \"title\": \"no compression\",\n" + + " \"required\": [\"codec\"],\n" + + " \"properties\": {\n" + + " \"codec\": {\n" + + " \"type\": \"string\",\n" + + " \"enum\": [\"no compression\"],\n" + + " \"default\": \"no compression\"\n" + + " }\n" + + " }\n" + + " },\n" + + " {\n" + + " \"title\": \"Deflate\",\n" + + " \"required\": [\"codec\", \"compression_level\"],\n" + + " \"properties\": {\n" + + " \"codec\": {\n" + + " \"type\": \"string\",\n" + + " \"enum\": [\"Deflate\"],\n" + + " \"default\": \"Deflate\"\n" + + " },\n" + + " \"compression_level\": {\n" + + " \"type\": \"integer\",\n" + + " \"default\": 0,\n" + + " \"minimum\": 0,\n" + + " \"maximum\": 9\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " {\n" + + " \"title\": \"Parquet: Columnar Storage\",\n" + + " \"required\": [\"format_type\"],\n" + + " \"properties\": {\n" + + " \"format_type\": {\n" + + " \"type\": \"string\",\n" + + " \"enum\": [\"Parquet\"],\n" + + " \"default\": \"Parquet\"\n" + + " },\n" + + " \"compression_codec\": {\n" + + " \"type\": \"string\",\n" + + " \"enum\": [\n" + + " \"UNCOMPRESSED\",\n" + + " \"GZIP\"\n" + + " ],\n" + + " \"default\": \"UNCOMPRESSED\"\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " }\n" + + " }"); + JsonSecretsProcessor processor = new JsonSecretsProcessor(); @Test @@ -228,4 +319,26 @@ public void testCopySecretNotInSrcInnerObject() { assertEquals(expected, actual); } + // test the case where multiple sub schemas of a oneOf contain the same key but a different type. + @Test + void testHandlesSameKeyInOneOf() { + final JsonNode compressionCodecObject = Jsons.jsonNode(ImmutableMap.of( + "codec", "no compression")); + final JsonNode avroConfig = Jsons.jsonNode(ImmutableMap.of( + "format_type", "Avro", + "compression_codec", compressionCodecObject)); + final JsonNode src = Jsons.jsonNode(ImmutableMap.of( + "client_id", "whatever", + "format", avroConfig)); + + final JsonNode parquetConfig = Jsons.jsonNode(ImmutableMap.of( + "format_type", "Parquet", + "compression_codec", "GZIP")); + final JsonNode dst = Jsons.jsonNode(ImmutableMap.of( + "client_id", "whatever", + "format", parquetConfig)); + + final JsonNode actual = new JsonSecretsProcessor().copySecrets(src, dst, ONE_OF_WITH_SAME_KEY_IN_SUB_SCHEMAS); + } + }