Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unchanged status on schemas #344

Merged
merged 10 commits into from
Jan 21, 2024
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ group = "com.michelin.ns4kafka"

repositories {
mavenCentral()
maven {
url "https://packages.confluent.io/maven"
}
}

dependencies {
Expand All @@ -34,6 +37,8 @@ dependencies {
implementation("io.swagger.core.v3:swagger-annotations")
implementation("jakarta.annotation:jakarta.annotation-api")
implementation("jakarta.validation:jakarta.validation-api")
implementation('io.confluent:kafka-schema-registry-client:7.5.1')


compileOnly("org.projectlombok:lombok")
compileOnly("com.google.code.findbugs:jsr305") // https://github.com/micronaut-projects/micronaut-core/pull/5691
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.michelin.ns4kafka.services.SchemaService;
import com.michelin.ns4kafka.utils.enums.ApplyStatus;
import com.michelin.ns4kafka.utils.exceptions.ResourceValidationException;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.annotation.Body;
Expand Down Expand Up @@ -100,52 +101,63 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
}

return schemaService
.validateSchemaCompatibility(ns.getMetadata().getCluster(), schema)
.flatMap(validationErrors -> {
if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(validationErrors, schema.getKind(),
schema.getMetadata().getName()));
}
.getLatestSubject(ns, schema.getMetadata().getName())
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(latestSubjectOptional -> {
if (latestSubjectOptional.isPresent()) {
var newSchema = new AvroSchema(schema.getSpec().getSchema());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a constructor for AvroSchema that takes schema references, here you take a risk to compare without references later in canonicalString().
It was done automatically on the registry side before

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point @twobeeb. Doing the comparison without the potential references is incorrect.

var actualSchema = new AvroSchema(latestSubjectOptional.get().getSpec().getSchema());

return schemaService
.getLatestSubject(ns, schema.getMetadata().getName())
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(latestSubjectOptional -> {
schema.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
schema.getMetadata().setCluster(ns.getMetadata().getCluster());
schema.getMetadata().setNamespace(ns.getMetadata().getName());
latestSubjectOptional.ifPresent(
value -> schema.getSpec().setCompatibility(value.getSpec().getCompatibility()));

if (dryrun) {
// Cannot compute the "unchanged" apply status before getting the ID at registration
return Mono.just(formatHttpResponse(schema,
latestSubjectOptional.isPresent() ? ApplyStatus.changed : ApplyStatus.created));
if (newSchema.canonicalString().equals(actualSchema.canonicalString())) {
return Mono.just(formatHttpResponse(schema, ApplyStatus.unchanged));
}

return schemaService
.register(ns, schema)
.map(id -> {
ApplyStatus status;

if (latestSubjectOptional.isEmpty()) {
status = ApplyStatus.created;
sendEventLog(schema.getKind(), schema.getMetadata(), status, null,
schema.getSpec());
} else if (id > latestSubjectOptional.get().getSpec().getId()) {
status = ApplyStatus.changed;
sendEventLog(schema.getKind(), schema.getMetadata(), status,
latestSubjectOptional.get().getSpec(),
schema.getSpec());
} else {
status = ApplyStatus.unchanged;
schema.getSpec().setCompatibility(latestSubjectOptional.get().getSpec().getCompatibility());
}

return schemaService
.validateSchemaCompatibility(ns.getMetadata().getCluster(), schema)
.flatMap(validationErrors -> {
if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(
validationErrors,
schema.getKind(),
schema.getMetadata().getName()));
}

schema.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
schema.getMetadata().setCluster(ns.getMetadata().getCluster());
schema.getMetadata().setNamespace(ns.getMetadata().getName());

if (dryrun) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this:

if (dryrun) {
  return Mono.just(formatHttpResponse(schema,
          latestSubjectOptional.isPresent()
                  ? ApplyStatus.changed : ApplyStatus.created));
}
  
return schemaService
      .register(ns, schema)
      .map(id -> {
          ApplyStatus status;
  
          if (latestSubjectOptional.isEmpty()) {
              status = ApplyStatus.created;
              sendEventLog(schema.getKind(), schema.getMetadata(), status, null,
                  schema.getSpec());
          } else if (id > latestSubjectOptional.get().getSpec().getId()) {
              status = ApplyStatus.changed;
              sendEventLog(schema.getKind(), schema.getMetadata(), status,
                  latestSubjectOptional.get().getSpec(),
                  schema.getSpec());
          } else {
              status = ApplyStatus.unchanged;
          }
  
          return formatHttpResponse(schema, status);
      });
});

could be now simplified to this:

ApplyStatus status = latestSubjectOptional.isPresent() ? ApplyStatus.changed : ApplyStatus.created;
if (dryrun) {
    return Mono.just(formatHttpResponse(schema, status);
}

return schemaService
    .register(ns, schema)
    .map(id -> formatHttpResponse(schema, status));

Since we now use the canonical string to compute the status, we do not need the schema registry response for the final HTTP response, what do you think ? I'm gonna try this out

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of reapplying a schema v1 that already has been deployed, the schema registry will return the ID of the schema v1 without any change (unchanged status). In this case, Ns4Kafka will return "changed"

The previous implementation will return "unchanged" as expected, except in dry-run mode that will return "changed" 😅

The "newSchema" parameter needs to be different from all schemas version to be considered "changed", not only the latest one

return Mono.just(formatHttpResponse(schema,
latestSubjectOptional.isPresent()
? ApplyStatus.changed : ApplyStatus.created));
}

return formatHttpResponse(schema, status);
return schemaService
.register(ns, schema)
.map(id -> {
ApplyStatus status;

if (latestSubjectOptional.isEmpty()) {
status = ApplyStatus.created;
sendEventLog(schema.getKind(), schema.getMetadata(), status, null,
schema.getSpec());
} else if (id > latestSubjectOptional.get().getSpec().getId()) {
status = ApplyStatus.changed;
sendEventLog(schema.getKind(), schema.getMetadata(), status,
latestSubjectOptional.get().getSpec(),
schema.getSpec());
} else {
status = ApplyStatus.unchanged;
}

return formatHttpResponse(schema, status);
});
});
});
});
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -77,18 +78,19 @@ void applyCreated() {
void applyChanged() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
Schema schemaV2 = buildSchemaV2();

when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace));
when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true);
when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Mono.just(List.of()));
when(schemaService.validateSchemaCompatibility("local", schemaV2)).thenReturn(Mono.just(List.of()));
when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName()))
.thenReturn(Mono.just(schema));
when(schemaService.register(namespace, schema)).thenReturn(Mono.just(2));
when(schemaService.register(namespace, schemaV2)).thenReturn(Mono.just(2));
when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
doNothing().when(applicationEventPublisher).publishEvent(any());

