Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
E022974 committed Aug 28, 2024
1 parent 418f850 commit 9f3d0cc
Show file tree
Hide file tree
Showing 41 changed files with 647 additions and 470 deletions.
6 changes: 0 additions & 6 deletions .docker/config/broker/kafka_server_jaas.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin" password="admin"
user_admin="admin";
};

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin" password="admin";
};
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ dependencies {
runtimeOnly("ch.qos.logback:logback-classic")

testImplementation("org.mockito:mockito-core")
testImplementation("org.testcontainers:junit-jupiter")
testImplementation("org.testcontainers:testcontainers")
testImplementation("org.testcontainers:kafka")
testImplementation("org.testcontainers:junit-jupiter:1.20.0")
testImplementation("org.testcontainers:testcontainers:1.20.0")
testImplementation("org.testcontainers:kafka:1.20.0")
testImplementation("org.mockito:mockito-junit-jupiter:5.12.0")
testImplementation("org.junit.jupiter:junit-jupiter-params:5.11.0")
testImplementation("io.projectreactor:reactor-test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec
}

sendEventLog(connector, status, existingConnector.<Object>map(Connector::getSpec).orElse(null),
connector.getSpec());
connector.getSpec(), "");

return Mono.just(formatHttpResponse(connectorService.createOrUpdate(connector), status));
});
Expand Down Expand Up @@ -180,7 +180,7 @@ public Mono<HttpResponse<Void>> delete(String namespace, String connector,
}

Connector connectorToDelete = optionalConnector.get();
sendEventLog(connectorToDelete, ApplyStatus.deleted, connectorToDelete.getSpec(), null);
sendEventLog(connectorToDelete, ApplyStatus.deleted, connectorToDelete.getSpec(), null, "");

return connectorService
.delete(ns, optionalConnector.get())
Expand Down Expand Up @@ -264,7 +264,7 @@ public Flux<Connector> importResources(String namespace, @QueryValue(defaultValu
return unsynchronizedConnector;
}

sendEventLog(unsynchronizedConnector, ApplyStatus.created, null, unsynchronizedConnector.getSpec());
sendEventLog(unsynchronizedConnector, ApplyStatus.created, null, unsynchronizedConnector.getSpec(), "");

return connectorService.createOrUpdate(unsynchronizedConnector);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public List<ConsumerGroupResetOffsetsResponse> resetOffsets(String namespace, St
consumerGroupResetOffsets.getSpec().getMethod());

if (!dryrun) {
sendEventLog(consumerGroupResetOffsets, ApplyStatus.changed, null, consumerGroupResetOffsets.getSpec());
sendEventLog(consumerGroupResetOffsets, ApplyStatus.changed, null,
consumerGroupResetOffsets.getSpec(), "");
consumerGroupService.alterConsumerGroupOffsets(ns, consumerGroup, preparedOffsets);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public HttpResponse<Namespace> apply(@Valid @Body Namespace namespace,
}

sendEventLog(namespace, status, existingNamespace.<Object>map(Namespace::getSpec).orElse(null),
namespace.getSpec());
namespace.getSpec(), "");

return formatHttpResponse(namespaceService.createOrUpdate(namespace), status);
}
Expand Down Expand Up @@ -136,7 +136,7 @@ public HttpResponse<Void> delete(String namespace, @QueryValue(defaultValue = "f
}

var namespaceToDelete = optionalNamespace.get();
sendEventLog(namespaceToDelete, ApplyStatus.deleted, namespaceToDelete.getSpec(), null);
sendEventLog(namespaceToDelete, ApplyStatus.deleted, namespaceToDelete.getSpec(), null, "");
namespaceService.delete(optionalNamespace.get());
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public HttpResponse<RoleBinding> apply(String namespace, @Valid @Body RoleBindin
}

sendEventLog(roleBinding, status, existingRoleBinding.<Object>map(RoleBinding::getSpec).orElse(null),
roleBinding.getSpec());
roleBinding.getSpec(), "");
roleBindingService.create(roleBinding);
return formatHttpResponse(roleBinding, status);
}
Expand All @@ -116,7 +116,7 @@ public HttpResponse<Void> delete(String namespace, String name,
}

var roleBindingToDelete = roleBinding.get();
sendEventLog(roleBindingToDelete, ApplyStatus.deleted, roleBindingToDelete.getSpec(), null);
sendEventLog(roleBindingToDelete, ApplyStatus.deleted, roleBindingToDelete.getSpec(), null, "");
roleBindingService.delete(roleBindingToDelete);
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
oldSchemas.isEmpty() ? null : oldSchemas.stream()
.max(Comparator.comparingInt(
(Schema s) -> s.getSpec().getId())),
schema.getSpec());
schema.getSpec(), "");

return formatHttpResponse(schema, status);
});
Expand All @@ -142,16 +142,19 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
}

/**
* Delete all schemas under the given subject.
* Delete all schema versions under the given subject, or a specific version of the schema if specified.
*
* @param namespace The current namespace
* @param subject The current subject to delete
* @param namespace The namespace
* @param subject The subject
* @param version The version of the schema to delete
* @param dryrun Run in dry mode or not
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{subject}")
public Mono<HttpResponse<Void>> delete(String namespace, @PathVariable String subject,
public Mono<HttpResponse<Void>> delete(String namespace,
@PathVariable String subject,
@QueryValue Optional<String> version,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

Expand All @@ -160,25 +163,29 @@ public Mono<HttpResponse<Void>> delete(String namespace, @PathVariable String su
return Mono.error(new ResourceValidationException(SCHEMA, subject, invalidOwner(subject)));
}

return schemaService.getLatestSubject(ns, subject)
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(latestSubjectOptional -> {
if (latestSubjectOptional.isEmpty()) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

Schema schemaToDelete = latestSubjectOptional.get();
sendEventLog(schemaToDelete, ApplyStatus.deleted, schemaToDelete.getSpec(), null);

return schemaService
.delete(ns, subject)
.map(deletedSchemaIds -> HttpResponse.noContent());
});
return version
.map(v -> schemaService.getSubject(ns, subject, v))
.orElseGet(() -> schemaService.getLatestSubject(ns, subject))
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(subjectOptional -> {
if (subjectOptional.isEmpty()) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

return (version.isEmpty() ? schemaService.deleteAllVersions(ns, subject) :
schemaService.deleteVersion(ns, subject, version.get()))
.map(deletedVersionIds -> {
Schema deletedSchema = subjectOptional.get();
sendEventLog(deletedSchema, ApplyStatus.deleted, deletedSchema.getSpec(), null,
version.map(v -> "").orElseGet(() -> String.valueOf(deletedVersionIds)));
return HttpResponse.noContent();
});
});
}

/**
Expand Down Expand Up @@ -221,7 +228,7 @@ public Mono<HttpResponse<SchemaCompatibilityState>> config(String namespace, @Pa
.updateSubjectCompatibility(ns, latestSubjectOptional.get(), compatibility)
.map(schemaCompatibility -> {
sendEventLog(latestSubjectOptional.get(), ApplyStatus.changed,
latestSubjectOptional.get().getSpec().getCompatibility(), compatibility);
latestSubjectOptional.get().getSpec().getCompatibility(), compatibility, "");

return HttpResponse.ok(state);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ HttpResponse<KafkaStream> apply(String namespace, @Body @Valid KafkaStream strea
}

sendEventLog(stream, status, existingStream.<Object>map(KafkaStream::getMetadata).orElse(null),
stream.getMetadata());
stream.getMetadata(), "");

return formatHttpResponse(streamService.create(stream), status);
}
Expand Down Expand Up @@ -126,7 +126,7 @@ HttpResponse<Void> delete(String namespace, String stream, @QueryValue(defaultVa
}

var streamToDelete = optionalStream.get();
sendEventLog(streamToDelete, ApplyStatus.deleted, streamToDelete.getMetadata(), null);
sendEventLog(streamToDelete, ApplyStatus.deleted, streamToDelete.getMetadata(), null, "");
streamService.delete(ns, optionalStream.get());
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public HttpResponse<KafkaUserResetPassword> resetPassword(String namespace, Stri
.build())
.build();

sendEventLog(response, ApplyStatus.changed, null, response.getSpec());
sendEventLog(response, ApplyStatus.changed, null, response.getSpec(), "");
return HttpResponse.ok(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public HttpResponse<AccessControlEntry> apply(Authentication authentication, Str
}

sendEventLog(accessControlEntry, status, existingAcl.<Object>map(AccessControlEntry::getSpec).orElse(null),
accessControlEntry.getSpec());
accessControlEntry.getSpec(), "");

return formatHttpResponse(aclService.create(accessControlEntry), status);
}
Expand Down Expand Up @@ -174,7 +174,7 @@ public HttpResponse<Void> delete(Authentication authentication, String namespace
return HttpResponse.noContent();
}

sendEventLog(accessControlEntry, ApplyStatus.deleted, accessControlEntry.getSpec(), null);
sendEventLog(accessControlEntry, ApplyStatus.deleted, accessControlEntry.getSpec(), null, "");

aclService.delete(accessControlEntry);
return HttpResponse.noContent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public Mono<HttpResponse<ConnectCluster>> apply(String namespace, @Body @Valid C
}

sendEventLog(connectCluster, status,
existingConnectCluster.<Object>map(ConnectCluster::getSpec).orElse(null), connectCluster.getSpec());
existingConnectCluster.<Object>map(ConnectCluster::getSpec).orElse(null),
connectCluster.getSpec(), "");

return Mono.just(formatHttpResponse(connectClusterService.create(connectCluster), status));
});
Expand Down Expand Up @@ -161,7 +162,7 @@ public HttpResponse<Void> delete(String namespace, String connectCluster,
}

ConnectCluster connectClusterToDelete = optionalConnectCluster.get();
sendEventLog(connectClusterToDelete, ApplyStatus.deleted, connectClusterToDelete.getSpec(), null);
sendEventLog(connectClusterToDelete, ApplyStatus.deleted, connectClusterToDelete.getSpec(), null, "");

connectClusterService.delete(connectClusterToDelete);
return HttpResponse.noContent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public <T> HttpResponse<T> formatHttpResponse(T body, ApplyStatus status) {
* @param after the resource after the operation
*/
public void sendEventLog(MetadataResource resource, ApplyStatus operation, Object before,
Object after) {
Object after, String version) {
AuditLog auditLog = new AuditLog(securityService.username().orElse(""),
securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN), Date.from(Instant.now()),
resource.getKind(), resource.getMetadata(), operation, before, after);
resource.getKind(), resource.getMetadata(), operation, before, after, version);
applicationEventPublisher.publishEvent(auditLog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public HttpResponse<ResourceQuota> apply(String namespace, @Body @Valid Resource
}

sendEventLog(quota, status, resourceQuotaOptional.<Object>map(ResourceQuota::getSpec).orElse(null),
quota.getSpec());
quota.getSpec(), "");

