-
Notifications
You must be signed in to change notification settings - Fork 353
Add support for Avro serialization in Kafka #687
Comments
The 42201 error is returned by the registry server - can you capture the JSON for the schema from the non-native app, to see what is the difference. |
Here is debug logging for the non-native instance. Your stacktrace has an extra trailing square bracket ] at the end of the payload. Otherwise they look the same.
|
It's not MY stack trace, it's yours 😄 but that would explain why the registry flags it as invalid. Looking at the serializer, it uses this to obtain the schema It looks like But that avro code seems to be the smoking gun. |
I didn't scroll right when I pasted it in the stacktrace 😒 |
I got your project built and see that it works as Java. I finally got the image to build, but now I need docker containers for Kafka and the registry; I see you have a kubernetes profile which I assume you use for this. If you can jump start me, it would save me some time; then I want to add some debug logic to the serializer, to figure where the junk is coming from. |
Never mind; I got everything working - I reproduced it and will start diagnosing - probably tomorrow now - stay tuned. |
Strange, I added this to your app... @Bean
ApplicationRunner runner() {
return args -> {
Thread.sleep(30000);
Address address = Address.newBuilder().setAddressId(UUID.randomUUID().toString()).setAddressPostcode("12345").build();
System.out.println(AvroSchemaUtils.getSchema(address, false).toString(true));
};
} and
No extra |
Progress. I was getting a different problem to you; registration failed due to an empty schema, after some debugging, I added these to the RegisterSchemaRequest.class,
RegisterSchemaResponse.class,
SchemaTypeConverter.class, and now the schema registration and publishing is successful. I am not yet seeing anything on the consumer (Streams) side, though.
|
Correction; it is working fine for me now...
I added code to the producer to collect the record metadata. public void sendMessage(Address address) {
Future<SendResult<String, Address>> future = this.kafkaTemplate.send(this.TOPIC, address.getAddressId(), address);
try {
SendResult<String, Address> result = future.get(10, TimeUnit.SECONDS);
log.info(result.getRecordMetadata().toString());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
log.info(String.format("Produced address -> %s", address));
} |
It's the suggestion here that a TypeHint is needed for all generated classes? |
@pdebuitlear At the moment, yes; I am looking at a mechanism to detect and auto-register Avro generated types. |
- detect Apache Avro generated types and register for reflection
- detect Apache Avro generated types used in listeners, and register for reflection
There seem to be 2 issues with Avro in Spring Native. These 2 issues I'm not seeing with the same code running outside of Spring Native.
Sample code here.
The first issue is that auto generated classes from Avro being are not being registered automatically. I needed to add a hint for the class in question. This is not ideal.
The second issue is that when I trying to invoke the serialization logic by trying to put a message to the broker, I am getting the following exception indicating that the input schema is invalid. This works outside of Spring Native.
The text was updated successfully, but these errors were encountered: