Skip to content
This repository was archived by the owner on Feb 23, 2023. It is now read-only.

Commit 68a6047

Browse files
artembilansdeleuze
authored andcommitted
Add more support with Spring for Apache Kafka
Closes gh-921
1 parent a5962a8 commit 68a6047

File tree

4 files changed

+52
-8
lines changed

4 files changed

+52
-8
lines changed

samples/kafka/src/main/java/com/example/kafka/KafkaApplication.java

+36-7
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@
1010
import org.springframework.kafka.config.TopicBuilder;
1111
import org.springframework.kafka.core.ConsumerFactory;
1212
import org.springframework.kafka.core.KafkaTemplate;
13+
import org.springframework.nativex.hint.TypeHint;
1314

15+
import com.fasterxml.jackson.annotation.JsonCreator;
16+
import com.fasterxml.jackson.annotation.JsonProperty;
17+
18+
@TypeHint(types = KafkaApplication.Greeting.class)
1419
@SpringBootApplication
1520
public class KafkaApplication {
1621

@@ -19,8 +24,8 @@ public static void main(String[] args) {
1924
}
2025

2126
@KafkaListener(id = "graal", topics = "graal")
22-
public void listen(String in) {
23-
System.out.println("++++++Received:" + in);
27+
public void listen(Greeting in) {
28+
System.out.println("++++++Received: " + in);
2429
}
2530

2631
@Bean
@@ -29,15 +34,39 @@ public NewTopic topic() {
2934
}
3035

3136
@Bean
32-
public ApplicationRunner runner(KafkaTemplate<String, String> template,
33-
ConsumerFactory<String, String> cf) {
37+
public ApplicationRunner runner(KafkaTemplate<String, Greeting> template,
38+
ConsumerFactory<String, Greeting> cf) {
39+
40+
cf.addListener(new ConsumerFactory.Listener<String, Greeting>() {
3441

35-
cf.addListener(new ConsumerFactory.Listener() { });
42+
});
3643
return args -> {
37-
template.send("graal", "foo");
38-
System.out.println("++++++Sent:foo");
44+
Greeting data = new Greeting("Hello from GraalVM!");
45+
template.send("graal", data);
46+
System.out.println("++++++Sent: " + data);
3947
Thread.sleep(5000);
4048
};
4149
}
4250

51+
public static class Greeting {
52+
53+
private final String message;
54+
55+
@JsonCreator
56+
public Greeting(@JsonProperty("message") String message) {
57+
this.message = message;
58+
}
59+
60+
public String getMessage() {
61+
return this.message;
62+
}
63+
64+
@Override public String toString() {
65+
return "Greeting{" +
66+
"message='" + this.message + '\'' +
67+
'}';
68+
}
69+
70+
}
71+
4372
}
Original file line numberDiff line numberDiff line change
@@ -1 +1,4 @@
11
spring.kafka.consumer.auto-offset-reset=earliest
2+
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
3+
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.kafka
4+
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

samples/kafka/verify.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env bash
22
sleep 2
3-
if [[ `cat target/native/test-output.txt | grep "++++++Received:foo"` ]]; then
3+
if [[ `cat target/native/test-output.txt | grep "++++++Received: Greeting{message='Hello from GraalVM!'}"` ]]; then
44
exit 0
55
else
66
exit 1

spring-native-configuration/src/main/java/org/springframework/kafka/annotation/KafkaHints.java

+12
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,11 @@
5353
import org.springframework.kafka.listener.ContainerProperties;
5454
import org.springframework.kafka.support.LoggingProducerListener;
5555
import org.springframework.kafka.support.ProducerListener;
56+
import org.springframework.kafka.support.serializer.JsonDeserializer;
5657
import org.springframework.kafka.support.serializer.JsonSerializer;
5758
import org.springframework.nativex.hint.AccessBits;
59+
import org.springframework.nativex.hint.InitializationHint;
60+
import org.springframework.nativex.hint.InitializationTime;
5861
import org.springframework.nativex.hint.NativeHint;
5962
import org.springframework.nativex.hint.JdkProxyHint;
6063
import org.springframework.nativex.hint.TypeHint;
@@ -118,6 +121,7 @@
118121
DefaultPartitioner.class,
119122
StringDeserializer.class,
120123
StringSerializer.class,
124+
JsonDeserializer.class,
121125
JsonSerializer.class,
122126
ByteArrayDeserializer.class,
123127
ByteArraySerializer.class
@@ -173,5 +177,13 @@
173177
})
174178
}
175179
)
180+
@NativeHint(trigger = org.springframework.kafka.support.KafkaUtils.class,
181+
initialization =
182+
@InitializationHint(initTime = InitializationTime.BUILD,
183+
types = {
184+
org.springframework.kafka.support.JacksonUtils.class,
185+
org.springframework.kafka.support.KafkaUtils.class,
186+
org.springframework.kafka.support.JacksonPresent.class
187+
}))
176188
public class KafkaHints implements NativeConfiguration {
177189
}

0 commit comments

Comments
 (0)