Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚨 Add ability to enforce SSL in MongoDB source connector #17590

Merged
merged 17 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@
- name: MongoDb
sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerRepository: airbyte/source-mongodb-v2
dockerImageTag: 0.1.17
dockerImageTag: 0.1.18
documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2
icon: mongodb.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6801,7 +6801,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-mongodb-v2:0.1.17"
- dockerImage: "airbyte/source-mongodb-v2:0.1.18"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
changelogUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mongodb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.version=0.1.11
LABEL io.airbyte.name=airbyte/source-mongodb-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@

package io.airbyte.integrations.source.mongodb;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.spec_modification.SpecModifyingSource;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.ConnectorSpecification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -22,7 +26,21 @@ public MongodbSourceStrictEncrypt() {
}

@Override
public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) throws Exception {
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
final JsonNode instanceConfig = config.get(MongoDbSourceUtils.INSTANCE_TYPE);
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoDbSourceUtils.INSTANCE).asText());
// If the MongoDb source connector is not set up to use a TLS connection, then we should fail the check.
if (instance.equals(MongoInstanceType.STANDALONE) && !MongoDbSourceUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("TLS connection must be used to read from MongoDB.");
}

return super.check(config);
}

@Override
public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
// removing tls property for a standalone instance to disable possibility to switch off a tls
// connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.mongodb.client.MongoCollection;
Expand All @@ -17,6 +18,8 @@
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand Down Expand Up @@ -78,7 +81,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put("auth_source", "admin")
.build());

