-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SQL Sources should include previous sync's cursor value in SELECT statement as not to skip rows added mid-sync #14732
Comments
NOW()
-aware
NOW()
-aware
Hey team! Please add your planning poker estimate with ZenHub @edgao @ryankfu @subodh1810 @tuliren |
The
which will be skipped with a query with |
We will probably need to also sort by the cursor column for this solution. There's added db load. |
Oh, right. I missed the sorting part in the The chosen solution, solution 1, can lead to lots of duplications. Imagine a connection that runs every 5 minutes, if the source database has no updates, the destination will get 12x duplicated records for the cursor value per hour. There are a few ways to reduce the amount of duplications.
(Another approach is to exclude the max cursor value in the current sync, i.e. query with So maybe we should just implement solution 2. |
Actually solution 2 is very hard to implement with our current code structure. Right now, we chain together a series of iterators ahead of time. Each iterator is created based on one concrete SQL query. To implement solution 2, we need to keep firing queries inside the iterator until it reaches an offset with no data. If we go with this solution, it will be significantly more complicated than a few hours of work. Update. Actually we can get the total number of records first, and pre-define the offsets. So this is not a problem. |
I'm still not sure how sorting helps here - what happens if there are three records, all with updated_at set to (also, what if in between the first and second syncs, one of the records gets updated to have updated_at=2022-08-06? then our stored offset of 3 would be explicitly wrong) I actually kind of like the hash-based approach - pretty sure hashing is fast (probably much faster than IO, assuming we use a reasonable hash algo). Just not sure how complicated that implementation would be. |
Clarification question: The hash-based approach is to prevent the source from sending duplicate data it's already sent, correct? However, we have a the ability in the normalizer to de-dupe data already. Could we split this problem into 2 phases:
Asked another way, do Airbyte users expect at-least once delivery or at-most once delivery of records? I'm worried that the hash based approach could be a memory problem. Many times in the past I've updated all million |
When I first saw airbyte, I actually thought the |
Me too. We should totally leverage this ID to do more. But at this point, it is not trivial to change this behavior.
The de-dup mode does that. However, for people using the non-dedup incremental mode, this is not possible / there is no cheap solution. |
Yes.
Yes. 1) is already taken care of by the normalization, if the user chooses the de-dup mode. The solutions discussed here are mainly concerned with 2).
At least once.
Yes, OOME can be a real problem. The implementation should only keep a constant number of things in memory. I think we can tolerate false positive, i.e. data is duplicated with a small probability. |
Agree that sorting won't help. The only sort column we can rely on is the cursor column. Other columns may not be indexed at all. |
@sherifnada, maybe it's time for Update. Given that this is a relatively rare case, introducing |
TODOs for storing the number of records that has the cursor value:
|
@liren Tu (Airbyte) thanks for the presentation yesterday. Kept thinking about it and I'm wondering if this approach would help (for postgres only)? disclaimer, when i was thinking through this, i was just thinking about postgres. i understand that the purpose of this issue is to find a default implementation just for postgres. that said in the presentation yesterday there was some reference to why using xmin wouldn't work for even postgres that I wanted to understand. My understanding is xmin is the last transaction number to affect a row. It is always ascending (and wraps around). So if in your state for a given cursor value, you saved the min xmin (the minimum xmin that was synced for that cursor value) and the max xmin (the maximum xmin that was synced for that cursor value), I think you can always* accurately detect when there are new records. Here's the algorithm I'm thinking about. Algorithm to handle simple case:
On the next sync check the max xmin in table for records with that cursor. If This breaks in the case where the xmin wraps around... Algorithm to handle wrap around case:
when we query the table again we find that xmin has wrapped around:
The record with *The one case that this still would fail on would be if the xmin wrapped around AND got inside the bounds of |
replying in #5022 (comment) to keep postgres-specific discussion in its own issue |
Work left:
|
Description
Consider the following
users
table.A common sync would be to create in incremental sync, using the
updated_at
timestamp column + de-duped history. As users are added or updated theupdated_at
column is changed and the next sync will pick them up as the cursor can only move forward in time.There's a problem however at the boundary of the sync. If the sync itself was running at
2022-01-03 00:00:00
, right when a new user was added, they would be not be included in the sync:We won't pick up user 4 (at least until something about them changes and their
updated_at
is bumped) because in subsequent syncs, we use the greater-than comparison to choose which records to import. This can lead to missing records if they were inserted as the sync was running.Local confirmation
To confirm this behavior, create a source table with the 3 rows listed in the first example, and run the sync. You will have 3 records in your destination. Then add a 4th row with the an
updated_at
matching the current cursor ("2022-01-03 00:00:00" in this example). Sync again, and see that user 4 did not make it to the destination.Possible Solutions
Rather than selecting records from SQL sources matching
"SELECT %s FROM %s WHERE %s > ?"
, we could use>=
, e.g."SELECT %s FROM %s WHERE %s >= ?"
. This solution is the most robust because it also works for numeric (vs time) cursors as well. This will have the negative side effect of almost always creating a duplicate entry (the previous sync's most recent record is very likely to be returned again at the start of a subsequent sync), and normalization will need to deal with it. This is not a problem with de-duped history, but is weird in all other sync modes.Building from solution 1 above (using
>=
), we could store both the cursor value and the offset in the STATE. This solves the duplicate row problem introduced above. The state stored could remember the previous sync's cursor and OFFSET. In this way, the next sync could continue from the same cursor position + offset to be sure that now new records appeared within the same cursor that we haven't seen yet:So, the state for an incremental SQL cursor source should look like:
Probably bad solutions
AND ${cursor} >= NOW() - INTERVAL '2 seconds'
. In this way, we would ignore "too recent" values that might be inserted at the time of the sync. This doesn't solve the problem with numeric cursors, only "time"-like cursors.Misc
This approach assumes that our source database have an implicit, static sort which will return rows in the same order without needing an
ORDER BY
. This is true for postgres and mysql... is it true for all DB sources? We may also need to add anORDER BY
clause.The text was updated successfully, but these errors were encountered: