diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json index 58c40f87a119a..e97a5f07c782b 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750", "name": "Postgres", "dockerRepository": "airbyte/source-postgres", - "dockerImageTag": "0.3.17", + "dockerImageTag": "0.4.1", "documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres", "icon": "postgresql.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 1d360c7fca4de..79ff39c818bbb 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -537,7 +537,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 0.4.0 + dockerImageTag: 0.4.1 documentationUrl: https://docs.airbyte.io/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 5ad1f8e4340fb..a303e5b1dd99d 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.5 +LABEL io.airbyte.version=0.1.6 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 85cfddff1d742..7b76636deed89 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.0 +LABEL io.airbyte.version=0.4.1 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 4fcdc9a09bbb2..9cdc63b7a1801 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -117,8 +117,10 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception { } @Override - public List> getCheckOperations(final JsonNode config) throws Exception { - final List> checkOperations = new ArrayList<>(super.getCheckOperations(config)); + public List> getCheckOperations(final JsonNode config) + throws Exception { + final List> checkOperations = new ArrayList<>( + super.getCheckOperations(config)); if (isCdc(config)) { checkOperations.add(database -> { @@ -129,21 +131,24 @@ public List> getCheckOperations(final J ps.setString(2, PostgresUtils.getPluginValue(config.get("replication_method"))); ps.setString(3, config.get("database").asText()); - LOGGER.info("Attempting to find the named replication slot using the query: " + ps.toString()); + LOGGER.info( + "Attempting to find the named replication slot using the query: " + ps.toString()); return ps; }, sourceOperations::rowToJson).collect(toList()); if (matchingSlots.size() != 1) { - throw new RuntimeException("Expected exactly one replication slot but found " + matchingSlots.size() - + ". Please read the docs and add a replication slot to your database."); + throw new RuntimeException( + "Expected exactly one replication slot but found " + matchingSlots.size() + + ". Please read the docs and add a replication slot to your database."); } }); checkOperations.add(database -> { final List matchingPublications = database.query(connection -> { - final PreparedStatement ps = connection.prepareStatement("SELECT * FROM pg_publication WHERE pubname = ?"); + final PreparedStatement ps = connection + .prepareStatement("SELECT * FROM pg_publication WHERE pubname = ?"); ps.setString(1, config.get("replication_method").get("publication").asText()); LOGGER.info("Attempting to find the publication using the query: " + ps.toString()); @@ -152,8 +157,9 @@ public List> getCheckOperations(final J }, sourceOperations::rowToJson).collect(toList()); if (matchingPublications.size() != 1) { - throw new RuntimeException("Expected exactly one publication but found " + matchingPublications.size() - + ". Please read the docs and add a publication to your database."); + throw new RuntimeException( + "Expected exactly one publication but found " + matchingPublications.size() + + ". Please read the docs and add a publication to your database."); } }); @@ -163,7 +169,9 @@ public List> getCheckOperations(final J } @Override - public AutoCloseableIterator read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state) + public AutoCloseableIterator read(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final JsonNode state) throws Exception { // this check is used to ensure that have the pgoutput slot available so Debezium won't attempt to // create it. @@ -177,7 +185,8 @@ public AutoCloseableIterator read(final JsonNode config, final C } @Override - public List> getIncrementalIterators(final JdbcDatabase database, + public List> getIncrementalIterators( + final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog, final Map>> tableNameToTable, final StateManager stateManager, @@ -192,10 +201,13 @@ public List> getIncrementalIterators(final */ final JsonNode sourceConfig = database.getSourceConfig(); if (isCdc(sourceConfig)) { - final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig, PostgresCdcTargetPosition.targetPosition(database), + final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig, + PostgresCdcTargetPosition.targetPosition(database), PostgresCdcProperties.getDebeziumProperties(sourceConfig), catalog, false); - return handler.getIncrementalIterators(new PostgresCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()), - new PostgresCdcStateHandler(stateManager), new PostgresCdcConnectorMetadataInjector(), emittedAt); + return handler.getIncrementalIterators( + new PostgresCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()), + new PostgresCdcStateHandler(stateManager), new PostgresCdcConnectorMetadataInjector(), + emittedAt); } else { return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt); @@ -228,13 +240,47 @@ private static AirbyteStream removeIncrementalWithoutPk(final AirbyteStream stre } @Override - public Set getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException { + public Set getPrivilegesTableForCurrentUser(final JdbcDatabase database, + final String schema) + throws SQLException { return database.query(connection -> { final PreparedStatement ps = connection.prepareStatement( - "SELECT DISTINCT table_catalog, table_schema, table_name, privilege_type\n" - + "FROM information_schema.table_privileges\n" - + "WHERE grantee = ? AND privilege_type = 'SELECT'"); - ps.setString(1, database.getDatabaseConfig().get("username").asText()); + """ + SELECT DISTINCT table_catalog, + table_schema, + table_name, + privilege_type + FROM information_schema.table_privileges + WHERE grantee = ? + AND privilege_type = 'SELECT' + UNION ALL + SELECT r.rolname AS table_catalog, + n.nspname AS table_schema, + c.relname AS table_name, + -- the initial query is supposed to get a SELECT type. Since we use a UNION query + -- to get Views that we can read (i.e. select) - then lets fill this columns with SELECT + -- value to keep the backward-compatibility + COALESCE ('SELECT') AS privilege_type + FROM pg_class c + JOIN pg_namespace n + ON n.oid = relnamespace + JOIN pg_roles r + ON r.oid = relowner, + Unnest(COALESCE(relacl::text[], Format('{%s=arwdDxt/%s}', rolname, rolname)::text[])) acl, + Regexp_split_to_array(acl, '=|/') s + WHERE r.rolname = ? + AND nspname = 'public' + -- 'm' means Materialized View + AND c.relkind = 'm' + AND ( + -- all grants + c.relacl IS NULL + -- read grant + OR s[2] = 'r'); + """); + final String username = database.getDatabaseConfig().get("username").asText(); + ps.setString(1, username); + ps.setString(2, username); return ps; }, sourceOperations::rowToJson) .collect(toSet()) diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java index c5a3709631f8e..9b8b0a634324e 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java @@ -31,6 +31,7 @@ public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest { private static final String STREAM_NAME = "public.id_and_name"; private static final String STREAM_NAME2 = "public.starships"; + private static final String STREAM_NAME_MATERIALIZED_VIEW = "public.testview"; private PostgreSQLContainer container; private JsonNode config; @@ -67,6 +68,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));"); ctx.fetch("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); + ctx.fetch("CREATE MATERIALIZED VIEW testview AS select * from id_and_name where id = '2';"); return null; }); @@ -113,6 +115,15 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() { STREAM_NAME2, Field.of("id", JsonSchemaPrimitive.NUMBER), Field.of("name", JsonSchemaPrimitive.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + STREAM_NAME_MATERIALIZED_VIEW, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); } diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 3695a3316c1d7..84350062df93e 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -257,6 +257,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |:--------|:-----------|:-------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------| +| 0.4.1 | 2022-01-05 | [9116](https://github.com/airbytehq/airbyte/pull/9116) | Added materialized views processing | | 0.4.0 | 2021-12-13 | [8726](https://github.com/airbytehq/airbyte/pull/8726) | Support all Postgres types | | 0.3.17 | 2021-12-01 | [8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key | | 0.3.16 | 2021-11-28 | [7995](https://github.com/airbytehq/airbyte/pull/7995) | Fixed money type with amount > 1000 |