Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Aug 12, 2022
1 parent 7c25f5b commit 8f19838
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ properties:
recordsCommitted:
type: integer # if unset, committed records could not be computed
meanSecondsBeforeStateMessageEmitted:
type: double
type: integer
maxSecondsBeforeStateMessageEmitted:
type: integer
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class JobTrackerTest {
.put("volume_mb", SYNC_BYTES_SYNC)
.put("count_state_messages_from_source", 3L)
.put("count_state_messages_from_destination", 1L)
.put("max_seconds_before_state_message_emitted", 5L)
.put("mean_seconds_before_state_message_emitted", 4L)
.build();
private static final ImmutableMap<String, Object> SYNC_CONFIG_METADATA = ImmutableMap.<String, Object>builder()
.put(JobTracker.CONFIG + ".source.key", JobTracker.SET)
Expand Down Expand Up @@ -496,6 +498,8 @@ private Attempt getAttemptMock() {
when(attempt.getOutput()).thenReturn(java.util.Optional.of(jobOutput));
when(syncStats.getSourceStateMessagesEmitted()).thenReturn(3L);
when(syncStats.getDestinationStateMessagesEmitted()).thenReturn(1L);
when(syncStats.getMaxSecondsBeforeStateMessageEmitted()).thenReturn(5L);
when(syncStats.getMeanSecondsBeforeStateMessageEmitted()).thenReturn(4L);
return attempt;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.google.common.collect.HashBiMap;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.AtomicDouble;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.FailureReason;
Expand Down Expand Up @@ -44,7 +43,7 @@ public class AirbyteMessageTracker implements MessageTracker {
private final AtomicLong totalSourceEmittedStateMessages;
private final AtomicLong totalDestinationEmittedStateMessages;
private final AtomicLong maxSecondsToReceiveStateMessage;
private final AtomicDouble meanSecondsToReceiveStateMessage;
private final AtomicLong meanSecondsToReceiveStateMessage;
private final Map<Short, Long> streamToRunningCount;
private final HashFunction hashFunction;
private final BiMap<String, Short> streamNameToIndex;
Expand Down Expand Up @@ -82,7 +81,7 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, final
this.totalSourceEmittedStateMessages = new AtomicLong(0L);
this.totalDestinationEmittedStateMessages = new AtomicLong(0L);
this.maxSecondsToReceiveStateMessage = new AtomicLong(0L);
this.meanSecondsToReceiveStateMessage = new AtomicDouble(0.0);
this.meanSecondsToReceiveStateMessage = new AtomicLong(0L);
this.streamToRunningCount = new HashMap<>();
this.streamNameToIndex = HashBiMap.create();
this.hashFunction = Hashing.murmur3_32_fixed();
Expand Down Expand Up @@ -349,7 +348,7 @@ public Long getMaxSecondsToReceiveStateMessage() {
}

@Override
public Double getMeanSecondsToReceiveStateMessage() {
public Long getMeanSecondsToReceiveStateMessage() {
return meanSecondsToReceiveStateMessage.get();
}

Expand All @@ -362,7 +361,7 @@ private void updateMaxAndMeanSecondsToReceiveStateMessage(final DateTime stateMe
if (meanSecondsToReceiveStateMessage.get() == 0) {
meanSecondsToReceiveStateMessage.set(secondsSinceLastStateMessage);
} else {
final Double newMeanSeconds =
final Long newMeanSeconds =
calculateMean(meanSecondsToReceiveStateMessage.get(), totalSourceEmittedStateMessages.get(), secondsSinceLastStateMessage);
meanSecondsToReceiveStateMessage.set(newMeanSeconds);
}
Expand All @@ -381,9 +380,10 @@ private Long calculateSecondsSinceLastStateEmitted(final DateTime stateMessageRe
}

@VisibleForTesting
protected Double calculateMean(final Double currentMean, final Long totalCount, final Long newDataPoint) {
protected Long calculateMean(final Long currentMean, final Long totalCount, final Long newDataPoint) {
final Long previousCount = totalCount - 1;
return (currentMean * previousCount / totalCount) + (Double.valueOf(newDataPoint) / totalCount);
final double result = (Double.valueOf(currentMean * previousCount) / totalCount) + (Double.valueOf(newDataPoint) / totalCount);
return (long) result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public interface MessageTracker {

Long getMaxSecondsToReceiveStateMessage();

Double getMeanSecondsToReceiveStateMessage();
Long getMeanSecondsToReceiveStateMessage();

AirbyteTraceMessage getFirstDestinationErrorTraceMessage();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
when(messageTracker.getTotalDestinationStateMessagesEmitted()).thenReturn(1L);
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L));
when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L));
when(messageTracker.getMaxSecondsToReceiveStateMessage()).thenReturn(5L);
when(messageTracker.getMeanSecondsToReceiveStateMessage()).thenReturn(4L);

final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
Expand All @@ -467,6 +469,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
.withBytesEmitted(100L)
.withSourceStateMessagesEmitted(3L)
.withDestinationStateMessagesEmitted(1L)
.withMaxSecondsBeforeStateMessageEmitted(5L)
.withMeanSecondsBeforeStateMessageEmitted(4L)
.withRecordsCommitted(12L)) // since success, should use emitted count
.withStreamStats(Collections.singletonList(
new StreamSyncStats()
Expand All @@ -476,7 +480,9 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
.withRecordsEmitted(12L)
.withRecordsCommitted(12L) // since success, should use emitted count
.withSourceStateMessagesEmitted(null)
.withDestinationStateMessagesEmitted(null)))))
.withDestinationStateMessagesEmitted(null)
.withMaxSecondsBeforeStateMessageEmitted(null)
.withMeanSecondsBeforeStateMessageEmitted(null)))))
.withOutputCatalog(syncInput.getCatalog())
.withState(new State().withState(expectedState));

Expand Down Expand Up @@ -548,6 +554,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L));
when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L));
when(messageTracker.getStreamToCommittedRecords()).thenReturn(Optional.of(Collections.singletonMap(STREAM1, 6L)));
when(messageTracker.getMaxSecondsToReceiveStateMessage()).thenReturn(10L);
when(messageTracker.getMeanSecondsToReceiveStateMessage()).thenReturn(8L);

final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
Expand All @@ -565,6 +573,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
.withBytesEmitted(100L)
.withSourceStateMessagesEmitted(3L)
.withDestinationStateMessagesEmitted(2L)
.withMaxSecondsBeforeStateMessageEmitted(10L)
.withMeanSecondsBeforeStateMessageEmitted(8L)
.withRecordsCommitted(6L);
final List<StreamSyncStats> expectedStreamStats = Collections.singletonList(
new StreamSyncStats()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,10 @@ void testErrorTraceMessageFailureWithNoTraceErrors() throws Exception {
@Test
void testCalculateMean() throws Exception {
// Mean for 3 state messages is 5, 4th state message is 9, new mean should be 6
assertEquals(6.0, messageTracker.calculateMean(5.0, 4L, 9L));
assertEquals(6L, messageTracker.calculateMean(5L, 4L, 9L));

// Mean for 5 state messages is 10, 4th state message is 12, new mean should be 10.33...
assertEquals(10.333333333333334, messageTracker.calculateMean(10.0, 6L, 12L));
// Mean for 5 state messages is 10, 4th state message is 12, new mean is 10.33 rounded down to 10
assertEquals(10L, messageTracker.calculateMean(10L, 6L, 12L));
}

}

0 comments on commit 8f19838

Please sign in to comment.