From f77238f97b6be97f884667631b93364e32794456 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Mon, 15 Nov 2021 16:47:03 +0200 Subject: [PATCH 1/7] updated source-mongodb-v2 performance --- .../b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../io/airbyte/db/mongodb/MongoUtils.java | 75 ++++++++++++++----- .../connectors/source-mongodb-v2/Dockerfile | 2 +- .../MongoDbSourceAbstractAcceptanceTest.java | 4 +- .../MongoDbSourceAtlasAcceptanceTest.java | 10 ++- ...MongoDbSourceStandaloneAcceptanceTest.java | 10 ++- docs/integrations/sources/mongodb-v2.md | 1 + 8 files changed, 79 insertions(+), 27 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json index 25b676bac981a..763684f7fa839 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e", "name": "MongoDb", "dockerRepository": "airbyte/source-mongodb-v2", - "dockerImageTag": "0.1.3", + "dockerImageTag": "0.1.4", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2", "icon": "mongodb.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 050f62d8e455b..800e26a7b6a15 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -353,7 +353,7 @@ - name: MongoDb sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e dockerRepository: airbyte/source-mongodb-v2 - dockerImageTag: 0.1.3 + dockerImageTag: 0.1.4 documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2 icon: mongodb.svg sourceType: database diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java index 8f93e26251be9..6c8c08e57593a 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java @@ -12,16 +12,22 @@ import com.google.api.client.util.DateTime; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.mongodb.client.AggregateIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.MoreIterators; import io.airbyte.db.DataTypeUtils; import io.airbyte.protocol.models.JsonSchemaPrimitive; + +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.bson.BsonBinary; import org.bson.BsonDateTime; import org.bson.BsonDocument; @@ -44,7 +50,9 @@ public class MongoUtils { private static final Logger LOGGER = LoggerFactory.getLogger(MongoUtils.class); - private static final int DISCOVERY_BATCH_SIZE = 10000; + private static final String MISSING_TYPE = "missing"; + private static final String NULL_TYPE = "null"; + private static final String TYPE = "type"; private static final String AIRBYTE_SUFFIX = "_aibyte_transform"; public static JsonSchemaPrimitive getType(final BsonType dataType) { @@ -174,27 +182,60 @@ private static ObjectNode readField(final BsonReader reader, * @return map of unique fields and its type */ public static Map getUniqueFields(final MongoCollection collection) { - final Map uniqueFields = new HashMap<>(); - try (final MongoCursor cursor = collection.find().batchSize(DISCOVERY_BATCH_SIZE).iterator()) { - while (cursor.hasNext()) { - final BsonDocument document = toBsonDocument(cursor.next()); - try (final BsonReader reader = new BsonDocumentReader(document)) { - reader.readStartDocument(); - while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { - final var fieldName = reader.readName(); + + Map result = new HashMap<>(); + var allkeys = getFieldsName(collection); + allkeys.forEach(key -> { + var types = getTypes(collection, key); + addUniqueType(result, collection, key, types); + }); + + return result; + } + + private static List getFieldsName(MongoCollection collection) { + AggregateIterable output = collection.aggregate(Arrays.asList( + new Document("$project", new Document("arrayofkeyvalue", new Document("$objectToArray", "$$ROOT"))), + new Document("$unwind", "$arrayofkeyvalue"), + new Document("$group", new Document("_id", null).append("allkeys", new Document("$addToSet", "$arrayofkeyvalue.k"))) + )); + return (List) output.cursor().next().get("allkeys"); + } + + private static void addUniqueType(Map map, MongoCollection collection, + String fieldName, Set types) { + if (types.size() != 1) { + map.put(fieldName + AIRBYTE_SUFFIX, BsonType.STRING); + } else { + var a = collection.find(new Document(fieldName, new Document("$type", types.stream().findFirst().get()))).first(); + var b = toBsonDocument(a); + try (final BsonReader reader = new BsonDocumentReader(b)) { + reader.readStartDocument(); + while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { + if (reader.readName().equals(fieldName)) { final var fieldType = reader.getCurrentBsonType(); - reader.skipValue(); - if (uniqueFields.containsKey(fieldName) && fieldType.compareTo(uniqueFields.get(fieldName)) != 0) { - uniqueFields.replace(fieldName + AIRBYTE_SUFFIX, BsonType.STRING); - } else { - uniqueFields.put(fieldName, fieldType); - } + map.put(fieldName, fieldType); } - reader.readEndDocument(); + reader.skipValue(); } + reader.readEndDocument(); } } - return uniqueFields; + } + + private static Set getTypes(MongoCollection collection, String fieldName) { + var searchField = "$" + fieldName; + var docTypes = collection.aggregate(Arrays.asList( + new Document("$project", new Document(TYPE, new Document("$type", searchField))))).cursor(); + Set types = new HashSet<>(); + while (docTypes.hasNext()) { + types.add(String.valueOf(docTypes.next().get(TYPE))); + } + types.remove(MISSING_TYPE); + if (types.size() > 1) { + types.remove(NULL_TYPE); + } + return types; } private static BsonDocument toBsonDocument(final Document document) { diff --git a/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile b/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile index 4d824fd24a304..2781dc6458b1d 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile +++ b/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAbstractAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAbstractAcceptanceTest.java index 6fcf627a59e30..bda0ad6a8214d 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAbstractAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAbstractAcceptanceTest.java @@ -57,7 +57,9 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception { DATABASE_NAME + "." + COLLECTION_NAME, Field.of("_id", JsonSchemaPrimitive.STRING), Field.of("id", JsonSchemaPrimitive.STRING), - Field.of("name", JsonSchemaPrimitive.STRING)) + Field.of("name", JsonSchemaPrimitive.STRING), + Field.of("test", JsonSchemaPrimitive.STRING), + Field.of("test_array", JsonSchemaPrimitive.ARRAY)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL)) .withDefaultCursorField(List.of("_id"))))); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java index 52f7b7850cdbe..f42bc90611ec0 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java @@ -15,6 +15,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; + +import org.bson.BsonArray; +import org.bson.BsonString; import org.bson.Document; public class MongoDbSourceAtlasAcceptanceTest extends MongoDbSourceAbstractAcceptanceTest { @@ -54,9 +57,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc database = new MongoDatabase(connectionString, DATABASE_NAME); final MongoCollection collection = database.createCollection(COLLECTION_NAME); - final var doc1 = new Document("id", "0001").append("name", "Test"); - final var doc2 = new Document("id", "0002").append("name", "Mongo"); - final var doc3 = new Document("id", "0003").append("name", "Source"); + final var doc1 = new Document("id", "0001").append("name", "Test") + .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))); + final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value"); + final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null); collection.insertMany(List.of(doc1, doc2, doc3)); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java index 1f1ae4ae2b59e..9f5818efbf393 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java @@ -13,6 +13,9 @@ import io.airbyte.db.mongodb.MongoDatabase; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import java.util.List; + +import org.bson.BsonArray; +import org.bson.BsonString; import org.bson.Document; import org.testcontainers.containers.MongoDBContainer; import org.testcontainers.utility.DockerImageName; @@ -46,9 +49,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc database = new MongoDatabase(connectionString, DATABASE_NAME); final MongoCollection collection = database.createCollection(COLLECTION_NAME); - final var doc1 = new Document("id", "0001").append("name", "Test"); - final var doc2 = new Document("id", "0002").append("name", "Mongo"); - final var doc3 = new Document("id", "0003").append("name", "Source"); + final var doc1 = new Document("id", "0001").append("name", "Test") + .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))); + final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value"); + final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null); collection.insertMany(List.of(doc1, doc2, doc3)); } diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 9a066a0f27c62..6213f7edb6921 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -102,6 +102,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.4 | 2021-11-15 | [7160](https://github.com/airbytehq/airbyte/pull/7160) | Updated Performance | | 0.1.3 | 2021-10-19 | [7160](https://github.com/airbytehq/airbyte/pull/7160) | Fixed nested document parsing | | 0.1.2 | 2021-10-07 | [6860](https://github.com/airbytehq/airbyte/pull/6860) | Added filter to avoid MongoDb system collections | | 0.1.1 | 2021-09-21 | [6364](https://github.com/airbytehq/airbyte/pull/6364) | Source MongoDb: added support via TLS/SSL | From 6ac8c9382dcceebc963a5340be40fe541c6fa7ef Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Mon, 15 Nov 2021 16:52:54 +0200 Subject: [PATCH 2/7] updated code style --- .../io/airbyte/db/mongodb/MongoUtils.java | 20 +++++++++---------- .../MongoDbSourceAtlasAcceptanceTest.java | 3 +-- ...MongoDbSourceStandaloneAcceptanceTest.java | 3 +-- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java index 6c8c08e57593a..86ae3f1e09d2e 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java @@ -14,13 +14,10 @@ import com.google.common.collect.Lists; import com.mongodb.client.AggregateIterable; import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoCursor; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.MoreIterators; import io.airbyte.db.DataTypeUtils; import io.airbyte.protocol.models.JsonSchemaPrimitive; - -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -195,15 +192,16 @@ public static Map getUniqueFields(final MongoCollection getFieldsName(MongoCollection collection) { AggregateIterable output = collection.aggregate(Arrays.asList( - new Document("$project", new Document("arrayofkeyvalue", new Document("$objectToArray", "$$ROOT"))), - new Document("$unwind", "$arrayofkeyvalue"), - new Document("$group", new Document("_id", null).append("allkeys", new Document("$addToSet", "$arrayofkeyvalue.k"))) - )); - return (List) output.cursor().next().get("allkeys"); + new Document("$project", new Document("arrayofkeyvalue", new Document("$objectToArray", "$$ROOT"))), + new Document("$unwind", "$arrayofkeyvalue"), + new Document("$group", new Document("_id", null).append("allkeys", new Document("$addToSet", "$arrayofkeyvalue.k"))))); + return (List) output.cursor().next().get("allkeys"); } - private static void addUniqueType(Map map, MongoCollection collection, - String fieldName, Set types) { + private static void addUniqueType(Map map, + MongoCollection collection, + String fieldName, + Set types) { if (types.size() != 1) { map.put(fieldName + AIRBYTE_SUFFIX, BsonType.STRING); } else { @@ -226,7 +224,7 @@ private static void addUniqueType(Map map, MongoCollection getTypes(MongoCollection collection, String fieldName) { var searchField = "$" + fieldName; var docTypes = collection.aggregate(Arrays.asList( - new Document("$project", new Document(TYPE, new Document("$type", searchField))))).cursor(); + new Document("$project", new Document(TYPE, new Document("$type", searchField))))).cursor(); Set types = new HashSet<>(); while (docTypes.hasNext()) { types.add(String.valueOf(docTypes.next().get(TYPE))); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java index f42bc90611ec0..5974760cc2a79 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java @@ -15,7 +15,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; - import org.bson.BsonArray; import org.bson.BsonString; import org.bson.Document; @@ -58,7 +57,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc final MongoCollection collection = database.createCollection(COLLECTION_NAME); final var doc1 = new Document("id", "0001").append("name", "Test") - .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))); + .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))); final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value"); final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java index 9f5818efbf393..e72715457b5f4 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java @@ -13,7 +13,6 @@ import io.airbyte.db.mongodb.MongoDatabase; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import java.util.List; - import org.bson.BsonArray; import org.bson.BsonString; import org.bson.Document; @@ -50,7 +49,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc final MongoCollection collection = database.createCollection(COLLECTION_NAME); final var doc1 = new Document("id", "0001").append("name", "Test") - .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))); + .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))); final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value"); final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null); From 9ff2bfdc42cb338f6ed6d9adc22035b93f0e3e58 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Mon, 15 Nov 2021 18:54:05 +0200 Subject: [PATCH 3/7] fixed remarks --- .../java/io/airbyte/db/mongodb/MongoUtils.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java index 86ae3f1e09d2e..763ba762dd7a1 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java @@ -205,9 +205,10 @@ private static void addUniqueType(Map map, if (types.size() != 1) { map.put(fieldName + AIRBYTE_SUFFIX, BsonType.STRING); } else { - var a = collection.find(new Document(fieldName, new Document("$type", types.stream().findFirst().get()))).first(); - var b = toBsonDocument(a); - try (final BsonReader reader = new BsonDocumentReader(b)) { + var document = collection.find(new Document(fieldName, + new Document("$type", types.stream().findFirst().get()))).first(); + var bsonDoc = toBsonDocument(document); + try (final BsonReader reader = new BsonDocumentReader(bsonDoc)) { reader.readStartDocument(); while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { if (reader.readName().equals(fieldName)) { @@ -223,13 +224,15 @@ private static void addUniqueType(Map map, private static Set getTypes(MongoCollection collection, String fieldName) { var searchField = "$" + fieldName; - var docTypes = collection.aggregate(Arrays.asList( - new Document("$project", new Document(TYPE, new Document("$type", searchField))))).cursor(); + var docTypes = collection.aggregate(List.of( + new Document("$project", new Document(TYPE, new Document("$type", searchField))))).cursor(); Set types = new HashSet<>(); while (docTypes.hasNext()) { - types.add(String.valueOf(docTypes.next().get(TYPE))); + var type = String.valueOf(docTypes.next().get(TYPE)); + if (!MISSING_TYPE.equals(type)) { + types.add(type); + } } - types.remove(MISSING_TYPE); if (types.size() > 1) { types.remove(NULL_TYPE); } From 4eb9d09f0ccf185e83216f828e9633021b1538cc Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Mon, 15 Nov 2021 19:05:55 +0200 Subject: [PATCH 4/7] fixed remarks --- .../main/java/io/airbyte/db/mongodb/MongoUtils.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java index 763ba762dd7a1..a080d490a4de0 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java @@ -206,7 +206,7 @@ private static void addUniqueType(Map map, map.put(fieldName + AIRBYTE_SUFFIX, BsonType.STRING); } else { var document = collection.find(new Document(fieldName, - new Document("$type", types.stream().findFirst().get()))).first(); + new Document("$type", types.stream().findFirst().get()))).first(); var bsonDoc = toBsonDocument(document); try (final BsonReader reader = new BsonDocumentReader(bsonDoc)) { reader.readStartDocument(); @@ -225,18 +225,15 @@ private static void addUniqueType(Map map, private static Set getTypes(MongoCollection collection, String fieldName) { var searchField = "$" + fieldName; var docTypes = collection.aggregate(List.of( - new Document("$project", new Document(TYPE, new Document("$type", searchField))))).cursor(); + new Document("$project", new Document(TYPE, new Document("$type", searchField))))).cursor(); Set types = new HashSet<>(); while (docTypes.hasNext()) { var type = String.valueOf(docTypes.next().get(TYPE)); - if (!MISSING_TYPE.equals(type)) { + if (!MISSING_TYPE.equals(type) && !NULL_TYPE.equals(type)) { types.add(type); } } - if (types.size() > 1) { - types.remove(NULL_TYPE); - } - return types; + return types.isEmpty() ? Set.of(NULL_TYPE) : types; } private static BsonDocument toBsonDocument(final Document document) { From 451fe715498bc59a577d2d94324ee7fb3f6293a6 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Mon, 15 Nov 2021 19:12:17 +0200 Subject: [PATCH 5/7] fixed remarks --- .../lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java | 4 ++-- docs/integrations/sources/mongodb-v2.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java index a080d490a4de0..c969f7fddd2cf 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java @@ -94,7 +94,7 @@ private static void formatDocument(final Document document, final ObjectNode obj try (final BsonReader reader = new BsonDocumentReader(bsonDocument)) { readDocument(reader, objectNode, columnNames); } catch (final Exception e) { - LOGGER.error("Exception while parsing BsonDocument: ", e.getMessage()); + LOGGER.error("Exception while parsing BsonDocument: {}", e.getMessage()); throw new RuntimeException(e); } } @@ -240,7 +240,7 @@ private static BsonDocument toBsonDocument(final Document document) { try { return document.toBsonDocument(); } catch (final Exception e) { - LOGGER.error("Exception while converting Document to BsonDocument: ", e.getMessage()); + LOGGER.error("Exception while converting Document to BsonDocument: {}", e.getMessage()); throw new RuntimeException(e); } } diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 6213f7edb6921..2490ee6964693 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -102,7 +102,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | -| 0.1.4 | 2021-11-15 | [7160](https://github.com/airbytehq/airbyte/pull/7160) | Updated Performance | +| 0.1.4 | 2021-11-15 | [7982](https://github.com/airbytehq/airbyte/pull/7982) | Updated Performance | | 0.1.3 | 2021-10-19 | [7160](https://github.com/airbytehq/airbyte/pull/7160) | Fixed nested document parsing | | 0.1.2 | 2021-10-07 | [6860](https://github.com/airbytehq/airbyte/pull/6860) | Added filter to avoid MongoDb system collections | | 0.1.1 | 2021-09-21 | [6364](https://github.com/airbytehq/airbyte/pull/6364) | Source MongoDb: added support via TLS/SSL | From ae3c06e76d4b515ef9d098c31d98eb48d7d9b7fb Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Thu, 18 Nov 2021 10:46:49 +0200 Subject: [PATCH 6/7] updated strict encrypt source mongodb version --- .../source-mongodb-strict-encrypt/Dockerfile | 2 +- ...MongodbSourceStrictEncryptAcceptanceTest.java | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile index f0188fa585ecf..87ef2134436da 100644 --- a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-mongodb-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java index c7cda18cc7d73..a4f2176523f08 100644 --- a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java @@ -29,6 +29,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; + +import org.bson.BsonArray; +import org.bson.BsonString; import org.bson.Document; import org.junit.jupiter.api.Test; @@ -77,7 +80,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .put("auth_source", "admin") .build()); - final String connectionString = String.format("mongodb://%s:%s@%s:%s/%s?authSource=admin&ssl=true", + final String connectionString = String.format("mongodb://%s:%s@%s:%s/%s?authSource=admin&directConnection=false&ssl=true", config.get("user").asText(), config.get("password").asText(), config.get("instance_type").get("host").asText(), @@ -87,9 +90,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc database = new MongoDatabase(connectionString, DATABASE_NAME); final MongoCollection collection = database.createCollection(COLLECTION_NAME); - final var doc1 = new Document("id", "0001").append("name", "Test"); - final var doc2 = new Document("id", "0002").append("name", "Mongo"); - final var doc3 = new Document("id", "0003").append("name", "Source"); + final var doc1 = new Document("id", "0001").append("name", "Test") + .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))); + final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value"); + final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null); collection.insertMany(List.of(doc1, doc2, doc3)); } @@ -117,7 +121,9 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception { DATABASE_NAME + "." + COLLECTION_NAME, Field.of("_id", JsonSchemaPrimitive.STRING), Field.of("id", JsonSchemaPrimitive.STRING), - Field.of("name", JsonSchemaPrimitive.STRING)) + Field.of("name", JsonSchemaPrimitive.STRING), + Field.of("test", JsonSchemaPrimitive.STRING), + Field.of("test_array", JsonSchemaPrimitive.ARRAY)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL)) .withDefaultCursorField(List.of("_id"))))); } From a1d23ffbdd159f8db572150c9f6295ac54e1eec8 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Thu, 18 Nov 2021 17:27:52 +0200 Subject: [PATCH 7/7] updated source mongodb work with empty collections --- .../b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json | 2 +- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- .../lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java | 6 +++++- .../connectors/source-mongodb-strict-encrypt/Dockerfile | 2 +- .../connectors/source-mongodb-v2/Dockerfile | 2 +- 5 files changed, 9 insertions(+), 5 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json index 8fc2f6403f189..92c06b63504b9 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e", "name": "MongoDb", "dockerRepository": "airbyte/source-mongodb-v2", - "dockerImageTag": "0.1.5", + "dockerImageTag": "0.1.6", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2", "icon": "mongodb.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 0396136ea2dfc..469bbbe0da635 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -359,7 +359,7 @@ - name: MongoDb sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e dockerRepository: airbyte/source-mongodb-v2 - dockerImageTag: 0.1.5 + dockerImageTag: 0.1.6 documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2 icon: mongodb.svg sourceType: database diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java index 9d6ebb70a105a..f40d9d79e35db 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java @@ -195,7 +195,11 @@ private static List getFieldsName(MongoCollection collection) new Document("$project", new Document("arrayofkeyvalue", new Document("$objectToArray", "$$ROOT"))), new Document("$unwind", "$arrayofkeyvalue"), new Document("$group", new Document("_id", null).append("allkeys", new Document("$addToSet", "$arrayofkeyvalue.k"))))); - return (List) output.cursor().next().get("allkeys"); + if (output.cursor().hasNext()) { + return (List) output.cursor().next().get("allkeys"); + } else { + return Collections.emptyList(); + } } private static void addUniqueType(Map map, diff --git a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile index 497949621afa2..3017fb613eff6 100644 --- a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-mongodb-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile b/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile index 91ce5c47ea783..38e1303821345 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile +++ b/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.5 +LABEL io.airbyte.version=0.1.6 LABEL io.airbyte.name=airbyte/source-mongodb-v2