Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unchanged status on schemas #344

Merged
merged 10 commits into from
Jan 21, 2024
48 changes: 48 additions & 0 deletions .docker/resources/user/schema-refs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
---
apiVersion: v1
kind: Schema
metadata:
name: abc.header-value
spec:
schema: |
{
"namespace": "io.github.michelin.ns4kafka.avro",
"type": "record",
"name": "KafkaHeader",
"fields": [
{
"name": "id",
"type": "long"
}
]
}
---
apiVersion: v1
kind: Schema
metadata:
name: abc.person-value
spec:
schema: |
{
"namespace": "io.github.michelin.ns4kafka.avro",
"type": "record",
"name": "KafkaPerson",
"fields": [
{
"name": "header",
"type": "io.github.michelin.ns4kafka.avro.KafkaHeader"
},
{
"name": "firstName",
"type": "string"
},
{
"name": "lastName",
"type": "string"
}
]
}
references:
- name: io.github.michelin.ns4kafka.avro.KafkaHeader
subject: abc.header-value
version: 1
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ group = "com.michelin.ns4kafka"

repositories {
mavenCentral()
maven {
url "https://packages.confluent.io/maven"
}
}

dependencies {
Expand All @@ -34,6 +37,8 @@ dependencies {
implementation("io.swagger.core.v3:swagger-annotations")
implementation("jakarta.annotation:jakarta.annotation-api")
implementation("jakarta.validation:jakarta.validation-api")
implementation('io.confluent:kafka-schema-registry-client:7.5.1')


compileOnly("org.projectlombok:lombok")
compileOnly("com.google.code.findbugs:jsr305") // https://github.com/micronaut-projects/micronaut-core/pull/5691
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

// Validate ownership
if (!connectorService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName())) {
return Mono.error(new ResourceValidationException(
List.of(String.format(NAMESPACE_NOT_OWNER, connector.getMetadata().getName())),
Expand Down
106 changes: 55 additions & 51 deletions src/main/java/com/michelin/ns4kafka/controllers/SchemaController.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.michelin.ns4kafka.services.SchemaService;
import com.michelin.ns4kafka.utils.enums.ApplyStatus;
import com.michelin.ns4kafka.utils.exceptions.ResourceValidationException;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.annotation.Body;
Expand All @@ -25,8 +26,10 @@
import jakarta.inject.Inject;
import jakarta.validation.Valid;
import java.time.Instant;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -38,6 +41,8 @@
@Controller(value = "/api/namespaces/{namespace}/schemas")
@ExecuteOn(TaskExecutors.IO)
public class SchemaController extends NamespacedResourceController {
private static final String NAMESPACE_NOT_OWNER = "Namespace not owner of this schema %s.";

@Inject
SchemaService schemaService;

Expand Down Expand Up @@ -83,66 +88,65 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

// Validate TopicNameStrategy
// https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/subject/TopicNameStrategy.java
if (!schema.getMetadata().getName().endsWith("-key") && !schema.getMetadata().getName().endsWith("-value")) {
return Mono.error(
new ResourceValidationException(List.of("Invalid value " + schema.getMetadata().getName()
+ " for name: subject must end with -key or -value"), schema.getKind(),
schema.getMetadata().getName()));
}

// Validate ownership
if (!schemaService.isNamespaceOwnerOfSubject(ns, schema.getMetadata().getName())) {
return Mono.error(
new ResourceValidationException(List.of(String.format("Namespace not owner of this schema %s.",
schema.getMetadata().getName())), schema.getKind(), schema.getMetadata().getName()));
return Mono.error(new ResourceValidationException(
List.of(String.format(NAMESPACE_NOT_OWNER, schema.getMetadata().getName())),
schema.getKind(), schema.getMetadata().getName()));
}

return schemaService
.validateSchemaCompatibility(ns.getMetadata().getCluster(), schema)
.flatMap(validationErrors -> {
if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(validationErrors, schema.getKind(),
return schemaService.validateSchema(ns, schema)
.flatMap(errors -> {
if (!errors.isEmpty()) {
return Mono.error(new ResourceValidationException(errors, schema.getKind(),
schema.getMetadata().getName()));
}

return schemaService
.getLatestSubject(ns, schema.getMetadata().getName())
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(latestSubjectOptional -> {
schema.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
schema.getMetadata().setCluster(ns.getMetadata().getCluster());
schema.getMetadata().setNamespace(ns.getMetadata().getName());
latestSubjectOptional.ifPresent(
value -> schema.getSpec().setCompatibility(value.getSpec().getCompatibility()));

if (dryrun) {
// Cannot compute the "unchanged" apply status before getting the ID at registration
return Mono.just(formatHttpResponse(schema,
latestSubjectOptional.isPresent() ? ApplyStatus.changed : ApplyStatus.created));
return schemaService.getAllSubjectVersions(ns, schema.getMetadata().getName())
.collectList()
.flatMap(subjects -> {
// If new schema matches any of the existing schemas, return unchanged
boolean unchanged = subjects.stream().anyMatch(subject -> {
var actualSchema = new AvroSchema(subject.getSpec().getSchema(),
schemaService.getReferences(subject), schemaService.getSchemaReferences(subject, ns),
null);
var newSchema = new AvroSchema(schema.getSpec().getSchema(),
schemaService.getReferences(schema), schemaService.getSchemaReferences(schema, ns),
null);

return Objects.equals(newSchema.canonicalString(), actualSchema.canonicalString())
&& Objects.equals(newSchema.references(), actualSchema.references());
});

if (unchanged) {
return Mono.just(formatHttpResponse(schema, ApplyStatus.unchanged));
}

return schemaService
.register(ns, schema)
.map(id -> {
ApplyStatus status;

if (latestSubjectOptional.isEmpty()) {
status = ApplyStatus.created;
sendEventLog(schema.getKind(), schema.getMetadata(), status, null,
schema.getSpec());
} else if (id > latestSubjectOptional.get().getSpec().getId()) {
status = ApplyStatus.changed;
sendEventLog(schema.getKind(), schema.getMetadata(), status,
latestSubjectOptional.get().getSpec(),
schema.getSpec());
} else {
status = ApplyStatus.unchanged;
.validateSchemaCompatibility(ns.getMetadata().getCluster(), schema)
.flatMap(validationErrors -> {
if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(
validationErrors, schema.getKind(), schema.getMetadata().getName()));
}

return formatHttpResponse(schema, status);
schema.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
schema.getMetadata().setCluster(ns.getMetadata().getCluster());
schema.getMetadata().setNamespace(ns.getMetadata().getName());

ApplyStatus status = subjects.isEmpty() ? ApplyStatus.created : ApplyStatus.changed;
if (dryrun) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this:

if (dryrun) {
  return Mono.just(formatHttpResponse(schema,
          latestSubjectOptional.isPresent()
                  ? ApplyStatus.changed : ApplyStatus.created));
}
  
return schemaService
      .register(ns, schema)
      .map(id -> {
          ApplyStatus status;
  
          if (latestSubjectOptional.isEmpty()) {
              status = ApplyStatus.created;
              sendEventLog(schema.getKind(), schema.getMetadata(), status, null,
                  schema.getSpec());
          } else if (id > latestSubjectOptional.get().getSpec().getId()) {
              status = ApplyStatus.changed;
              sendEventLog(schema.getKind(), schema.getMetadata(), status,
                  latestSubjectOptional.get().getSpec(),
                  schema.getSpec());
          } else {
              status = ApplyStatus.unchanged;
          }
  
          return formatHttpResponse(schema, status);
      });
});

could be now simplified to this:

ApplyStatus status = latestSubjectOptional.isPresent() ? ApplyStatus.changed : ApplyStatus.created;
if (dryrun) {
    return Mono.just(formatHttpResponse(schema, status);
}

return schemaService
    .register(ns, schema)
    .map(id -> formatHttpResponse(schema, status));

Since we now use the canonical string to compute the status, we do not need the schema registry response for the final HTTP response, what do you think ? I'm gonna try this out

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of reapplying a schema v1 that already has been deployed, the schema registry will return the ID of the schema v1 without any change (unchanged status). In this case, Ns4Kafka will return "changed"

The previous implementation will return "unchanged" as expected, except in dry-run mode that will return "changed" 😅

The "newSchema" parameter needs to be different from all schemas version to be considered "changed", not only the latest one

return Mono.just(formatHttpResponse(schema, status));
}

return schemaService
.register(ns, schema)
.map(id -> {
sendEventLog(schema.getKind(), schema.getMetadata(), status,
subjects.isEmpty() ? null : subjects.stream()
.max(Comparator.comparingInt((Schema s) -> s.getSpec().getId())),
schema.getSpec());

return formatHttpResponse(schema, status);
});
});
});
});
Expand All @@ -165,7 +169,7 @@ public Mono<HttpResponse<Void>> deleteSubject(String namespace, @PathVariable St
// Validate ownership
if (!schemaService.isNamespaceOwnerOfSubject(ns, subject)) {
return Mono.error(new ResourceValidationException(
List.of(String.format("Namespace not owner of this schema %s.", subject)),
List.of(String.format(NAMESPACE_NOT_OWNER, subject)),
AccessControlEntry.ResourceType.SCHEMA.toString(), subject));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ public Mono<List<String>> validateLocally(Namespace namespace, Connector connect
.collect(Collectors.joining(", "));
return Mono.just(
List.of("Invalid value " + connector.getSpec().getConnectCluster()
+ " for spec.connectCluster: Value must be one of [" + allowedConnectClusters + "]"));
+ " for spec.connectCluster: Value must be one of [" + allowedConnectClusters + "]."));
}

// If class doesn't exist, no need to go further
// If class does not exist, no need to go further
if (StringUtils.isEmpty(connector.getSpec().getConfig().get(CONNECTOR_CLASS))) {
return Mono.just(List.of("Invalid value for spec.config.'connector.class': Value must be non-null"));
return Mono.just(List.of("Invalid value for spec.config.'connector.class': Value must be non-null."));
}

// Connector type exists on this target connect cluster ?
// Connector type exists on this target connect cluster
return kafkaConnectClient.connectPlugins(namespace.getMetadata().getCluster(),
connector.getSpec().getConnectCluster())
.map(connectorPluginInfos -> {
Expand All @@ -141,8 +141,8 @@ public Mono<List<String>> validateLocally(Namespace namespace, Connector connect
.findFirst();

if (connectorType.isEmpty()) {
return List.of("Failed to find any class that implements Connector and which name matches "
+ connector.getSpec().getConfig().get(CONNECTOR_CLASS));
return List.of("Failed to find any class that implements connector and which name matches "
+ connector.getSpec().getConfig().get(CONNECTOR_CLASS) + ".");
}

return namespace.getSpec().getConnectValidator() != null
Expand Down
Loading