StepVerifier.create(schemaController.apply("myNamespace", schema, false))
StepVerifier.create(schemaController.apply("myNamespace", schemaV2, false))
.consumeNextWith(response -> {
assertEquals("changed", response.header("X-Ns4kafka-Result"));
assertTrue(response.getBody().isPresent());
Expand All @@ -104,9 +106,7 @@ void applyUnchanged() {

when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace));
when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true);
when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Mono.just(List.of()));
when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())).thenReturn(Mono.just(schema));
when(schemaService.register(namespace, schema)).thenReturn(Mono.just(1));

StepVerifier.create(schemaController.apply("myNamespace", schema, false))
.consumeNextWith(response -> {
Expand Down Expand Up @@ -180,30 +180,34 @@ void applyDryRunCreated() {
void applyDryRunChanged() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
Schema schemaV2 = buildSchemaV2();

when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace));
when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true);
when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Mono.just(List.of()));
when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName()))
.thenReturn(true);
when(schemaService.validateSchemaCompatibility("local", schemaV2)).thenReturn(Mono.just(List.of()));
when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())).thenReturn(Mono.just(schema));

StepVerifier.create(schemaController.apply("myNamespace", schema, true))
StepVerifier.create(schemaController.apply("myNamespace", schemaV2, true))
.consumeNextWith(response -> {
assertEquals("changed", response.header("X-Ns4kafka-Result"));
assertTrue(response.getBody().isPresent());
assertEquals("prefix.subject-value", response.getBody().get().getMetadata().getName());
})
.verifyComplete();

verify(schemaService, never()).register(namespace, schema);
verify(schemaService, never()).register(namespace, schemaV2);
}

@Test
void applyDryRunNotCompatible() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
Schema schemaV2 = buildSchemaV2();

when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace));
when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true);
when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())).thenReturn(Mono.just(schemaV2));
when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(
Mono.just(List.of("Not compatible")));

Expand Down Expand Up @@ -446,6 +450,28 @@ private Schema buildSchema() {
.build();
}

private Schema buildSchemaV2() {
return Schema.builder()
.metadata(ObjectMeta.builder()
.name("prefix.subject-value")
.build())
.spec(Schema.SchemaSpec.builder()
.id(1)
.version(2)
.schema(
"{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\","
+ "\"name\":\"PersonAvro\""
+ ",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"],\"default\":null,"
+ "\"doc\":\"First name of the person\"},{\"name\":\"lastName\",\"type\":[\"null\","
+ "\"string\"],\"default\":null,\"doc\":\"Last name of the person\"},"
+ "{\"name\":\"dateOfBirth\",\"type\":[\"null\",{\"type\":\"long\","
+ "\"logicalType\":\"timestamp-millis\"}],\"default\":null,"
+ "\"doc\":\"Date of birth of the person\"},{\"name\":\"birthPlace\",\"type\":[\"null\","
+ "\"string\"],\"default\":null,\"doc\":\"Place of birth\"}]}")
.build())
.build();
}

