diff --git a/.github/workflows/on_pull_request.yml b/.github/workflows/on_pull_request.yml index 63f575d9..b3d2c3ee 100644 --- a/.github/workflows/on_pull_request.yml +++ b/.github/workflows/on_pull_request.yml @@ -13,10 +13,10 @@ jobs: with: fetch-depth: 0 - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v4 with: - java-version: '17' + java-version: '21' distribution: 'temurin' - name: Cache SonarCloud packages diff --git a/.github/workflows/on_push_master.yml b/.github/workflows/on_push_master.yml index a5a945c7..f34618c9 100644 --- a/.github/workflows/on_push_master.yml +++ b/.github/workflows/on_push_master.yml @@ -19,10 +19,10 @@ jobs: with: fetch-depth: 0 - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v4 with: - java-version: '17' + java-version: '21' distribution: 'temurin' - name: Cache SonarCloud packages @@ -41,7 +41,7 @@ jobs: - name: Check Style run: ./gradlew checkstyleMain checkstyleTest - + - name: Build and analyze id: build_jar env: diff --git a/.github/workflows/on_push_tag.yml b/.github/workflows/on_push_tag.yml index 2388dc30..6ffc91ed 100644 --- a/.github/workflows/on_push_tag.yml +++ b/.github/workflows/on_push_tag.yml @@ -19,10 +19,10 @@ jobs: with: fetch-depth: 0 - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v4 with: - java-version: '17' + java-version: '21' distribution: 'temurin' - name: Cache Gradle packages diff --git a/.github/workflows/tag.yml b/.github/workflows/tag.yml index 0f2badc7..476dc63f 100644 --- a/.github/workflows/tag.yml +++ b/.github/workflows/tag.yml @@ -17,10 +17,10 @@ jobs: with: token: ${{ secrets.CI_CD_TOKEN }} - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v4 with: - java-version: '17' + java-version: '21' distribution: 'temurin' - name: Import GPG key diff --git a/README.md b/README.md index 42e2dc32..cf323d2e 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ continuous integration/continuous delivery (CI/CD) pipelines. ## Download You can download Ns4Kafka as a fat jar from the project's releases page on GitHub -at https://github.com/michelin/ns4kafka/releases. +at https://github.com/michelin/ns4kafka/releases. Please note that Java 21 is required to run the application. Additionally, a Docker image of the solution is available at https://hub.docker.com/repository/docker/michelin/ns4kafka. diff --git a/build.gradle b/build.gradle index 6bdb1abc..86c34895 100644 --- a/build.gradle +++ b/build.gradle @@ -48,11 +48,7 @@ dependencies { testImplementation("org.mockito:mockito-core") testImplementation("org.testcontainers:junit-jupiter") testImplementation("org.testcontainers:testcontainers") - testImplementation("org.testcontainers:kafka") { - version { - strictly '1.15.3' - } - } + testImplementation("org.testcontainers:kafka") testImplementation("org.mockito:mockito-junit-jupiter:5.9.0") testImplementation("io.projectreactor:reactor-test") @@ -65,8 +61,8 @@ application { } java { - sourceCompatibility = JavaVersion.toVersion('17') - targetCompatibility = JavaVersion.toVersion('17') + sourceCompatibility = JavaVersion.toVersion('21') + targetCompatibility = JavaVersion.toVersion('21') } run { @@ -88,7 +84,7 @@ shadowJar { } dockerfile { - baseImage = "eclipse-temurin:17-jre-alpine" + baseImage = "eclipse-temurin:21-jre-alpine" } if (project.hasProperty("releaseLatest")) { diff --git a/gradle.properties b/gradle.properties index 54f101a7..56a2e88c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -micronautVersion=4.2.0 +micronautVersion=4.2.4 diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 033e24c4..7f93135c 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 9f4197d5..3fa8f862 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.2.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew b/gradlew index fcb6fca1..1aa94a42 100755 --- a/gradlew +++ b/gradlew @@ -83,7 +83,8 @@ done # This is normally unused # shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -144,7 +145,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac @@ -152,7 +153,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then '' | soft) :;; #( *) # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -201,11 +202,11 @@ fi # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' -# Collect all arguments for the java command; -# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of -# shell script including quotes and variable substitutions, so put them in -# double quotes to make sure that they get re-expanded; and -# * put everything else in single quotes, so that it's not re-expanded. +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ diff --git a/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationConnectTest.java b/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationConnectTest.java index 6290fc1a..9e161333 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationConnectTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationConnectTest.java @@ -12,7 +12,7 @@ */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class AbstractIntegrationConnectTest extends AbstractIntegrationTest { - public KafkaConnectContainer connect; + public KafkaConnectContainer connectContainer; /** * Starts the Kafka Connect container. @@ -23,8 +23,8 @@ public abstract class AbstractIntegrationConnectTest extends AbstractIntegration @Override public Map getProperties() { Map brokerProps = super.getProperties(); - if (connect == null || !connect.isRunning()) { - connect = + if (connectContainer == null || !connectContainer.isRunning()) { + connectContainer = new KafkaConnectContainer(DockerImageName.parse("confluentinc/cp-kafka-connect:" + CONFLUENT_VERSION), "kafka:9092") .withEnv("CONNECT_SASL_MECHANISM", "PLAIN") @@ -33,11 +33,11 @@ public Map getProperties() { "org.apache.kafka.common.security.plain.PlainLoginModule " + "required username=\"admin\" password=\"admin\";") .withNetwork(network); - connect.start(); + connectContainer.start(); } Map properties = new HashMap<>(brokerProps); - properties.put("ns4kafka.managed-clusters.test-cluster.connects.test-connect.url", connect.getUrl()); + properties.put("ns4kafka.managed-clusters.test-cluster.connects.test-connect.url", connectContainer.getUrl()); return properties; } } diff --git a/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationTest.java index 23c423b1..f88646f8 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationTest.java @@ -26,29 +26,12 @@ public Map getProperties() { if (kafka == null || !kafka.isRunning()) { network = Network.newNetwork(); kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:" + CONFLUENT_VERSION)) - .withEnv( - "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", - "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT" - ) - .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER") + .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT") .withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN") .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN") .withEnv("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS", "PLAIN") - - .withEnv("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG", - "org.apache.kafka.common.security.plain.PlainLoginModule required " - + "username=\"admin\" password=\"admin\" " - + "user_admin=\"admin\" " - + "user_client=\"client\";") - .withEnv("KAFKA_SASL_JAAS_CONFIG", - "org.apache.kafka.common.security.plain.PlainLoginModule required " - + "username=\"admin\" password=\"admin\";") - .withEnv( - "KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", - "org.apache.kafka.common.security.plain.PlainLoginModule required " - + "username=\"admin\" password=\"admin\" " - + "user_admin=\"admin\" " - + "user_client=\"client\";") + .withEnv("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG", getJaasConfig()) + .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", getJaasConfig()) .withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "kafka.security.authorizer.AclAuthorizer") .withEnv("KAFKA_SUPER_USERS", "User:admin") .withNetworkAliases("kafka") @@ -62,7 +45,6 @@ public Map getProperties() { "kafka.security.protocol", "SASL_PLAINTEXT", "kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";", - "ns4kafka.managed-clusters.test-cluster.config.bootstrap.servers", kafka.getHost() + ":" + kafka.getMappedPort(9093), "ns4kafka.managed-clusters.test-cluster.config.sasl.mechanism", "PLAIN", @@ -72,6 +54,18 @@ public Map getProperties() { ); } + /** + * Get the JAAS config. + * + * @return The JAAS config + */ + private static String getJaasConfig() { + return "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"admin\" password=\"admin\" " + + "user_admin=\"admin\" " + + "user_client=\"client\";"; + } + /** * Getter for admin client service. * diff --git a/src/test/java/com/michelin/ns4kafka/integration/ConnectTest.java b/src/test/java/com/michelin/ns4kafka/integration/ConnectTest.java index 681b4ba2..6368fb03 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/ConnectTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/ConnectTest.java @@ -30,6 +30,7 @@ import com.michelin.ns4kafka.services.executors.TopicAsyncExecutor; import com.michelin.ns4kafka.validation.ConnectValidator; import com.michelin.ns4kafka.validation.TopicValidator; +import io.micronaut.context.ApplicationContext; import io.micronaut.context.annotation.Property; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; @@ -41,7 +42,6 @@ import io.micronaut.test.extensions.junit5.annotation.MicronautTest; import jakarta.inject.Inject; import java.net.MalformedURLException; -import java.net.URL; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -53,9 +53,14 @@ @MicronautTest @Property(name = "micronaut.security.gitlab.enabled", value = "false") class ConnectTest extends AbstractIntegrationConnectTest { + @Inject + private ApplicationContext applicationContext; + @Inject @Client("/") - HttpClient client; + HttpClient ns4KafkaClient; + + private HttpClient connectClient; @Inject List topicAsyncExecutorList; @@ -67,6 +72,8 @@ class ConnectTest extends AbstractIntegrationConnectTest { @BeforeAll void init() { + connectClient = applicationContext.createBean(HttpClient.class, connectContainer.getUrl()); + Namespace ns1 = Namespace.builder() .metadata(ObjectMeta.builder() .name("ns1") @@ -103,13 +110,14 @@ void init() { UsernamePasswordCredentials credentials = new UsernamePasswordCredentials("admin", "admin"); HttpResponse response = - client.toBlocking().exchange(HttpRequest.POST("/login", credentials), BearerAccessRefreshToken.class); + ns4KafkaClient.toBlocking() + .exchange(HttpRequest.POST("/login", credentials), BearerAccessRefreshToken.class); token = response.getBody().get().getAccessToken(); - client.toBlocking() + ns4KafkaClient.toBlocking() .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces").bearerAuth(token).body(ns1)); - client.toBlocking().exchange( + ns4KafkaClient.toBlocking().exchange( HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/role-bindings").bearerAuth(token).body(rb1)); AccessControlEntry aclConnect = AccessControlEntry.builder() @@ -126,7 +134,7 @@ void init() { .build()) .build(); - client.toBlocking().exchange( + ns4KafkaClient.toBlocking().exchange( HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/acls").bearerAuth(token).body(aclConnect)); AccessControlEntry aclTopic = AccessControlEntry.builder() @@ -143,14 +151,13 @@ void init() { .build()) .build(); - client.toBlocking() + ns4KafkaClient.toBlocking() .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/acls").bearerAuth(token).body(aclTopic)); } @Test - void createConnect() throws MalformedURLException { - HttpClient connectCli = HttpClient.create(new URL(connect.getUrl())); - ServerInfo actual = connectCli.toBlocking().retrieve(HttpRequest.GET("/"), ServerInfo.class); + void createConnect() { + ServerInfo actual = connectClient.toBlocking().retrieve(HttpRequest.GET("/"), ServerInfo.class); assertEquals("7.4.1-ccs", actual.version()); } @@ -167,7 +174,7 @@ void createNamespaceWithoutConnect() { .build()) .build(); - assertDoesNotThrow(() -> client.toBlocking() + assertDoesNotThrow(() -> ns4KafkaClient.toBlocking() .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces").bearerAuth(token).body(ns))); } @@ -212,10 +219,10 @@ void deployConnectors() throws InterruptedException, MalformedURLException { .build()) .build(); - client.toBlocking() + ns4KafkaClient.toBlocking() .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/topics").bearerAuth(token).body(topic)); topicAsyncExecutorList.forEach(TopicAsyncExecutor::run); - client.toBlocking().exchange( + ns4KafkaClient.toBlocking().exchange( HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/connectors").bearerAuth(token) .body(connectorWithNullParameter)); @@ -235,7 +242,7 @@ void deployConnectors() throws InterruptedException, MalformedURLException { .build()) .build(); - client.toBlocking().exchange( + ns4KafkaClient.toBlocking().exchange( HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/connectors").bearerAuth(token) .body(connectorWithEmptyParameter)); @@ -255,7 +262,7 @@ void deployConnectors() throws InterruptedException, MalformedURLException { .build()) .build(); - client.toBlocking().exchange( + ns4KafkaClient.toBlocking().exchange( HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/connectors").bearerAuth(token) .body(connectorWithFillParameter)); @@ -264,24 +271,22 @@ void deployConnectors() throws InterruptedException, MalformedURLException { Thread.sleep(2000); - HttpClient connectCli = HttpClient.create(new URL(connect.getUrl())); - // "File" property is present, but null - ConnectorInfo actualConnectorWithNullParameter = connectCli.toBlocking() + ConnectorInfo actualConnectorWithNullParameter = connectClient.toBlocking() .retrieve(HttpRequest.GET("/connectors/ns1-connectorWithNullParameter"), ConnectorInfo.class); assertTrue(actualConnectorWithNullParameter.config().containsKey("file")); Assertions.assertNull(actualConnectorWithNullParameter.config().get("file")); // "File" property is present, but empty - ConnectorInfo actualConnectorWithEmptyParameter = connectCli.toBlocking() + ConnectorInfo actualConnectorWithEmptyParameter = connectClient.toBlocking() .retrieve(HttpRequest.GET("/connectors/ns1-connectorWithEmptyParameter"), ConnectorInfo.class); assertTrue(actualConnectorWithEmptyParameter.config().containsKey("file")); assertTrue(actualConnectorWithEmptyParameter.config().get("file").isEmpty()); // "File" property is present - ConnectorInfo actualConnectorWithFillParameter = connectCli.toBlocking() + ConnectorInfo actualConnectorWithFillParameter = connectClient.toBlocking() .retrieve(HttpRequest.GET("/connectors/ns1-connectorWithFillParameter"), ConnectorInfo.class); assertTrue(actualConnectorWithFillParameter.config().containsKey("file")); @@ -290,8 +295,6 @@ void deployConnectors() throws InterruptedException, MalformedURLException { @Test void updateConnectorsWithNullProperty() throws InterruptedException, MalformedURLException { - - ConnectorSpecs connectorSpecs = ConnectorSpecs.builder() .config(Map.of("connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max", "1", @@ -306,8 +309,7 @@ void updateConnectorsWithNullProperty() throws InterruptedException, MalformedUR updatedConnectorSpecs.put("topics", "ns1-to1"); updatedConnectorSpecs.put("file", null); - HttpClient connectCli = HttpClient.create(new URL(connect.getUrl())); - HttpResponse connectorInfo = connectCli.toBlocking() + HttpResponse connectorInfo = connectClient.toBlocking() .exchange(HttpRequest.PUT("/connectors/ns1-connector/config", connectorSpecs), ConnectorInfo.class); // "File" property is present and fill @@ -329,7 +331,7 @@ void updateConnectorsWithNullProperty() throws InterruptedException, MalformedUR .build()) .build(); - client.toBlocking() + ns4KafkaClient.toBlocking() .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/topics").bearerAuth(token).body(topic)); topicAsyncExecutorList.forEach(TopicAsyncExecutor::run); @@ -345,7 +347,7 @@ void updateConnectorsWithNullProperty() throws InterruptedException, MalformedUR .build()) .build(); - client.toBlocking().exchange( + ns4KafkaClient.toBlocking().exchange( HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/connectors").bearerAuth(token) .body(updateConnector)); @@ -354,8 +356,8 @@ void updateConnectorsWithNullProperty() throws InterruptedException, MalformedUR Thread.sleep(2000); - ConnectorInfo actualConnector = - connectCli.toBlocking().retrieve(HttpRequest.GET("/connectors/ns1-connector"), ConnectorInfo.class); + ConnectorInfo actualConnector = connectClient.toBlocking() + .retrieve(HttpRequest.GET("/connectors/ns1-connector"), ConnectorInfo.class); // "File" property is present, but null assertTrue(actualConnector.config().containsKey("file")); @@ -393,10 +395,10 @@ void restartConnector() throws InterruptedException { .build()) .build(); - client.toBlocking() + ns4KafkaClient.toBlocking() .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/topics").bearerAuth(token).body(to)); topicAsyncExecutorList.forEach(TopicAsyncExecutor::run); - client.toBlocking() + ns4KafkaClient.toBlocking() .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/connectors").bearerAuth(token).body(co)); Flux.fromIterable(connectorAsyncExecutorList).flatMap(ConnectorAsyncExecutor::runHealthCheck).subscribe(); @@ -410,7 +412,7 @@ void restartConnector() throws InterruptedException { .action(ChangeConnectorState.ConnectorAction.restart).build()) .build(); - HttpResponse actual = client.toBlocking().exchange( + HttpResponse actual = ns4KafkaClient.toBlocking().exchange( HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/connectors/ns1-co1/change-state").bearerAuth(token) .body(restartState), ChangeConnectorState.class); assertEquals(HttpStatus.OK, actual.status()); @@ -447,10 +449,10 @@ void pauseAndResumeConnector() throws MalformedURLException, InterruptedExceptio .build()) .build(); - client.toBlocking() + ns4KafkaClient.toBlocking() .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/topics").bearerAuth(token).body(to)); topicAsyncExecutorList.forEach(TopicAsyncExecutor::run); - client.toBlocking() + ns4KafkaClient.toBlocking() .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/connectors").bearerAuth(token).body(co)); Flux.fromIterable(connectorAsyncExecutorList).flatMap(ConnectorAsyncExecutor::runHealthCheck).subscribe(); @@ -464,15 +466,14 @@ void pauseAndResumeConnector() throws MalformedURLException, InterruptedExceptio .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() .action(ChangeConnectorState.ConnectorAction.pause).build()) .build(); - client.toBlocking().exchange( + ns4KafkaClient.toBlocking().exchange( HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/connectors/ns1-co2/change-state").bearerAuth(token) .body(pauseState)); Thread.sleep(2000); - HttpClient connectCli = HttpClient.create(new URL(connect.getUrl())); - ConnectorStateInfo actual = - connectCli.toBlocking().retrieve(HttpRequest.GET("/connectors/ns1-co2/status"), ConnectorStateInfo.class); + ConnectorStateInfo actual = connectClient.toBlocking() + .retrieve(HttpRequest.GET("/connectors/ns1-co2/status"), ConnectorStateInfo.class); assertEquals("PAUSED", actual.connector().getState()); assertEquals("PAUSED", actual.tasks().get(0).getState()); assertEquals("PAUSED", actual.tasks().get(1).getState()); @@ -484,15 +485,15 @@ void pauseAndResumeConnector() throws MalformedURLException, InterruptedExceptio .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() .action(ChangeConnectorState.ConnectorAction.resume).build()) .build(); - client.toBlocking().exchange( + ns4KafkaClient.toBlocking().exchange( HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/connectors/ns1-co2/change-state").bearerAuth(token) .body(resumeState)); Thread.sleep(2000); // verify resumed directly on connect cluster - actual = - connectCli.toBlocking().retrieve(HttpRequest.GET("/connectors/ns1-co2/status"), ConnectorStateInfo.class); + actual = connectClient.toBlocking() + .retrieve(HttpRequest.GET("/connectors/ns1-co2/status"), ConnectorStateInfo.class); assertEquals("RUNNING", actual.connector().getState()); assertEquals("RUNNING", actual.tasks().get(0).getState()); assertEquals("RUNNING", actual.tasks().get(1).getState()); diff --git a/src/test/java/com/michelin/ns4kafka/integration/SchemaTest.java b/src/test/java/com/michelin/ns4kafka/integration/SchemaTest.java index 7c4032bc..4b1e53bc 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/SchemaTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/SchemaTest.java @@ -51,6 +51,7 @@ class SchemaTest extends AbstractIntegrationSchemaRegistryTest { @BeforeAll void init() { + // Create HTTP client as bean to load client configuration from application.yml schemaRegistryClient = applicationContext.createBean(HttpClient.class, schemaRegistryContainer.getUrl()); Namespace namespace = Namespace.builder()