Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
E022974 committed Aug 28, 2024
1 parent b42607c commit 4aa20bb
Show file tree
Hide file tree
Showing 19 changed files with 368 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec
}

sendEventLog(connector, status, existingConnector.<Object>map(Connector::getSpec).orElse(null),
connector.getSpec());
connector.getSpec(), "");

return Mono.just(formatHttpResponse(connectorService.createOrUpdate(connector), status));
});
Expand Down Expand Up @@ -180,7 +180,7 @@ public Mono<HttpResponse<Void>> delete(String namespace, String connector,
}

Connector connectorToDelete = optionalConnector.get();
sendEventLog(connectorToDelete, ApplyStatus.deleted, connectorToDelete.getSpec(), null);
sendEventLog(connectorToDelete, ApplyStatus.deleted, connectorToDelete.getSpec(), null, "");

return connectorService
.delete(ns, optionalConnector.get())
Expand Down Expand Up @@ -264,7 +264,7 @@ public Flux<Connector> importResources(String namespace, @QueryValue(defaultValu
return unsynchronizedConnector;
}

sendEventLog(unsynchronizedConnector, ApplyStatus.created, null, unsynchronizedConnector.getSpec());
sendEventLog(unsynchronizedConnector, ApplyStatus.created, null, unsynchronizedConnector.getSpec(), "");

return connectorService.createOrUpdate(unsynchronizedConnector);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public List<ConsumerGroupResetOffsetsResponse> resetOffsets(String namespace, St
consumerGroupResetOffsets.getSpec().getMethod());

if (!dryrun) {
sendEventLog(consumerGroupResetOffsets, ApplyStatus.changed, null, consumerGroupResetOffsets.getSpec());
sendEventLog(consumerGroupResetOffsets, ApplyStatus.changed, null,
consumerGroupResetOffsets.getSpec(), "");
consumerGroupService.alterConsumerGroupOffsets(ns, consumerGroup, preparedOffsets);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public HttpResponse<Namespace> apply(@Valid @Body Namespace namespace,
}

sendEventLog(namespace, status, existingNamespace.<Object>map(Namespace::getSpec).orElse(null),
namespace.getSpec());
namespace.getSpec(), "");

return formatHttpResponse(namespaceService.createOrUpdate(namespace), status);
}
Expand Down Expand Up @@ -136,7 +136,7 @@ public HttpResponse<Void> delete(String namespace, @QueryValue(defaultValue = "f
}

var namespaceToDelete = optionalNamespace.get();
sendEventLog(namespaceToDelete, ApplyStatus.deleted, namespaceToDelete.getSpec(), null);
sendEventLog(namespaceToDelete, ApplyStatus.deleted, namespaceToDelete.getSpec(), null, "");
namespaceService.delete(optionalNamespace.get());
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public HttpResponse<RoleBinding> apply(String namespace, @Valid @Body RoleBindin
}

sendEventLog(roleBinding, status, existingRoleBinding.<Object>map(RoleBinding::getSpec).orElse(null),
roleBinding.getSpec());
roleBinding.getSpec(), "");
roleBindingService.create(roleBinding);
return formatHttpResponse(roleBinding, status);
}
Expand All @@ -116,7 +116,7 @@ public HttpResponse<Void> delete(String namespace, String name,
}

var roleBindingToDelete = roleBinding.get();
sendEventLog(roleBindingToDelete, ApplyStatus.deleted, roleBindingToDelete.getSpec(), null);
sendEventLog(roleBindingToDelete, ApplyStatus.deleted, roleBindingToDelete.getSpec(), null, "");
roleBindingService.delete(roleBindingToDelete);
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
oldSchemas.isEmpty() ? null : oldSchemas.stream()
.max(Comparator.comparingInt(
(Schema s) -> s.getSpec().getId())),
schema.getSpec());
schema.getSpec(), "");

return formatHttpResponse(schema, status);
});
Expand All @@ -142,16 +142,19 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
}

