Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source redshift: implement privileges check #9744

Merged
merged 34 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a98a8e6
fix: postgres priviledge check for redshift
ethanve Jan 6, 2022
ee65b0d
add missing imports
alafanechere Jan 24, 2022
f17818c
Merge branch 'master' into augustin/fix/redshift-privileges
alafanechere Jan 24, 2022
fcecd98
Merge branch 'master' into augustin/fix/redshift-privileges
alafanechere Jan 26, 2022
0396618
bump version
alafanechere Jan 26, 2022
33ada8f
format
alafanechere Jan 26, 2022
3ba2418
Update docs/integrations/sources/redshift.md
alafanechere Jan 27, 2022
7ecc9e4
remove not used fields from query results
alafanechere Jan 31, 2022
3d996e4
improve changelog
alafanechere Jan 31, 2022
72690a7
create unreadable table
alafanechere Jan 31, 2022
fb2b6a6
Merge branch 'master' into augustin/fix/redshift-privileges
alafanechere Jan 31, 2022
5eaf663
update documentation url in spec.json
alafanechere Jan 31, 2022
b079fdc
fix tests
alafanechere Jan 31, 2022
4ce02a6
fix tests
alafanechere Jan 31, 2022
c945988
bump version to 0.3.8 in dockerfile
alafanechere Feb 4, 2022
b9b9227
Fix method override
tuliren Feb 4, 2022
1c9ed2d
Limit schemas in jdbc test
tuliren Feb 4, 2022
81591a2
Revert jdbc acceptance test change
tuliren Feb 5, 2022
ceba782
Fix sql statement
tuliren Feb 5, 2022
81e73de
Fix empty stream name and wrong username
tuliren Feb 5, 2022
1137d6a
Close the connection that queries for privileges in redshift
tuliren Feb 5, 2022
0c3e8a7
Add more comment
tuliren Feb 5, 2022
f214661
fix SQL query to check privileges
alafanechere Feb 21, 2022
7f2eec5
Merge branch 'master' into augustin/fix/redshift-privileges
alafanechere Feb 21, 2022
a7782f4
bump to 0.3.9
alafanechere Feb 21, 2022
d698334
fix SQL query to check privileges
alafanechere Feb 21, 2022
d940fc0
fix SQL query to check privileges
alafanechere Feb 21, 2022
5285228
use specific test user
alafanechere Feb 22, 2022
87d343e
Merge branch 'master' into augustin/fix/redshift-privileges
alafanechere Feb 22, 2022
cddf939
Merge branch 'master' into augustin/fix/redshift-privileges
alafanechere Feb 23, 2022
e29b817
make changes suggested by liren
alafanechere Feb 28, 2022
f7aa4f2
update source_specs.yaml
alafanechere Mar 1, 2022
b47cb35
Merge branch 'master' into augustin/fix/redshift-privileges
alafanechere Mar 1, 2022
740c8bc
format
alafanechere Mar 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@
- name: Redshift
sourceDefinitionId: e87ffa8e-a3b5-f69c-9076-6011339de1f6
dockerRepository: airbyte/source-redshift
dockerImageTag: 0.3.8
dockerImageTag: 0.3.9
documentationUrl: https://docs.airbyte.io/integrations/sources/redshift
icon: redshift.svg
sourceType: database
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6631,9 +6631,9 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-redshift:0.3.8"
- dockerImage: "airbyte/source-redshift:0.3.9"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/redshift"
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
$schema: "http://json-schema.org/draft-07/schema#"
title: "Redshift Source Spec"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-redshift/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.8
LABEL io.airbyte.version=0.3.9
LABEL io.airbyte.name=airbyte/source-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.protocol.models.CommonField;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
Expand Down Expand Up @@ -93,6 +97,27 @@ public Set<String> getExcludedInternalNameSpaces() {
return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history");
}

@Override
public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException {
return new HashSet<>(database.bufferedResultSetQuery(
connection -> {
connection.setAutoCommit(true);
final PreparedStatement ps = connection.prepareStatement(
"SELECT schemaname, tablename "
+ "FROM pg_tables "
+ "WHERE has_table_privilege(schemaname||'.'||tablename, 'select') = true AND schemaname = ?;");
ps.setString(1, schema);
return ps.executeQuery();
},
resultSet -> {
final JsonNode json = sourceOperations.rowToJson(resultSet);
return JdbcPrivilegeDto.builder()
.schemaName(json.get("schemaname").asText())
.tableName(json.get("tablename").asText())
.build();
}));
}

