Skip to content

Commit 9dafec6

Browse files
marcosmarxmMarcos Marx
and
Marcos Marx
authored
Destination Kafka: correct spec json and data types in config (#6040)
* correct spec json and data types in config * bump version * correct tests * correct config parser NPE * format files Co-authored-by: Marcos Marx <marcosmarx@MacBook-Pro-de-Marcos.local>
1 parent b596194 commit 9dafec6

File tree

9 files changed

+53
-52
lines changed

9 files changed

+53
-52
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9f760101-60ae-462f-9ee6-b7a9dafd454d.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"destinationDefinitionId": "9f760101-60ae-462f-9ee6-b7a9dafd454d",
33
"name": "Kafka",
44
"dockerRepository": "airbyte/destination-kafka",
5-
"dockerImageTag": "0.1.1",
5+
"dockerImageTag": "0.1.2",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/kafka"
77
}

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
- destinationDefinitionId: 9f760101-60ae-462f-9ee6-b7a9dafd454d
7979
name: Kafka
8080
dockerRepository: airbyte/destination-kafka
81-
dockerImageTag: 0.1.1
81+
dockerImageTag: 0.1.2
8282
documentationUrl: https://docs.airbyte.io/integrations/destinations/kafka
8383
- destinationDefinitionId: 8ccd8909-4e99-4141-b48d-4984b70b2d89
8484
name: DynamoDB

airbyte-integrations/connectors/destination-kafka/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.1.1
11+
LABEL io.airbyte.version=0.1.2
1212
LABEL io.airbyte.name=airbyte/destination-kafka

airbyte-integrations/connectors/destination-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaDestinationConfig.java

+16-16
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private KafkaDestinationConfig(String topicPattern, boolean sync, JsonNode confi
5555
public static KafkaDestinationConfig getKafkaDestinationConfig(JsonNode config) {
5656
return new KafkaDestinationConfig(
5757
config.get("topic_pattern").asText(),
58-
config.has("sync_producer") && config.get("sync_producer").booleanValue(),
58+
config.has("sync_producer") && config.get("sync_producer").asBoolean(),
5959
config);
6060
}
6161

@@ -64,27 +64,27 @@ private KafkaProducer<String, JsonNode> buildKafkaProducer(JsonNode config) {
6464
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText())
6565
.putAll(propertiesByProtocol(config))
6666
.put(ProducerConfig.CLIENT_ID_CONFIG,
67-
config.has("client_id") ? config.get("client_id").asText() : null)
67+
config.has("client_id") ? config.get("client_id").asText() : "")
6868
.put(ProducerConfig.ACKS_CONFIG, config.get("acks").asText())
69-
.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, config.get("enable_idempotence").booleanValue())
69+
.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, config.get("enable_idempotence").asBoolean())
7070
.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.get("compression_type").asText())
71-
.put(ProducerConfig.BATCH_SIZE_CONFIG, config.get("batch_size").intValue())
72-
.put(ProducerConfig.LINGER_MS_CONFIG, config.get("linger_ms").longValue())
71+
.put(ProducerConfig.BATCH_SIZE_CONFIG, config.get("batch_size").asInt())
72+
.put(ProducerConfig.LINGER_MS_CONFIG, config.get("linger_ms").asLong())
7373
.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
74-
config.get("max_in_flight_requests_per_connection").intValue())
74+
config.get("max_in_flight_requests_per_connection").asInt())
7575
.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get("client_dns_lookup").asText())
76-
.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.get("buffer_memory").longValue())
77-
.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.get("max_request_size").intValue())
78-
.put(ProducerConfig.RETRIES_CONFIG, config.get("retries").intValue())
76+
.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.get("buffer_memory").asLong())
77+
.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.get("max_request_size").asInt())
78+
.put(ProducerConfig.RETRIES_CONFIG, config.get("retries").asInt())
7979
.put(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
80-
config.get("socket_connection_setup_timeout_ms").longValue())
80+
config.get("socket_connection_setup_timeout_ms").asLong())
8181
.put(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
82-
config.get("socket_connection_setup_timeout_max_ms").longValue())
83-
.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.get("max_block_ms").longValue())
84-
.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.get("request_timeout_ms").intValue())
85-
.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.get("delivery_timeout_ms").intValue())
86-
.put(ProducerConfig.SEND_BUFFER_CONFIG, config.get("send_buffer_bytes").intValue())
87-
.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, config.get("receive_buffer_bytes").intValue())
82+
config.get("socket_connection_setup_timeout_max_ms").asLong())
83+
.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.get("max_block_ms").asInt())
84+
.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.get("request_timeout_ms").asInt())
85+
.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.get("delivery_timeout_ms").asInt())
86+
.put(ProducerConfig.SEND_BUFFER_CONFIG, config.get("send_buffer_bytes").asInt())
87+
.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, config.get("receive_buffer_bytes").asInt())
8888
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
8989
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName())
9090
.build();

airbyte-integrations/connectors/destination-kafka/src/main/resources/spec.json

+18-18
Original file line numberDiff line numberDiff line change
@@ -162,19 +162,19 @@
162162
"title": "Batch size",
163163
"description": "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.",
164164
"type": "integer",
165-
"default": 16384
165+
"examples": [16384]
166166
},
167167
"linger_ms": {
168168
"title": "Linger ms",
169169
"description": "The producer groups together any records that arrive in between request transmissions into a single batched request.",
170-
"type": "number",
171-
"default": 0
170+
"type": "string",
171+
"examples": [0]
172172
},
173173
"max_in_flight_requests_per_connection": {
174174
"title": "Max in flight requests per connection",
175175
"description": "The maximum number of unacknowledged requests the client will send on a single connection before blocking.",
176176
"type": "integer",
177-
"default": 5
177+
"examples": [5]
178178
},
179179
"client_dns_lookup": {
180180
"title": "Client DNS lookup",
@@ -191,62 +191,62 @@
191191
"buffer_memory": {
192192
"title": "Buffer memory",
193193
"description": "The total bytes of memory the producer can use to buffer records waiting to be sent to the server.",
194-
"type": "number",
195-
"default": 33554432
194+
"type": "string",
195+
"examples": 33554432
196196
},
197197
"max_request_size": {
198198
"title": "Max request size",
199199
"description": "The maximum size of a request in bytes.",
200200
"type": "integer",
201-
"default": 1048576
201+
"examples": [1048576]
202202
},
203203
"retries": {
204204
"title": "Retries",
205205
"description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.",
206206
"type": "integer",
207-
"default": 2147483647
207+
"examples": [2147483647]
208208
},
209209
"socket_connection_setup_timeout_ms": {
210210
"title": "Socket connection setup timeout",
211211
"description": "The amount of time the client will wait for the socket connection to be established.",
212-
"type": "number",
213-
"default": 10000
212+
"type": "string",
213+
"examples": [10000]
214214
},
215215
"socket_connection_setup_timeout_max_ms": {
216216
"title": "Socket connection setup max timeout",
217217
"description": "The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum.",
218-
"type": "number",
219-
"default": 30000
218+
"type": "string",
219+
"examples": [30000]
220220
},
221221
"max_block_ms": {
222222
"title": "Max block ms",
223223
"description": "The configuration controls how long the KafkaProducer's send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() and abortTransaction() methods will block.",
224-
"type": "number",
225-
"default": 60000
224+
"type": "string",
225+
"examples": [60000]
226226
},
227227
"request_timeout_ms": {
228228
"title": "Request timeout",
229229
"description": "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.",
230230
"type": "integer",
231-
"default": 30000
231+
"examples": [30000]
232232
},
233233
"delivery_timeout_ms": {
234234
"title": "Delivery timeout",
235235
"description": "An upper bound on the time to report success or failure after a call to 'send()' returns.",
236236
"type": "integer",
237-
"default": 120000
237+
"examples": [120000]
238238
},
239239
"send_buffer_bytes": {
240240
"title": "Send buffer bytes",
241241
"description": "The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.",
242242
"type": "integer",
243-
"default": 131072
243+
"examples": [131072]
244244
},
245245
"receive_buffer_bytes": {
246246
"title": "Receive buffer bytes",
247247
"description": "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.",
248248
"type": "integer",
249-
"default": 32768
249+
"examples": [32768]
250250
}
251251
}
252252
}

