Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source MongoDB fetch authorized collections only #9238

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
93d53ac
fix for jdk 17
Dec 15, 2021
ede8d38
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 15, 2021
27be3fe
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 15, 2021
ca75033
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 16, 2021
72ab46f
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 16, 2021
aec3384
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 16, 2021
7efd5aa
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 20, 2021
6a773f9
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 21, 2021
fa18537
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 21, 2021
b0ba37b
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 22, 2021
fa31a0d
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 23, 2021
67e0bd6
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 24, 2021
ec0d1bd
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 28, 2021
23598ec
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 28, 2021
59d63e6
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 30, 2021
4d10720
Source MongoDB show authorized collections
Dec 31, 2021
e2cb62c
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 10, 2022
f622758
add javadoc
Jan 10, 2022
aae25d6
fixed checkstyle
Jan 10, 2022
632583a
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 10, 2022
2bc76ce
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 11, 2022
0556234
add CHANGELOG
Jan 11, 2022
131c45f
Merge branch 'master' into vmaltsev/8752-source-mongodb-show-authoriz…
Jan 11, 2022
a1c94e1
fix checkstyle
Jan 11, 2022
a51ec08
refactoring
Jan 12, 2022
45dfc2d
bump version anf fix checkstyle
Jan 12, 2022
9baf2e1
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 12, 2022
9942d14
Merge branch 'master' into vmaltsev/8752-source-mongodb-show-authoriz…
Jan 12, 2022
ed4f1ba
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 13, 2022
6d073b7
Merge branch 'master' into vmaltsev/8752-source-mongodb-show-authoriz…
Jan 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e",
"name": "MongoDb",
"dockerRepository": "airbyte/source-mongodb-v2",
"dockerImageTag": "0.1.10",
"dockerImageTag": "0.1.11",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2",
"icon": "mongodb.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@
- name: MongoDb
sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerRepository: airbyte/source-mongodb-v2
dockerImageTag: 0.1.10
dockerImageTag: 0.1.11
documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2
icon: mongodb.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -51,11 +49,11 @@ public class SnowflakeInternalStagingConsumerFactory {
private final String CURRENT_SYNC_PATH = UUID.randomUUID().toString();

public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final SnowflakeStagingSqlOperations sqlOperations,
final SnowflakeSQLNameTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog) {
final JdbcDatabase database,
final SnowflakeStagingSqlOperations sqlOperations,
final SnowflakeSQLNameTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog);

return new BufferedStreamConsumer(
Expand Down Expand Up @@ -135,10 +133,10 @@ private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteCon
}

private RecordWriter recordWriterFunction(final JdbcDatabase database,
final SqlOperations snowflakeSqlOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog,
final SnowflakeSQLNameTransformer namingResolver) {
final SqlOperations snowflakeSqlOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog,
final SnowflakeSQLNameTransformer namingResolver) {
final Map<AirbyteStreamNameNamespacePair, WriteConfig> pairToWriteConfig =
writeConfigs.stream()
.collect(Collectors.toUnmodifiableMap(
Expand All @@ -160,9 +158,9 @@ private RecordWriter recordWriterFunction(final JdbcDatabase database,
}

private OnCloseFunction onCloseFunction(final JdbcDatabase database,
final SnowflakeStagingSqlOperations sqlOperations,
final List<WriteConfig> writeConfigs,
final SnowflakeSQLNameTransformer namingResolver) {
final SnowflakeStagingSqlOperations sqlOperations,
final List<WriteConfig> writeConfigs,
final SnowflakeSQLNameTransformer namingResolver) {
return (hasFailed) -> {
if (!hasFailed) {
final List<String> queryList = new ArrayList<>();
Expand All @@ -176,14 +174,14 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database,

final String path = namingResolver.getStagingPath(schemaName, dstTableName, CURRENT_SYNC_PATH);
LOGGER.info("Uploading data from stage: stream {}. schema {}, tmp table {}, stage path {}", writeConfig.getStreamName(), schemaName,
srcTableName,
path);
srcTableName,
path);
try {
sqlOperations.copyIntoTmpTableFromStage(database, path, srcTableName, schemaName);
} catch (SQLException e){
} catch (Exception e) {
sqlOperations.cleanUpStage(database, path);
LOGGER.info("Cleaning stage path {}", path);
throw new RuntimeException("Failed to upload data from stage "+ path, e);
throw new RuntimeException("Failed to upload data from stage " + path, e);
}

sqlOperations.createTableIfNotExists(database, schemaName, dstTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public String getStageName(String schemaName, String outputTableName) {
}

public String getStagingPath(String schemaName, String tableName, String currentSyncPath) {
return (getStageName(schemaName,tableName)+"/staged/"+currentSyncPath).toUpperCase();
return (getStageName(schemaName, tableName) + "/staged/" + currentSyncPath).toUpperCase();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@ public void cleanUpStage(JdbcDatabase database, String path) throws SQLException
public boolean isSchemaExists(JdbcDatabase database, String outputSchema) throws Exception {
return database.query(SHOW_SCHEMAS).map(schemas -> schemas.get(NAME).asText()).anyMatch(outputSchema::equalsIgnoreCase);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mongodb-v2

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.version=0.1.11
LABEL io.airbyte.name=airbyte/source-mongodb-v2
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies {
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
implementation project(':airbyte-integrations:connectors:source-relational-db')

implementation 'org.mongodb:mongodb-driver-sync:4.3.0'
implementation 'org.mongodb:mongodb-driver-sync:4.4.0'

testImplementation 'org.testcontainers:mongodb:1.15.3'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public List<CheckedConsumer<MongoDatabase, Exception>> getCheckOperations(final
throws Exception {
final List<CheckedConsumer<MongoDatabase, Exception>> checkList = new ArrayList<>();
checkList.add(database -> {
if (database.getCollectionNames().isEmpty()) {
if (getAuthorizedCollections(database).isEmpty()) {
throw new Exception("Unable to execute any operation on the source!");
} else {
LOGGER.info("The source passed the basic operation test!");
Expand All @@ -114,7 +114,7 @@ protected List<TableInfo<CommonField<BsonType>>> discoverInternal(final MongoDat
throws Exception {
final List<TableInfo<CommonField<BsonType>>> tableInfos = new ArrayList<>();

for (final String collectionName : database.getCollectionNames()) {
for (final String collectionName : getAuthorizedCollections(database)) {
final MongoCollection<Document> collection = database.getCollection(collectionName);
final Map<String, BsonType> uniqueFields = MongoUtils.getUniqueFields(collection);

Expand All @@ -135,6 +135,27 @@ protected List<TableInfo<CommonField<BsonType>>> discoverInternal(final MongoDat
return tableInfos;
}

private Set<String> getAuthorizedCollections(MongoDatabase database) {
/*
* db.runCommand ({listCollections: 1.0, authorizedCollections: true, nameOnly: true }) the command
* returns only those collections for which the user has privileges. For example, if a user has find
* action on specific collections, the command returns only those collections; or, if a user has
* find or any other action, on the database resource, the command lists all collections in the
* database.
*/
Document document = database.getDatabase().runCommand(new Document("listCollections", 1)
.append("authorizedCollections", true)
.append("nameOnly", true))
.append("filter", "{ 'type': 'collection' }");
return document.toBsonDocument()
.get("cursor").asDocument()
.getArray("firstBatch")
.stream()
.map(bsonValue -> bsonValue.asDocument().getString("name").getValue())
.collect(Collectors.toSet());

}

@Override
protected List<TableInfo<CommonField<BsonType>>> discoverInternal(final MongoDatabase database, final String schema) throws Exception {
// MondoDb doesn't support schemas
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.11 | 2022-01-10 | [9238](https://github.com/airbytehq/airbyte/pull/9238) | Return only those collections for which the user has privileges |
| 0.1.10 | 2021-12-30 | [9202](https://github.com/airbytehq/airbyte/pull/9202) | Update connector fields title/description |
| 0.1.9 | 2021-12-07 | [8491](https://github.com/airbytehq/airbyte/pull/8491) | Configure 10000 limit doc reading during Discovery step |
| 0.1.8 | 2021-11-29 | [8306](https://github.com/airbytehq/airbyte/pull/8306) | Added milliseconds for date format for cursor |
Expand Down