From 6b592368b721c15c0003da8e05f8ce3f8823f015 Mon Sep 17 00:00:00 2001 From: Anne <102554163+alovew@users.noreply.github.com> Date: Thu, 11 Aug 2022 12:38:37 -0700 Subject: [PATCH] Track state message counts during syncs (#15526) * Track state message counts from source and destination connectors during syncs - Add new field to SyncStats and update existing field --- .../src/main/resources/types/SyncStats.yaml | 8 ++++++-- .../job_tracker/TrackingMetadata.java | 2 ++ .../job_tracker/JobTrackerTest.java | 7 +++++++ .../server/converters/JobConverter.java | 4 ++-- .../server/converters/JobConverterTest.java | 4 ++-- .../general/DefaultReplicationWorker.java | 6 ++++-- .../internal/AirbyteMessageTracker.java | 18 +++++++++++++----- .../workers/internal/MessageTracker.java | 8 +++++--- .../scheduling/SyncCheckConnectionFailure.java | 5 +++-- .../general/DefaultReplicationWorkerTest.java | 18 ++++++++++++------ .../internal/AirbyteMessageTrackerTest.java | 2 +- 11 files changed, 57 insertions(+), 25 deletions(-) diff --git a/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml b/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml index 5c38885e6dc2f..6616996cb33d8 100644 --- a/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml +++ b/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml @@ -7,13 +7,17 @@ type: object required: - recordsEmitted - bytesEmitted -additionalProperties: false +additionalProperties: true properties: recordsEmitted: type: integer bytesEmitted: type: integer - stateMessagesEmitted: # TODO make required once per-stream state messages are supported in V2 + sourceStateMessagesEmitted: + description: Number of State messages emitted by the Source Connector + type: integer + destinationStateMessagesEmitted: + description: Number of State messages emitted by the Destination Connector type: integer recordsCommitted: type: integer # if unset, committed records could not be computed diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java index 7bbe227b19c0a..42db33fca844d 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java @@ -109,6 +109,8 @@ public static ImmutableMap generateJobAttemptMetadata(final Job metadata.put("duration", Math.round((syncSummary.getEndTime() - syncSummary.getStartTime()) / 1000.0)); metadata.put("volume_mb", syncSummary.getBytesSynced()); metadata.put("volume_rows", syncSummary.getRecordsSynced()); + metadata.put("count_state_messages_from_source", syncSummary.getTotalStats().getSourceStateMessagesEmitted()); + metadata.put("count_state_messages_from_destination", syncSummary.getTotalStats().getDestinationStateMessagesEmitted()); } } diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java index 970efef1c3000..56f5347677af9 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java @@ -37,6 +37,7 @@ import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.SyncStats; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.CatalogHelpers; @@ -111,6 +112,8 @@ class JobTrackerTest { .put("duration", SYNC_DURATION) .put("volume_rows", SYNC_RECORDS_SYNC) .put("volume_mb", SYNC_BYTES_SYNC) + .put("count_state_messages_from_source", 3L) + .put("count_state_messages_from_destination", 1L) .build(); private static final ImmutableMap SYNC_CONFIG_METADATA = ImmutableMap.builder() .put(JobTracker.CONFIG + ".source.key", JobTracker.SET) @@ -481,14 +484,18 @@ private Attempt getAttemptMock() { final JobOutput jobOutput = mock(JobOutput.class); final StandardSyncOutput syncOutput = mock(StandardSyncOutput.class); final StandardSyncSummary syncSummary = mock(StandardSyncSummary.class); + final SyncStats syncStats = mock(SyncStats.class); when(syncSummary.getStartTime()).thenReturn(SYNC_START_TIME); when(syncSummary.getEndTime()).thenReturn(SYNC_END_TIME); when(syncSummary.getBytesSynced()).thenReturn(SYNC_BYTES_SYNC); when(syncSummary.getRecordsSynced()).thenReturn(SYNC_RECORDS_SYNC); when(syncOutput.getStandardSyncSummary()).thenReturn(syncSummary); + when(syncSummary.getTotalStats()).thenReturn(syncStats); when(jobOutput.getSync()).thenReturn(syncOutput); when(attempt.getOutput()).thenReturn(java.util.Optional.of(jobOutput)); + when(syncStats.getSourceStateMessagesEmitted()).thenReturn(3L); + when(syncStats.getDestinationStateMessagesEmitted()).thenReturn(1L); return attempt; } diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index 4df3a51331b2a..b431dea76cd1d 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -158,7 +158,7 @@ private static AttemptStats getTotalAttemptStats(final Attempt attempt) { return new AttemptStats() .bytesEmitted(totalStats.getBytesEmitted()) .recordsEmitted(totalStats.getRecordsEmitted()) - .stateMessagesEmitted(totalStats.getStateMessagesEmitted()) + .stateMessagesEmitted(totalStats.getSourceStateMessagesEmitted()) .recordsCommitted(totalStats.getRecordsCommitted()); } @@ -175,7 +175,7 @@ private static List getAttemptStreamStats(final Attempt atte .stats(new AttemptStats() .bytesEmitted(streamStat.getStats().getBytesEmitted()) .recordsEmitted(streamStat.getStats().getRecordsEmitted()) - .stateMessagesEmitted(streamStat.getStats().getStateMessagesEmitted()) + .stateMessagesEmitted(streamStat.getStats().getSourceStateMessagesEmitted()) .recordsCommitted(streamStat.getStats().getRecordsCommitted()))) .collect(Collectors.toList()); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java index 5e37872494f55..047e5e65cdee1 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java @@ -99,14 +99,14 @@ class JobConverterTest { .withTotalStats(new SyncStats() .withRecordsEmitted(RECORDS_EMITTED) .withBytesEmitted(BYTES_EMITTED) - .withStateMessagesEmitted(STATE_MESSAGES_EMITTED) + .withSourceStateMessagesEmitted(STATE_MESSAGES_EMITTED) .withRecordsCommitted(RECORDS_COMMITTED)) .withStreamStats(Lists.newArrayList(new StreamSyncStats() .withStreamName(STREAM_NAME) .withStats(new SyncStats() .withRecordsEmitted(RECORDS_EMITTED) .withBytesEmitted(BYTES_EMITTED) - .withStateMessagesEmitted(STATE_MESSAGES_EMITTED) + .withSourceStateMessagesEmitted(STATE_MESSAGES_EMITTED) .withRecordsCommitted(RECORDS_COMMITTED)))))); private JobConverter jobConverter; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 4b5e26425a1ad..f1b7085493899 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -201,7 +201,8 @@ else if (hasFailed.get()) { final SyncStats totalSyncStats = new SyncStats() .withRecordsEmitted(messageTracker.getTotalRecordsEmitted()) .withBytesEmitted(messageTracker.getTotalBytesEmitted()) - .withStateMessagesEmitted(messageTracker.getTotalStateMessagesEmitted()); + .withSourceStateMessagesEmitted(messageTracker.getTotalSourceStateMessagesEmitted()) + .withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted()); if (outputStatus == ReplicationStatus.COMPLETED) { totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted()); @@ -217,7 +218,8 @@ else if (hasFailed.get()) { final SyncStats syncStats = new SyncStats() .withRecordsEmitted(messageTracker.getStreamToEmittedRecords().get(stream)) .withBytesEmitted(messageTracker.getStreamToEmittedBytes().get(stream)) - .withStateMessagesEmitted(null); // TODO (parker) populate per-stream state messages emitted once supported in V2 + .withSourceStateMessagesEmitted(null) + .withDestinationStateMessagesEmitted(null); if (outputStatus == ReplicationStatus.COMPLETED) { syncStats.setRecordsCommitted(messageTracker.getStreamToEmittedRecords().get(stream)); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index 1f603d72ce9fb..81d25e1ed0918 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -38,7 +38,8 @@ public class AirbyteMessageTracker implements MessageTracker { private final AtomicReference sourceOutputState; private final AtomicReference destinationOutputState; - private final AtomicLong totalEmittedStateMessages; + private final AtomicLong totalSourceEmittedStateMessages; + private final AtomicLong totalDestinationEmittedStateMessages; private final Map streamToRunningCount; private final HashFunction hashFunction; private final BiMap streamNameToIndex; @@ -71,7 +72,8 @@ public AirbyteMessageTracker() { protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, final StateAggregator stateAggregator) { this.sourceOutputState = new AtomicReference<>(); this.destinationOutputState = new AtomicReference<>(); - this.totalEmittedStateMessages = new AtomicLong(0L); + this.totalSourceEmittedStateMessages = new AtomicLong(0L); + this.totalDestinationEmittedStateMessages = new AtomicLong(0L); this.streamToRunningCount = new HashMap<>(); this.streamNameToIndex = HashBiMap.create(); this.hashFunction = Hashing.murmur3_32_fixed(); @@ -130,7 +132,7 @@ private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage) */ private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) { sourceOutputState.set(new State().withState(stateMessage.getData())); - totalEmittedStateMessages.incrementAndGet(); + totalSourceEmittedStateMessages.incrementAndGet(); final int stateHash = getStateHashCode(stateMessage); try { if (!unreliableCommittedCounts) { @@ -150,6 +152,7 @@ private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) { * committed in the {@link StateDeltaTracker}. Also record this state as the last committed state. */ private void handleDestinationEmittedState(final AirbyteStateMessage stateMessage) { + totalDestinationEmittedStateMessages.incrementAndGet(); stateAggregator.ingest(stateMessage); destinationOutputState.set(stateAggregator.getAggregated()); try { @@ -315,8 +318,13 @@ public Optional getTotalRecordsCommitted() { } @Override - public Long getTotalStateMessagesEmitted() { - return totalEmittedStateMessages.get(); + public Long getTotalSourceStateMessagesEmitted() { + return totalSourceEmittedStateMessages.get(); + } + + @Override + public Long getTotalDestinationStateMessagesEmitted() { + return totalDestinationEmittedStateMessages.get(); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/MessageTracker.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/MessageTracker.java index 4012bece62667..94266bf0f34c1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/MessageTracker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/MessageTracker.java @@ -98,11 +98,13 @@ public interface MessageTracker { Optional getTotalRecordsCommitted(); /** - * Get the overall emitted state message count. + * Get the count of state messages emitted from the source connector. * - * @return returns the total count of emitted state messages. + * @return returns the total count of state messages emitted from the source. */ - Long getTotalStateMessagesEmitted(); + Long getTotalSourceStateMessagesEmitted(); + + Long getTotalDestinationStateMessagesEmitted(); AirbyteTraceMessage getFirstDestinationErrorTraceMessage(); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/SyncCheckConnectionFailure.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/SyncCheckConnectionFailure.java index ac3c623bbe0fb..8755221f51786 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/SyncCheckConnectionFailure.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/SyncCheckConnectionFailure.java @@ -68,8 +68,9 @@ public StandardSyncOutput buildFailureOutput() { .withTotalStats(new SyncStats() .withRecordsEmitted(0L) .withBytesEmitted(0L) - .withStateMessagesEmitted(0L) - .withRecordsCommitted(0L)));; + .withSourceStateMessagesEmitted(0L) + .withDestinationStateMessagesEmitted(0L) + .withRecordsCommitted(0L))); if (failureOutput.getFailureReason() != null) { syncOutput.setFailures(List.of(failureOutput.getFailureReason().withFailureOrigin(origin))); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java index 6d8a5d890c302..b51e9bad82b63 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java @@ -441,7 +441,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException { when(messageTracker.getDestinationOutputState()).thenReturn(Optional.of(new State().withState(expectedState))); when(messageTracker.getTotalRecordsEmitted()).thenReturn(12L); when(messageTracker.getTotalBytesEmitted()).thenReturn(100L); - when(messageTracker.getTotalStateMessagesEmitted()).thenReturn(3L); + when(messageTracker.getTotalSourceStateMessagesEmitted()).thenReturn(3L); + when(messageTracker.getTotalDestinationStateMessagesEmitted()).thenReturn(1L); when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L)); when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L)); @@ -464,7 +465,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException { .withTotalStats(new SyncStats() .withRecordsEmitted(12L) .withBytesEmitted(100L) - .withStateMessagesEmitted(3L) + .withSourceStateMessagesEmitted(3L) + .withDestinationStateMessagesEmitted(1L) .withRecordsCommitted(12L)) // since success, should use emitted count .withStreamStats(Collections.singletonList( new StreamSyncStats() @@ -473,7 +475,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException { .withBytesEmitted(100L) .withRecordsEmitted(12L) .withRecordsCommitted(12L) // since success, should use emitted count - .withStateMessagesEmitted(null))))) + .withSourceStateMessagesEmitted(null) + .withDestinationStateMessagesEmitted(null))))) .withOutputCatalog(syncInput.getCatalog()) .withState(new State().withState(expectedState)); @@ -540,7 +543,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception { when(messageTracker.getTotalRecordsEmitted()).thenReturn(12L); when(messageTracker.getTotalBytesEmitted()).thenReturn(100L); when(messageTracker.getTotalRecordsCommitted()).thenReturn(Optional.of(6L)); - when(messageTracker.getTotalStateMessagesEmitted()).thenReturn(3L); + when(messageTracker.getTotalSourceStateMessagesEmitted()).thenReturn(3L); + when(messageTracker.getTotalDestinationStateMessagesEmitted()).thenReturn(2L); when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L)); when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L)); when(messageTracker.getStreamToCommittedRecords()).thenReturn(Optional.of(Collections.singletonMap(STREAM1, 6L))); @@ -559,7 +563,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception { final SyncStats expectedTotalStats = new SyncStats() .withRecordsEmitted(12L) .withBytesEmitted(100L) - .withStateMessagesEmitted(3L) + .withSourceStateMessagesEmitted(3L) + .withDestinationStateMessagesEmitted(2L) .withRecordsCommitted(6L); final List expectedStreamStats = Collections.singletonList( new StreamSyncStats() @@ -568,7 +573,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception { .withBytesEmitted(100L) .withRecordsEmitted(12L) .withRecordsCommitted(6L) - .withStateMessagesEmitted(null))); + .withSourceStateMessagesEmitted(null) + .withDestinationStateMessagesEmitted(null))); assertNotNull(actual); assertEquals(expectedTotalStats, actual.getReplicationAttemptSummary().getTotalStats()); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java index 68e3d304cdb60..df7f1f988b7c7 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java @@ -58,7 +58,7 @@ void testGetTotalRecordsStatesAndBytesEmitted() { assertEquals(3, messageTracker.getTotalRecordsEmitted()); assertEquals(3L * Jsons.getEstimatedByteSize(r1.getRecord().getData()), messageTracker.getTotalBytesEmitted()); - assertEquals(2, messageTracker.getTotalStateMessagesEmitted()); + assertEquals(2, messageTracker.getTotalSourceStateMessagesEmitted()); } @Test