return formatHttpResponse(resourceQuotaService.create(quota), status);
}
Expand All @@ -130,7 +130,7 @@ public HttpResponse<Void> delete(String namespace, String name,
}

ResourceQuota resourceQuotaToDelete = resourceQuota.get();
sendEventLog(resourceQuotaToDelete, ApplyStatus.deleted, resourceQuotaToDelete.getSpec(), null);
sendEventLog(resourceQuotaToDelete, ApplyStatus.deleted, resourceQuotaToDelete.getSpec(), null, "");
resourceQuotaService.delete(resourceQuotaToDelete);
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public HttpResponse<Topic> apply(String namespace, @Valid @Body Topic topic,
return formatHttpResponse(topic, status);
}

sendEventLog(topic, status, existingTopic.<Object>map(Topic::getSpec).orElse(null), topic.getSpec());
sendEventLog(topic, status, existingTopic.<Object>map(Topic::getSpec).orElse(null), topic.getSpec(), "");

return formatHttpResponse(topicService.create(topic), status);
}
Expand Down Expand Up @@ -178,7 +178,7 @@ public HttpResponse<Void> delete(String namespace, String topic,
}

Topic topicToDelete = optionalTopic.get();
sendEventLog(topicToDelete, ApplyStatus.deleted, topicToDelete.getSpec(), null);
sendEventLog(topicToDelete, ApplyStatus.deleted, topicToDelete.getSpec(), null, "");
topicService.delete(optionalTopic.get());

return HttpResponse.noContent();
Expand Down Expand Up @@ -214,7 +214,7 @@ public List<Topic> importResources(String namespace, @QueryValue(defaultValue =
return unsynchronizedTopics
.stream()
.map(topic -> {
sendEventLog(topic, ApplyStatus.created, null, topic.getSpec());
sendEventLog(topic, ApplyStatus.created, null, topic.getSpec(), "");
return topicService.create(topic);
})
.toList();
Expand Down Expand Up @@ -256,7 +256,7 @@ public List<DeleteRecordsResponse> deleteRecords(String namespace, String topic,
if (dryrun) {
deletedRecords = recordsToDelete;
} else {
sendEventLog(optionalTopic.get(), ApplyStatus.deleted, null, null);
sendEventLog(optionalTopic.get(), ApplyStatus.deleted, null, null, "");
deletedRecords = topicService.deleteRecords(optionalTopic.get(), recordsToDelete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ public class ConsoleLogListener implements ApplicationEventListener<AuditLog> {

@Override
public void onApplicationEvent(AuditLog event) {
log.info("{} {} {} {} {} in namespace {} on cluster {}.",
log.info("{} {} {} {}{} {} in namespace {} on cluster {}.",
event.isAdmin() ? "Admin" : "User",
event.getUser(),
event.getOperation(),
event.getKind(),
event.getMetadata().getName(),
event.getVersion().isEmpty() ? "" : " version " + event.getVersion(),
event.getMetadata().getNamespace(),
event.getMetadata().getCluster()
);
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/michelin/ns4kafka/model/AuditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ public class AuditLog {
private ApplyStatus operation;
private Object before;
private Object after;
private String version;
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,30 @@
*/
@Slf4j
public abstract class KafkaStore<T> {
private final Map<String, T> store;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final ReentrantLock offsetUpdateLock;
private final Condition offsetReachedThreshold;
@Inject
ApplicationContext applicationContext;

@Inject
AdminClient adminClient;

@Inject
KafkaStoreProperties kafkaStoreProperties;

@Inject
@Named(TaskExecutors.SCHEDULED)
TaskScheduler taskScheduler;

@Property(name = "ns4kafka.store.kafka.init-timeout")
int initTimeout;

private final Map<String, T> store;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final ReentrantLock offsetUpdateLock;
private final Condition offsetReachedThreshold;
String kafkaTopic;
Producer<String, T> kafkaProducer;
long offsetInSchemasTopic = -1;
long lastWrittenOffset = -1;
@Property(name = "ns4kafka.store.kafka.init-timeout")
int initTimeout;

KafkaStore(String kafkaTopic, Producer<String, T> kafkaProducer) {
this.kafkaTopic = kafkaTopic;
Expand Down
Loading

0 comments on commit 9f3d0cc

Please sign in to comment.