Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Different serialization behaviour for Postgres hstore type between normal and CDC sync #11953

Closed
Tolsto opened this issue Apr 12, 2022 · 3 comments · Fixed by #13367
Closed
Assignees
Labels
autoteam community type/bug Something isn't working

Comments

@Tolsto
Copy link

Tolsto commented Apr 12, 2022

Environment

  • Airbyte version: 0.35.65-alpha
  • OS Version / Instance: Ubuntu 20.04
  • Deployment: Docker
  • Source Connector and version: Postgres 0.4.9
  • Destination Connector and version: Snowflake 0.4.24
  • Severity: Normal
  • Step where error happened: Sync Job

Current Behavior

The Postgres source connector shows a different serialization behaviour for the hstore type between normal and CDC syncs.
Consider the following source data where the payload column is of type hstore:

postgres=> SELECT * from public.typetest
 id |       payload
----+----------------------
  2 | "my_key"=>"my value"
(1 row)

When using CDC, the Postgres source connector will serialize the hstore as a string with value {"my_key":"my value"}.
However, when using a source without enabled CDC the serialized string has the value "my_key"=>"my value"

Expected Behavior

The values should be consistent between the two different sync strategies.
The JSON string that the CDC source generates should be more desirable as it is easier to parse. It might even be best to serialize it as an object and not as a string as it could then directly be mapped to a VARIANT in Snowflake without an additional custom transformation that parses the varchar.

Steps to Reproduce

  1. Create a Postgres source table with a column of type hstore and some sample data.
  2. Set up a replication slot with the wal2json plugin and publication
  3. Create a source in Airbyte that uses the Postgres connector and CDC
  4. Create a second source in Airbyte that uses the Postgres connector and normal sync
  5. Create connections for the two sources and sync with a destination
@Tolsto Tolsto added needs-triage type/bug Something isn't working labels Apr 12, 2022
@bleonard bleonard added autoteam team/tse Technical Support Engineers labels Apr 26, 2022
@grishick grishick added team/databases and removed team/tse Technical Support Engineers labels May 10, 2022
@yurii-bidiuk yurii-bidiuk self-assigned this May 27, 2022
@yurii-bidiuk
Copy link
Contributor

I added some test data with type hstore to Postgres DB:
Test data: '"weight"=>"11.2 ounces", "ISBN-13"=>"978-1449370000", "language"=>"English", "paperback"=>"243", "publisher"=>"postgresqltutorial.com"'
hstore1
hstore2

How hstore type is represented now by source-postgres:
Normal mode: it's represented at the same way as it stored in DB (key-value) - "weight"=>"11.2 ounces", "ISBN-13"=>"978-1449370000", "language"=>"English", "paperback"=>"243", "publisher"=>"postgresqltutorial.com"
CDC: it's represented in JSON format: {"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}

An acceptance criteria for this ticket is to represent hstore type as JSON format for normal sync mode.

For normal mode all Postgres values are mapped on PostgresSourceOperations.setJsonField(). Theoretically to represent hstore values as JSON, we need to add one more case to switch block, something like:

...
case HSTORE -> putHstoreAsJson(json, columnName, resultSet, colIndex);
...

or add one more if-else condition:

...
if (columnTypeName.equalsIgnoreCase("hstore")) {
      putHstoreAsJson(json, columnName, resultSet, colIndex);
} else {
...

But there is a problem with hstore type which makes these approaches impossible to implement for this case . There is no way to find out that column has hstore type from ResultSet or PgResultSetMetaData. PgResultSetMetaData.getColumnTypeName() and PgResultSetMetaData.getColumnType() return varchar for columns with type hstore. So, we cannot handle that column has hstore type inside setJsonField() method.

Workaround:
It's impossible to handle that column has hstore type inside setJsonField() method, but it's possible on the DISCOVER phase and discoverInternal() method.

  1. Build JSON schema for hstore columns and set "airbyte_type":"hstore". Schema for this fields will look like
"some_hstore_column": {
            "type": "string",
            "airbyte_type": "hstore"
}
  1. Pass JSON schema to PostgresSourceOperations
  2. In PostgresSourceOperations.setJsonField() using provided JSON schema verify that column has hstore type and map this value to JSON format

Note: this approach will be working only for new connections and won't work for already configured connection. To make it work for existing connections, customer should refresh source schema on UI.

My pessimistic estimation for this implementation is 13 SP as we need to change the build of JSON schema and keep it compatible with other JDBC source-connectors.

@grishick could you please share your opinion? Are these steps OK for you?

cc: @tuliren @edgao @alexandr-shegeda

@grishick
Copy link
Contributor

grishick commented Jun 1, 2022

@yurii-bidiuk have you looked into using PgResultSetMetaData::getColumnClassName ? If I am reading the code of Postgres JDBC driver correctly it should return java.util.Map and that should be an indication that this column is to be encoded as a map.

@yurii-bidiuk
Copy link
Contributor

@grishick my apologies. By my mistake, I used the wrong column to verify hstore type, that's why it always returned 'varchar' instead of 'hstore'.
Here is PR for this ticket: #11367

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
autoteam community type/bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants