Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional sync timing information #17643

Merged
merged 14 commits into from
Oct 13, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
destinationConfig.setCatalog(mapper.mapCatalog(destinationConfig.getCatalog()));

final long startTime = System.currentTimeMillis();
final long replicationStartTime = startTime;
long replicationEndTime;
long sourceReadStartTime = -1;
long destinationWriteStartTime = -1;
final ThreadedTimeHolder timeHolder = new ThreadedTimeHolder();

final AtomicReference<FailureReason> replicationRunnableFailureRef = new AtomicReference<>();
final AtomicReference<FailureReason> destinationRunnableFailureRef = new AtomicReference<>();

Expand All @@ -146,12 +152,14 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
// closed first (which is what we want).
try (destination; source) {
destination.start(destinationConfig, jobRoot);
sourceReadStartTime = System.currentTimeMillis();
source.start(sourceConfig, jobRoot);
destinationWriteStartTime = System.currentTimeMillis();

// note: `whenComplete` is used instead of `exceptionally` so that the original exception is still
// thrown
final CompletableFuture<?> destinationOutputThreadFuture = CompletableFuture.runAsync(
getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc),
getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc, timeHolder),
executors).whenComplete((msg, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof DestinationException) {
Expand All @@ -163,7 +171,7 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
});

final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter),
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter, timeHolder),
executors).whenComplete((msg, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof SourceException) {
Expand Down Expand Up @@ -192,6 +200,8 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
executors.shutdownNow();
}

replicationEndTime = System.currentTimeMillis();

final ReplicationStatus outputStatus;
// First check if the process was cancelled. Cancellation takes precedence over failures.
if (cancelled.get()) {
Expand All @@ -212,7 +222,13 @@ else if (hasFailed.get()) {
.withMaxSecondsBeforeSourceStateMessageEmitted(messageTracker.getMaxSecondsToReceiveSourceStateMessage())
.withMeanSecondsBeforeSourceStateMessageEmitted(messageTracker.getMeanSecondsToReceiveSourceStateMessage())
.withMaxSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMaxSecondsBetweenStateMessageEmittedAndCommitted().orElse(null))
.withMeanSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMeanSecondsBetweenStateMessageEmittedAndCommitted().orElse(null));
.withMeanSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMeanSecondsBetweenStateMessageEmittedAndCommitted().orElse(null))
.withReplicationStartTime(replicationStartTime)
.withReplicationEndTime(replicationEndTime > 0 ? replicationEndTime : null)
.withSourceReadStartTime(sourceReadStartTime > 0 ? sourceReadStartTime : null)
.withSourceReadEndTime(timeHolder.getSourceReadEndTime() > 0 ? timeHolder.getSourceReadEndTime() : null)
.withDestinationWriteStartTime(destinationWriteStartTime > 0 ? destinationWriteStartTime : null)
.withDestinationWriteEndTime(timeHolder.getDestinationWriteEndTime() > 0 ? timeHolder.getDestinationWriteEndTime() : null);

if (outputStatus == ReplicationStatus.COMPLETED) {
totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted());
Expand Down Expand Up @@ -310,6 +326,29 @@ else if (hasFailed.get()) {

}

private static class ThreadedTimeHolder {

private long sourceReadEndTime = -1;
private long destinationWriteEndTime = -1;

public synchronized void trackSourceReadEndTime() {
this.sourceReadEndTime = System.currentTimeMillis();
}

public synchronized void trackDestinationWriteEndTime() {
this.destinationWriteEndTime = System.currentTimeMillis();
}

public synchronized long getSourceReadEndTime() {
return this.sourceReadEndTime;
}

public synchronized long getDestinationWriteEndTime() {
return this.destinationWriteEndTime;
}

}

@SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause")
private static Runnable getReplicationRunnable(final AirbyteSource source,
final AirbyteDestination destination,
Expand All @@ -318,7 +357,8 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
final MessageTracker messageTracker,
final Map<String, String> mdc,
final RecordSchemaValidator recordSchemaValidator,
final WorkerMetricReporter metricReporter) {
final WorkerMetricReporter metricReporter,
final ThreadedTimeHolder timeHolder) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Replication thread started.");
Expand Down Expand Up @@ -362,6 +402,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
}
}
}
timeHolder.trackSourceReadEndTime();
LOGGER.info("Total records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted()));
if (!validationErrors.isEmpty()) {
validationErrors.forEach((stream, errorPair) -> {
Expand Down Expand Up @@ -431,7 +472,8 @@ private static void validateSchema(final RecordSchemaValidator recordSchemaValid
private static Runnable getDestinationOutputRunnable(final AirbyteDestination destination,
final AtomicBoolean cancelled,
final MessageTracker messageTracker,
final Map<String, String> mdc) {
final Map<String, String> mdc,
final ThreadedTimeHolder timeHolder) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Destination output thread started.");
Expand All @@ -448,6 +490,7 @@ private static Runnable getDestinationOutputRunnable(final AirbyteDestination de
messageTracker.acceptFromDestination(messageOptional.get());
}
}
timeHolder.trackDestinationWriteEndTime();
if (!cancelled.get() && destination.getExitValue() != 0) {
throw new DestinationException("Destination process exited with non-zero exit code " + destination.getExitValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,11 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
Jsons.jsonNode(actual));
assertTrue(validate.isEmpty(), "Validation errors: " + Strings.join(validate, ","));

// remove times so we can do the rest of the object <> object comparison.
actual.getReplicationAttemptSummary().withStartTime(null);
actual.getReplicationAttemptSummary().withEndTime(null);
// remove times, so we can do the rest of the object <> object comparison.
actual.getReplicationAttemptSummary().withStartTime(null).withEndTime(null).getTotalStats().withReplicationStartTime(null)
.withReplicationEndTime(null)
.withSourceReadStartTime(null).withSourceReadEndTime(null)
.withDestinationWriteStartTime(null).withDestinationWriteEndTime(null);

assertEquals(replicationOutput, actual);
}
Expand Down Expand Up @@ -631,7 +633,9 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
.withDestinationStateMessagesEmitted(null)));

assertNotNull(actual);
assertEquals(expectedTotalStats, actual.getReplicationAttemptSummary().getTotalStats());
// null out timing stats for assertion matching
assertEquals(expectedTotalStats, actual.getReplicationAttemptSummary().getTotalStats().withReplicationStartTime(null).withReplicationEndTime(null)
.withSourceReadStartTime(null).withSourceReadEndTime(null).withDestinationWriteStartTime(null).withDestinationWriteEndTime(null));
assertEquals(expectedStreamStats, actual.getReplicationAttemptSummary().getStreamStats());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,15 @@ properties:
type: integer
meanSecondsBetweenStateMessageEmittedandCommitted:
type: integer
replicationStartTime:
type: integer
replicationEndTime:
type: integer
sourceReadStartTime:
type: integer
sourceReadEndTime:
type: integer
destinationWriteStartTime:
type: integer
destinationWriteEndTime:
type: integer
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobOutput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.ScheduleData;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.ScheduleType;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.persistence.job.models.Attempt;
import io.airbyte.persistence.job.models.Job;
Expand Down Expand Up @@ -110,6 +112,9 @@ public static Map<String, Object> generateJobAttemptMetadata(final Job job) {
final JobOutput jobOutput = lastAttempt.getOutput().get();
if (jobOutput.getSync() != null) {
final StandardSyncSummary syncSummary = jobOutput.getSync().getStandardSyncSummary();
final SyncStats totalStats = syncSummary.getTotalStats();
final NormalizationSummary normalizationSummary = jobOutput.getSync().getNormalizationSummary();

if (syncSummary.getStartTime() != null)
metadata.put("sync_start_time", syncSummary.getStartTime());
if (syncSummary.getEndTime() != null && syncSummary.getStartTime() != null)
Expand All @@ -118,22 +123,42 @@ public static Map<String, Object> generateJobAttemptMetadata(final Job job) {
metadata.put("volume_mb", syncSummary.getBytesSynced());
if (syncSummary.getRecordsSynced() != null)
metadata.put("volume_rows", syncSummary.getRecordsSynced());
if (syncSummary.getTotalStats().getSourceStateMessagesEmitted() != null)
if (totalStats.getSourceStateMessagesEmitted() != null)
metadata.put("count_state_messages_from_source", syncSummary.getTotalStats().getSourceStateMessagesEmitted());
if (syncSummary.getTotalStats().getDestinationStateMessagesEmitted() != null)
if (totalStats.getDestinationStateMessagesEmitted() != null)
metadata.put("count_state_messages_from_destination", syncSummary.getTotalStats().getDestinationStateMessagesEmitted());
if (syncSummary.getTotalStats().getMaxSecondsBeforeSourceStateMessageEmitted() != null)
if (totalStats.getMaxSecondsBeforeSourceStateMessageEmitted() != null)
metadata.put("max_seconds_before_source_state_message_emitted",
syncSummary.getTotalStats().getMaxSecondsBeforeSourceStateMessageEmitted());
if (syncSummary.getTotalStats().getMeanSecondsBeforeSourceStateMessageEmitted() != null)
totalStats.getMaxSecondsBeforeSourceStateMessageEmitted());
if (totalStats.getMeanSecondsBeforeSourceStateMessageEmitted() != null)
metadata.put("mean_seconds_before_source_state_message_emitted",
syncSummary.getTotalStats().getMeanSecondsBeforeSourceStateMessageEmitted());
if (syncSummary.getTotalStats().getMaxSecondsBetweenStateMessageEmittedandCommitted() != null)
totalStats.getMeanSecondsBeforeSourceStateMessageEmitted());
if (totalStats.getMaxSecondsBetweenStateMessageEmittedandCommitted() != null)
metadata.put("max_seconds_between_state_message_emit_and_commit",
syncSummary.getTotalStats().getMaxSecondsBetweenStateMessageEmittedandCommitted());
if (syncSummary.getTotalStats().getMeanSecondsBetweenStateMessageEmittedandCommitted() != null)
totalStats.getMaxSecondsBetweenStateMessageEmittedandCommitted());
if (totalStats.getMeanSecondsBetweenStateMessageEmittedandCommitted() != null)
metadata.put("mean_seconds_between_state_message_emit_and_commit",
syncSummary.getTotalStats().getMeanSecondsBetweenStateMessageEmittedandCommitted());
totalStats.getMeanSecondsBetweenStateMessageEmittedandCommitted());

if (totalStats.getReplicationStartTime() != null)
metadata.put("replication_start_time", totalStats.getReplicationStartTime());
if (totalStats.getReplicationEndTime() != null)
metadata.put("replication_end_time", totalStats.getReplicationEndTime());
if (totalStats.getSourceReadStartTime() != null)
metadata.put("source_read_start_time", totalStats.getSourceReadStartTime());
if (totalStats.getSourceReadEndTime() != null)
metadata.put("source_read_end_time", totalStats.getSourceReadEndTime());
if (totalStats.getDestinationWriteStartTime() != null)
metadata.put("destination_write_start_time", totalStats.getDestinationWriteStartTime());
if (totalStats.getDestinationWriteEndTime() != null)
metadata.put("destination_write_end_time", totalStats.getDestinationWriteEndTime());

if (normalizationSummary != null) {
if (normalizationSummary.getStartTime() != null)
metadata.put("normalization_start_time", normalizationSummary.getStartTime());
if (normalizationSummary.getEndTime() != null)
metadata.put("normalization_end_time", normalizationSummary.getEndTime());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Metadata;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.Schedule;
import io.airbyte.config.Schedule.TimeUnit;
import io.airbyte.config.StandardCheckConnectionOutput;
Expand Down Expand Up @@ -124,6 +125,14 @@ class JobTrackerTest {
.put("mean_seconds_before_source_state_message_emitted", 4L)
.put("max_seconds_between_state_message_emit_and_commit", 7L)
.put("mean_seconds_between_state_message_emit_and_commit", 6L)
.put("replication_start_time", 7L)
.put("replication_end_time", 8L)
.put("source_read_start_time", 9L)
.put("source_read_end_time", 10L)
.put("destination_write_start_time", 11L)
.put("destination_write_end_time", 12L)
.put("normalization_start_time", 13L)
.put("normalization_end_time", 14L)
.build();
private static final ImmutableMap<String, Object> SYNC_CONFIG_METADATA = ImmutableMap.<String, Object>builder()
.put(JobTracker.CONFIG + ".source.key", JobTracker.SET)
Expand Down Expand Up @@ -566,13 +575,15 @@ private Attempt getAttemptMock() {
final JobOutput jobOutput = mock(JobOutput.class);
final StandardSyncOutput syncOutput = mock(StandardSyncOutput.class);
final StandardSyncSummary syncSummary = mock(StandardSyncSummary.class);
final NormalizationSummary normalizationSummary = mock(NormalizationSummary.class);
final SyncStats syncStats = mock(SyncStats.class);

when(syncSummary.getStartTime()).thenReturn(SYNC_START_TIME);
when(syncSummary.getEndTime()).thenReturn(SYNC_END_TIME);
when(syncSummary.getBytesSynced()).thenReturn(SYNC_BYTES_SYNC);
when(syncSummary.getRecordsSynced()).thenReturn(SYNC_RECORDS_SYNC);
when(syncOutput.getStandardSyncSummary()).thenReturn(syncSummary);
when(syncOutput.getNormalizationSummary()).thenReturn(normalizationSummary);
when(syncSummary.getTotalStats()).thenReturn(syncStats);
when(jobOutput.getSync()).thenReturn(syncOutput);
when(attempt.getOutput()).thenReturn(java.util.Optional.of(jobOutput));
Expand All @@ -582,6 +593,15 @@ private Attempt getAttemptMock() {
when(syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()).thenReturn(4L);
when(syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted()).thenReturn(7L);
when(syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted()).thenReturn(6L);
when(syncStats.getReplicationStartTime()).thenReturn(7L);
when(syncStats.getReplicationEndTime()).thenReturn(8L);
when(syncStats.getSourceReadStartTime()).thenReturn(9L);
when(syncStats.getSourceReadEndTime()).thenReturn(10L);
when(syncStats.getDestinationWriteStartTime()).thenReturn(11L);
when(syncStats.getDestinationWriteEndTime()).thenReturn(12L);
when(normalizationSummary.getStartTime()).thenReturn(13L);
when(normalizationSummary.getEndTime()).thenReturn(14L);

return attempt;
}

Expand Down
Loading