-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Track state message counts during syncs #15526
Conversation
db6de06
to
0280965
Compare
properties: | ||
recordsEmitted: | ||
type: integer | ||
bytesEmitted: | ||
type: integer | ||
stateMessagesEmitted: # TODO make required once per-stream state messages are supported in V2 | ||
sourceStateMessagesEmitted: # TODO make required once per-stream state messages are supported in V2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the name of this property because this was counting the state messages emitted from a source connector, not the total number of state messages emitted. I updated additionalProperties to true because I think that is needed to ensure that we don't have issues with older data that have the stateMessagesEmitted
property, but let me know if I'm mistaken. cc: @benmoriceau since I think you might have experience with this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 on the name change - please also update the PRD and tech spec to match
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 from me! It's great that the event handlers (handleSourceEmittedState) already exist!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, a couple of questions/nit.
properties: | ||
recordsEmitted: | ||
type: integer | ||
bytesEmitted: | ||
type: integer | ||
stateMessagesEmitted: # TODO make required once per-stream state messages are supported in V2 | ||
sourceStateMessagesEmitted: # TODO make required once per-stream state messages are supported in V2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OPT: I would remove this todo. I don't think we benefit from marking this attribute as required so I would not add this constraint.
@@ -129,8 +131,10 @@ private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage) | |||
* correctly. | |||
*/ | |||
private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) { | |||
log.info("source emitted state message"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if logging a message per StateMessage is a bit too noisy for info. Does it make sense to have that in debug?
@@ -150,6 +154,9 @@ private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) { | |||
* committed in the {@link StateDeltaTracker}. Also record this state as the last committed state. | |||
*/ | |||
private void handleDestinationEmittedState(final AirbyteStateMessage stateMessage) { | |||
log.info("destination emitted state message"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
@@ -441,7 +441,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException { | |||
when(messageTracker.getDestinationOutputState()).thenReturn(Optional.of(new State().withState(expectedState))); | |||
when(messageTracker.getTotalRecordsEmitted()).thenReturn(12L); | |||
when(messageTracker.getTotalBytesEmitted()).thenReturn(100L); | |||
when(messageTracker.getTotalStateMessagesEmitted()).thenReturn(3L); | |||
when(messageTracker.getTotalSourceStateMessagesEmitted()).thenReturn(3L); | |||
when(messageTracker.getTotalDestinationStateMessagesEmitted()).thenReturn(3L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use a different value to make sure we do not confuse the two fields. (bad copy paste typically)
@@ -109,6 +109,8 @@ public static ImmutableMap<String, Object> generateJobAttemptMetadata(final Job | |||
metadata.put("duration", Math.round((syncSummary.getEndTime() - syncSummary.getStartTime()) / 1000.0)); | |||
metadata.put("volume_mb", syncSummary.getBytesSynced()); | |||
metadata.put("volume_rows", syncSummary.getRecordsSynced()); | |||
metadata.put("count_state_messages_from_source", syncSummary.getTotalStats().getSourceStateMessagesEmitted()); | |||
metadata.put("count_destination_messages_from_source", syncSummary.getTotalStats().getDestinationStateMessagesEmitted()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should source
be removed from this name and/or be count_state_messages_from_destination
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes! thank you for noticing this
@@ -111,6 +112,8 @@ class JobTrackerTest { | |||
.put("duration", SYNC_DURATION) | |||
.put("volume_rows", SYNC_RECORDS_SYNC) | |||
.put("volume_mb", SYNC_BYTES_SYNC) | |||
.put("count_state_messages_from_source", 1L) | |||
.put("count_destination_messages_from_source", 1L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above: is this name correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add count of state messages emitted from source & destination to Segment tracking