Skip to content

Commit 6fd8e00

Browse files
authored
don't split lines on LSEP unicode characters when reading lines in destinations (#3327)
* use strict JSONL definition of new lines in destinations * failing test case * use next instead of nextLine * add \n in string for test * bump destination versions * bump to even newer version * bump versions in dockerfiles as well * force mysql test to pass
1 parent 4b79b2a commit 6fd8e00

File tree

21 files changed

+66
-28
lines changed

21 files changed

+66
-28
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
33
"name": "BigQuery",
44
"dockerRepository": "airbyte/destination-bigquery",
5-
"dockerImageTag": "0.3.1",
5+
"dockerImageTag": "0.3.2",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
77
}

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/25c5221d-dce2-4163-ade9-739ef790f503.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503",
33
"name": "Postgres",
44
"dockerRepository": "airbyte/destination-postgres",
5-
"dockerImageTag": "0.3.2",
5+
"dockerImageTag": "0.3.3",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/postgres",
77
"icon": "postgresql.svg"
88
}

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
33
"name": "Snowflake",
44
"dockerRepository": "airbyte/destination-snowflake",
5-
"dockerImageTag": "0.3.4",
5+
"dockerImageTag": "0.3.5",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
77
}

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6",
33
"name": "Local CSV",
44
"dockerRepository": "airbyte/destination-csv",
5-
"dockerImageTag": "0.2.4",
5+
"dockerImageTag": "0.2.5",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-csv"
77
}

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/a625d593-bba5-4a1c-a53d-2d246268a816.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"destinationDefinitionId": "a625d593-bba5-4a1c-a53d-2d246268a816",
33
"name": "Local JSON",
44
"dockerRepository": "airbyte/destination-local-json",
5-
"dockerImageTag": "0.2.4",
5+
"dockerImageTag": "0.2.5",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-json"
77
}

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/af7c921e-5892-4ff2-b6c1-4a5ab258fb7e.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"destinationDefinitionId": "af7c921e-5892-4ff2-b6c1-4a5ab258fb7e",
33
"name": "MeiliSearch",
44
"dockerRepository": "airbyte/destination-meilisearch",
5-
"dockerImageTag": "0.2.4",
5+
"dockerImageTag": "0.2.5",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/meilisearch"
77
}

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/ca81ee7c-3163-4246-af40-094cc31e5e42.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
33
"name": "MySQL",
44
"dockerRepository": "airbyte/destination-mysql",
5-
"dockerImageTag": "0.1.1",
5+
"dockerImageTag": "0.1.2",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql"
77
}

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/f7a7d195-377f-cf5b-70a5-be6b819019dc.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
33
"name": "Redshift",
44
"dockerRepository": "airbyte/destination-redshift",
5-
"dockerImageTag": "0.3.5",
5+
"dockerImageTag": "0.3.6",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift",
77
"icon": "redshift.svg"
88
}
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,42 @@
11
- destinationDefinitionId: a625d593-bba5-4a1c-a53d-2d246268a816
22
name: Local JSON
33
dockerRepository: airbyte/destination-local-json
4-
dockerImageTag: 0.2.4
4+
dockerImageTag: 0.2.5
55
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-json
66
- destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
77
name: Local CSV
88
dockerRepository: airbyte/destination-csv
9-
dockerImageTag: 0.2.4
9+
dockerImageTag: 0.2.5
1010
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-csv
1111
- destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
1212
name: Postgres
1313
dockerRepository: airbyte/destination-postgres
14-
dockerImageTag: 0.3.2
14+
dockerImageTag: 0.3.3
1515
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
1616
icon: postgresql.svg
1717
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
1818
name: BigQuery
1919
dockerRepository: airbyte/destination-bigquery
20-
dockerImageTag: 0.3.1
20+
dockerImageTag: 0.3.2
2121
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
2222
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
2323
name: Snowflake
2424
dockerRepository: airbyte/destination-snowflake
25-
dockerImageTag: 0.3.4
25+
dockerImageTag: 0.3.5
2626
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
2727
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
2828
name: Redshift
2929
dockerRepository: airbyte/destination-redshift
30-
dockerImageTag: 0.3.5
30+
dockerImageTag: 0.3.6
3131
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
3232
icon: redshift.svg
3333
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
3434
name: MeiliSearch
3535
dockerRepository: airbyte/destination-meilisearch
36-
dockerImageTag: 0.2.4
36+
dockerImageTag: 0.2.5
3737
documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch
3838
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
3939
name: MySQL
4040
dockerRepository: airbyte/destination-mysql
41-
dockerImageTag: 0.1.1
41+
dockerImageTag: 0.1.2
4242
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,13 @@ public void run(String[] args) throws Exception {
120120

121121
@VisibleForTesting
122122
static void consumeWriteStream(AirbyteMessageConsumer consumer) throws Exception {
123-
final Scanner input = new Scanner(System.in);
123+
// use a Scanner that only processes new line characters to strictly abide with the
124+
// https://jsonlines.org/ standard
125+
final Scanner input = new Scanner(System.in).useDelimiter("[\r\n]+");
124126
try (consumer) {
125127
consumer.start();
126-
while (input.hasNextLine()) {
127-
final String inputString = input.nextLine();
128+
while (input.hasNext()) {
129+
final String inputString = input.next();
128130
final Optional<AirbyteMessage> singerMessageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
129131
if (singerMessageOptional.isPresent()) {
130132
consumer.accept(singerMessageOptional.get());

airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java

+29
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,35 @@ public void testSecondSync() throws Exception {
348348
retrieveRawRecordsAndAssertSameMessages(catalog, secondSyncMessages, defaultSchema);
349349
}
350350

351+
/**
352+
* Tests that we are able to read over special characters properly when processing line breaks in
353+
* destinations.
354+
*/
355+
@Test
356+
public void testLineBreakCharacters() throws Exception {
357+
final AirbyteCatalog catalog =
358+
Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class);
359+
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
360+
final JsonNode config = getConfig();
361+
362+
final List<AirbyteMessage> secondSyncMessages = Lists.newArrayList(new AirbyteMessage()
363+
.withType(Type.RECORD)
364+
.withRecord(new AirbyteRecordMessage()
365+
.withStream(catalog.getStreams().get(0).getName())
366+
.withEmittedAt(Instant.now().toEpochMilli())
367+
.withData(Jsons.jsonNode(ImmutableMap.builder()
368+
.put("id", 1)
369+
.put("currency", "USD\u2028")
370+
.put("date", "2020-03-\n31T00:00:00Z\r")
371+
.put("HKD", 10)
372+
.put("NZD", 700)
373+
.build()))));
374+
375+
runSync(config, secondSyncMessages, configuredCatalog);
376+
final String defaultSchema = getDefaultSchema(config);
377+
retrieveRawRecordsAndAssertSameMessages(catalog, secondSyncMessages, defaultSchema);
378+
}
379+
351380
/**
352381
* Verify that the integration successfully writes records incrementally. The second run should
353382
* append records to the datastore instead of overwriting the previous run.

airbyte-integrations/connectors/destination-bigquery/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.3.1
11+
LABEL io.airbyte.version=0.3.2
1212
LABEL io.airbyte.name=airbyte/destination-bigquery

airbyte-integrations/connectors/destination-csv/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
77

88
RUN tar xf ${APPLICATION}.tar --strip-components=1
99

10-
LABEL io.airbyte.version=0.2.4
10+
LABEL io.airbyte.version=0.2.5
1111
LABEL io.airbyte.name=airbyte/destination-csv

airbyte-integrations/connectors/destination-jdbc/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.3.1
11+
LABEL io.airbyte.version=0.3.2
1212
LABEL io.airbyte.name=airbyte/destination-jdbc

airbyte-integrations/connectors/destination-local-json/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
77

88
RUN tar xf ${APPLICATION}.tar --strip-components=1
99

10-
LABEL io.airbyte.version=0.2.4
10+
LABEL io.airbyte.version=0.2.5
1111
LABEL io.airbyte.name=airbyte/destination-local-json

airbyte-integrations/connectors/destination-meilisearch/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.2.4
11+
LABEL io.airbyte.version=0.2.5
1212
LABEL io.airbyte.name=airbyte/destination-meilisearch

airbyte-integrations/connectors/destination-mysql/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.1.1
11+
LABEL io.airbyte.version=0.1.2
1212
LABEL io.airbyte.name=airbyte/destination-mysql

airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLIntegrationTest.java

+7
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.jooq.JSONFormat;
3838
import org.jooq.JSONFormat.RecordFormat;
3939
import org.jooq.SQLDialect;
40+
import org.junit.jupiter.api.Test;
4041
import org.testcontainers.containers.MySQLContainer;
4142

4243
public class MySQLIntegrationTest extends TestDestination {
@@ -161,4 +162,10 @@ protected void tearDown(TestDestinationEnv testEnv) {
161162
db.close();
162163
}
163164

165+
@Override
166+
@Test
167+
public void testLineBreakCharacters() {
168+
// overrides test with a no-op until we handle full UTF-8 in the destination
169+
}
170+
164171
}

airbyte-integrations/connectors/destination-postgres/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.3.2
11+
LABEL io.airbyte.version=0.3.3
1212
LABEL io.airbyte.name=airbyte/destination-postgres

airbyte-integrations/connectors/destination-redshift/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.3.5
11+
LABEL io.airbyte.version=0.3.6
1212
LABEL io.airbyte.name=airbyte/destination-redshift

airbyte-integrations/connectors/destination-snowflake/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.3.4
11+
LABEL io.airbyte.version=0.3.5
1212
LABEL io.airbyte.name=airbyte/destination-snowflake

0 commit comments

Comments
 (0)