var credentials = String.format("%s:%s@", config.get("user").asText(),
final var credentials = String.format("%s:%s@", config.get("user").asText(),
config.get(JdbcUtils.PASSWORD_KEY).asText());
final String connectionString = String.format("mongodb+srv://%s%s/%s?retryWrites=true&w=majority&tls=true",
credentials,
Expand Down Expand Up @@ -146,4 +149,24 @@ void testSpec() throws Exception {
assertEquals(expected, actual);
}

@Test
void testCheck() throws Exception {
final JsonNode instanceConfig = Jsons.jsonNode(ImmutableMap.builder()
.put("instance", MongoInstanceType.STANDALONE.getType())
.put("tls", false)
.build());

final JsonNode invalidStandaloneConfig = Jsons.clone(getConfig());

((ObjectNode) invalidStandaloneConfig).put(INSTANCE_TYPE, instanceConfig);

final AirbyteConnectionStatus actual = new MongodbSourceStrictEncrypt().check(invalidStandaloneConfig);
final AirbyteConnectionStatus expected =
new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("TLS connection must be used to read from MongoDB.");

assertEquals(expected, actual);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mongodb-v2

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.17
LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.name=airbyte/source-mongodb-v2
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@
package io.airbyte.integrations.source.mongodb;

import static com.mongodb.client.model.Filters.gt;
import static org.bson.BsonType.DATE_TIME;
import static org.bson.BsonType.DECIMAL128;
import static org.bson.BsonType.DOCUMENT;
import static org.bson.BsonType.DOUBLE;
import static org.bson.BsonType.INT32;
import static org.bson.BsonType.INT64;
import static org.bson.BsonType.OBJECT_ID;
import static org.bson.BsonType.STRING;
import static org.bson.BsonType.TIMESTAMP;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -55,20 +46,6 @@ public class MongoDbSource extends AbstractDbSource<BsonType, MongoDatabase> {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSource.class);

private static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=%s&ssl=%s";
private static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?authSource=%s&retryWrites=true&w=majority&tls=true";
private static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=%s&directConnection=false&ssl=true";
private static final String USER = "user";
private static final String INSTANCE_TYPE = "instance_type";
private static final String INSTANCE = "instance";
private static final String CLUSTER_URL = "cluster_url";
private static final String SERVER_ADDRESSES = "server_addresses";
private static final String REPLICA_SET = "replica_set";
private static final String AUTH_SOURCE = "auth_source";
private static final String PRIMARY_KEY = "_id";
private static final Set<BsonType> ALLOWED_CURSOR_TYPES = Set.of(DOUBLE, STRING, DOCUMENT, OBJECT_ID, DATE_TIME,
INT32, TIMESTAMP, INT64, DECIMAL128);

public static void main(final String[] args) throws Exception {
final Source source = new MongoDbSource();
LOGGER.info("starting source: {}", MongoDbSource.class);
Expand All @@ -78,8 +55,8 @@ public static void main(final String[] args) throws Exception {

@Override
public JsonNode toDatabaseConfig(final JsonNode config) {
final var credentials = config.has(USER) && config.has(JdbcUtils.PASSWORD_KEY)
? String.format("%s:%s@", config.get(USER).asText(), config.get(JdbcUtils.PASSWORD_KEY).asText())
final var credentials = config.has(MongoDbSourceUtils.USER) && config.has(JdbcUtils.PASSWORD_KEY)
? String.format("%s:%s@", config.get(MongoDbSourceUtils.USER).asText(), config.get(JdbcUtils.PASSWORD_KEY).asText())
: StringUtils.EMPTY;

return Jsons.jsonNode(ImmutableMap.builder()
Expand Down Expand Up @@ -132,7 +109,7 @@ protected List<TableInfo<CommonField<BsonType>>> discoverInternal(final MongoDat
.nameSpace(database.getName())
.name(collectionName)
.fields(fields)
.primaryKeys(List.of(PRIMARY_KEY))
.primaryKeys(List.of(MongoDbSourceUtils.PRIMARY_KEY))
.build();

tableInfos.add(tableInfo);
Expand Down Expand Up @@ -214,7 +191,7 @@ public boolean isCursorType(final BsonType bsonType) {
// when we have no cursor field here, at least id could be used as cursor here.
// This logic will be used feather when we will implement part which will show only list of possible
// cursor fields on UI
return ALLOWED_CURSOR_TYPES.contains(bsonType);
return MongoDbSourceUtils.ALLOWED_CURSOR_TYPES.contains(bsonType);
}

private AutoCloseableIterator<JsonNode> queryTable(final MongoDatabase database,
Expand All @@ -234,31 +211,30 @@ private AutoCloseableIterator<JsonNode> queryTable(final MongoDatabase database,
private String buildConnectionString(final JsonNode config, final String credentials) {
final StringBuilder connectionStrBuilder = new StringBuilder();

final JsonNode instanceConfig = config.get(INSTANCE_TYPE);
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(INSTANCE).asText());
final JsonNode instanceConfig = config.get(MongoDbSourceUtils.INSTANCE_TYPE);
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoDbSourceUtils.INSTANCE).asText());
switch (instance) {
case STANDALONE -> {
// supports backward compatibility and secure only connector
final var tls = config.has(JdbcUtils.TLS_KEY) ? config.get(JdbcUtils.TLS_KEY).asBoolean()
: (instanceConfig.has(JdbcUtils.TLS_KEY) ? instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean() : true);
connectionStrBuilder.append(
String.format(MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(),
String.format(MongoDbSourceUtils.MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(),
instanceConfig.get(JdbcUtils.PORT_KEY).asText(),
config.get(JdbcUtils.DATABASE_KEY).asText(), config.get(AUTH_SOURCE).asText(), tls));
config.get(JdbcUtils.DATABASE_KEY).asText(),
config.get(MongoDbSourceUtils.AUTH_SOURCE).asText(), MongoDbSourceUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)));
}
case REPLICA -> {
connectionStrBuilder.append(
String.format(MONGODB_REPLICA_URL, credentials, instanceConfig.get(SERVER_ADDRESSES).asText(),
String.format(MongoDbSourceUtils.MONGODB_REPLICA_URL, credentials, instanceConfig.get(MongoDbSourceUtils.SERVER_ADDRESSES).asText(),
config.get(JdbcUtils.DATABASE_KEY).asText(),
config.get(AUTH_SOURCE).asText()));
if (instanceConfig.has(REPLICA_SET)) {
connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(REPLICA_SET).asText()));
config.get(MongoDbSourceUtils.AUTH_SOURCE).asText()));
if (instanceConfig.has(MongoDbSourceUtils.REPLICA_SET)) {
connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(MongoDbSourceUtils.REPLICA_SET).asText()));
}
}
case ATLAS -> {
connectionStrBuilder.append(
String.format(MONGODB_CLUSTER_URL, credentials, instanceConfig.get(CLUSTER_URL).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(),
config.get(AUTH_SOURCE).asText()));
String.format(MongoDbSourceUtils.MONGODB_CLUSTER_URL, credentials,
instanceConfig.get(MongoDbSourceUtils.CLUSTER_URL).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(),
config.get(MongoDbSourceUtils.AUTH_SOURCE).asText()));
}
default -> throw new IllegalArgumentException("Unsupported instance type: " + instance);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.airbyte.integrations.source.mongodb;

import static org.bson.BsonType.DATE_TIME;
import static org.bson.BsonType.DECIMAL128;
import static org.bson.BsonType.DOCUMENT;
import static org.bson.BsonType.DOUBLE;
import static org.bson.BsonType.INT32;
import static org.bson.BsonType.INT64;
import static org.bson.BsonType.OBJECT_ID;
import static org.bson.BsonType.STRING;
import static org.bson.BsonType.TIMESTAMP;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.db.jdbc.JdbcUtils;
import java.util.Set;
import org.bson.BsonType;

public final class MongoDbSourceUtils {

private MongoDbSourceUtils() {}

public static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=%s&ssl=%s";
public static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?authSource=%s&retryWrites=true&w=majority&tls=true";
public static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=%s&directConnection=false&ssl=true";
public static final String USER = "user";
public static final String INSTANCE_TYPE = "instance_type";
public static final String INSTANCE = "instance";
public static final String CLUSTER_URL = "cluster_url";
public static final String SERVER_ADDRESSES = "server_addresses";
public static final String REPLICA_SET = "replica_set";
public static final String AUTH_SOURCE = "auth_source";
public static final String PRIMARY_KEY = "_id";
public static final Set<BsonType> ALLOWED_CURSOR_TYPES = Set.of(DOUBLE, STRING, DOCUMENT, OBJECT_ID, DATE_TIME,
INT32, TIMESTAMP, INT64, DECIMAL128);

/**
* Determines whether TLS/SSL should be enabled for a standalone instance of MongoDB.
*/
public static boolean tlsEnabledForStandaloneInstance(final JsonNode config, final JsonNode instanceConfig) {
Copy link
Contributor

@ryankfu ryankfu Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should both the javadoc comment and the method name be more generalized? It seems you're passing in an instanceConfig so it appears this can be extended in the future to handle other instance types yet this method is specifically for Standalone instances, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is actually specifically for Standalone instances actually. For Atlas and replica set deployments, we are overrwriting the arguments to always have tls/ssl=true in the parames (see MONGODB_SERVER_URL and MONGODB_CLUSTER_URL above. I was essentially duplicating the logic in the source-mongodb-strict-encrypt's check() method, so pulled it out here. The naming is specifically a warning to not rely on this method for non-standalone instances

return config.has(JdbcUtils.TLS_KEY) ? config.get(JdbcUtils.TLS_KEY).asBoolean()
: (instanceConfig.has(JdbcUtils.TLS_KEY) ? instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean() : true);
}
}
39 changes: 20 additions & 19 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,23 @@ For more information regarding configuration parameters, please see [MongoDb Doc

## Changelog

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.17 | 2022-09-08 | [16401](https://github.com/airbytehq/airbyte/pull/16401) | Fixed bug with empty strings in fields with __aibyte_transform_ |
| 0.1.16 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field |
| 0.1.15 | 2022-06-17 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Updated stacktrace format for any trace message errors |
| 0.1.14 | 2022-05-05 | [12428](https://github.com/airbytehq/airbyte/pull/12428) | JsonSchema: Add properties to fields with type 'object' |
| 0.1.13 | 2022-02-21 | [10276](https://github.com/airbytehq/airbyte/pull/10276) | Create a custom codec registry to handle DBRef MongoDB objects |
| 0.1.12 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | (unpublished) Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.1.11 | 2022-01-10 | [9238](https://github.com/airbytehq/airbyte/pull/9238) | Return only those collections for which the user has privileges |
| 0.1.10 | 2021-12-30 | [9202](https://github.com/airbytehq/airbyte/pull/9202) | Update connector fields title/description |
| 0.1.9 | 2021-12-07 | [8491](https://github.com/airbytehq/airbyte/pull/8491) | Configure 10000 limit doc reading during Discovery step |
| 0.1.8 | 2021-11-29 | [8306](https://github.com/airbytehq/airbyte/pull/8306) | Added milliseconds for date format for cursor |
| 0.1.7 | 2021-11-22 | [8161](https://github.com/airbytehq/airbyte/pull/8161) | Updated Performance and updated cursor for timestamp type |
| 0.1.5 | 2021-11-17 | [8046](https://github.com/airbytehq/airbyte/pull/8046) | Added milliseconds to convert timestamp to datetime format |
| 0.1.4 | 2021-11-15 | [7982](https://github.com/airbytehq/airbyte/pull/7982) | Updated Performance |
| 0.1.3 | 2021-10-19 | [7160](https://github.com/airbytehq/airbyte/pull/7160) | Fixed nested document parsing |
| 0.1.2 | 2021-10-07 | [6860](https://github.com/airbytehq/airbyte/pull/6860) | Added filter to avoid MongoDb system collections |
| 0.1.1 | 2021-09-21 | [6364](https://github.com/airbytehq/airbyte/pull/6364) | Source MongoDb: added support via TLS/SSL |
| 0.1.0 | 2021-08-30 | [5530](https://github.com/airbytehq/airbyte/pull/5530) | New source: MongoDb ported to java |
| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--- |:----------------------------------------------------------------------------------------------------------|
| 0.1.18 | 2022-10-05 | [17590](https://github.com/airbytehq/airbyte/pull/17590) | Add ability to enforce SSL in MongoDB connector and check logic _ |
| 0.1.17 | 2022-09-08 | [16401](https://github.com/airbytehq/airbyte/pull/16401) | Fixed bug with empty strings in fields with __aibyte_transform_ |
| 0.1.16 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field |
| 0.1.15 | 2022-06-17 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Updated stacktrace format for any trace message errors |
| 0.1.14 | 2022-05-05 | [12428](https://github.com/airbytehq/airbyte/pull/12428) | JsonSchema: Add properties to fields with type 'object' |
| 0.1.13 | 2022-02-21 | [10276](https://github.com/airbytehq/airbyte/pull/10276) | Create a custom codec registry to handle DBRef MongoDB objects |
| 0.1.12 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | (unpublished) Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.1.11 | 2022-01-10 | [9238](https://github.com/airbytehq/airbyte/pull/9238) | Return only those collections for which the user has privileges |
| 0.1.10 | 2021-12-30 | [9202](https://github.com/airbytehq/airbyte/pull/9202) | Update connector fields title/description |
| 0.1.9 | 2021-12-07 | [8491](https://github.com/airbytehq/airbyte/pull/8491) | Configure 10000 limit doc reading during Discovery step |
| 0.1.8 | 2021-11-29 | [8306](https://github.com/airbytehq/airbyte/pull/8306) | Added milliseconds for date format for cursor |
| 0.1.7 | 2021-11-22 | [8161](https://github.com/airbytehq/airbyte/pull/8161) | Updated Performance and updated cursor for timestamp type |
| 0.1.5 | 2021-11-17 | [8046](https://github.com/airbytehq/airbyte/pull/8046) | Added milliseconds to convert timestamp to datetime format |
| 0.1.4 | 2021-11-15 | [7982](https://github.com/airbytehq/airbyte/pull/7982) | Updated Performance |
| 0.1.3 | 2021-10-19 | [7160](https://github.com/airbytehq/airbyte/pull/7160) | Fixed nested document parsing |
| 0.1.2 | 2021-10-07 | [6860](https://github.com/airbytehq/airbyte/pull/6860) | Added filter to avoid MongoDb system collections |
| 0.1.1 | 2021-09-21 | [6364](https://github.com/airbytehq/airbyte/pull/6364) | Source MongoDb: added support via TLS/SSL |
| 0.1.0 | 2021-08-30 | [5530](https://github.com/airbytehq/airbyte/pull/5530) | New source: MongoDb ported to java |