Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL Sources should include previous sync's cursor value in SELECT statement as not to skip rows added mid-sync #14732

Closed
evantahler opened this issue Jul 14, 2022 · 16 comments · Fixed by #15535
Assignees

Comments

@evantahler
Copy link
Contributor

evantahler commented Jul 14, 2022

Description

Consider the following users table.

id email name created_at updated_at
1 evan@example.com Evan 2022-01-01 00:00:00 2022-01-01 00:00:00
2 brian@example.com Brian 2022-01-02 00:00:00 2022-01-02 00:00:00
3 andy@example.com Andy 2022-01-03 00:00:00 2022-01-03 00:00:00

A common sync would be to create in incremental sync, using the updated_at timestamp column + de-duped history. As users are added or updated the updated_at column is changed and the next sync will pick them up as the cursor can only move forward in time.

There's a problem however at the boundary of the sync. If the sync itself was running at 2022-01-03 00:00:00, right when a new user was added, they would be not be included in the sync:

id email name created_at updated_at
1 evan@example.com Evan 2022-01-01 00:00:00 2022-01-01 00:00:00
2 brian@example.com Brian 2022-01-02 00:00:00 2022-01-02 00:00:00
3 andy@example.com Andy 2022-01-03 00:00:00 2022-01-03 00:00:00
4 greg@example.com Greg 2022-01-03 00:00:00 2022-01-03 00:00:00

We won't pick up user 4 (at least until something about them changes and their updated_at is bumped) because in subsequent syncs, we use the greater-than comparison to choose which records to import. This can lead to missing records if they were inserted as the sync was running.

Local confirmation

To confirm this behavior, create a source table with the 3 rows listed in the first example, and run the sync. You will have 3 records in your destination. Then add a 4th row with the an updated_at matching the current cursor ("2022-01-03 00:00:00" in this example). Sync again, and see that user 4 did not make it to the destination.

Possible Solutions

  1. Rather than selecting records from SQL sources matching "SELECT %s FROM %s WHERE %s > ?", we could use >=, e.g. "SELECT %s FROM %s WHERE %s >= ?". This solution is the most robust because it also works for numeric (vs time) cursors as well. This will have the negative side effect of almost always creating a duplicate entry (the previous sync's most recent record is very likely to be returned again at the start of a subsequent sync), and normalization will need to deal with it. This is not a problem with de-duped history, but is weird in all other sync modes.

  2. Building from solution 1 above (using >=), we could store both the cursor value and the offset in the STATE. This solves the duplicate row problem introduced above. The state stored could remember the previous sync's cursor and OFFSET. In this way, the next sync could continue from the same cursor position + offset to be sure that now new records appeared within the same cursor that we haven't seen yet:

SELECT * from users LIMIT 100 # got some records
SELECT * from users LIMIT 100 OFFSET 3 # got 0 records after using offset
# first sync ends, cursor is {value: "2022-01-03 00:00:00" + offset: 0}

# Next sync starts
SELECT * from users WHERE updated_at >= "2022-01-03 00:00:00" LIMIT 100 OFFSET 1 # use previous offset + 1 # 1 new row found!

So, the state for an incremental SQL cursor source should look like:

{ users: { offset: 0, cursor: "2022-01-03 00:00:00"}}

Probably bad solutions

  1. If we could detect that the cursor was a time field, we could append AND ${cursor} >= NOW() - INTERVAL '2 seconds'. In this way, we would ignore "too recent" values that might be inserted at the time of the sync. This doesn't solve the problem with numeric cursors, only "time"-like cursors.
  2. Introduce a sleep and one more read before the sync is determined to be complete, e.g.:
SELECT * from users LIMIT 100 # got some records
SELECT * from users LIMIT 100 OFFSET 3 # got 0 records after using offset
# SLEEP 2. Sleep enough time to ensure that the cursor's minimum resolution (often 1 second) has passed
SELECT * from users LIMIT 100  OFFSET 3 # got 1 record that was inserted during the sleep
# LOOP or END if still 0 rows
  1. Don't ever send any RECORDS from the latest cursor value. You'll never see any duplicates, but the destination will always be some level of "behind" the source

Misc

This approach assumes that our source database have an implicit, static sort which will return rows in the same order without needing an ORDER BY. This is true for postgres and mysql... is it true for all DB sources? We may also need to add an ORDER BY clause.

@evantahler evantahler added type/bug Something isn't working needs-triage labels Jul 14, 2022
@evantahler evantahler changed the title SQL Sources should include previous sync's cursor value in SELECT statement SQL Sources should include previous sync's cursor value in SELECT statement or be time-aware Jul 14, 2022
@evantahler evantahler changed the title SQL Sources should include previous sync's cursor value in SELECT statement or be time-aware SQL Sources should include previous sync's cursor value in SELECT statement or be NOW()-aware Jul 14, 2022
@evantahler evantahler changed the title SQL Sources should include previous sync's cursor value in SELECT statement or be NOW()-aware SQL Sources should include previous sync's cursor value in SELECT statement as not to skip rows added mid-sync Jul 14, 2022
@grishick
Copy link
Contributor

Hey team! Please add your planning poker estimate with ZenHub @edgao @ryankfu @subodh1810 @tuliren

@tuliren
Copy link
Contributor

tuliren commented Jul 26, 2022

The OFFSET approach, the second possible solution, probably does not work. The return order of the records is unknown other than the updated_at. So it is possible that the newly added record shows up as the first row:

id email name created_at updated_at
1 greg@example.com Greg 2022-01-03 00:00:00 2022-01-03 00:00:00
2 andy@example.com Andy 2022-01-03 00:00:00 2022-01-03 00:00:00

which will be skipped with a query with OFFSET 1.

@evantahler
Copy link
Contributor Author

evantahler commented Jul 28, 2022

The return order of the records is unknown other than the updated_at

We will probably need to also sort by the cursor column for this solution. There's added db load.

@tuliren tuliren self-assigned this Aug 3, 2022
@tuliren
Copy link
Contributor

tuliren commented Aug 4, 2022

We will probably need to also sort by the cursor column for this solution. There's added db load.

Oh, right. I missed the sorting part in the MISC section.


The chosen solution, solution 1, can lead to lots of duplications. Imagine a connection that runs every 5 minutes, if the source database has no updates, the destination will get 12x duplicated records for the cursor value per hour.

There are a few ways to reduce the amount of duplications.

  • Track the number of rows synced together with the cursor value. So the state becomes { users: { count: 3, cursor: "2022-01-03 00:00:00"}}. Each time before the incremental sync, count the number of rows with cursorColumn = cursorValue. If the result is different from the one in the state, we know there are changes, and use >= for the incremental query. Otherwise, use > to avoid the duplication.
  • The above approach has one flaw. If someone removed one row and added another, the row count will remain the same, but the data is different. A more precise approach is to store not just the count, but a hash of all rows with cursorColumn = cursorValue. The state becomes { users: { hash: "c2RmO2xqYXM7bG", cursor: "2022-01-03 00:00:00"}} Only query with >= if the hash is different. The downside of this approach is that we need to so lots of hashing on the fly.

(Another approach is to exclude the max cursor value in the current sync, i.e. query with cursorColumn >= cursorValue and cursorColumn < currentMaxCursorValue. The data rows with the currentMaxCursorValue will be synced in full next time. However, after a second thought, this solution does not work because the destination can never get the data with currentMaxCursorValue, which will be pretty frustrating for the users.)

So maybe we should just implement solution 2.

@tuliren
Copy link
Contributor

tuliren commented Aug 5, 2022

Actually solution 2 is very hard to implement with our current code structure. Right now, we chain together a series of iterators ahead of time. Each iterator is created based on one concrete SQL query. To implement solution 2, we need to keep firing queries inside the iterator until it reaches an offset with no data. If we go with this solution, it will be significantly more complicated than a few hours of work.


Update. Actually we can get the total number of records first, and pre-define the offsets. So this is not a problem.

@edgao
Copy link
Contributor

edgao commented Aug 5, 2022

I'm still not sure how sorting helps here - what happens if there are three records, all with updated_at set to 2022-08-05, we run a sync on Aug 5, and while we're running that sync, a new record is inserted? Is there any guarantee that on a subsequent sync, that new record will be sorted after the older records?

(also, what if in between the first and second syncs, one of the records gets updated to have updated_at=2022-08-06? then our stored offset of 3 would be explicitly wrong)


I actually kind of like the hash-based approach - pretty sure hashing is fast (probably much faster than IO, assuming we use a reasonable hash algo). Just not sure how complicated that implementation would be.

@evantahler
Copy link
Contributor Author

evantahler commented Aug 5, 2022

Clarification question: The hash-based approach is to prevent the source from sending duplicate data it's already sent, correct? However, we have a the ability in the normalizer to de-dupe data already. Could we split this problem into 2 phases:

  1. (re-)send data from the latest cursor value and let the normalizer sort it out
  2. be smarter in the source about not re-sending data

Asked another way, do Airbyte users expect at-least once delivery or at-most once delivery of records?

I'm worried that the hash based approach could be a memory problem. Many times in the past I've updated all million users to add a new field or migrate something... and they all had the same updated_at value after that. There are solutions to this problem, like using a file locally to store "seen hashes" and persisting that to S3 between syncs, but I don't think storing all of that data in STATE is the right approach.

@bleonard
Copy link
Contributor

bleonard commented Aug 5, 2022

When I first saw airbyte, I actually thought the airbyte_id was a hash of the values and that's how it knew it already had that one when it got a duplicate from the >= (solution 1 approach). In general, is there a way to use >= and then discard results already in the destination?

@tuliren
Copy link
Contributor

tuliren commented Aug 5, 2022

When I first saw airbyte, I actually thought the airbyte_id was a hash of the values

Me too. We should totally leverage this ID to do more. But at this point, it is not trivial to change this behavior.

is there a way to use >= and then discard results already in the destination?

The de-dup mode does that. However, for people using the non-dedup incremental mode, this is not possible / there is no cheap solution.

@tuliren
Copy link
Contributor

tuliren commented Aug 5, 2022

The hash-based approach is to prevent the source from sending duplicate data it's already sent, correct?

Yes.

Could we split this problem into 2 phases:

  1. (re-)send data from the latest cursor value and let the normalizer sort it out
  2. be smarter in the source about not re-sending data

Yes. 1) is already taken care of by the normalization, if the user chooses the de-dup mode. The solutions discussed here are mainly concerned with 2).

do Airbyte users expect at-least once delivery or at-most once delivery of records?

At least once.

I'm worried that the hash based approach could be a memory problem.

Yes, OOME can be a real problem. The implementation should only keep a constant number of things in memory. I think we can tolerate false positive, i.e. data is duplicated with a small probability.

@tuliren
Copy link
Contributor

tuliren commented Aug 6, 2022

I'm still not sure how sorting helps here - what happens if there are three records, all with updated_at set to 2022-08-05, we run a sync on Aug 5, and while we're running that sync, a new record is inserted? Is there any guarantee that on a subsequent sync, that new record will be sorted after the older records?

Agree that sorting won't help. The only sort column we can rely on is the cursor column. Other columns may not be indexed at all.

@tuliren
Copy link
Contributor

tuliren commented Aug 6, 2022

@sherifnada, maybe it's time for HyperMinHash?


Update. Given that this is a relatively rare case, introducing HyperMinHash and the extra overhead does not seem justifiable. Storing the row count in the state message may be a good compromise.

@tuliren
Copy link
Contributor

tuliren commented Aug 10, 2022

TODOs for storing the number of records that has the cursor value:

@cgardens
Copy link
Contributor

@liren Tu (Airbyte) thanks for the presentation yesterday. Kept thinking about it and I'm wondering if this approach would help (for postgres only)?

disclaimer, when i was thinking through this, i was just thinking about postgres. i understand that the purpose of this issue is to find a default implementation just for postgres. that said in the presentation yesterday there was some reference to why using xmin wouldn't work for even postgres that I wanted to understand.

My understanding is xmin is the last transaction number to affect a row. It is always ascending (and wraps around). So if in your state for a given cursor value, you saved the min xmin (the minimum xmin that was synced for that cursor value) and the max xmin (the maximum xmin that was synced for that cursor value), I think you can always* accurately detect when there are new records. Here's the algorithm I'm thinking about.

Algorithm to handle simple case:

xmin, cursor
1, 2022-08-05
2, 2022-08-05

=> state = { cursor: 2022-08-05, xmin_max: 2 }

On the next sync check the max xmin in table for records with that cursor. If xmin in the table > xmin in the state then there are new rows to sync. otherwise, do nothing.

This breaks in the case where the xmin wraps around...

Algorithm to handle wrap around case:

xmin, cursor
200, 2022-08-05
201, 2022-08-05

=> state = { cursor: 2022-08-05, xmin_min: 1, xmin_max: 2 }

when we query the table again we find that xmin has wrapped around:

xmin, cursor
200, 2022-08-05
201, 2022-08-05
100, 2022-08-05

The record with xmin = 100 is new and we should sync it. In the previous algorithm it wouldn't detect it. Now that we have stored the extra xmin_min we should be able to deal with this. Check if xmin_max in the table for that cursor value = xmin_max in the state AND xmin_min in the table for that cursor value = xmin_min in the state . If that is true, then we can assume there are no new records to sync. Otherwise we should sync any records that fall outside of those bounds.

*The one case that this still would fail on would be if the xmin wrapped around AND got inside the bounds of xmin_min and xmin_max . I think this should be vanishingly unlikely though (and probably less likely than the edge case described yesterday?).

@edgao
Copy link
Contributor

edgao commented Aug 16, 2022

replying in #5022 (comment) to keep postgres-specific discussion in its own issue

@grishick
Copy link
Contributor

Work left:

  • rebase
  • re-run integration tests after rebase for all JDBC connectors
  • publish all JDBC connectors

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants