-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support unchanged status on schemas #344
Changes from 5 commits
251f13b
219963c
0b7374a
0976934
61a084d
6f8987b
b503449
08ddc10
8b8f79e
c4be16b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
import com.michelin.ns4kafka.services.SchemaService; | ||
import com.michelin.ns4kafka.utils.enums.ApplyStatus; | ||
import com.michelin.ns4kafka.utils.exceptions.ResourceValidationException; | ||
import io.confluent.kafka.schemaregistry.avro.AvroSchema; | ||
import io.micronaut.http.HttpResponse; | ||
import io.micronaut.http.HttpStatus; | ||
import io.micronaut.http.annotation.Body; | ||
|
@@ -100,52 +101,63 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc | |
} | ||
|
||
return schemaService | ||
.validateSchemaCompatibility(ns.getMetadata().getCluster(), schema) | ||
.flatMap(validationErrors -> { | ||
if (!validationErrors.isEmpty()) { | ||
return Mono.error(new ResourceValidationException(validationErrors, schema.getKind(), | ||
schema.getMetadata().getName())); | ||
} | ||
.getLatestSubject(ns, schema.getMetadata().getName()) | ||
.map(Optional::of) | ||
.defaultIfEmpty(Optional.empty()) | ||
.flatMap(latestSubjectOptional -> { | ||
if (latestSubjectOptional.isPresent()) { | ||
var newSchema = new AvroSchema(schema.getSpec().getSchema()); | ||
var actualSchema = new AvroSchema(latestSubjectOptional.get().getSpec().getSchema()); | ||
|
||
return schemaService | ||
.getLatestSubject(ns, schema.getMetadata().getName()) | ||
.map(Optional::of) | ||
.defaultIfEmpty(Optional.empty()) | ||
.flatMap(latestSubjectOptional -> { | ||
schema.getMetadata().setCreationTimestamp(Date.from(Instant.now())); | ||
schema.getMetadata().setCluster(ns.getMetadata().getCluster()); | ||
schema.getMetadata().setNamespace(ns.getMetadata().getName()); | ||
latestSubjectOptional.ifPresent( | ||
value -> schema.getSpec().setCompatibility(value.getSpec().getCompatibility())); | ||
|
||
if (dryrun) { | ||
// Cannot compute the "unchanged" apply status before getting the ID at registration | ||
return Mono.just(formatHttpResponse(schema, | ||
latestSubjectOptional.isPresent() ? ApplyStatus.changed : ApplyStatus.created)); | ||
if (newSchema.canonicalString().equals(actualSchema.canonicalString())) { | ||
return Mono.just(formatHttpResponse(schema, ApplyStatus.unchanged)); | ||
} | ||
|
||
return schemaService | ||
.register(ns, schema) | ||
.map(id -> { | ||
ApplyStatus status; | ||
|
||
if (latestSubjectOptional.isEmpty()) { | ||
status = ApplyStatus.created; | ||
sendEventLog(schema.getKind(), schema.getMetadata(), status, null, | ||
schema.getSpec()); | ||
} else if (id > latestSubjectOptional.get().getSpec().getId()) { | ||
status = ApplyStatus.changed; | ||
sendEventLog(schema.getKind(), schema.getMetadata(), status, | ||
latestSubjectOptional.get().getSpec(), | ||
schema.getSpec()); | ||
} else { | ||
status = ApplyStatus.unchanged; | ||
schema.getSpec().setCompatibility(latestSubjectOptional.get().getSpec().getCompatibility()); | ||
} | ||
|
||
return schemaService | ||
.validateSchemaCompatibility(ns.getMetadata().getCluster(), schema) | ||
.flatMap(validationErrors -> { | ||
if (!validationErrors.isEmpty()) { | ||
return Mono.error(new ResourceValidationException( | ||
validationErrors, | ||
schema.getKind(), | ||
schema.getMetadata().getName())); | ||
} | ||
|
||
schema.getMetadata().setCreationTimestamp(Date.from(Instant.now())); | ||
schema.getMetadata().setCluster(ns.getMetadata().getCluster()); | ||
schema.getMetadata().setNamespace(ns.getMetadata().getName()); | ||
|
||
if (dryrun) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this: if (dryrun) {
return Mono.just(formatHttpResponse(schema,
latestSubjectOptional.isPresent()
? ApplyStatus.changed : ApplyStatus.created));
}
return schemaService
.register(ns, schema)
.map(id -> {
ApplyStatus status;
if (latestSubjectOptional.isEmpty()) {
status = ApplyStatus.created;
sendEventLog(schema.getKind(), schema.getMetadata(), status, null,
schema.getSpec());
} else if (id > latestSubjectOptional.get().getSpec().getId()) {
status = ApplyStatus.changed;
sendEventLog(schema.getKind(), schema.getMetadata(), status,
latestSubjectOptional.get().getSpec(),
schema.getSpec());
} else {
status = ApplyStatus.unchanged;
}
return formatHttpResponse(schema, status);
});
}); could be now simplified to this: ApplyStatus status = latestSubjectOptional.isPresent() ? ApplyStatus.changed : ApplyStatus.created;
if (dryrun) {
return Mono.just(formatHttpResponse(schema, status);
}
return schemaService
.register(ns, schema)
.map(id -> formatHttpResponse(schema, status)); Since we now use the canonical string to compute the status, we do not need the schema registry response for the final HTTP response, what do you think ? I'm gonna try this out There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case of reapplying a schema v1 that already has been deployed, the schema registry will return the ID of the schema v1 without any change (unchanged status). In this case, Ns4Kafka will return "changed" The previous implementation will return "unchanged" as expected, except in dry-run mode that will return "changed" 😅 The "newSchema" parameter needs to be different from all schemas version to be considered "changed", not only the latest one |
||
return Mono.just(formatHttpResponse(schema, | ||
latestSubjectOptional.isPresent() | ||
? ApplyStatus.changed : ApplyStatus.created)); | ||
} | ||
|
||
return formatHttpResponse(schema, status); | ||
return schemaService | ||
.register(ns, schema) | ||
.map(id -> { | ||
ApplyStatus status; | ||
|
||
if (latestSubjectOptional.isEmpty()) { | ||
status = ApplyStatus.created; | ||
sendEventLog(schema.getKind(), schema.getMetadata(), status, null, | ||
schema.getSpec()); | ||
} else if (id > latestSubjectOptional.get().getSpec().getId()) { | ||
status = ApplyStatus.changed; | ||
sendEventLog(schema.getKind(), schema.getMetadata(), status, | ||
latestSubjectOptional.get().getSpec(), | ||
schema.getSpec()); | ||
} else { | ||
status = ApplyStatus.unchanged; | ||
} | ||
|
||
return formatHttpResponse(schema, status); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a constructor for
AvroSchema
that takes schema references, here you take a risk to compare without references later incanonicalString()
.It was done automatically on the registry side before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point @twobeeb. Doing the comparison without the potential references is incorrect.