Skip to content

Commit 5e07d95

Browse files
eddumelendezfokion
authored andcommitted
Add redpandadata/redpanda as a compatible image (testcontainers#7898)
Test has been added for all compatible images.
1 parent a17d8c3 commit 5e07d95

File tree

4 files changed

+146
-80
lines changed

4 files changed

+146
-80
lines changed

modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
/**
3131
* Testcontainers implementation for Redpanda.
3232
* <p>
33-
* Supported images: {@code docker.redpanda.com/redpandadata/redpanda}, {@code docker.redpanda.com/vectorized/redpanda}
33+
* Supported images: {@code redpandadata/redpanda}, {@code docker.redpanda.com/redpandadata/redpanda}
3434
* <p>
3535
* Exposed ports:
3636
* <ul>
@@ -43,11 +43,15 @@ public class RedpandaContainer extends GenericContainer<RedpandaContainer> {
4343

4444
private static final String REDPANDA_FULL_IMAGE_NAME = "docker.redpanda.com/redpandadata/redpanda";
4545

46+
private static final String IMAGE_NAME = "redpandadata/redpanda";
47+
4648
@Deprecated
4749
private static final String REDPANDA_OLD_FULL_IMAGE_NAME = "docker.redpanda.com/vectorized/redpanda";
4850

4951
private static final DockerImageName REDPANDA_IMAGE = DockerImageName.parse(REDPANDA_FULL_IMAGE_NAME);
5052

53+
private static final DockerImageName IMAGE = DockerImageName.parse(IMAGE_NAME);
54+
5155
@Deprecated
5256
private static final DockerImageName REDPANDA_OLD_IMAGE = DockerImageName.parse(REDPANDA_OLD_FULL_IMAGE_NAME);
5357

@@ -75,10 +79,14 @@ public RedpandaContainer(String image) {
7579

7680
public RedpandaContainer(DockerImageName imageName) {
7781
super(imageName);
78-
imageName.assertCompatibleWith(REDPANDA_OLD_IMAGE, REDPANDA_IMAGE);
82+
imageName.assertCompatibleWith(REDPANDA_OLD_IMAGE, REDPANDA_IMAGE, IMAGE);
7983

8084
boolean isLessThanBaseVersion = new ComparableVersion(imageName.getVersionPart()).isLessThan("v22.2.1");
81-
if (REDPANDA_FULL_IMAGE_NAME.equals(imageName.getUnversionedPart()) && isLessThanBaseVersion) {
85+
boolean isPublicCompatibleImage =
86+
REDPANDA_FULL_IMAGE_NAME.equals(imageName.getUnversionedPart()) ||
87+
IMAGE_NAME.equals(imageName.getUnversionedPart()) ||
88+
REDPANDA_OLD_FULL_IMAGE_NAME.equals(imageName.getUnversionedPart());
89+
if (isPublicCompatibleImage && isLessThanBaseVersion) {
8290
throw new IllegalArgumentException("Redpanda version must be >= v22.2.1");
8391
}
8492

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package org.testcontainers.redpanda;
2+
3+
import com.google.common.collect.ImmutableMap;
4+
import org.apache.kafka.clients.admin.AdminClient;
5+
import org.apache.kafka.clients.admin.AdminClientConfig;
6+
import org.apache.kafka.clients.admin.NewTopic;
7+
import org.apache.kafka.clients.consumer.ConsumerConfig;
8+
import org.apache.kafka.clients.consumer.ConsumerRecord;
9+
import org.apache.kafka.clients.consumer.ConsumerRecords;
10+
import org.apache.kafka.clients.consumer.KafkaConsumer;
11+
import org.apache.kafka.clients.producer.KafkaProducer;
12+
import org.apache.kafka.clients.producer.ProducerConfig;
13+
import org.apache.kafka.clients.producer.ProducerRecord;
14+
import org.apache.kafka.common.serialization.StringDeserializer;
15+
import org.apache.kafka.common.serialization.StringSerializer;
16+
import org.rnorth.ducttape.unreliables.Unreliables;
17+
18+
import java.time.Duration;
19+
import java.util.Collection;
20+
import java.util.Collections;
21+
import java.util.UUID;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
import static org.assertj.core.api.Assertions.tuple;
26+
27+
public class AbstractRedpanda {
28+
29+
protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
30+
testKafkaFunctionality(bootstrapServers, 1, 1);
31+
}
32+
33+
protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
34+
try (
35+
AdminClient adminClient = AdminClient.create(
36+
ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
37+
);
38+
KafkaProducer<String, String> producer = new KafkaProducer<>(
39+
ImmutableMap.of(
40+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
41+
bootstrapServers,
42+
ProducerConfig.CLIENT_ID_CONFIG,
43+
UUID.randomUUID().toString()
44+
),
45+
new StringSerializer(),
46+
new StringSerializer()
47+
);
48+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
49+
ImmutableMap.of(
50+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
51+
bootstrapServers,
52+
ConsumerConfig.GROUP_ID_CONFIG,
53+
"tc-" + UUID.randomUUID(),
54+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
55+
"earliest"
56+
),
57+
new StringDeserializer(),
58+
new StringDeserializer()
59+
);
60+
) {
61+
String topicName = "messages-" + UUID.randomUUID();
62+
63+
Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
64+
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);
65+
66+
consumer.subscribe(Collections.singletonList(topicName));
67+
68+
producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
69+
70+
Unreliables.retryUntilTrue(
71+
10,
72+
TimeUnit.SECONDS,
73+
() -> {
74+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
75+
76+
if (records.isEmpty()) {
77+
return false;
78+
}
79+
80+
assertThat(records)
81+
.hasSize(1)
82+
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
83+
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
84+
85+
return true;
86+
}
87+
);
88+
89+
consumer.unsubscribe();
90+
}
91+
}
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.testcontainers.redpanda;
2+
3+
import org.junit.Test;
4+
import org.junit.runner.RunWith;
5+
import org.junit.runners.Parameterized;
6+
7+
@RunWith(Parameterized.class)
8+
public class CompatibleImageTest extends AbstractRedpanda {
9+
10+
private final String image;
11+
12+
public CompatibleImageTest(String image) {
13+
this.image = image;
14+
}
15+
16+
@Parameterized.Parameters(name = "{0}")
17+
public static String[] image() {
18+
return new String[] { "docker.redpanda.com/vectorized/redpanda:v22.2.1", "redpandadata/redpanda:v22.2.1" };
19+
}
20+
21+
@Test
22+
public void shouldProduceAndConsumeMessage() throws Exception {
23+
try (RedpandaContainer container = new RedpandaContainer(this.image)) {
24+
container.start();
25+
testKafkaFunctionality(container.getBootstrapServers());
26+
}
27+
}
28+
}

modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java

+15-77
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,16 @@
88
import org.apache.kafka.clients.admin.AdminClient;
99
import org.apache.kafka.clients.admin.AdminClientConfig;
1010
import org.apache.kafka.clients.admin.NewTopic;
11-
import org.apache.kafka.clients.consumer.ConsumerConfig;
12-
import org.apache.kafka.clients.consumer.ConsumerRecord;
13-
import org.apache.kafka.clients.consumer.ConsumerRecords;
14-
import org.apache.kafka.clients.consumer.KafkaConsumer;
15-
import org.apache.kafka.clients.producer.KafkaProducer;
16-
import org.apache.kafka.clients.producer.ProducerConfig;
17-
import org.apache.kafka.clients.producer.ProducerRecord;
1811
import org.apache.kafka.common.config.SaslConfigs;
1912
import org.apache.kafka.common.errors.SaslAuthenticationException;
2013
import org.apache.kafka.common.errors.TopicAuthorizationException;
21-
import org.apache.kafka.common.serialization.StringDeserializer;
22-
import org.apache.kafka.common.serialization.StringSerializer;
2314
import org.awaitility.Awaitility;
2415
import org.junit.Test;
25-
import org.rnorth.ducttape.unreliables.Unreliables;
2616
import org.testcontainers.containers.GenericContainer;
2717
import org.testcontainers.containers.Network;
2818
import org.testcontainers.images.builder.Transferable;
2919
import org.testcontainers.utility.DockerImageName;
3020

31-
import java.time.Duration;
3221
import java.util.Collection;
3322
import java.util.Collections;
3423
import java.util.HashMap;
@@ -39,9 +28,8 @@
3928

4029
import static org.assertj.core.api.Assertions.assertThat;
4130
import static org.assertj.core.api.Assertions.assertThatThrownBy;
42-
import static org.assertj.core.api.Assertions.tuple;
4331

44-
public class RedpandaContainerTest {
32+
public class RedpandaContainerTest extends AbstractRedpanda {
4533

4634
private static final String REDPANDA_IMAGE = "docker.redpanda.com/redpandadata/redpanda:v22.2.1";
4735

@@ -78,6 +66,20 @@ public void testNotCompatibleVersion() {
7866
.hasMessageContaining("Redpanda version must be >= v22.2.1");
7967
}
8068

69+
@Test
70+
public void vectorizedRedpandaImageVersion2221ShouldNotBeCompatible() {
71+
assertThatThrownBy(() -> new RedpandaContainer("docker.redpanda.com/vectorized/redpanda:v21.11.19"))
72+
.isInstanceOf(IllegalArgumentException.class)
73+
.hasMessageContaining("Redpanda version must be >= v22.2.1");
74+
}
75+
76+
@Test
77+
public void redpandadataRedpandaImageVersion2221ShouldNotBeCompatible() {
78+
assertThatThrownBy(() -> new RedpandaContainer("redpandadata/redpanda:v21.11.19"))
79+
.isInstanceOf(IllegalArgumentException.class)
80+
.hasMessageContaining("Redpanda version must be >= v22.2.1");
81+
}
82+
8183
@Test
8284
public void testSchemaRegistry() {
8385
try (RedpandaContainer container = new RedpandaContainer(REDPANDA_DOCKER_IMAGE)) {
@@ -359,70 +361,6 @@ public void testRestProxy() {
359361
}
360362
}
361363

362-
private void testKafkaFunctionality(String bootstrapServers) throws Exception {
363-
testKafkaFunctionality(bootstrapServers, 1, 1);
364-
}
365-
366-
private void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
367-
try (
368-
AdminClient adminClient = AdminClient.create(
369-
ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
370-
);
371-
KafkaProducer<String, String> producer = new KafkaProducer<>(
372-
ImmutableMap.of(
373-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
374-
bootstrapServers,
375-
ProducerConfig.CLIENT_ID_CONFIG,
376-
UUID.randomUUID().toString()
377-
),
378-
new StringSerializer(),
379-
new StringSerializer()
380-
);
381-
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
382-
ImmutableMap.of(
383-
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
384-
bootstrapServers,
385-
ConsumerConfig.GROUP_ID_CONFIG,
386-
"tc-" + UUID.randomUUID(),
387-
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
388-
"earliest"
389-
),
390-
new StringDeserializer(),
391-
new StringDeserializer()
392-
);
393-
) {
394-
String topicName = "messages-" + UUID.randomUUID();
395-
396-
Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
397-
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);
398-
399-
consumer.subscribe(Collections.singletonList(topicName));
400-
401-
producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
402-
403-
Unreliables.retryUntilTrue(
404-
10,
405-
TimeUnit.SECONDS,
406-
() -> {
407-
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
408-
409-
if (records.isEmpty()) {
410-
return false;
411-
}
412-
413-
assertThat(records)
414-
.hasSize(1)
415-
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
416-
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
417-
418-
return true;
419-
}
420-
);
421-
422-
consumer.unsubscribe();
423-
}
424-
}
425-
426364
private AdminClient getAdminClient(RedpandaContainer redpanda) {
427365
String bootstrapServer = String.format("%s:%s", redpanda.getHost(), redpanda.getMappedPort(9092));
428366
// createAdminClient {

0 commit comments

Comments
 (0)