-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CDK: add support for streams with state attribute #9746
Conversation
|
@eugene-kulak what is the motivation for this change? How is it intended to be used? We also need to add docs for any such change to the CDK |
@sherifnada the motivation partially explained in the ticket itself, this is rather an alpha version of this feature that doesn't require any change from Stream, but I will update docstrings. Do you think it is worth mention in the end user docs? |
added IncrementalMixin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One comment on maintaining an invariant
def _checkpoint_state(self, stream_name, stream_state, connector_state, logger): | ||
logger.info(f"Setting state of {stream_name} stream to {stream_state}") | ||
connector_state[stream_name] = stream_state | ||
def _checkpoint_state(self, stream, stream_state, connector_state): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@keu could we maintain the invariant that the state output from this method is always the same as stream.state
? I think the only thing I'm concerned about is that if I define both stream.state
and get_updated_state
then there are two potentially different states floating around which will lead to confusing behavior.
Can we always maintain the invariant that whatever is stored in stream.state
contains the state object being output?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sherifnada I'm not sure how we can achieve this, _checkpoint_state
will always return value from stream.state
if there is any, if not it will fallback to the state obtained from get_updated_state
,
so what is the problem here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the contract is IncrementalMixin
implementation always takes precedence over get_updated_state
? sounds fine w me.
Should we add this to the docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, will do
@@ -20,6 +20,40 @@ def package_name_from_class(cls: object) -> str: | |||
return module.__name__.split(".")[0] | |||
|
|||
|
|||
class IncrementalMixin(ABC): | |||
"""Mixing to make stream incremental. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"""Mixing to make stream incremental. | |
"""Mixin to make stream incremental. |
@@ -137,7 +171,7 @@ def state_checkpoint_interval(self) -> Optional[int]: | |||
return None | |||
|
|||
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's put a deprecation notice?
def _checkpoint_state(self, stream_name, stream_state, connector_state, logger): | ||
logger.info(f"Setting state of {stream_name} stream to {stream_state}") | ||
connector_state[stream_name] = stream_state | ||
def _checkpoint_state(self, stream, stream_state, connector_state): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the contract is IncrementalMixin
implementation always takes precedence over get_updated_state
? sounds fine w me.
Should we add this to the docs?
/publish-cdk dry-run=true
|
Codecov Report
@@ Coverage Diff @@
## master #9746 +/- ##
=========================================
Coverage ? 66.83%
=========================================
Files ? 6
Lines ? 603
Branches ? 0
=========================================
Hits ? 403
Misses ? 200
Partials ? 0 Continue to review full report at Codecov.
|
/publish-cdk dry-run=false
|
What
Update
AbstractSource
to support streams that have thestate
attribute, use it to set and retrieve state.How
Updated
AbstractSource._read_incremental
andAbstractSource._checkpoint_state
to try first usestate
attribute and handleAttributeError
if it doesn't exist.Also fixed timer usage.
I didn't use
hasattr
because it will trigger property call.I didn't use
dir
because it doesn't support dynamic attributes (if we will have any)Recommended reading order
abstract_source.py
test_abstract_source.py