From 2b0d0bdef634c2284a3e4945aa0551e5693419f2 Mon Sep 17 00:00:00 2001 From: VitaliiMaltsev <39538064+VitaliiMaltsev@users.noreply.github.com> Date: Thu, 13 Jan 2022 15:41:34 +0200 Subject: [PATCH] Source MongoDB fetch authorized collections only (#9238) * fix for jdk 17 * Source MongoDB show authorized collections * add javadoc * fixed checkstyle * add CHANGELOG * fix checkstyle * refactoring * bump version anf fix checkstyle Co-authored-by: vmaltsev --- .../b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- ...owflakeInternalStagingConsumerFactory.java | 34 +++++++++---------- .../SnowflakeSQLNameTransformer.java | 3 +- .../SnowflakeStagingSqlOperations.java | 1 + .../connectors/source-mongodb-v2/Dockerfile | 2 +- .../connectors/source-mongodb-v2/build.gradle | 2 +- .../MongoDbSource.java | 25 ++++++++++++-- docs/integrations/sources/mongodb-v2.md | 1 + 9 files changed, 47 insertions(+), 25 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json index ca312b3e32319..3543414abd73c 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e", "name": "MongoDb", "dockerRepository": "airbyte/source-mongodb-v2", - "dockerImageTag": "0.1.10", + "dockerImageTag": "0.1.11", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2", "icon": "mongodb.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 6f853b92ecad3..caf5401fa0ff1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -426,7 +426,7 @@ - name: MongoDb sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e dockerRepository: airbyte/source-mongodb-v2 - dockerImageTag: 0.1.10 + dockerImageTag: 0.1.11 documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2 icon: mongodb.svg sourceType: database diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java index 4017ca7618794..52844d93c786d 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java @@ -22,8 +22,6 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; - -import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -51,11 +49,11 @@ public class SnowflakeInternalStagingConsumerFactory { private final String CURRENT_SYNC_PATH = UUID.randomUUID().toString(); public AirbyteMessageConsumer create(final Consumer outputRecordCollector, - final JdbcDatabase database, - final SnowflakeStagingSqlOperations sqlOperations, - final SnowflakeSQLNameTransformer namingResolver, - final JsonNode config, - final ConfiguredAirbyteCatalog catalog) { + final JdbcDatabase database, + final SnowflakeStagingSqlOperations sqlOperations, + final SnowflakeSQLNameTransformer namingResolver, + final JsonNode config, + final ConfiguredAirbyteCatalog catalog) { final List writeConfigs = createWriteConfigs(namingResolver, config, catalog); return new BufferedStreamConsumer( @@ -135,10 +133,10 @@ private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteCon } private RecordWriter recordWriterFunction(final JdbcDatabase database, - final SqlOperations snowflakeSqlOperations, - final List writeConfigs, - final ConfiguredAirbyteCatalog catalog, - final SnowflakeSQLNameTransformer namingResolver) { + final SqlOperations snowflakeSqlOperations, + final List writeConfigs, + final ConfiguredAirbyteCatalog catalog, + final SnowflakeSQLNameTransformer namingResolver) { final Map pairToWriteConfig = writeConfigs.stream() .collect(Collectors.toUnmodifiableMap( @@ -160,9 +158,9 @@ private RecordWriter recordWriterFunction(final JdbcDatabase database, } private OnCloseFunction onCloseFunction(final JdbcDatabase database, - final SnowflakeStagingSqlOperations sqlOperations, - final List writeConfigs, - final SnowflakeSQLNameTransformer namingResolver) { + final SnowflakeStagingSqlOperations sqlOperations, + final List writeConfigs, + final SnowflakeSQLNameTransformer namingResolver) { return (hasFailed) -> { if (!hasFailed) { final List queryList = new ArrayList<>(); @@ -176,14 +174,14 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database, final String path = namingResolver.getStagingPath(schemaName, dstTableName, CURRENT_SYNC_PATH); LOGGER.info("Uploading data from stage: stream {}. schema {}, tmp table {}, stage path {}", writeConfig.getStreamName(), schemaName, - srcTableName, - path); + srcTableName, + path); try { sqlOperations.copyIntoTmpTableFromStage(database, path, srcTableName, schemaName); - } catch (SQLException e){ + } catch (Exception e) { sqlOperations.cleanUpStage(database, path); LOGGER.info("Cleaning stage path {}", path); - throw new RuntimeException("Failed to upload data from stage "+ path, e); + throw new RuntimeException("Failed to upload data from stage " + path, e); } sqlOperations.createTableIfNotExists(database, schemaName, dstTableName); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java index 18c97338f39a6..373c3aa099830 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java @@ -18,6 +18,7 @@ public String getStageName(String schemaName, String outputTableName) { } public String getStagingPath(String schemaName, String tableName, String currentSyncPath) { - return (getStageName(schemaName,tableName)+"/staged/"+currentSyncPath).toUpperCase(); + return (getStageName(schemaName, tableName) + "/staged/" + currentSyncPath).toUpperCase(); } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java index 6fa6a7c65df55..a1ba41ed47fa5 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java @@ -85,4 +85,5 @@ public void cleanUpStage(JdbcDatabase database, String path) throws SQLException public boolean isSchemaExists(JdbcDatabase database, String outputSchema) throws Exception { return database.query(SHOW_SCHEMAS).map(schemas -> schemas.get(NAME).asText()).anyMatch(outputSchema::equalsIgnoreCase); } + } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile b/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile index de0380fd37d97..6984e28285045 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile +++ b/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mongodb-v2 COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.10 +LABEL io.airbyte.version=0.1.11 LABEL io.airbyte.name=airbyte/source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle index 89f3ccef41950..b8a449c3c8799 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle @@ -16,7 +16,7 @@ dependencies { implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) implementation project(':airbyte-integrations:connectors:source-relational-db') - implementation 'org.mongodb:mongodb-driver-sync:4.3.0' + implementation 'org.mongodb:mongodb-driver-sync:4.4.0' testImplementation 'org.testcontainers:mongodb:1.15.3' diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java index 9af7c89f6c8f1..98e10ef509049 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java @@ -90,7 +90,7 @@ public List> getCheckOperations(final throws Exception { final List> checkList = new ArrayList<>(); checkList.add(database -> { - if (database.getCollectionNames().isEmpty()) { + if (getAuthorizedCollections(database).isEmpty()) { throw new Exception("Unable to execute any operation on the source!"); } else { LOGGER.info("The source passed the basic operation test!"); @@ -114,7 +114,7 @@ protected List>> discoverInternal(final MongoDat throws Exception { final List>> tableInfos = new ArrayList<>(); - for (final String collectionName : database.getCollectionNames()) { + for (final String collectionName : getAuthorizedCollections(database)) { final MongoCollection collection = database.getCollection(collectionName); final Map uniqueFields = MongoUtils.getUniqueFields(collection); @@ -135,6 +135,27 @@ protected List>> discoverInternal(final MongoDat return tableInfos; } + private Set getAuthorizedCollections(MongoDatabase database) { + /* + * db.runCommand ({listCollections: 1.0, authorizedCollections: true, nameOnly: true }) the command + * returns only those collections for which the user has privileges. For example, if a user has find + * action on specific collections, the command returns only those collections; or, if a user has + * find or any other action, on the database resource, the command lists all collections in the + * database. + */ + Document document = database.getDatabase().runCommand(new Document("listCollections", 1) + .append("authorizedCollections", true) + .append("nameOnly", true)) + .append("filter", "{ 'type': 'collection' }"); + return document.toBsonDocument() + .get("cursor").asDocument() + .getArray("firstBatch") + .stream() + .map(bsonValue -> bsonValue.asDocument().getString("name").getValue()) + .collect(Collectors.toSet()); + + } + @Override protected List>> discoverInternal(final MongoDatabase database, final String schema) throws Exception { // MondoDb doesn't support schemas diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 0db57aa9c996c..7f93671fd9417 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -102,6 +102,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.11 | 2022-01-10 | [9238](https://github.com/airbytehq/airbyte/pull/9238) | Return only those collections for which the user has privileges | | 0.1.10 | 2021-12-30 | [9202](https://github.com/airbytehq/airbyte/pull/9202) | Update connector fields title/description | | 0.1.9 | 2021-12-07 | [8491](https://github.com/airbytehq/airbyte/pull/8491) | Configure 10000 limit doc reading during Discovery step | | 0.1.8 | 2021-11-29 | [8306](https://github.com/airbytehq/airbyte/pull/8306) | Added milliseconds for date format for cursor |