/**
* Delete all schemas under the given subject.
* Delete all schema versions under the given subject, or a specific version of the schema if specified.
*
* @param namespace The current namespace
* @param subject The current subject to delete
* @param namespace The namespace
* @param subject The subject
* @param version The version of the schema to delete
* @param dryrun Run in dry mode or not
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{subject}")
public Mono<HttpResponse<Void>> delete(String namespace, @PathVariable String subject,
public Mono<HttpResponse<Void>> delete(String namespace,
@PathVariable String subject,
@QueryValue Optional<String> version,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

Expand All @@ -160,25 +163,29 @@ public Mono<HttpResponse<Void>> delete(String namespace, @PathVariable String su
return Mono.error(new ResourceValidationException(SCHEMA, subject, invalidOwner(subject)));
}

return schemaService.getLatestSubject(ns, subject)
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(latestSubjectOptional -> {
if (latestSubjectOptional.isEmpty()) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

Schema schemaToDelete = latestSubjectOptional.get();
sendEventLog(schemaToDelete, ApplyStatus.deleted, schemaToDelete.getSpec(), null);

return schemaService
.delete(ns, subject)
.map(deletedSchemaIds -> HttpResponse.noContent());
});
return version
.map(v -> schemaService.getSubject(ns, subject, v))
.orElseGet(() -> schemaService.getLatestSubject(ns, subject))
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(subjectOptional -> {
if (subjectOptional.isEmpty()) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

return (version.isEmpty() ? schemaService.deleteAllVersions(ns, subject) :
schemaService.deleteVersion(ns, subject, version.get()))
.map(deletedVersionIds -> {
Schema deletedSchema = subjectOptional.get();
sendEventLog(deletedSchema, ApplyStatus.deleted, deletedSchema.getSpec(), null,
version.map(v -> "").orElseGet(() -> String.valueOf(deletedVersionIds)));
return HttpResponse.noContent();
});
});
}

/**
Expand Down Expand Up @@ -221,7 +228,7 @@ public Mono<HttpResponse<SchemaCompatibilityState>> config(String namespace, @Pa
.updateSubjectCompatibility(ns, latestSubjectOptional.get(), compatibility)
.map(schemaCompatibility -> {
sendEventLog(latestSubjectOptional.get(), ApplyStatus.changed,
latestSubjectOptional.get().getSpec().getCompatibility(), compatibility);
latestSubjectOptional.get().getSpec().getCompatibility(), compatibility, "");

return HttpResponse.ok(state);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ HttpResponse<KafkaStream> apply(String namespace, @Body @Valid KafkaStream strea
}

sendEventLog(stream, status, existingStream.<Object>map(KafkaStream::getMetadata).orElse(null),
stream.getMetadata());
stream.getMetadata(), "");

return formatHttpResponse(streamService.create(stream), status);
}
Expand Down Expand Up @@ -126,7 +126,7 @@ HttpResponse<Void> delete(String namespace, String stream, @QueryValue(defaultVa
}

var streamToDelete = optionalStream.get();
sendEventLog(streamToDelete, ApplyStatus.deleted, streamToDelete.getMetadata(), null);
sendEventLog(streamToDelete, ApplyStatus.deleted, streamToDelete.getMetadata(), null, "");
streamService.delete(ns, optionalStream.get());
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public HttpResponse<KafkaUserResetPassword> resetPassword(String namespace, Stri
.build())
.build();

sendEventLog(response, ApplyStatus.changed, null, response.getSpec());
sendEventLog(response, ApplyStatus.changed, null, response.getSpec(), "");
return HttpResponse.ok(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public HttpResponse<AccessControlEntry> apply(Authentication authentication, Str
}

sendEventLog(accessControlEntry, status, existingAcl.<Object>map(AccessControlEntry::getSpec).orElse(null),
accessControlEntry.getSpec());
accessControlEntry.getSpec(), "");

return formatHttpResponse(aclService.create(accessControlEntry), status);
}
Expand Down Expand Up @@ -174,7 +174,7 @@ public HttpResponse<Void> delete(Authentication authentication, String namespace
return HttpResponse.noContent();
}

sendEventLog(accessControlEntry, ApplyStatus.deleted, accessControlEntry.getSpec(), null);
sendEventLog(accessControlEntry, ApplyStatus.deleted, accessControlEntry.getSpec(), null, "");

aclService.delete(accessControlEntry);
return HttpResponse.noContent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public Mono<HttpResponse<ConnectCluster>> apply(String namespace, @Body @Valid C
}

sendEventLog(connectCluster, status,
existingConnectCluster.<Object>map(ConnectCluster::getSpec).orElse(null), connectCluster.getSpec());
existingConnectCluster.<Object>map(ConnectCluster::getSpec).orElse(null),
connectCluster.getSpec(), "");

return Mono.just(formatHttpResponse(connectClusterService.create(connectCluster), status));
});
Expand Down Expand Up @@ -161,7 +162,7 @@ public HttpResponse<Void> delete(String namespace, String connectCluster,
}

ConnectCluster connectClusterToDelete = optionalConnectCluster.get();
sendEventLog(connectClusterToDelete, ApplyStatus.deleted, connectClusterToDelete.getSpec(), null);
sendEventLog(connectClusterToDelete, ApplyStatus.deleted, connectClusterToDelete.getSpec(), null, "");

connectClusterService.delete(connectClusterToDelete);
return HttpResponse.noContent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public <T> HttpResponse<T> formatHttpResponse(T body, ApplyStatus status) {
* @param after the resource after the operation
*/
public void sendEventLog(MetadataResource resource, ApplyStatus operation, Object before,
Object after) {
Object after, String version) {
AuditLog auditLog = new AuditLog(securityService.username().orElse(""),
securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN), Date.from(Instant.now()),
resource.getKind(), resource.getMetadata(), operation, before, after);
resource.getKind(), resource.getMetadata(), operation, before, after, version);
applicationEventPublisher.publishEvent(auditLog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public HttpResponse<ResourceQuota> apply(String namespace, @Body @Valid Resource
}

sendEventLog(quota, status, resourceQuotaOptional.<Object>map(ResourceQuota::getSpec).orElse(null),
quota.getSpec());
quota.getSpec(), "");

