Skip to content
This repository was archived by the owner on Feb 23, 2023. It is now read-only.

Add more support with Spring for Apache Kafka #921

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions samples/kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@

<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.7.5-SNAPSHOT</spring-kafka.version>
</properties>

<dependencies>
Original file line number Diff line number Diff line change
@@ -10,7 +10,12 @@
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.nativex.hint.TypeHint;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

@TypeHint(types = KafkaApplication.Greeting.class)
@SpringBootApplication
public class KafkaApplication {

@@ -19,8 +24,8 @@ public static void main(String[] args) {
}

@KafkaListener(id = "graal", topics = "graal")
public void listen(String in) {
System.out.println("++++++Received:" + in);
public void listen(Greeting in) {
System.out.println("++++++Received: " + in);
}

@Bean
@@ -29,15 +34,39 @@ public NewTopic topic() {
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
ConsumerFactory<String, String> cf) {
public ApplicationRunner runner(KafkaTemplate<String, Greeting> template,
ConsumerFactory<String, Greeting> cf) {

cf.addListener(new ConsumerFactory.Listener<String, Greeting>() {

cf.addListener(new ConsumerFactory.Listener() { });
});
return args -> {
template.send("graal", "foo");
System.out.println("++++++Sent:foo");
Greeting data = new Greeting("Hello from GraalVM!");
template.send("graal", data);
System.out.println("++++++Sent: " + data);
Thread.sleep(5000);
};
}

public static class Greeting {

private final String message;

@JsonCreator
public Greeting(@JsonProperty("message") String message) {
this.message = message;
}

public String getMessage() {
return this.message;
}

@Override public String toString() {
return "Greeting{" +
"message='" + this.message + '\'' +
'}';
}

}

}
3 changes: 3 additions & 0 deletions samples/kafka/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.kafka
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
2 changes: 1 addition & 1 deletion samples/kafka/verify.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env bash
sleep 2
if [[ `cat target/native/test-output.txt | grep "++++++Received:foo"` ]]; then
if [[ `cat target/native/test-output.txt | grep "++++++Received: Greeting{message='Hello from GraalVM!'}"` ]]; then
exit 0
else
exit 1
Original file line number Diff line number Diff line change
@@ -53,8 +53,11 @@
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.nativex.hint.AccessBits;
import org.springframework.nativex.hint.InitializationHint;
import org.springframework.nativex.hint.InitializationTime;
import org.springframework.nativex.hint.NativeHint;
import org.springframework.nativex.hint.JdkProxyHint;
import org.springframework.nativex.hint.TypeHint;
@@ -118,6 +121,7 @@
DefaultPartitioner.class,
StringDeserializer.class,
StringSerializer.class,
JsonDeserializer.class,
JsonSerializer.class,
ByteArrayDeserializer.class,
ByteArraySerializer.class
@@ -173,5 +177,16 @@
})
}
)
@NativeHint(trigger = com.fasterxml.jackson.databind.ObjectMapper.class,
initialization =
@InitializationHint(initTime = InitializationTime.BUILD,
types = {
org.springframework.kafka.support.JacksonUtils.class,
org.springframework.kafka.support.JacksonPresent.class
}))
@NativeHint(trigger = io.micrometer.core.instrument.MeterRegistry.class,
initialization =
@InitializationHint(initTime = InitializationTime.BUILD,
types = org.springframework.kafka.support.KafkaUtils.class))
public class KafkaHints implements NativeConfiguration {
}