-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source Zendesk Chat: Process large amount of data in batches for incremental #13387
Source Zendesk Chat: Process large amount of data in batches for incremental #13387
Conversation
"api_pagination_limit": { | ||
"type": "integer", | ||
"title": "Api Pagination Limit (0=No Limit)", | ||
"description": "Limit up to how many pages will be queried in the API to force the writing of data", | ||
"default": 0, | ||
"minimum": 0 | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RobertoBonnet this should not be changed only after 100k requests? Adding this to users will create an overhead process here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your point is to not allow this limitation to be configurable by user? With 0 the limitations doesn't exist. Yesterday we processed the data with a limitation of 15k. Around of 130 GB was processed with 23M of rows in 24h. After that, the steam state wans't updated. Maybe there is a problem with big query destination to long runs. In another day we tried do process without limitation. It took 5 days and an erro ocurrered with invalid response. We have lost 5 days of processing data. What is your suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My question is why not implement this #12591 (comment) in code instead give a complex configuration to users?
- Limit to 1000 pages the sync and finish? not the best because users will think the connector has problem and will ask why data is missing. maybe one solution will add a warning in documentation
- After 1000 pages, reset the connection and wait until the server is ok to start getting data again?
- Implement in the connector the change of request/min after 1000 so users don't need to worry about it
@sherifnada can you give your opinion here? this is a problem that impacts users will large Zendesk accounts that got stuck because of Zendesk side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changed that I have commented in these issue does not affect Zendesk Chat. The objective of this PR would be to have pieces of data without having to wait for the whole process. In zendesk support, for example, it would take 14 days to process everything at once. On Zendesk Chat we were processing for 5 days and we couldn't
@RobertoBonnet btw, thanks for all help in Zendesk connectors! The discussion is to find the best approach to solve problem align with CDK development. |
@marcosmarxm sure. No problem. You have a good point. Here at hurb we are using our Connector of Zendesk Chat (with theses changes in this PR) and Zendesk Support (We have to make some changes to be able to process a lot of data. I haven't opened a PR yet. I would need to mature some things and analyze the changes you guys have recently uploaded. Zendesk Support I have already managed to update the data. look at the amount |
airbyte-integrations/connectors/source-zendesk-chat/source_zendesk_chat/streams.py
Show resolved
Hide resolved
@@ -104,11 +104,21 @@ def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime: | |||
|
|||
|
|||
class TimeIncrementalStream(BaseIncrementalStream, ABC): | |||
def __init__(self, start_date, **kwargs): | |||
state_checkpoint_interval = 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RobertoBonnet I believe this is the only line necessary in the current PR. Since this connector wasn't checkpointing previously, a halfway failure causes all progress to be lost. But with this line, the connector will "save" its progress every 1000 records. in that case even if a halfway failure occurs, then we don't lose data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sherifnada @marcosmarxm It makes sense. Increasing the limit of records that we will process for each request along with the checkpoints will help a lot. Could I do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it also possible to use this no IdBased incremental streams? The docs don't mention anything on whether those records are returned in ascending ID order as far as I can see. But it seems really surprising if they don't.
If records are returned in Ascending order of the ID, we may be able to achieve the same effect by putting this line in BaseIncrementalStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sherifnada These routes that I changed are based on the record update date
@@ -104,11 +104,21 @@ def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime: | |||
|
|||
|
|||
class TimeIncrementalStream(BaseIncrementalStream, ABC): | |||
def __init__(self, start_date, **kwargs): | |||
state_checkpoint_interval = 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it also possible to use this no IdBased incremental streams? The docs don't mention anything on whether those records are returned in ascending ID order as far as I can see. But it seems really surprising if they don't.
If records are returned in Ascending order of the ID, we may be able to achieve the same effect by putting this line in BaseIncrementalStream
@marcosmarxm @sherifnada I'm trying to run a fullrefresh with this version. |
|
|
/test connector=connectors/source-zendesk-chat
Build FailedTest summary info:
|
/test connector=connectors/source-zendesk-chat
Build FailedTest summary info:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RobertoBonnet I executed the merge and some changes in your contribution #14214. Thanks so much for this, hope to see more.
Closed here because this was merged in #14214 |
@marcosmarxm OK. I was traveling and had no opportunity to follow what was happening. I'm going back to work today |
You can update to version 0.1.8 and see your changes there. |
What
Making able to process a large amount of data more smothly, making data available faster. #13379
How
Making possible to configure a limit of maximum paginations will be processed for incremental data
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesTests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.