return formatHttpResponse(resourceQuotaService.create(quota), status);
}
Expand All @@ -130,7 +130,7 @@ public HttpResponse<Void> delete(String namespace, String name,
}

ResourceQuota resourceQuotaToDelete = resourceQuota.get();
sendEventLog(resourceQuotaToDelete, ApplyStatus.deleted, resourceQuotaToDelete.getSpec(), null);
sendEventLog(resourceQuotaToDelete, ApplyStatus.deleted, resourceQuotaToDelete.getSpec(), null, "");
resourceQuotaService.delete(resourceQuotaToDelete);
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public HttpResponse<Topic> apply(String namespace, @Valid @Body Topic topic,
return formatHttpResponse(topic, status);
}

sendEventLog(topic, status, existingTopic.<Object>map(Topic::getSpec).orElse(null), topic.getSpec());
sendEventLog(topic, status, existingTopic.<Object>map(Topic::getSpec).orElse(null), topic.getSpec(), "");

return formatHttpResponse(topicService.create(topic), status);
}
Expand Down Expand Up @@ -178,7 +178,7 @@ public HttpResponse<Void> delete(String namespace, String topic,
}

Topic topicToDelete = optionalTopic.get();
sendEventLog(topicToDelete, ApplyStatus.deleted, topicToDelete.getSpec(), null);
sendEventLog(topicToDelete, ApplyStatus.deleted, topicToDelete.getSpec(), null, "");
topicService.delete(optionalTopic.get());

return HttpResponse.noContent();
Expand Down Expand Up @@ -214,7 +214,7 @@ public List<Topic> importResources(String namespace, @QueryValue(defaultValue =
return unsynchronizedTopics
.stream()
.map(topic -> {
sendEventLog(topic, ApplyStatus.created, null, topic.getSpec());
sendEventLog(topic, ApplyStatus.created, null, topic.getSpec(), "");
return topicService.create(topic);
})
.toList();
Expand Down Expand Up @@ -256,7 +256,7 @@ public List<DeleteRecordsResponse> deleteRecords(String namespace, String topic,
if (dryrun) {
deletedRecords = recordsToDelete;
} else {
sendEventLog(optionalTopic.get(), ApplyStatus.deleted, null, null);
sendEventLog(optionalTopic.get(), ApplyStatus.deleted, null, null, "");
deletedRecords = topicService.deleteRecords(optionalTopic.get(), recordsToDelete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ public class ConsoleLogListener implements ApplicationEventListener<AuditLog> {

@Override
public void onApplicationEvent(AuditLog event) {
log.info("{} {} {} {} {} in namespace {} on cluster {}.",
log.info("{} {} {} {}{} {} in namespace {} on cluster {}.",
event.isAdmin() ? "Admin" : "User",
event.getUser(),
event.getOperation(),
event.getKind(),
event.getMetadata().getName(),
event.getVersion().isEmpty() ? "" : " version " + event.getVersion(),
event.getMetadata().getNamespace(),
event.getMetadata().getCluster()
);
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/michelin/ns4kafka/model/AuditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ public class AuditLog {
private ApplyStatus operation;
private Object before;
private Object after;
private String version;
}
27 changes: 22 additions & 5 deletions src/main/java/com/michelin/ns4kafka/service/SchemaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,37 @@ public Mono<Integer> register(Namespace namespace, Schema schema) {
}

/**
* Delete all schemas under the given subject.
* Delete all the schema versions under the given subject.
*
* @param namespace The current namespace
* @param subject The current subject to delete
* @param namespace The namespace
* @param subject The subject to delete
* @return The list of deleted versions
*/
public Mono<Integer[]> delete(Namespace namespace, String subject) {
public Mono<Integer[]> deleteAllVersions(Namespace namespace, String subject) {
return schemaRegistryClient
.deleteSubject(namespace.getMetadata().getCluster(), subject, false)
.flatMap(ids -> schemaRegistryClient
.flatMap(softDeletedVersionIds -> schemaRegistryClient
.deleteSubject(namespace.getMetadata().getCluster(),
subject, true));
}

/**
* Delete the schema version under the given subject.
*
* @param namespace The namespace
* @param subject The subject
* @param version The version of the schema to delete
* @return The latest subject after deletion
*/
public Mono<Integer> deleteVersion(Namespace namespace, String subject, String version) {
return schemaRegistryClient
.deleteSubjectVersion(namespace.getMetadata().getCluster(), subject, version, false)
.flatMap(softDeletedVersionIds -> schemaRegistryClient
.deleteSubjectVersion(namespace.getMetadata().getCluster(),
subject, Integer.toString(softDeletedVersionIds), true)
);
}

/**
* Validate the schema compatibility.
*
Expand Down
Loading

0 comments on commit 4aa20bb

Please sign in to comment.