Skip to content

Commit 201180c

Browse files
committed
Native Hints for Avro SchemaRegistryClient
1 parent 4eebffc commit 201180c

File tree

6 files changed

+70
-9
lines changed

6 files changed

+70
-9
lines changed

docker-compose.yml

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
version: '3.1'
2+
services:
3+
avro:
4+
image: spring-native-kafka-avro:0.0.1-SNAPSHOT
5+
ports:
6+
- 9999:9999
7+
depends_on:
8+
- schema-registry
9+
kafka:
10+
image: wurstmeister/kafka:latest
11+
ports:
12+
- "9092:9092"
13+
environment:
14+
- KAFKA_ADVERTISED_HOST_NAME=kafka
15+
- KAFKA_ADVERTISED_PORT=9092
16+
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
17+
depends_on:
18+
- zookeeper
19+
zookeeper:
20+
image: wurstmeister/zookeeper:latest
21+
ports:
22+
- "2181:2181"
23+
environment:
24+
- KAFKA_ADVERTISED_HOST_NAME=zookeeper
25+
schema-registry:
26+
image: confluentinc/cp-schema-registry:latest
27+
ports:
28+
- "8081:8081"
29+
environment:
30+
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
31+
- SCHEMA_REGISTRY_HOST_NAME=schema-registry

gradlew

100644100755
File mode changed.

src/main/java/com/sample/config/AotConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import com.sample.avro.Address;
44
import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
55
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
6+
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaTypeConverter;
7+
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
8+
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
69
import io.confluent.kafka.serializers.KafkaAvroSerializer;
710
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
811
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
@@ -35,6 +38,9 @@
3538
SpecificAvroSerde.class,
3639
KafkaAvroSerializer.class,
3740
ErrorMessage.class,
41+
RegisterSchemaRequest.class,
42+
RegisterSchemaResponse.class,
43+
SchemaTypeConverter.class,
3844
SchemaString.class
3945
})
4046
)

src/main/java/com/sample/controllers/KafkaController.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.sample.model.AddrReq;
55
import com.sample.topology.Producer;
66
import org.springframework.beans.factory.annotation.Autowired;
7+
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
78
import org.springframework.web.bind.annotation.*;
89

910
import java.util.UUID;
@@ -14,13 +15,19 @@ public class KafkaController {
1415

1516
private final Producer producer;
1617

18+
private final StreamsBuilderFactoryBean fb;
19+
1720
@Autowired
18-
KafkaController(Producer producer) {
21+
KafkaController(Producer producer, StreamsBuilderFactoryBean fb) {
1922
this.producer = producer;
23+
this.fb = fb;
2024
}
2125

2226
@PostMapping
2327
public void sendMessageToKafkaTopic(@RequestBody AddrReq addr) {
28+
if (!this.fb.isRunning()) {
29+
fb.start();
30+
}
2431
Address address = Address.newBuilder().setAddressId(UUID.randomUUID().toString()).setAddressPostcode(addr.getPostcode()).build();
2532
this.producer.sendMessage(address);
2633
}

src/main/java/com/sample/topology/Producer.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
import org.springframework.beans.factory.annotation.Value;
77
import org.springframework.context.annotation.Configuration;
88
import org.springframework.kafka.core.KafkaTemplate;
9+
import org.springframework.kafka.support.SendResult;
10+
11+
import java.util.concurrent.ExecutionException;
12+
import java.util.concurrent.Future;
13+
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.TimeoutException;
915

1016
@Configuration
1117
@Slf4j
@@ -21,7 +27,18 @@ public Producer(KafkaTemplate<String, Address> kafkaTemplate) {
2127
}
2228

2329
public void sendMessage(Address address) {
24-
this.kafkaTemplate.send(this.TOPIC, address.getAddressId(), address);
30+
Future<SendResult<String, Address>> future = this.kafkaTemplate.send(this.TOPIC, address.getAddressId(), address);
31+
try {
32+
SendResult<String, Address> result = future.get(10, TimeUnit.SECONDS);
33+
log.info(result.getRecordMetadata().toString());
34+
} catch (InterruptedException e) {
35+
Thread.currentThread().interrupt();
36+
e.printStackTrace();
37+
} catch (ExecutionException e) {
38+
e.printStackTrace();
39+
} catch (TimeoutException e) {
40+
e.printStackTrace();
41+
}
2542
log.info(String.format("Produced address -> %s", address));
2643
}
2744

src/main/resources/application.yml

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,32 @@
11
server:
2-
port: 8080
2+
port: 9999
33

44
spring:
55
application:
66
name: "spring-native-kafka-streams"
77

88
kafka:
9-
bootstrap-servers: localhost:9092
9+
bootstrap-servers: kafka:9092
1010
streams:
11-
bootstrap-servers: localhost:9092
11+
bootstrap-servers: kafka:9092
1212
application-id: ${spring.application.name}
1313
client-id: ${spring.application.name}-stream
1414
auto-offset-reset: earliest
1515
properties:
1616
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
1717
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
1818
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
19+
auto-startup: false
1920
properties:
20-
schema.registry.url: http://localhost:8081
21+
schema.registry.url: http://schema-registry:8081
2122
bootstrap.servers: ${spring.kafka.bootstrap-servers}
2223
producer:
2324
key-serializer: org.apache.kafka.common.serialization.StringSerializer
2425
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
2526

26-
2727
logging:
28-
pattern:
29-
console: "%d{yyyy-MM-dd HH:mm:ss} ${LOG_LEVEL_PATTERN:-%5p} %m%n"
28+
# pattern:
29+
# console: "%d{yyyy-MM-dd HH:mm:ss} ${LOG_LEVEL_PATTERN:-%5p} %m%n"
3030
level:
3131
org.springframework.kafka.config: DEBUG
3232

0 commit comments

Comments
 (0)