airbyte-integrations/connectors/destination-kafka/src/test-integration/java/io/airbyte/integrations/destination/kafka/KafkaDestinationAcceptanceTest.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,15 @@ protected JsonNode getConfig() {
7575
.put("enable_idempotence", true)
7676
.put("compression_type", "none")
7777
.put("batch_size", 16384)
78-
.put("linger_ms", 0)
78+
.put("linger_ms", "0")
7979
.put("max_in_flight_requests_per_connection", 5)
8080
.put("client_dns_lookup", "use_all_dns_ips")
81-
.put("buffer_memory", 33554432)
81+
.put("buffer_memory", "33554432")
8282
.put("max_request_size", 1048576)
8383
.put("retries", 2147483647)
84-
.put("socket_connection_setup_timeout_ms", 10000)
85-
.put("socket_connection_setup_timeout_max_ms", 30000)
86-
.put("max_block_ms", 60000)
84+
.put("socket_connection_setup_timeout_ms", "10000")
85+
.put("socket_connection_setup_timeout_max_ms", "30000")
86+
.put("max_block_ms", "60000")
8787
.put("request_timeout_ms", 30000)
8888
.put("delivery_timeout_ms", 120000)
8989
.put("send_buffer_bytes", -1)

airbyte-integrations/connectors/destination-kafka/src/test/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumerTest.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -122,16 +122,16 @@ private JsonNode getConfig(String topicPattern) {
122122
.put("transactional_id", "txn-id")
123123
.put("enable_idempotence", true)
124124
.put("compression_type", "none")
125-
.put("batch_size", 16384)
126-
.put("linger_ms", 0)
127-
.put("max_in_flight_requests_per_connection", 5)
125+
.put("batch_size", "16384")
126+
.put("linger_ms", "0")
127+
.put("max_in_flight_requests_per_connection", "5")
128128
.put("client_dns_lookup", "use_all_dns_ips")
129129
.put("buffer_memory", 33554432)
130130
.put("max_request_size", 1048576)
131131
.put("retries", 1)
132-
.put("socket_connection_setup_timeout_ms", 10)
133-
.put("socket_connection_setup_timeout_max_ms", 30)
134-
.put("max_block_ms", 100)
132+
.put("socket_connection_setup_timeout_ms", "10")
133+
.put("socket_connection_setup_timeout_max_ms", "30")
134+
.put("max_block_ms", "100")
135135
.put("request_timeout_ms", 100)
136136
.put("delivery_timeout_ms", 120)
137137
.put("send_buffer_bytes", -1)

airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ public KeenTimestampService(ConfiguredAirbyteCatalog catalog, boolean timestampI
7575
/**
7676
* Tries to inject keen.timestamp field to the given message data. If the stream contains cursor
7777
* field, it's value is tried to be parsed to timestamp. If this procedure fails, stream is removed
78-
* from timestamp-parsable stream map, so parsing is not tried for future messages in the same stream.
79-
* If parsing succeeds, keen.timestamp field is put as a JSON node to the message data and whole data
80-
* is returned. Otherwise, keen.timestamp is set to emittedAt value
78+
* from timestamp-parsable stream map, so parsing is not tried for future messages in the same
79+
* stream. If parsing succeeds, keen.timestamp field is put as a JSON node to the message data and
80+
* whole data is returned. Otherwise, keen.timestamp is set to emittedAt value
8181
*
8282
* @param message AirbyteRecordMessage containing record data
8383
* @return Record data together with keen.timestamp field

docs/integrations/destinations/kafka.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -114,5 +114,6 @@ More info about this can be found in the [Kafka producer configs documentation s
114114

115115
| Version | Date | Pull Request | Subject |
116116
| :------ | :-------- | :----- | :------ |
117+
| 0.1.2 | 2021-09-14 | [#6040](https://github.com/airbytehq/airbyte/pull/6040) | Change spec.json and config parser |
117118
| 0.1.1 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json |
118-
| 0.1.0 | 2021-07-21 | [3746](https://github.com/airbytehq/airbyte/pull/3746) | Initial Release |
119+
| 0.1.0 | 2021-07-21 | [#3746](https://github.com/airbytehq/airbyte/pull/3746) | Initial Release |

0 commit comments

Comments
 (0)