public static void main(final String[] args) throws Exception {
final Source source = new RedshiftSource();
LOGGER.info("starting source: {}", RedshiftSource.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/redshift",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Redshift Source Spec",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ private static JsonNode getStaticConfig() {
@BeforeEach
public void setup() throws Exception {
config = getStaticConfig();

super.setup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class RedshiftSourceAcceptanceTest extends SourceAcceptanceTest {
// This test case expects an active redshift cluster that is useable from outside of vpc
protected ObjectNode config;
protected JdbcDatabase database;
protected String testUserName;
protected String testUserPassword;
protected String schemaName;
protected String schemaToIgnore;
protected String streamName;
Expand All @@ -53,22 +55,30 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
config = getStaticConfig();

database = createDatabase(config);

testUserName = "foo";
testUserPassword = "BarBarBar1&";
createTestUser(database, config, testUserName, testUserPassword);
schemaName = Strings.addRandomSuffix("integration_test", "_", 5).toLowerCase();
schemaToIgnore = schemaName + "shouldIgnore";
streamName = "customer";

// limit the connection to one schema only
config = config.set("schemas", Jsons.jsonNode(List.of(schemaName)));

// use test user user
config = config.set("username", Jsons.jsonNode(testUserName));
config = config.set("password", Jsons.jsonNode(testUserPassword));

// create a test data
createTestData(database, schemaName);
createTestData(database, schemaName, streamName, testUserName, true);
createTestData(database, schemaName, "not_readable", testUserName, false);

// create a schema with data that will not be used for testing, but would be used to check schema
// filtering. This one should not be visible in results
createTestData(database, schemaToIgnore);
createTestData(database, schemaToIgnore, streamName, testUserName, true);
}

protected static JdbcDatabase createDatabase(final JsonNode config) {
protected JdbcDatabase createDatabase(final JsonNode config) {
return Databases.createJdbcDatabase(
config.get("username").asText(),
config.get("password").asText(),
Expand All @@ -79,15 +89,30 @@ protected static JdbcDatabase createDatabase(final JsonNode config) {
RedshiftSource.DRIVER_CLASS);
}

protected void createTestData(final JdbcDatabase database, final String schemaName)
protected void createTestUser(final JdbcDatabase database, final JsonNode config, final String testUserName, final String testUserPassword)
throws SQLException {
final String createTestUserQuery = String.format("CREATE USER %s PASSWORD '%s'", testUserName, testUserPassword);
database.execute(connection -> {
connection.createStatement().execute(createTestUserQuery);
});
final String grantSelectOnPgTablesQuery = String.format("GRANT SELECT ON TABLE pg_tables TO %s ", testUserName);
database.execute(connection -> {
connection.createStatement().execute(grantSelectOnPgTablesQuery);
});
}

protected void createTestData(final JdbcDatabase database,
final String schemaName,
final String tableName,
final String testUserName,
final Boolean isReadableByTestUser)
throws SQLException {
final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
final String createSchemaQuery = String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName);
database.execute(connection -> {
connection.createStatement().execute(createSchemaQuery);
});

streamName = "customer";
final String fqTableName = JdbcUtils.getFullyQualifiedTableName(schemaName, streamName);
final String fqTableName = JdbcUtils.getFullyQualifiedTableName(schemaName, tableName);
final String createTestTable =
String.format(
"CREATE TABLE IF NOT EXISTS %s (c_custkey INTEGER, c_name VARCHAR(16), c_nation VARCHAR(16));\n",
Expand All @@ -101,6 +126,22 @@ protected void createTestData(final JdbcDatabase database, final String schemaNa
database.execute(connection -> {
connection.createStatement().execute(insertTestData);
});

if (!isReadableByTestUser) {
final String revokeSelect = String.format("REVOKE SELECT ON TABLE %s FROM %s;\n", fqTableName, testUserName);
database.execute(connection -> {
connection.createStatement().execute(revokeSelect);
});
} else {
final String grantUsageQuery = String.format("GRANT USAGE ON SCHEMA %s TO %s;\n", schemaName, testUserName);
database.execute(connection -> {
connection.createStatement().execute(grantUsageQuery);
});
final String grantSelectQuery = String.format("GRANT SELECT ON TABLE %s TO %s;\n", fqTableName, testUserName);
database.execute(connection -> {
connection.createStatement().execute(grantSelectQuery);
});
}
}

@Override
Expand All @@ -109,6 +150,10 @@ protected void tearDown(final TestDestinationEnv testEnv) throws SQLException {
.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schemaName)));
database.execute(connection -> connection.createStatement()
.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schemaToIgnore)));
database.execute(connection -> connection.createStatement()
.execute(String.format("REVOKE SELECT ON table pg_tables FROM %s", testUserName)));
database.execute(connection -> connection.createStatement()
.execute(String.format("DROP USER IF EXISTS %s", testUserName)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

public class RedshiftSslSourceAcceptanceTest extends RedshiftSourceAcceptanceTest {

protected static JdbcDatabase createDatabase(final JsonNode config) {
@Override
protected JdbcDatabase createDatabase(final JsonNode config) {
return Databases.createJdbcDatabase(
config.get("username").asText(),
config.get("password").asText(),
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ All Redshift connections are encrypted using SSL

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.3 .8 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.3.9 | 2022-02-21 | [9744](https://github.com/airbytehq/airbyte/pull/9744) | List only the tables on which the user has SELECT permissions.
| 0.3.8 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.3.7 | 2022-01-26 | [9721](https://github.com/airbytehq/airbyte/pull/9721) | Added schema selection |
| 0.3.6 | 2022-01-20 | [8617](https://github.com/airbytehq/airbyte/pull/8617) | Update connector fields title/description |
| 0.3.5 | 2021-12-24 | [8958](https://github.com/airbytehq/airbyte/pull/8958) | Add support for JdbcType.ARRAY |
Expand Down