private SchemaList buildSchemaList() {
return SchemaList.builder()
.metadata(ObjectMeta.builder()
Expand Down
98 changes: 98 additions & 0 deletions src/test/java/com/michelin/ns4kafka/integration/SchemaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import com.michelin.ns4kafka.models.schema.SchemaList;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityResponse;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaResponse;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Property;
import io.micronaut.core.type.Argument;
Expand Down Expand Up @@ -458,4 +460,100 @@ void registerSchema() {

assertEquals(HttpStatus.NOT_FOUND, getException.getStatus());
}

@Test
void registerSameSchemaTwice() {
Schema schema = Schema.builder()
.metadata(ObjectMeta.builder()
.name("ns1-subject3-value")
.build())
.spec(Schema.SchemaSpec.builder()
.schema(
"{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\","
+ "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"],"
+ "\"default\":null,\"doc\":\"First name of the person\"},"
+ "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null,"
+ "\"doc\":\"Last name of the person\"},{\"name\":\"dateOfBirth\",\"type\":[\"null\","
+ "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null,"
+ "\"doc\":\"Date of birth of the person\"}]}")
.build())
.build();

// Apply schema
var createResponse =
ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/schemas")
.bearerAuth(token)
.body(schema), Schema.class);

assertEquals("created", createResponse.header("X-Ns4kafka-Result"));

// Get all schemas
var getResponse =
ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.GET, "/api/namespaces/ns1/schemas")
.bearerAuth(token), Argument.listOf(SchemaList.class));

assertTrue(getResponse.getBody().isPresent());
assertTrue(getResponse.getBody().get()
.stream()
.anyMatch(schemaList -> schemaList.getMetadata().getName().equals("ns1-subject3-value")));

// Apply the same schema with swapped fields
Schema sameSchemaWithSwappedFields = Schema.builder()
.metadata(ObjectMeta.builder()
.name("ns1-subject3-value")
.build())
.spec(Schema.SchemaSpec.builder()
.schema(
"{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\","
+ "\"name\":\"PersonAvro\",\"fields\":[ {\"name\":\"lastName\",\"type\":[\"null\",\"string\"],"
+ "\"default\":null, \"doc\":\"Last name of the person\"},"
+ "{\"name\":\"firstName\",\"type\":[\"null\",\"string\"], \"default\":null,"
+ "\"doc\":\"First name of the person\"}, {\"name\":\"dateOfBirth\",\"type\":[\"null\","
+ "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null,"
+ "\"doc\":\"Date of birth of the person\"}]}")
.build())
.build();

var createSwappedFieldsResponse =
ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/schemas")
.bearerAuth(token)
.body(sameSchemaWithSwappedFields), Schema.class);

// Expects a new version
assertEquals("changed", createSwappedFieldsResponse.header("X-Ns4kafka-Result"));

// Get the latest version to check if we are on v2
SchemaResponse schemaAfterApply = schemaRegistryClient.toBlocking()
.retrieve(HttpRequest.GET("/subjects/ns1-subject3-value/versions/latest"),
SchemaResponse.class);

Assertions.assertNotNull(schemaAfterApply.id());
assertEquals(2, schemaAfterApply.version());
assertEquals("ns1-subject3-value", schemaAfterApply.subject());

// Apply again the schema with swapped fields
var createAgainResponse =
ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/schemas")
.bearerAuth(token)
.body(sameSchemaWithSwappedFields), Schema.class);

// Expects no new schema version but an unchanged status
assertEquals("unchanged", createAgainResponse.header("X-Ns4kafka-Result"));

// Apply again with SR client to be sure that
RegisterSchemaRequest request = new RegisterSchemaRequest();
request.setSchema(sameSchemaWithSwappedFields.getSpec().getSchema());

schemaRegistryClient.toBlocking()
.exchange(HttpRequest.create(HttpMethod.POST, "/subjects/ns1-subject3-value/versions")
.body(request), RegisterSchemaResponse.class);

SchemaResponse schemaAfterPostOnRegistry = schemaRegistryClient.toBlocking()
.retrieve(HttpRequest.GET("/subjects/ns1-subject3-value/versions/latest"),
SchemaResponse.class);

Assertions.assertNotNull(schemaAfterPostOnRegistry.id());
assertEquals(2, schemaAfterPostOnRegistry.version());
assertEquals("ns1-subject3-value", schemaAfterPostOnRegistry.subject());
}
}