Skip to content

Commit

Permalink
🐛Source-postgres: added materialized views processing (#9116)
Browse files Browse the repository at this point in the history
* [9012] Source-postgres: added materialized views processing
  • Loading branch information
etsybaev authored Jan 5, 2022
1 parent 40db22c commit d791972
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.3.17",
"dockerImageTag": "0.4.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres",
"icon": "postgresql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.0
dockerImageTag: 0.4.1
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.0
LABEL io.airbyte.version=0.4.1
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,10 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {
}

@Override
public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config) throws Exception {
final List<CheckedConsumer<JdbcDatabase, Exception>> checkOperations = new ArrayList<>(super.getCheckOperations(config));
public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config)
throws Exception {
final List<CheckedConsumer<JdbcDatabase, Exception>> checkOperations = new ArrayList<>(
super.getCheckOperations(config));

if (isCdc(config)) {
checkOperations.add(database -> {
Expand All @@ -129,21 +131,24 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final J
ps.setString(2, PostgresUtils.getPluginValue(config.get("replication_method")));
ps.setString(3, config.get("database").asText());

LOGGER.info("Attempting to find the named replication slot using the query: " + ps.toString());
LOGGER.info(
"Attempting to find the named replication slot using the query: " + ps.toString());

return ps;
}, sourceOperations::rowToJson).collect(toList());

if (matchingSlots.size() != 1) {
throw new RuntimeException("Expected exactly one replication slot but found " + matchingSlots.size()
+ ". Please read the docs and add a replication slot to your database.");
throw new RuntimeException(
"Expected exactly one replication slot but found " + matchingSlots.size()
+ ". Please read the docs and add a replication slot to your database.");
}

});

checkOperations.add(database -> {
final List<JsonNode> matchingPublications = database.query(connection -> {
final PreparedStatement ps = connection.prepareStatement("SELECT * FROM pg_publication WHERE pubname = ?");
final PreparedStatement ps = connection
.prepareStatement("SELECT * FROM pg_publication WHERE pubname = ?");
ps.setString(1, config.get("replication_method").get("publication").asText());

LOGGER.info("Attempting to find the publication using the query: " + ps.toString());
Expand All @@ -152,8 +157,9 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final J
}, sourceOperations::rowToJson).collect(toList());

if (matchingPublications.size() != 1) {
throw new RuntimeException("Expected exactly one publication but found " + matchingPublications.size()
+ ". Please read the docs and add a publication to your database.");
throw new RuntimeException(
"Expected exactly one publication but found " + matchingPublications.size()
+ ". Please read the docs and add a publication to your database.");
}

});
Expand All @@ -163,7 +169,9 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final J
}

@Override
public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state)
public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final JsonNode state)
throws Exception {
// this check is used to ensure that have the pgoutput slot available so Debezium won't attempt to
// create it.
Expand All @@ -177,7 +185,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final C
}

@Override
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final JdbcDatabase database,
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<JDBCType>>> tableNameToTable,
final StateManager stateManager,
Expand All @@ -192,10 +201,13 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
*/
final JsonNode sourceConfig = database.getSourceConfig();
if (isCdc(sourceConfig)) {
final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig, PostgresCdcTargetPosition.targetPosition(database),
final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig,
PostgresCdcTargetPosition.targetPosition(database),
PostgresCdcProperties.getDebeziumProperties(sourceConfig), catalog, false);
return handler.getIncrementalIterators(new PostgresCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()),
new PostgresCdcStateHandler(stateManager), new PostgresCdcConnectorMetadataInjector(), emittedAt);
return handler.getIncrementalIterators(
new PostgresCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()),
new PostgresCdcStateHandler(stateManager), new PostgresCdcConnectorMetadataInjector(),
emittedAt);

} else {
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt);
Expand Down Expand Up @@ -228,13 +240,47 @@ private static AirbyteStream removeIncrementalWithoutPk(final AirbyteStream stre
}

@Override
public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException {
public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase database,
final String schema)
throws SQLException {
return database.query(connection -> {
final PreparedStatement ps = connection.prepareStatement(
"SELECT DISTINCT table_catalog, table_schema, table_name, privilege_type\n"
+ "FROM information_schema.table_privileges\n"
+ "WHERE grantee = ? AND privilege_type = 'SELECT'");
ps.setString(1, database.getDatabaseConfig().get("username").asText());
"""
SELECT DISTINCT table_catalog,
table_schema,
table_name,
privilege_type
FROM information_schema.table_privileges
WHERE grantee = ?
AND privilege_type = 'SELECT'
UNION ALL
SELECT r.rolname AS table_catalog,
n.nspname AS table_schema,
c.relname AS table_name,
-- the initial query is supposed to get a SELECT type. Since we use a UNION query
-- to get Views that we can read (i.e. select) - then lets fill this columns with SELECT
-- value to keep the backward-compatibility
COALESCE ('SELECT') AS privilege_type
FROM pg_class c
JOIN pg_namespace n
ON n.oid = relnamespace
JOIN pg_roles r
ON r.oid = relowner,
Unnest(COALESCE(relacl::text[], Format('{%s=arwdDxt/%s}', rolname, rolname)::text[])) acl,
Regexp_split_to_array(acl, '=|/') s
WHERE r.rolname = ?
AND nspname = 'public'
-- 'm' means Materialized View
AND c.relkind = 'm'
AND (
-- all grants
c.relacl IS NULL
-- read grant
OR s[2] = 'r');
""");
final String username = database.getDatabaseConfig().get("username").asText();
ps.setString(1, username);
ps.setString(2, username);
return ps;
}, sourceOperations::rowToJson)
.collect(toSet())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "public.testview";

private PostgreSQLContainer<?> container;
private JsonNode config;
Expand Down Expand Up @@ -67,6 +68,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));");
ctx.fetch("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');");
ctx.fetch("CREATE MATERIALIZED VIEW testview AS select * from id_and_name where id = '2';");
return null;
});

Expand Down Expand Up @@ -113,6 +115,15 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
STREAM_NAME2,
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME_MATERIALIZED_VIEW,
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
}

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------|
| 0.4.1 | 2022-01-05 | [9116](https://github.com/airbytehq/airbyte/pull/9116) | Added materialized views processing |
| 0.4.0 | 2021-12-13 | [8726](https://github.com/airbytehq/airbyte/pull/8726) | Support all Postgres types |
| 0.3.17 | 2021-12-01 | [8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key |
| 0.3.16 | 2021-11-28 | [7995](https://github.com/airbytehq/airbyte/pull/7995) | Fixed money type with amount > 1000 |
Expand Down

0 comments on commit d791972

Please sign in to comment.