Skip to content

Commit

Permalink
feat(avro-decimal-format): add configurable serialization for Avro de…
Browse files Browse the repository at this point in the history
…cimals (#281)

* feat(avro-decimal-format): add configurable serialization for Avro decimals

Issue : #280

Clients using the http-sink connector often cannot handle Base64-encoded Avro decimals.
This enhancement introduces a configurable option (`avro.decimal.format`) allowing users
to choose between `BASE64` (default) and `NUMERIC` serialization formats.

- Preserves the existing Base64 behavior by default.
- Enables direct numeric representation for improved compatibility with JSON-based consumers.
- Reduces the need for additional conversions on the target system.

This change enhances flexibility and ensures broader consumer support for Avro decimal values.

* test: Add decimal value conversion tests in RecordValueConverter

- Introduce parameterized tests verifying both NUMERIC and BASE64 decimal formats.
- Cover multiple scales (e.g., 2, 3, 5) for realistic decimal usage.
- Validate correct serialization in arrays, nested fields, and single fields.
- Ensure no conditional logic in test methods; all scenarios driven by @CsvSource.

* chore: Remove dangling comment

* chore: rename test variable from `finalSnippet` to `expectedValue` for clarity

* chore: fix lint

* chore: rename `avro.decimal.format` to `decimal.format`

---------

Co-authored-by: Maycon <maycon_comput@yahoo.com.br>
  • Loading branch information
jmaycon and mayconcomput authored Feb 26, 2025
1 parent ad0abbe commit 9b94970
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 19 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ dependencies {
implementation("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion")
implementation("org.slf4j:slf4j-api:$slf4japiVersion")

testRuntimeOnly("org.apache.kafka:connect-json:$kafkaVersion")
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion")
testRuntimeOnly("org.junit.jupiter:junit-jupiter:$jupiterVersion")

testImplementation("org.apache.kafka:connect-api:$kafkaVersion")
testImplementation("org.apache.kafka:connect-json:$kafkaVersion")
testImplementation("org.junit.jupiter:junit-jupiter-api:$jupiterVersion")
testImplementation("org.junit.jupiter:junit-jupiter-params:$jupiterVersion")
testImplementation("org.mockito:mockito-core:$mockitoVersion")
Expand Down
10 changes: 10 additions & 0 deletions docs/sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,14 @@ Errors Handling
* Default: null
* Importance: low

Formatting
^^^^^^^^^^^^^^^

``decimal.format``
Controls which format this converter will serialize decimals in. It can be either `BASE64` (default) or `NUMERIC`.

* Type: string
* Default: BASE64
* Valid Values: [BASE64, NUMERIC]
* Importance: low

Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -35,6 +38,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.data.Decimal;

import io.aiven.kafka.connect.http.mockserver.BodyRecorderHandler;
import io.aiven.kafka.connect.http.mockserver.MockServer;
Expand All @@ -47,6 +51,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Container;
Expand Down Expand Up @@ -185,6 +191,58 @@ final void testBasicDelivery() throws ExecutionException, InterruptedException {
assertThat(bodyRecorderHandler.recorderBodies()).containsExactlyElementsOf(expectedBodies);
}

@ParameterizedTest
@CsvSource({
"BASE64 , BZw=", // Expected Base64-encoded representation of 14.36
"NUMERIC, 14.36"
})
void testDecimalFormat(final String decimalFormat, final String expectedValue) throws Exception {
final BodyRecorderHandler bodyRecorderHandler = new BodyRecorderHandler();
mockServer.addHandler(bodyRecorderHandler);
mockServer.start();

// Define Avro Schema with only one field: Decimal (Precision 10, Scale 2)
final var priceValue = new BigDecimal("14.36");
final var schema = new Schema.Parser()
.parse("{\"type\":\"record\",\"name\":\"record\","
+ "\"fields\":["
+ "{\"name\":\"price\",\"type\":{"
+ "\"type\":\"bytes\","
+ "\"logicalType\":\"decimal\","
+ "\"precision\":10,"
+ "\"scale\":2"
+ "}}]}");

final var valueRecord = new GenericData.Record(schema);

// Convert BigDecimal to Avro's expected bytes format
final var encodedPrice = Decimal.fromLogical(Decimal.schema(2), priceValue);
final var priceBuffer = ByteBuffer.wrap(encodedPrice);
valueRecord.put("price", priceBuffer);

// Configure connector decimal formatting
final Map<String, String> config = basicConnectorConfig();
config.put("decimal.format", decimalFormat);

connectRunner.createConnector(config);

// Send message asynchronously
sendMessageAsync(0, TEST_TOPIC, valueRecord);
producer.flush();

final List<String> actualReceivedValue = await("All expected requests received by HTTP server")
.atMost(Duration.ofSeconds(5000))
.pollInterval(Duration.ofMillis(100))
.until(() -> {
final List<String> recordedBodies = bodyRecorderHandler.recorderBodies();
return recordedBodies.isEmpty() ? null : recordedBodies;
}, Objects::nonNull);

assertThat(actualReceivedValue).hasSize(1)
.first()
.asString().contains(expectedValue);
}

private Map<String, String> basicConnectorConfig() {
final var config = new HashMap<String, String>();
config.put("name", CONNECTOR_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.DecimalFormat;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

Expand Down Expand Up @@ -80,6 +81,9 @@ public final class HttpSinkConfig extends AbstractConfig {
private static final String TIMEOUT_GROUP = "Timeout";
private static final String HTTP_TIMEOUT_CONFIG = "http.timeout";

private static final String FORMATING_GROUP = "Format";
private static final String AVRO_DECIMAL_FORMAT_CONFIG = "decimal.format";

public static final String NAME_CONFIG = "name";

private static final String ERRORS_GROUP = "Errors Handling";
Expand All @@ -92,6 +96,7 @@ public static ConfigDef configDef() {
addRetriesConfigGroup(configDef);
addTimeoutConfigGroup(configDef);
addErrorsConfigGroup(configDef);
addFormatConfigGroup(configDef);
return configDef;
}

Expand Down Expand Up @@ -586,7 +591,23 @@ private static void addErrorsConfigGroup(final ConfigDef configDef) {
groupCounter++,
ConfigDef.Width.SHORT,
HTTP_TIMEOUT_CONFIG
);
}

@SuppressFBWarnings("DLS_DEAD_LOCAL_STORE") // Suppress groupCounter and groupCounter++
private static void addFormatConfigGroup(final ConfigDef configDef) {
int groupCounter = 0;
configDef.define(
AVRO_DECIMAL_FORMAT_CONFIG,
ConfigDef.Type.STRING,
"BASE64",
ConfigDef.Importance.LOW,
"Controls which format this converter will serialize "
+ "decimals in. and can be either BASE64 (default) or NUMERIC.",
FORMATING_GROUP,
groupCounter++,
ConfigDef.Width.MEDIUM,
AVRO_DECIMAL_FORMAT_CONFIG
);
}

Expand Down Expand Up @@ -782,6 +803,10 @@ public final InetSocketAddress proxy() {
return new InetSocketAddress(getString(HTTP_PROXY_HOST), getInt(HTTP_PROXY_PORT));
}

public DecimalFormat decimalFormat() {
return DecimalFormat.valueOf(getString(AVRO_DECIMAL_FORMAT_CONFIG));
}

public final boolean sslTrustAllCertificates() {
return getBoolean(HTTP_SSL_TRUST_ALL_CERTIFICATES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@
import java.nio.charset.StandardCharsets;
import java.util.Map;

import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.sink.SinkRecord;

class JsonRecordValueConverter implements RecordValueConverter.Converter {

private final JsonConverter jsonConverter;

public JsonRecordValueConverter() {
public JsonRecordValueConverter(final DecimalFormat decimalFormat) {
this.jsonConverter = new JsonConverter();
jsonConverter.configure(Map.of("schemas.enable", false, "converter.type", "value"));
jsonConverter.configure(Map.of("schemas.enable", false, "converter.type", "value",
JsonConverterConfig.DECIMAL_FORMAT_CONFIG, decimalFormat.name()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,32 @@

import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.sink.SinkRecord;

import io.aiven.kafka.connect.http.config.HttpSinkConfig;

public class RecordValueConverter {
private static final ConcurrentHashMap<Class<?>, Converter> RUNTIME_CLASS_TO_CONVERTER_CACHE =
new ConcurrentHashMap<>();
private final JsonRecordValueConverter jsonRecordValueConverter = new JsonRecordValueConverter();

private final Map<Class<?>, Converter> converters = Map.of(
String.class, record -> (String) record.value(),
Map.class, jsonRecordValueConverter,
Struct.class, jsonRecordValueConverter
);
private final Map<Class<?>, Converter> converters;

public static RecordValueConverter create(final HttpSinkConfig config) {
RUNTIME_CLASS_TO_CONVERTER_CACHE.clear(); // Avoid state being preserved on task restarts
final DecimalFormat decimalFormat = config.decimalFormat();
final JsonRecordValueConverter jsonRecordValueConverter = new JsonRecordValueConverter(decimalFormat);
final Map<Class<?>, RecordValueConverter.Converter> converters = Map.of(
String.class, record -> (String) record.value(),
Map.class, jsonRecordValueConverter,
Struct.class, jsonRecordValueConverter
);
return new RecordValueConverter(converters);
}

private RecordValueConverter(final Map<Class<?>, Converter> converters) {
this.converters = converters;
}

interface Converter {
String convert(final SinkRecord record);
Expand Down Expand Up @@ -109,5 +123,4 @@ private static void validateConvertibility(
recordClazz, implementedTypes));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;

import io.aiven.kafka.connect.http.converter.RecordValueConverter;
import io.aiven.kafka.connect.http.sender.HttpSender;

final class BatchRecordSender extends RecordSender {
Expand All @@ -33,11 +34,12 @@ final class BatchRecordSender extends RecordSender {
private final String batchSeparator;

protected BatchRecordSender(final HttpSender httpSender,
final RecordValueConverter recordValueConverter,
final int batchMaxSize,
final String batchPrefix,
final String batchSuffix,
final String batchSeparator) {
super(httpSender);
super(httpSender, recordValueConverter);
this.batchMaxSize = batchMaxSize;
this.batchPrefix = batchPrefix;
this.batchSuffix = batchSuffix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,29 @@ public abstract class RecordSender {

protected final HttpSender httpSender;

protected final RecordValueConverter recordValueConverter = new RecordValueConverter();
protected final RecordValueConverter recordValueConverter;

protected RecordSender(final HttpSender httpSender) {
protected RecordSender(final HttpSender httpSender, final RecordValueConverter recordValueConverter) {
this.httpSender = httpSender;
this.recordValueConverter = recordValueConverter;
}

public abstract void send(final Collection<SinkRecord> records);

public abstract void send(final SinkRecord record);

public static RecordSender createRecordSender(final HttpSender httpSender, final HttpSinkConfig config) {
final RecordValueConverter recordValueConverter = RecordValueConverter.create(config);
if (config.batchingEnabled()) {
return new BatchRecordSender(
httpSender,
recordValueConverter,
config.batchMaxSize(),
config.batchPrefix(),
config.batchSuffix(),
config.batchSeparator());
} else {
return new SingleRecordSender(httpSender);
return new SingleRecordSender(httpSender, recordValueConverter);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

import org.apache.kafka.connect.sink.SinkRecord;

import io.aiven.kafka.connect.http.converter.RecordValueConverter;
import io.aiven.kafka.connect.http.sender.HttpSender;

final class SingleRecordSender extends RecordSender {

protected SingleRecordSender(final HttpSender httpSender) {
super(httpSender);
protected SingleRecordSender(final HttpSender httpSender, final RecordValueConverter recordValueConverter) {
super(httpSender, recordValueConverter);
}

@Override
Expand Down
Loading

0 comments on commit 9b94970

Please sign in to comment.