Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track state message counts during syncs #15526

Merged
merged 4 commits into from
Aug 11, 2022

Conversation

alovew
Copy link
Contributor

@alovew alovew commented Aug 10, 2022

Add count of state messages emitted from source & destination to Segment tracking

@github-actions github-actions bot added area/platform issues related to the platform area/scheduler area/server area/worker Related to worker labels Aug 10, 2022
@alovew alovew force-pushed the anne/count-state-messages-tracking branch from db6de06 to 0280965 Compare August 10, 2022 19:43
@alovew alovew temporarily deployed to more-secrets August 10, 2022 19:45 Inactive
@terencecho terencecho temporarily deployed to more-secrets August 10, 2022 21:07 Inactive
properties:
recordsEmitted:
type: integer
bytesEmitted:
type: integer
stateMessagesEmitted: # TODO make required once per-stream state messages are supported in V2
sourceStateMessagesEmitted: # TODO make required once per-stream state messages are supported in V2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the name of this property because this was counting the state messages emitted from a source connector, not the total number of state messages emitted. I updated additionalProperties to true because I think that is needed to ensure that we don't have issues with older data that have the stateMessagesEmitted property, but let me know if I'm mistaken. cc: @benmoriceau since I think you might have experience with this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 on the name change - please also update the PRD and tech spec to match

@alovew alovew requested a review from benmoriceau August 10, 2022 22:41
Copy link
Contributor

@evantahler evantahler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 from me! It's great that the event handlers (handleSourceEmittedState) already exist!

Copy link
Contributor

@gosusnp gosusnp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, a couple of questions/nit.

properties:
recordsEmitted:
type: integer
bytesEmitted:
type: integer
stateMessagesEmitted: # TODO make required once per-stream state messages are supported in V2
sourceStateMessagesEmitted: # TODO make required once per-stream state messages are supported in V2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OPT: I would remove this todo. I don't think we benefit from marking this attribute as required so I would not add this constraint.

@@ -129,8 +131,10 @@ private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage)
* correctly.
*/
private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) {
log.info("source emitted state message");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if logging a message per StateMessage is a bit too noisy for info. Does it make sense to have that in debug?

@@ -150,6 +154,9 @@ private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) {
* committed in the {@link StateDeltaTracker}. Also record this state as the last committed state.
*/
private void handleDestinationEmittedState(final AirbyteStateMessage stateMessage) {
log.info("destination emitted state message");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

@@ -441,7 +441,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
when(messageTracker.getDestinationOutputState()).thenReturn(Optional.of(new State().withState(expectedState)));
when(messageTracker.getTotalRecordsEmitted()).thenReturn(12L);
when(messageTracker.getTotalBytesEmitted()).thenReturn(100L);
when(messageTracker.getTotalStateMessagesEmitted()).thenReturn(3L);
when(messageTracker.getTotalSourceStateMessagesEmitted()).thenReturn(3L);
when(messageTracker.getTotalDestinationStateMessagesEmitted()).thenReturn(3L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use a different value to make sure we do not confuse the two fields. (bad copy paste typically)

@@ -109,6 +109,8 @@ public static ImmutableMap<String, Object> generateJobAttemptMetadata(final Job
metadata.put("duration", Math.round((syncSummary.getEndTime() - syncSummary.getStartTime()) / 1000.0));
metadata.put("volume_mb", syncSummary.getBytesSynced());
metadata.put("volume_rows", syncSummary.getRecordsSynced());
metadata.put("count_state_messages_from_source", syncSummary.getTotalStats().getSourceStateMessagesEmitted());
metadata.put("count_destination_messages_from_source", syncSummary.getTotalStats().getDestinationStateMessagesEmitted());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should source be removed from this name and/or be count_state_messages_from_destination?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! thank you for noticing this

@@ -111,6 +112,8 @@ class JobTrackerTest {
.put("duration", SYNC_DURATION)
.put("volume_rows", SYNC_RECORDS_SYNC)
.put("volume_mb", SYNC_BYTES_SYNC)
.put("count_state_messages_from_source", 1L)
.put("count_destination_messages_from_source", 1L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above: is this name correct?

@alovew alovew temporarily deployed to more-secrets August 11, 2022 17:27 Inactive
Copy link
Contributor

@jdpgrailsdev jdpgrailsdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

@alovew alovew temporarily deployed to more-secrets August 11, 2022 18:50 Inactive
@alovew alovew merged commit 6b59236 into master Aug 11, 2022
@alovew alovew deleted the anne/count-state-messages-tracking branch August 11, 2022 19:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/platform issues related to the platform area/scheduler area/server area/worker Related to worker
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants